从Node.js服务器从ServiceBus接收BrokeredMessage

问题描述

我正在从新的nodejs服务器读取现有Azure ServiceBus中的消息。

这是将消息从.NET服务器发送到ServiceBus的方式:

var topicClient = TopicClient.CreateFromConnectionString(serviceBusConnectionString,routingKey);    
var brokeredMessage = new BrokeredMessage(message.ToJson());
topicClient.Send(brokeredMessage);

topicClient是Microsoft.ServiceBus.Messaging.TopicClient的地方

我正在使用以下方法使用azure-sb软件包在nodejs服务器上读取消息:

sbClient = ServiceBusClient.createFromConnectionString(connectionString)  
subscriptionClient = this.sbClient.createSubscriptionClient(topicName,subscriptionName);
receiver = this.subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
messages = await this.receiver.receiveMessages(10,10);
console.log(messages.map(message => { message.body }));

message.body是一个缓冲区,当我执行message.body.toString('utf-8')时,我会得到类似的东西:

@string3http://schemas.microsoft.com/2003/10/Serialization/��{VALID JSON}

我当然对介于两者之间的有效JSON感兴趣。 在.net服务器中,我们只需做brokeredMessage.GetBody()并获取对象,那么nodejs上是否有一种简单的方法可以做到这一点?

解决方法

根据我的测试,如果我们使用标准库Microsoft.Azure.ServiceBus在.Net应用程序中发送消息,它将直接JSON解析节点应用程序中的消息

例如

这是我发送消息的C#代码:

class Program
{
    static void Main(string[] args)
    {
        string connectionString = "Endpoint=sb://...";
        var client = new TopicClient(connectionString,"");
        var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello!!! {DateTime.Now}" });
        var serviceBusMessage = new Message(Encoding.UTF8.GetBytes(payload));
        serviceBusMessage.SessionId = Guid.NewGuid().ToString("D");

        client.SendAsync(serviceBusMessage).Wait();

    }

    private class DemoMessage
    {
        public DemoMessage()
        {
        }

        public string Title { get; set; }
    }

这是我的Node.js代码,用于接收消息:

const { ServiceBusClient,ReceiveMode } = require("@azure/service-bus");

// Define connection string and related Service Bus entity names here
const connectionString =
  "Endpoint=sb://";
const topicName = "***";
const subscriptionName = "***";

async function main() {
  const sbClient = ServiceBusClient.createFromConnectionString(
    connectionString,);
  const subscriptionClient = sbClient.createSubscriptionClient(
    topicName,subscriptionName,);
  const receiver = subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);

  try {
    const messages = await receiver.receiveMessages(1);
    console.log("Received messages:");
    console.log(messages.map((message) => message.body));
    await subscriptionClient.close();
  } finally {
    await sbClient.close();
  }
}

main().catch((err) => {
  console.log("Error occurred: ",err);
});

enter image description here

此外,如果您仍然使用库WindowsAzure.ServiceBus,则需要使用BrokeredMessage(Stream messageBodyStream,bool ownsStream)来初始化对象。

因为我们使用BrokeredMessage(platload)进行初始化,所以它将使用带有二进制XmlDictionaryWriter的DataContractSerializer来初始化对象。因此,有效载荷正在使用带有二进制XmlDictionaryWriter的DataContractSerializer进行序列化,这就是解释消息正文为何在其开始时具有类型指示@string3http://schemas.microsoft.com/2003/10/Serialization/的原因。

例如 这是我的C#代码,用于发送消息:

var client =TopicClient.CreateFromConnectionString(connectionString,"test");
            var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello BrokeredMessage!!! {DateTime.Now}" });
            using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) {
                var serviceBusMessage = new BrokeredMessage(stream,true);
                await client.SendAsync(serviceBusMessage);

            }

我使用相同的代码接收 enter image description here

有关更多详细信息,请参阅here

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...