问题描述
这是我的 sub.go 示例:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc,_ := nats.Connect("127.0.0.1:4222")
c,_ := nats.NewEncodedConn(nc,nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto","queue_titi",func(_,reply string,message *Message) {
fmt.Printf("%+v\n",message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
c.Publish("reply",response)
c.Flush()
})
c.Flush()
}
func main() {
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}
它运行得很好,所以现在我想向这个队列发布一条消息,这是我的 pub.go:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
"time"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
var err error
var message Message
var response Response
nc,nats.JSON_ENCODER)
message.Status = "Ok"
message.Msg = "hello"
err = c.Request("subject_toto",message,&response,6*time.Second)
if err != nil {
fmt.Printf("%+v\n",err)
}
fmt.Printf("%+v\n","response")
fmt.Printf("%+v\n",response)
defer c.Close()
}
func main() {
fmt.Println("begin")
start()
fmt.Println("done")
}
但是当我尝试向它发布时,我的回复是空的:
响应{状态:消息:}
答案的开头之一似乎使用了 PublishRequest,但似乎我只能向服务器发送字符串而不是结构。
解决方法
问题出在这一行:
c.Publish("reply",response)
当您发送请求并且您的订阅者要响应时,应该有一个称为收件箱的“回复主题”。此收件箱设置在处理程序函数的 reply
参数中。
因此,您必须将响应发布到由 reply
中处理程序函数的 QueueSubscribe()
arg 返回的主题中,因此将该行更改为:
c.Publish(reply,response)
reply arg 的值对于通信很重要,是这样的:_INBOX.bw5EtJShBTI9OQdvxFOBlz.VxsGBcjH
这是 sub 的功能版本:
package main
import (
"encoding/json"
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc,_ := nats.Connect("127.0.0.1:4222")
c,_ := nats.NewEncodedConn(nc,nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto","queue_titi",func(msg *nats.Msg) {
var message Message
err := json.Unmarshal([]byte(msg.Data),&message)
if err != nil {
fmt.Printf("%+v\n",err)
}
fmt.Printf("%+v\n","message from pub")
fmt.Printf("%+v\n",message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
fmt.Printf("%+v\n","response to sub")
fmt.Printf("%+v\n",response)
byteConfApi,err2 := json.Marshal(response)
if err2 != nil {
fmt.Printf("%+v\n",err2)
}
msg.Respond(byteConfApi)
c.Flush()
})
}
func main(){
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}