用于将事件中心消息发送到 Blob 存储的 Azure 事件中心捕获的替代方法?

问题描述

有什么方法可以将我的事件中心数据发送到 Azure 中的 blob 存储,这些数据是通过 Postman 通过 Postman 以 JSON 格式发送的吗? 我已经尝试使用 EventHub 的 Capture 功能,但不幸的是,数据以 Avro 格式保存,我真的很难再次将其转换回原始的 JSON 格式。

因此,我想将我的 EventHub 数据直接发送到某种 Blob 存储,这将使我的 Event Hub 消息保持其原始 JSON 格式,然后我可以使用 Azure 函数(获取 Http 触发器)从我的 SPA 通过前端通信。

另外,我是否必须为容器中的每条消息创建一个新的 blob?因为我认为我无法将它们全部写入一个 blob,因为当我同时触发 get HTTP 函数时,我将无法通过前端检索我的数据。

是否有事件中心捕获的替代方案?使用普通 blob 存储是最好的解决方案吗?我已经阅读了一些关于 Azure Timeseries Insights 和 CosmosDB 的文章,但我不确定这些是否是解决我的问题的最佳方法。

解决方法

所以问题是我最初通过邮递员将其作为原始数据发送:

通过 Postman 以 JSON 形式发送的原始数据:

{
   "id":1,"receiver":"2222222222222","message":{
      "Name":"testing","PersonId":2,"CarId":2,"GUID":"1s3q1d-s546dq1-8e22e","LineId":2,"SvcId":2,"Lat":-64.546547,"Lon":-64.546547,"TimeStamp":"2021-03-18T08:29:36.758Z","Recorder":"dq65ds4qdezzer","Env":"DEV"
   },"operator":20404,"sender":"MSISDN","binary":1,"sent":"2021-03-18T08:29:36.758Z"
}

一旦它被事件中心捕获捕获,它就会转换为 Avro 文件。 我正在尝试通过使用 fastavro 并将其转换为 JSON 格式来检索数据。 问题是我没有取回最初由 Postman 发送的原始数据。我找不到将其转换回原始状态的方法,为什么 Avro 还会从 Postman 向我发送其他信息? 我可能需要找到一种方法将“身体”设置为仅转换。但出于某种原因,它还在正文中添加了“字节” 我只是想找回通过 Postman 发送的原始原始数据。

init.py(Azure 函数)

    import logging
    import os
    import string
    import json
    import uuid
    import avro.schema
    import tempfile
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient,BlobClient,ContainerClient,__version__
    from avro.datafile import DataFileReader,DataFileWriter
    from avro.io import DatumReader,DatumWriter
    from fastavro import reader,json_writer
    
    
    #Because the Apache Python avro package is written in pure Python,it is relatively slow,therefoer I make use of fastavro
    def avroToJson(avroFile):
        with open("json_file.json","w") as json_file:
            with open(avroFile,"rb") as avro_file:
                avro_reader = reader(avro_file)
                json_writer(json_file,avro_reader.writer_schema,avro_reader)
    
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
      logging.info('Python HTTP trigger function processed a request.')
      print('Processor started using path ' + os.getcwd())
      connect_str = "###########"
      container = ContainerClient.from_connection_string(connect_str,container_name="####")
      blob_list = container.list_blobs() # List the blobs in the container.
      for blob in blob_list:
          # Content_length == 508 is an empty file,so process only content_length > 508 (skip empty files).
          if blob.size > 508:
              print('Downloaded a non empty blob: ' + blob.name)
              # Create a blob client for the blob.
              blob_client = ContainerClient.get_blob_client(container,blob=blob.name)
              # Construct a file name based on the blob name.
              cleanName = str.replace(blob.name,'/','_')
              cleanName = os.getcwd() + '\\' + cleanName
              # Download file
              with open(cleanName,"wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                  my_file.write(blob_client.download_blob().readall())# Write blob contents into the file.
                  
              avroToJson(cleanName)
              with open('json_file.json','r') as file:
                   jsonStr = file.read()
            
      return func.HttpResponse(jsonStr,status_code=200)

预期结果:

{
   "id":1,"sent":"2021-03-18T08:29:36.758Z"
}

实际结果:

{
   "SequenceNumber":19,"Offset":"10928","EnqueuedTimeUtc":"4/1/2021 8:43:19 AM","SystemProperties":{
      "x-opt-enqueued-time":{
         "long":1617266599145
      }
   },"Properties":{
      "Postman-Token":{
         "string":"37ff4cc6-9124-45e5-ba9d-######e"
      }
   },"Body":{
      "bytes":"{\r\n  \"id\": 1,\r\n  \"receiver\": \"2222222222222\",\r\n  \"message\": {\r\n    \"Name\": \"testing\",\r\n    \"PersonId\": 2,\r\n    \"CarId\": 2,\r\n    \"GUID\": \"1s3q1d-s546dq1-8e22e\",\r\n    \"LineId\": 2,\r\n    \"SvcId\": 2,\r\n    \"Lat\": -64.546547,\r\n    \"Lon\": -64.546547,\r\n    \"TimeStamp\": \"2021-03-18T08:29:36.758Z\",\r\n    \"Recorder\": \"dq65ds4qdezzer\",\r\n    \"Env\": \"DEV\"\r\n  },\r\n  \"operator\": 20404,\r\n  \"sender\": \"MSISDN\",\r\n  \"binary\": 1,\r\n  \"sent\": \"2021-03-29T08:29:36.758Z\"\r\n}"
   }
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...