Avro Producer发送没有密钥架构的密钥

问题描述

我在Python 2.7中使用Avro Producer。我需要发送带有键和值的消息, 该值在主题中具有Avro-Schema,但该键没有Avro-Schema(由于键的原因,我无法添加Schema-传统原因)。

这是我的代码:

def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com','priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,'schema.registry.url': schemaRegistry
    },default_value_schema=value_schema)


    key = 1638895406382020875
    
    avroProducer.produce(topic=topic,value=value,key=key)
    avroProducer.flush()

我收到以下错误:

raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key

如果我从生产函数中删除密钥:

avroProducer.produce(topic=topic,value=value)

有效。

如何在没有模式的情况下发送密钥?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)