限制rabbitmq每秒出队的任务

问题描述

我在 RabbitMQ 中有一个消费者-生产者方式的队列,它可以作为基本的循环队列正常工作。

我的问题是我试图限制每秒处理的请求数量,因为当我将项目出列时,我会向 DO 空间发出请求,如果我在一秒钟内发出 750 个或更多请求,该空间将阻止我的 IP .我使用 goroutines 同时使项目出列,但我只想每秒一次出列 500 个项目以避免达到该限制。这需要考虑当前正在出队的项目(即我不能只是从队列中提取 500 个项目然后延迟到下一秒),基本上在它运行出队代码之前,它需要等待以确保有在那一秒内出列的请求还没有超过 500 个。到目前为止,我有这段代码,但它似乎不能正常工作(注意我正在测试每秒 2 个请求,而不是现在 500 个)。每隔一段时间它就会有很长的延迟(比如 20+ 秒),我不确定它是否正确计算了限制。请注意,我很确定这里不需要预取选项,因为它限制了每秒传入的消息数量在这里我只想限制每秒并发出队的消息。

import (
   "os"
   "fmt"
   "github.com/streadway/amqp"
   "golang.org/x/time/rate"
   "context"
)


// Rate-limit => 2 req/s
const (
   workers = 2
)

func failOnErrorWorker(err error,msg string) {
   if err != nil {
       fmt.Println(msg)
       fmt.Println(err)
   }
}

func main() {
   // Get the env variables for the queue name and connection string
   queueName := os.Getenv("QUEUE_NAME")
   connectionString := os.Getenv("CONNECTION_STRING")

   // Set up rate limiter and context
   limiter := rate.NewLimiter(2,1)
   ctx := context.Background()

   // Connect to the rabbitmq instance
   conn,err := amqp.Dial(connectionString)
   failOnErrorWorker(err,"Failed to connect to RabbitMQ")
   defer conn.Close()

   // Open a channel for the queue
   ch,err := conn.Channel()
   failOnErrorWorker(err,"Failed to open a channel")
   defer ch.Close()

   // Consume the messages from this queue
   msgs,err := ch.Consume(
       queueName,// queue
       "",// consumer
       false,// auto-ack
       false,// exclusive
       false,// no-local
       false,// no-wait
       nil,// args
   )
   failOnErrorWorker(err,"Failed to register a consumer")

   forever := make(chan bool)

   go func() {
       for d := range msgs {
           // Wait until there are less than 2 workers per second
           limiter.Wait(ctx)
           go func() {
               // Dequeue the item and ackNowledge the message
               DeQueue(d.Body)
               d.Ack(false)
           } ()
       }
   }()

   fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
   // Continually run the worker
   <-forever
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...