如何在融合的Kafka中调试AvroConsumer?

问题描述

我正在尝试从python读取Kafka,但收到的消息是None,CLI中没有错误。 我正在使用通过腻子转发到目标主机的端口,而不是通过telnet测试端口-很好。 而且,我在Debian(WSL)上使用kafkacat,效果很好!

kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081

我正在使用PyCharm,我的代码在文本下方。我该如何调试?

from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError

topics = ['topic1','topic2']
c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'smallest','schema.registry.url': 'http://localhost:28081','api.version.request': True
})

c.subscribe(topics)
tp = TopicPartition(topics[0],0)
c.assign([tp])

while True:
    try:
        msg = c.poll(1)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg,e))
        break

    if msg is None:
        print('Message None')
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

解决方法

我要做的第一件事是使用kafka-avro-console-consumer工具确保在您的主题上出现消息。

然后在您的应用中,您可以尝试提高日志级别:

c = AvroConsumer({
    # ... your config here
    'log_level': 7,'debug': 'all',})

您可以在此处查看不同的参数:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

但是我相信您的问题与分配分区的方式有关。如果您使用subscribe,则群集会自动将分区分配给您的使用者。您可以在订阅时添加回叫,可以看到哪些分区已分配给您的使用者,但您不必自己做。参见https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html#confluent_kafka.Consumer.subscribe

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...