Azure Blob 存储/NodeJS - 从缓冲区读取 Avro 文件

问题描述

我已经使用捕获功能将事件中心输出到 Blob 存储 - 这会将我们作为 .avro 文件放入队列的任何内容输出

如果我下载这个文件并尝试使用像 avro-js 这样的库来解析它,我没有问题 - 我可以很好地读取文件并按照我认为合适的方式处理内容

但是 - 在使用 Node 处理 Azure Blob 存储时,我想处理已下载的文件。读取文件时返回的格式是缓冲区,但我找不到使用库成功解析此文件方法(找不到正确的方法,如果有的话)。

用于从 Azure 下载 blob 的代码,省略了几位:

const { BlobServiceClient } = require('@azure/storage-blob');
const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
const containerClient = blobServiceClient.getContainerClient("data");

const blockBlobClient = containerClient.getBlockBlobClient(blob.name);
    
const downloadBlockBlobResponse = await blockBlobClient.download(0);

输出到控制台时的缓冲区片段:

<Buffer 4f 62 6a 01 04 14 61 76 72 6f 2e 63 6f 64 65 63 08 6e 75 6c 6c 16 61 76 72 6f 2e 73 63 68 65 6d 61 ec 06 7b 22 74 79 70 65 22 3a 22 72 65 63 6f 72 64 ... 589 more bytes>

转换为字符串时的内容(粘贴图像,因为乱码输出不正确):

enter image description here

曾尝试将 .avro 文件作为纯文本读取,虽然它们大部分都可以,但有一些字符是乱码,因此它不会作为 JSON 读出(我不想做出假设在内容上尝试拉出消息正文)。

有没有人基于 Buffers 成功从 Azure 中提取 .avro 内容? 我在网上看到很多关于将这些加载到 Spark 或 Kafka 的指南,但不仅仅是读取流中的文件

谢谢!

解决方法

关于这个问题,我们可以使用包avsc来解析带有buffer的avro文件。详情请参阅here

例如

const avro = require("avsc");
const {
  BlobServiceClient,StorageSharedKeyCredential,} = require("@azure/storage-blob");

const accountName = "";
const accountKey =
  "";
async function main() {
  const creds = new StorageSharedKeyCredential(accountName,accountKey);
  const blobServiceClient = new BlobServiceClient(
    `https://${accountName}.blob.core.windows.net`,creds
  );
  const containerClient = blobServiceClient.getContainerClient("");
  const blockBlobClient = containerClient.getBlockBlobClient(
    ""
  );

  const downloadBlockBlobResponse = await blockBlobClient.download(0);
  const buf = await streamToBuffer(
    downloadBlockBlobResponse.readableStreamBody
  );

  const decoder = new avro.streams.BlockDecoder({
    parseHook: (schema) => {
      console.log("the avro file schema:");
      console.log(schema);
      return avro.Type.forSchema(schema,{ wrapUnions: true });
    },});

  decoder.on("data",(data) => {
    console.log(data);
  });
  decoder.end(buf);
}
async function streamToBuffer(readableStream) {
  return new Promise((resolve,reject) => {
    const chunks = [];
    readableStream.on("data",(data) => {
      chunks.push(data instanceof Buffer ? data : Buffer.from(data));
    });
    readableStream.on("end",() => {
      resolve(Buffer.concat(chunks));
    });
    readableStream.on("error",reject);
  });
}
main();

enter image description here