问题描述
我正在尝试连接到kafka服务器。身份验证基于GSSAPI。
/opt/app-root/src/server/node_modules/node-rdkafka/lib/error.js:411
return new LibrdKafkaError(e);
^
Error: broker transport failure
at Function.createLibrdkafkaError (/opt/app-root/src/server/node_modules/node-rdkafka/lib/error.js:411:10)
at /opt/app-root/src/server/node_modules/node-rdkafka/lib/client.js:350:28
这是我的test_kafka.js:
const Kafka = require('node-rdkafka');
const kafkaConf = {
'group.id': 'espdev2','enable.auto.commit': true,'Metadata.broker.list': 'br01','security.protocol': 'SASL_SSL','sasl.kerberos.service.name': 'kafka','sasl.kerberos.keytab': 'svc_esp_kafka_nonprod.keytab','sasl.kerberos.principal': '[email protected]','debug': 'all','enable.ssl.certificate.verification': true,//'ssl.certificate.location': 'some-root-ca.cer','ssl.ca.location': 'some-root-ca.cer',//'ssl.key.location': 'svc_esp_kafka_nonprod.keytab',};
const topics = 'hello1';
console.log(Kafka.features);
let readStream = new Kafka.KafkaConsumer.createReadStream(kafkaConf,{ "auto.offset.reset": "earliest" },{ topics })
readStream.on('data',function (message) {
const messageString = message.value.toString();
console.log(`Consumed message on Stream: ${messageString}`);
});
解决方法
您可以查看此问题以解释此错误: https://github.com/edenhill/librdkafka/issues/1987
来自@edenhill:
作为基于librdkafka的客户端的一般规则:正确配置了群集和客户端,可以忽略所有错误,因为它们很可能是临时错误,librdkafka会尝试自动恢复。
在这种情况下;如果组协调器请求失败,它将在500毫秒内重试(使用状态为Up的任何代理)。如果在丢失的心跳使成员资格(session.timeout.ms)超时之前找到新的协调器,则当前的分配和组成员资格将不受影响。 自动偏移提交将被暂停,直到找到新的协调器为止。
在将来的版本中,我们将扩展错误类型以包括严重性,使应用程序可以愉快地忽略非终端错误。此时,应用程序应该考虑所有错误,而不是信息错误。