问题描述
我是第一次尝试rabbitMQ,在学习了所有教程之后,对于简单的事情来说这似乎是一件容易的事,但并不是那么容易。
我用 Go 开发了一个程序,与 RPC 示例非常相似,但双方都手动确认。
在发布者上,我只发布 int 值,接收者将值除以 2,成功后,确认消息并回复 ReplyTo 路由键。然后,在原始发布者上,如果 CorrelationId 匹配,则处理响应,否则 拒绝 并重新排队,以便消息不会被丢弃(然后由正确的发布者使用) .
问题在于,原始发布者出于某种原因停止从频道读取消息。你能帮我理解这背后的原因吗?
P.S.:客户端脚本可以像这样运行:go run rpc_client.go 13
客户端(发布者并等待 RPC 返回)
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"strconv"
)
var exchangeName string = "exchangeName"
var serverRoutingKey string = "routeServer"
var responseRoutingKey string = "routeResponse"
var queueResponseName string = "queueResponseName"
// Initializes rabbitMQ connection,channel and exchange
func Start() (*amqp.Connection,*amqp.Channel,error) {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return nil,nil,err
}
ch,err := conn.Channel()
if err != nil {
return nil,err
}
err = ch.ExchangeDeclare(
exchangeName,// name
"direct",// type
true,// durable
false,// auto-deleted
false,// internal
false,// no-wait
nil,// arguments
)
if err != nil {
return nil,err
}
return conn,ch,nil
}
// Publish a message to execute a RPC and return response from the procedure
func Send(ch *amqp.Channel,counter int,multiplier int) (res string,err error) {
// Use the same (named) queue to receive the result of RPC for multiple instances and differ them by the correlation id
q,err := ch.QueueDeclare(
queueResponseName,// name
true,// delete when unused
false,// exclusive
false,// Nowait
nil,// arguments
)
if err != nil {
return "",err
}
err = ch.Qos(
1,// prefetch count
0,// prefetch size
false,// global
)
if err != nil {
return "",err
}
err = ch.QueueBind(
q.Name,// queue name
responseRoutingKey,// routing key
exchangeName,// exchange
false,//arguments
)
if err != nil {
return "",err
}
msgs,err := ch.Consume(
q.Name,// queue
"",// consumer
false,// auto-ack
false,// no-local
false,// args
)
if err != nil {
return "",err
}
counterID := strconv.Itoa(counter * multiplier)
log.Printf("->Calling RPC with value: %s",counterID)
err = ch.Publish(
exchangeName,// exchange
serverRoutingKey,// routing key
false,// mandatory
false,// immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,// Do not lose messages if no queue created yet
ContentType: "text/plain",CorrelationId: counterID,ReplyTo: responseRoutingKey,Body: []byte(counterID),})
if err != nil {
return "",err
}
for d := range msgs {
log.Println(" <-d.CorrelationId:",d.CorrelationId)
if d.CorrelationId == counterID {
log.Println(" ->ACK")
d.Ack(true)
res = string(d.Body)
break
} else {
// Requeue message to be consumed by the correct consumer. Otherwise it would be discarded because it was already read
log.Println(" ->REJECT")
d.Reject(false) // requeue
}
}
return res,nil
}
// Read argument from console to use as multiplier
func readMultiplier(args []string) (int,error) {
if (len(args) < 2) || os.Args[1] == "" {
return 1,nil
} else {
return strconv.Atoi(os.Args[1])
}
}
func main() {
// Just read an argument from console to use as multiplier
m,err := readMultiplier(os.Args)
if err != nil {
return
}
// Configure connection,channel and exchange on rabbitMQ
conn,err := Start()
if err != nil {
return
}
defer conn.Close()
defer ch.Close()
// A counter to differ sequential pusblished messages (I kNow this won't guarantee its uniqueness but this is for demo purposes only)
var cnt int = 1
// Send multiple and sequential messages
for {
log.Printf("Counter: %d",cnt)
res,err := Send(ch,cnt,m)
if err != nil {
log.Println("Error")
return
}
log.Printf("<-RPC returned: %s",res)
cnt++
}
}
服务器(消费者和返回响应的那个)
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
)
var exchangeName string = "exchangeName"
var serverRoutingKey string = "routeServer"
// Initializes rabbitMQ connection,nil
}
// Consumes messages from publishers,execute a division by 2 with body value and return the result
func Receive(ch *amqp.Channel) error {
q,err := ch.QueueDeclare(
"rpc_queue",// arguments
)
if err != nil {
return err
}
err = ch.QueueBind(
q.Name,// queue name
serverRoutingKey,//arguments
)
if err != nil {
return err
}
err = ch.Qos(
1,// global
)
if err != nil {
return err
}
msgs,// args
)
if err != nil {
return err
}
// Go routine to consume messages and replay back with the value divided by 2 (just a dummy operation for demo purposes)
go func() {
for d := range msgs {
c,err := strconv.ParseFloat(string(d.Body),64)
if err != nil {
// discard messages that can't be converted
d.Nack(
false,// multiple
false) // requeue
}
// Publish response to ReplyTo routing key
err = ch.Publish(
exchangeName,// exchange
d.ReplyTo,// routing key
false,// mandatory
false,// immediate
amqp.Publishing{
ContentType: "text/plain",CorrelationId: d.CorrelationId,Body: []byte(fmt.Sprintf("%f",c/2)),})
if err != nil {
//
d.Nack(
false,// multiple
true) // requeue
continue
}
d.Ack(false)
}
}()
return nil
}
func main() {
// Configure connection,err := Start()
if err != nil {
return
}
defer conn.Close()
defer ch.Close()
forever := make(chan bool)
// Continuously consume values from publishers
err = Receive(ch)
if err != nil {
return
}
log.Printf(" [*] Awaiting RPC requests")
<-forever
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)