Azure节点物联网SDK如何设置MQTT异常的回调

问题描述

我在多个项目中使用了用于C的Azure IOT SDK。有相当不错的文档,而且很清楚如何注册回调以在无法发送消息或MQTT连接断开时得到通知。查看节点的javascript SDK,我不知道如何注册类似的回调。例如,创建客户端后,我可以调用client.open(onConnect),并在建立连接时调用onConnect方法。我还可以致电client.sendEvent(message,...)发送消息,如果成功则通知该消息已排队。但是:

  • 如果mqtt连接中断,我如何注册回调通知
  • 如何注册一个回调以告知是否成功接收到数据包(QOS 1或2)或发送失败?
  • 如何指定我想要的QOS级别?

谢谢

解决方法

首先,关于IoT中心中的MQTT QoS支持的几点注意事项:

  • IoT中心不支持QoS 2消息。如果设备应用发布具有QoS 2的消息,则IoT中心将关闭网络连接。当一个 设备应用程序订阅了具有QoS 2的主题,IoT Hub最多授予 SUBACK数据包中的QoS级别1。之后,物联网中心交付 使用QoS 1发送到设备的消息。
  • 默认情况下,设备SDK会以QoS 1连接到IoT中心,以与IoT中心交换消息。

有关详细信息,请参见Communicate with your IoT hub using the MQTT protocol

现在要问的问题,您有2个选择:

使用Azure IoT SDK:

如果您想使用IoT hub device sdk,则以下是official sample中处理断开连接情况的参考。关于与QoS 1相关的交换(如PUBACK确认),它不允许在较低级别进行,但在内部进行处理以确保有保证的消息传递。要直接使用MQTT,请参阅我的答案的后面部分。

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

'use strict';

const Protocol = require('azure-iot-device-mqtt').Mqtt;
// Uncomment one of these transports and then change it in fromConnectionString to test other transports
// const Protocol = require('azure-iot-device-amqp').AmqpWs;
// const Protocol = require('azure-iot-device-http').Http;
// const Protocol = require('azure-iot-device-amqp').Amqp;
// const Protocol = require('azure-iot-device-mqtt').MqttWs;
const Client = require('azure-iot-device').Client;
const Message = require('azure-iot-device').Message;

// String containing Hostname,Device Id & Device Key in the following formats:
//  "HostName=<iothub_host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>"
const deviceConnectionString = process.env.DEVICE_CONNECTION_STRING;
let sendInterval;

function disconnectHandler () {
  clearInterval(sendInterval);
  client.open().catch((err) => {
    console.error(err.message);
  });
}

// The AMQP and HTTP transports have the notion of completing,rejecting or abandoning the message.
// For example,this is only functional in AMQP and HTTP:
// client.complete(msg,printResultFor('completed'));
// If using MQTT calls to complete,reject,or abandon are no-ops.
// When completing a message,the service that sent the C2D message is notified that the message has been processed.
// When rejecting a message,the service that sent the C2D message is notified that the message won't be processed by the device. the method to use is client.reject(msg,callback).
// When abandoning the message,IoT Hub will immediately try to resend it. The method to use is client.abandon(msg,callback).
// MQTT is simpler: it accepts the message by default,and doesn't support rejecting or abandoning a message.
function messageHandler (msg) {
  console.log('Id: ' + msg.messageId + ' Body: ' + msg.data);
  client.complete(msg,printResultFor('completed'));
}

function generateMessage () {
  const windSpeed = 10 + (Math.random() * 4); // range: [10,14]
  const temperature = 20 + (Math.random() * 10); // range: [20,30]
  const humidity = 60 + (Math.random() * 20); // range: [60,80]
  const data = JSON.stringify({ deviceId: 'myFirstDevice',windSpeed: windSpeed,temperature: temperature,humidity: humidity });
  const message = new Message(data);
  message.properties.add('temperatureAlert',(temperature > 28) ? 'true' : 'false');
  return message;
}

function errorCallback (err) {
  console.error(err.message);
}

function connectCallback () {
  console.log('Client connected');
  // Create a message and send it to the IoT Hub every two seconds
  sendInterval = setInterval(() => {
    const message = generateMessage();
    console.log('Sending message: ' + message.getData());
    client.sendEvent(message,printResultFor('send'));
  },2000);

}

// fromConnectionString must specify a transport constructor,coming from any transport package.
let client = Client.fromConnectionString(deviceConnectionString,Protocol);

client.on('connect',connectCallback);
client.on('error',errorCallback);
client.on('disconnect',disconnectHandler);
client.on('message',messageHandler);

client.open()
.catch(err => {
  console.error('Could not connect: ' + err.message);
});

// Helper function to print results in the console
function printResultFor(op) {
  return function printResult(err,res) {
    if (err) console.log(op + ' error: ' + err.toString());
    if (res) console.log(op + ' status: ' + res.constructor.name);
  };
}

直接使用MQTT:

Using the MQTT protocol directly (as a device)中所述,您还可以使用端口8883上的MQTT协议连接到公用设备端点。为此,您可以使用任何可用的标准MQTT库(例如https://www.npmjs.com/package/mqtt)。但是,请注意以下几点以作一些特殊考虑:

在CONNECT数据包中,设备应使用以下值:

  • 对于ClientId字段,请使用deviceId。
  • 对于“用户名”字段,使用{iothubhostname} / {device_id} /?api-version = 2018-06-30,其中 {iothubhostname}是IoT中心的完整CName。例如,如果 IoT中心的名称为contoso.azure-devices.net,如果 您的设备是MyDevice01,完整的“用户名”字段应包含:contoso.azure-devices.net/MyDevice01/?api-version=2018-06-30
  • 对于“密码”字段,请使用SAS令牌。 SAS令牌的格式与HTTPS和AMQP协议的格式相同:SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}

注意

如果使用X.509证书认证,则SAS令牌密码为 不需要。有关更多信息,请参见Set up X.509 security in your Azure IoT Hub 并按照TLS/SSL configuration section中的代码说明进行操作。

有关如何生成SAS令牌的更多信息,请参阅设备 Using IoT Hub security tokens部分。