SQS消息在队列中空闲长达99秒,直到readMessage接收到它为止

问题描述

我有一个ETL流程,可以转换s3中的文件。 S3事件通知将消息发送到SQS,并且Fargate任务轮询队列以查找消息。我的问题是,邮件在长达99秒的可变时间内没有被接收。

我*每2秒寻找一次邮件。我可以通过控制台确认消息在队列中,但是很长一段时间都没有读取。另外,我根据MessageSystemAttributeNameroxReceiveCount和MessageSystemAttributeNameSentTimestamp

间的差异,打印消息在日志队列中的延迟时间
    queueURL := os.Getenv("QUEUE_URL")
    csvBucket := os.Getenv("CSV_BUCKET_NAME")

    sess,_ := session.NewSession()
    //Todo: check and handle error
    svc := sqs.New(sess)

    for {
        time.Sleep(time.Duration(2) * time.Second)

        msgResult,err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
            AttributeNames: []*string{
                aws.String(sqs.MessageSystemAttributeNameSenderId),aws.String(sqs.MessageSystemAttributeNameSentTimestamp),aws.String(sqs.MessageSystemAttributeNameApproximateReceiveCount),aws.String(sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp),aws.String(sqs.MessageSystemAttributeNameSequenceNumber),aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId),aws.String(sqs.MessageSystemAttributeNameMessageGroupId),aws.String(sqs.MessageSystemAttributeNameAwstraceHeader),},MessageAttributeNames: []*string{
                aws.String(sqs.QueueAttributeNameAll),QueueUrl:            aws.String(queueURL),MaxnumberOfMessages: aws.Int64(10),VisibilityTimeout:   aws.Int64(30),// 60 seconds
            WaitTimeSeconds:     aws.Int64(0),})

        if err != nil {
            log.Printf("recieve message Failed: %v",err)
            return
        }

        if len(msgResult.Messages) == 0 {
            continue
        }
        //Todo: process all results
        // fmt.Printf("Success: %+v\n",msgResult.Messages)
        log.Printf("amount of messages %v",len(msgResult.Messages))

        i,err := strconv.ParseInt(*msgResult.Messages[0].Attributes["ApproximateFirstReceiveTimestamp"],10,64)
        if err != nil {
            log.Printf("Failed to parse int: %v",err)
            return
        }
        tm1 := time.Unix(0,i*int64(1000000))

        ii,err := strconv.ParseInt(*msgResult.Messages[0].Attributes["SentTimestamp"],64)
        if err != nil {
            panic(err)
        }
        tm2 := time.Unix(0,ii*int64(1000000))

/* some code to download from s3 and upload to another bucket */

        _,err = svc.DeleteMessage(&sqs.DeleteMessageInput{
            QueueUrl:      aws.String(queueURL),ReceiptHandle: msgResult.Messages[0].ReceiptHandle,})
        if err != nil {
            log.Println("delete Failed")
            continue
        }
        log.Println("message delete succeeded")
    }
        
timestamp,message
1599145270790,2020/09/03 15:01:10 seconds the message was unprocessed in the queue: 0.909
1599145426542,2020/09/03 15:03:46 seconds the message was unprocessed in the queue: 19.835
1599145472884,2020/09/03 15:04:32 seconds the message was unprocessed in the queue: 4.721
1599145611897,2020/09/03 15:06:51 seconds the message was unprocessed in the queue: 24.793
1599145720293,2020/09/03 15:08:40 seconds the message was unprocessed in the queue: 40.296
1599145930662,2020/09/03 15:12:10 seconds the message was unprocessed in the queue: 66.736
1599145997155,2020/09/03 15:13:17 seconds the message was unprocessed in the queue: 10.961
1599146249316,2020/09/03 15:17:29 seconds the message was unprocessed in the queue: 99.084
1599146319998,2020/09/03 15:18:39 seconds the message was unprocessed in the queue: 35.015
1599146361620,2020/09/03 15:19:21 seconds the message was unprocessed in the queue: 17.353
1599146438448,2020/09/03 15:20:38 seconds the message was unprocessed in the queue: 45.878

我很迷失了我做错的事情,之后我将另一个存储桶上传到队列中,并被lambda拾取,并始终在大约一秒钟内完成

定义管道的cdk代码

        super(scope,id)

        const dlq = new sqs.Queue(this,'DeadLetterQueue',{
            retentionPeriod: cdk.Duration.days(14),});

        const q = new sqs.Queue(this,'ProcessingQueue',{
            deadLetterQueue: {
                queue: dlq,maxReceiveCount: 3,});

        const queueFargate = new ecsPatterns.QueueProcessingFargateService(this,'Service',{
            queue: q,vpc: props.vpc,memoryLimitMiB: 512,cpu: 256,image: ecs.ContainerImage.fromAsset('services/convert'),platformVersion: ecs.FargatePlatformVersion.VERSION1_4,desiredTaskCount: 1,environment: {
                QUEUE_URL: q.queueUrl,CSV_BUCKET_NAME: props.convertBucket.bucketName,});

        props.uploadBucket.addEventNotification(s3.EventType.OBJECT_CREATED,new s3n.SqsDestination(queueFargate.sqsQueue))
        props.uploadBucket.grantRead(queueFargate.service.taskDeFinition.taskRole)
        props.convertBucket.grantWrite(queueFargate.service.taskDeFinition.taskRole)

适用于Go的AWS开发工具包的版本?

github.com/aws/aws-sdk-go v1.34.6

转到1.14

预期的行为

消息应在队列中后立即读取。

解决方法

这可能是使用短轮询而不是长轮询的结果。根据{{​​3}}上的AWS文档:

通过短轮询,ReceiveMessage请求仅查询服务器的子集(基于加权随机分布)以查找可包含在响应中的消息。即使查询未找到消息,Amazon SQS也会立即发送响应。

另一方面,长轮询将:

通过查询所有而非Amazon SQS服务器的子集来减少错误的空响应。

您可以通过为[2020-10-14 05:26:34] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 562 of 577[32m SUCCESS[39m (0 secs / 5 mins 53.508 secs) [2020-10-14 05:26:35] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 563 of 577[32m SUCCESS[39m (0 secs / 5 mins 53.624 secs) [2020-10-14 05:26:35] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 563 of 577[32m SUCCESS[39m (10 mins 26.296 secs / 5 mins 53.624 secs) [2020-10-14 05:26:35] DEBUG: [32mTOTAL: 563 SUCCESS[39m [2020-10-14 05:26:35] DEBUG: BROWSER FirefoxHeadless COMPLETED [2020-10-14 05:26:35] DEBUG: SUCCESS 563 [2020-10-14 05:26:35] DEBUG: FAILED 0 [2020-10-14 05:26:35] DEBUG: SKIPPED 0 [2020-10-14 05:26:35] DEBUG: TOTAL 577 [2020-10-14 05:26:35] DEBUG: ERROR false [2020-10-14 05:26:35] DEBUG: DISCONNECTED false [2020-10-14 05:26:35] DEBUG: TOTAL TIME 626296 ms 传递一个非零值,并在每次迭代中删除两秒钟的睡眠来在代码中实现长轮询。