将Avro消息推送到kafka主题

问题描述

我正在尝试使用 kafka-node-avro

库可将数据推送到现有主题。 我已经使用curl和主题将模式添加到SchemaRegistry中。 我收到以下错误

Error: Topic name or id is needed to build Schema
    at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
    at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
    at new Promise (<anonymous>)

我的代码段如下:

const Settings = {
    "kafka" : {
        "kafkaHost" : config.KafkaHost
    },"schema": {
        "registry" : config.KafkaRegistry
    }
};

console.log("settings registry: ",config.KafkaRegistry);
console.log("settings kafkaHost: ",config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
    {
        topic: 'classifier-response-test',messages: JSON.stringify(kafkaData)
    }
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
    console.log("message sent ! Awesome ! :) ")
},error => {
        // Something wrong happen
        console.log("There seems that there is a mistake ! try Again ;) ")
        console.log(error)
    });
},error => {
    // something wrong happen
    console.log("There seems that there is a global mistake ! try Again ;) ")
    console.log(error)
});

解决方法

问题是我们需要将主题列表放在架构设置中,以在主题和架构注册表之间建立链接。 我们可以输入模式ID或主题名称。

const kafkaSettings  = {
    "kafka" : {
      "kafkaHost" : config.KafkaHost
    },"schema": {
      "registry" : config.KafkaRegistry,"topics": [
        {"name": "classifier-response-test" }
    ]
    }
  };
,

该库尚未准备好send一条消息列表,这是为什么它试图在发送机制中将模式动态添加到模式池时抱怨的原因。最简单的解决方案是一次发送1条消息,在您的代码示例中可能就像

const Settings = {
  "kafka" : {
    "kafkaHost" : config.KafkaHost
  },"schema": {
    "registry" : config.KafkaRegistry
  }
};

KafkaAvro.init(Settings).then( kafka => {
  kafka.send({
    topic: 'classifier-response-test',messages: JSON.stringify(kafkaData)
  }).then( success => {
    // Message was sent encoded with Avro Schema
    console.log("message sent ! Awesome ! :) ")
  },error => {
    // Something wrong happen
    console.log("There seems that there is a mistake ! try Again ;) ")
    console.log(error)
  });
},error => {
  // something wrong happen
  console.log("There seems that there is a global mistake ! try Again ;) ")
  console.log(error)
});

感谢您使用 kafka-node-avro