将单个json从azure物联网中心存储到datalake2

问题描述

添加了物联网集线器和设备。来自物联网中心的所有数据均以json格式保存到数据湖2。工作正常,但是如果设备一次发送多个消息,则将其保存在单个json中。这会带来一些麻烦...有没有办法将每个消息事件保存在单独的json中?我浏览了iot hub的设置,但一无所获。

解决方法

在IoT中心路由机制中,没有这样的设置总是将单个消息转发到存储。基本上,可以通过azure函数在流管道使用者( IoTHubTrigger )或事件网格订户( EventGridTrigger )中实现此要求。

更新

以下是 IoTHubTrigger 函数的示例,其输出blob绑定到Data Lake Storage Gen2的容器:

run.csx:

#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"

using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static async Task Run(EventData ed,CloudBlockBlob outputBlob,ILogger log)
{   
    //log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");  

    var msg = new { 
        EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],Properties = ed.Properties,SystemProperties = new {
          connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"],connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]   
        },Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
    };

    byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
    await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));

    await Task.CompletedTask;
}

function.json:

{
  "bindings": [
    {
      "name": "ed","connection": "rk2020iot_IOTHUB","eventHubName": "rk2020iot_IOTHUBNAME","consumerGroup": "function","cardinality": "one","direction": "in","type": "eventHubTrigger"
    },{
      "name": "outputBlob","path": "iot/rk2020iot/{DateTime}.json","connection": "rk2020datalake2_STORAGE","direction": "out","type": "blob"
    }
  ]
}