问题描述
我有一个ETL流程,可以转换s3中的文件。 S3事件通知将消息发送到SQS,并且Fargate任务轮询队列以查找消息。我的问题是,邮件在长达99秒的可变时间内没有被接收。
我*每2秒寻找一次邮件。我可以通过控制台确认消息在队列中,但是很长一段时间都没有读取。另外,我根据MessageSystemAttributeNameroxReceiveCount和MessageSystemAttributeNameSentTimestamp
之间的差异,打印消息在日志队列中的延迟时间 queueURL := os.Getenv("QUEUE_URL")
csvBucket := os.Getenv("CSV_BUCKET_NAME")
sess,_ := session.NewSession()
//Todo: check and handle error
svc := sqs.New(sess)
for {
time.Sleep(time.Duration(2) * time.Second)
msgResult,err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSenderId),aws.String(sqs.MessageSystemAttributeNameSentTimestamp),aws.String(sqs.MessageSystemAttributeNameApproximateReceiveCount),aws.String(sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp),aws.String(sqs.MessageSystemAttributeNameSequenceNumber),aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId),aws.String(sqs.MessageSystemAttributeNameMessageGroupId),aws.String(sqs.MessageSystemAttributeNameAwstraceHeader),},MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),QueueUrl: aws.String(queueURL),MaxnumberOfMessages: aws.Int64(10),VisibilityTimeout: aws.Int64(30),// 60 seconds
WaitTimeSeconds: aws.Int64(0),})
if err != nil {
log.Printf("recieve message Failed: %v",err)
return
}
if len(msgResult.Messages) == 0 {
continue
}
//Todo: process all results
// fmt.Printf("Success: %+v\n",msgResult.Messages)
log.Printf("amount of messages %v",len(msgResult.Messages))
i,err := strconv.ParseInt(*msgResult.Messages[0].Attributes["ApproximateFirstReceiveTimestamp"],10,64)
if err != nil {
log.Printf("Failed to parse int: %v",err)
return
}
tm1 := time.Unix(0,i*int64(1000000))
ii,err := strconv.ParseInt(*msgResult.Messages[0].Attributes["SentTimestamp"],64)
if err != nil {
panic(err)
}
tm2 := time.Unix(0,ii*int64(1000000))
/* some code to download from s3 and upload to another bucket */
_,err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),ReceiptHandle: msgResult.Messages[0].ReceiptHandle,})
if err != nil {
log.Println("delete Failed")
continue
}
log.Println("message delete succeeded")
}
timestamp,message
1599145270790,2020/09/03 15:01:10 seconds the message was unprocessed in the queue: 0.909
1599145426542,2020/09/03 15:03:46 seconds the message was unprocessed in the queue: 19.835
1599145472884,2020/09/03 15:04:32 seconds the message was unprocessed in the queue: 4.721
1599145611897,2020/09/03 15:06:51 seconds the message was unprocessed in the queue: 24.793
1599145720293,2020/09/03 15:08:40 seconds the message was unprocessed in the queue: 40.296
1599145930662,2020/09/03 15:12:10 seconds the message was unprocessed in the queue: 66.736
1599145997155,2020/09/03 15:13:17 seconds the message was unprocessed in the queue: 10.961
1599146249316,2020/09/03 15:17:29 seconds the message was unprocessed in the queue: 99.084
1599146319998,2020/09/03 15:18:39 seconds the message was unprocessed in the queue: 35.015
1599146361620,2020/09/03 15:19:21 seconds the message was unprocessed in the queue: 17.353
1599146438448,2020/09/03 15:20:38 seconds the message was unprocessed in the queue: 45.878
我很迷失了我做错的事情,之后我将另一个存储桶上传到队列中,并被lambda拾取,并始终在大约一秒钟内完成
定义管道的cdk代码
super(scope,id)
const dlq = new sqs.Queue(this,'DeadLetterQueue',{
retentionPeriod: cdk.Duration.days(14),});
const q = new sqs.Queue(this,'ProcessingQueue',{
deadLetterQueue: {
queue: dlq,maxReceiveCount: 3,});
const queueFargate = new ecsPatterns.QueueProcessingFargateService(this,'Service',{
queue: q,vpc: props.vpc,memoryLimitMiB: 512,cpu: 256,image: ecs.ContainerImage.fromAsset('services/convert'),platformVersion: ecs.FargatePlatformVersion.VERSION1_4,desiredTaskCount: 1,environment: {
QUEUE_URL: q.queueUrl,CSV_BUCKET_NAME: props.convertBucket.bucketName,});
props.uploadBucket.addEventNotification(s3.EventType.OBJECT_CREATED,new s3n.SqsDestination(queueFargate.sqsQueue))
props.uploadBucket.grantRead(queueFargate.service.taskDeFinition.taskRole)
props.convertBucket.grantWrite(queueFargate.service.taskDeFinition.taskRole)
适用于Go的AWS开发工具包的版本?
github.com/aws/aws-sdk-go v1.34.6
转到1.14
预期的行为
消息应在队列中后立即读取。
解决方法
这可能是使用短轮询而不是长轮询的结果。根据{{3}}上的AWS文档:
通过短轮询,ReceiveMessage请求仅查询服务器的子集(基于加权随机分布)以查找可包含在响应中的消息。即使查询未找到消息,Amazon SQS也会立即发送响应。
另一方面,长轮询将:
通过查询所有而非Amazon SQS服务器的子集来减少错误的空响应。
您可以通过为[2020-10-14 05:26:34] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 562 of 577[32m SUCCESS[39m (0 secs / 5 mins 53.508 secs)
[2020-10-14 05:26:35] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 563 of 577[32m SUCCESS[39m (0 secs / 5 mins 53.624 secs)
[2020-10-14 05:26:35] DEBUG: [1A[2KFirefox 81.0 (Windows 10): Executed 563 of 577[32m SUCCESS[39m (10 mins 26.296 secs / 5 mins 53.624 secs)
[2020-10-14 05:26:35] DEBUG: [32mTOTAL: 563 SUCCESS[39m
[2020-10-14 05:26:35] DEBUG: BROWSER FirefoxHeadless COMPLETED
[2020-10-14 05:26:35] DEBUG: SUCCESS 563
[2020-10-14 05:26:35] DEBUG: FAILED 0
[2020-10-14 05:26:35] DEBUG: SKIPPED 0
[2020-10-14 05:26:35] DEBUG: TOTAL 577
[2020-10-14 05:26:35] DEBUG: ERROR false
[2020-10-14 05:26:35] DEBUG: DISCONNECTED false
[2020-10-14 05:26:35] DEBUG: TOTAL TIME 626296 ms
传递一个非零值,并在每次迭代中删除两秒钟的睡眠来在代码中实现长轮询。