问题描述
我在Python 2.7中使用Avro Producer。我需要发送带有键和值的消息, 该值在主题中具有Avro-Schema,但该键没有Avro-Schema(由于键的原因,我无法添加Schema-传统原因)。
这是我的代码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com','priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,'schema.registry.url': schemaRegistry
},default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic,value=value,key=key)
avroProducer.flush()
我收到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从生产函数中删除密钥:
avroProducer.produce(topic=topic,value=value)
有效。
如何在没有模式的情况下发送密钥?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)