来自双方确认的 RPC

问题描述

我是第一次尝试rabbitMQ,在学习了所有教程之后,对于简单的事情来说这似乎是一件容易的事,但并不是那么容易。

我用 Go 开发了一个程序,与 RPC 示例非常相似,但双方都手动确认。

在发布者上,我只发布 int 值,接收者将值除以 2,成功后,确认消息并回复 ReplyTo 路由键。然后,在原始发布者上,如果 CorrelationId 匹配,则处理响应,否则 拒绝 并重新排队,以便消息不会被丢弃(然后由正确的发布者使用) .

问题在于,原始发布者出于某种原因停止从频道读取消息。你能帮我理解这背后的原因吗?

P.S.:客户端脚本可以像这样运行:go run rpc_client.go 13

客户端(发布者并等待 RPC 返回)

package main

import (
    "github.com/streadway/amqp"
    "log"
    "os"
    "strconv"
)

var exchangeName string = "exchangeName"

var serverRoutingKey string = "routeServer"

var responseRoutingKey string = "routeResponse"
var queueResponseName string = "queueResponseName"

// Initializes rabbitMQ connection,channel and exchange
func Start() (*amqp.Connection,*amqp.Channel,error) {
    conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return nil,nil,err
    }

    ch,err := conn.Channel()
    if err != nil {
        return nil,err
    }

    err = ch.ExchangeDeclare(
        exchangeName,// name
        "direct",// type
        true,// durable
        false,// auto-deleted
        false,// internal
        false,// no-wait
        nil,// arguments
    )
    if err != nil {
        return nil,err
    }

    return conn,ch,nil
}

// Publish a message to execute a RPC and return response from the procedure
func Send(ch *amqp.Channel,counter int,multiplier int) (res string,err error) {
    // Use the same (named) queue to receive the result of RPC for multiple instances and differ them by the correlation id
    q,err := ch.QueueDeclare(
        queueResponseName,// name
        true,// delete when unused
        false,// exclusive
        false,// Nowait
        nil,// arguments
    )
    if err != nil {
        return "",err
    }

    err = ch.Qos(
        1,// prefetch count
        0,// prefetch size
        false,// global
    )
    if err != nil {
        return "",err
    }

    err = ch.QueueBind(
        q.Name,// queue name
        responseRoutingKey,// routing key
        exchangeName,// exchange
        false,//arguments
    )
    if err != nil {
        return "",err
    }

    msgs,err := ch.Consume(
        q.Name,// queue
        "",// consumer
        false,// auto-ack
        false,// no-local
        false,// args
    )
    if err != nil {
        return "",err
    }

    counterID := strconv.Itoa(counter * multiplier)
    log.Printf("->Calling RPC with value: %s",counterID)
    err = ch.Publish(
        exchangeName,// exchange
        serverRoutingKey,// routing key
        false,// mandatory
        false,// immediate
        amqp.Publishing{
            DeliveryMode:  amqp.Persistent,// Do not lose messages if no queue created yet
            ContentType:   "text/plain",CorrelationId: counterID,ReplyTo:       responseRoutingKey,Body:          []byte(counterID),})
    if err != nil {
        return "",err
    }

    for d := range msgs {
        log.Println("   <-d.CorrelationId:",d.CorrelationId)
        if d.CorrelationId == counterID {
            log.Println("   ->ACK")
            d.Ack(true)
            res = string(d.Body)
            break
        } else {
            // Requeue message to be consumed by the correct consumer. Otherwise it would be discarded because it was already read
            log.Println("   ->REJECT")
            d.Reject(false) // requeue
        }
    }

    return res,nil
}

// Read argument from console to use as multiplier
func readMultiplier(args []string) (int,error) {
    if (len(args) < 2) || os.Args[1] == "" {
        return 1,nil
    } else {
        return strconv.Atoi(os.Args[1])
    }
}

func main() {
    // Just read an argument from console to use as multiplier
    m,err := readMultiplier(os.Args)
    if err != nil {
        return
    }

    // Configure connection,channel and exchange on rabbitMQ
    conn,err := Start()
    if err != nil {
        return
    }
    defer conn.Close()
    defer ch.Close()

    // A counter to differ sequential pusblished messages (I kNow this won't guarantee its uniqueness but this is for demo purposes only)
    var cnt int = 1
    // Send multiple and sequential messages
    for {
        log.Printf("Counter: %d",cnt)
        res,err := Send(ch,cnt,m)
        if err != nil {
            log.Println("Error")
            return
        }

        log.Printf("<-RPC returned: %s",res)
        cnt++
    }

}

服务器(消费者和返回响应的那个)

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "strconv"
)

var exchangeName string = "exchangeName"

var serverRoutingKey string = "routeServer"

// Initializes rabbitMQ connection,nil
}

// Consumes messages from publishers,execute a division by 2 with body value and return the result
func Receive(ch *amqp.Channel) error {
    q,err := ch.QueueDeclare(
        "rpc_queue",// arguments
    )
    if err != nil {
        return err
    }

    err = ch.QueueBind(
        q.Name,// queue name
        serverRoutingKey,//arguments
    )
    if err != nil {
        return err
    }

    err = ch.Qos(
        1,// global
    )
    if err != nil {
        return err
    }

    msgs,// args
    )
    if err != nil {
        return err
    }

    // Go routine to consume messages and replay back with the value divided by 2 (just a dummy operation for demo purposes)
    go func() {
        for d := range msgs {
            c,err := strconv.ParseFloat(string(d.Body),64)
            if err != nil {
                // discard messages that can't be converted
                d.Nack(
                    false,// multiple
                    false) // requeue
            }

            // Publish response to ReplyTo routing key
            err = ch.Publish(
                exchangeName,// exchange
                d.ReplyTo,// routing key
                false,// mandatory
                false,// immediate
                amqp.Publishing{
                    ContentType:   "text/plain",CorrelationId: d.CorrelationId,Body:          []byte(fmt.Sprintf("%f",c/2)),})
            if err != nil {
                //
                d.Nack(
                    false,// multiple
                    true)  // requeue

                continue
            }

            d.Ack(false)
        }
    }()

    return nil
}

func main() {
    // Configure connection,err := Start()
    if err != nil {
        return
    }
    defer conn.Close()
    defer ch.Close()

    forever := make(chan bool)

    // Continuously consume values from publishers
    err = Receive(ch)
    if err != nil {
        return
    }

    log.Printf(" [*] Awaiting RPC requests")
    <-forever
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)