问题描述
我的AvroConsumer
模块中的confluent_kafka.avro
在轮询时总是提高'dict' object has no attribute 'get_by_id'
。
尽管,当我使用Consumer
中的简单confluent_kafka
进行轮询时,我得到了二进制序列化。
ccloud CLI也可以很好地使用Kafka。
您知道为什么confluent_kafka
客户端不起作用吗?是因为我的配置吗?
我使用confluent-kafka==1.5.0
。
这是我的python代码的示例:
from confluent_kafka.avro import AvroConsumer
conf = {
'bootstrap.servers': MY_BT_SERVERS,'sasl.mechanisms': "PLAIN",'security.protocol': "SASL_SSL",'sasl.username': API_KEY,'sasl.password': API_PASSWORD,'group.id': 'group_id','auto.offset.reset': 'earliest'
}
schema_registry_conf = {
'url': SR_ENDPOINT,'basic.auth.user.info': "USER_INFO",'schema.registry.basic.auth.user.info': f"{SR_API_KEY}:{SR_API_SECRET}"
}
consumer = AvroConsumer(config=conf,schema_registry=schema_registry_conf)
consumer.subscribe(["my-topic"])
message = consumer.poll(5)
引发:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-22-95673a1ff746> in <module>
----> message = consumer.poll(5)
lib/python3.7/site-packages/confluent_kafka/avro/__init__.py in poll(self,timeout)
164 try:
165 if message.value() is not None:
--> 166 decoded_value = self._serializer.decode_message(message.value(),is_key=False)
167 message.set_value(decoded_value)
168 if message.key() is not None:
/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in decode_message(self,message,is_key)
229 if magic != MAGIC_BYTE:
230 raise SerializerError("message does not start with magic byte")
--> 231 decoder_func = self._get_decoder_func(schema_id,payload,is_key)
232 return decoder_func(payload)
/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in _get_decoder_func(self,schema_id,is_key)
161 # fetch writer schema from schema reg
162 try:
--> 163 writer_schema_obj = self.registry_client.get_by_id(schema_id)
164 except ClientError as e:
165 raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id,str(e)))
AttributeError: 'dict' object has no attribute 'get_by_id'
作为一个线索,我还想弄清楚,我轮询的所有序列化消息均以手动反序列化数据时必须摆脱的奇怪的\x00\x00\x01\x86\xa1\
字节开头。
感谢您的帮助!
解决方法
您的错误在这里 - schema_registry=schema_registry_conf
你传递了一个字典,而你应该传递一个注册表客户端的实例
生产者示例
from confluent_kafka.avro import CachedSchemaRegistryClient
registry_conf = {
'url' : os.environ.get('SCHEMA_REGISTRY','http://localhost:8081')
}
schema_registry = CachedSchemaRegistryClient(registry_conf)
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
p = Producer(...)
value = ...
value_payload = serialize_avro(topic,value_schema,value,is_key=False)
p.produce(topic,key=key,value=value_payload,callback=delivery_report)