如何在 kafka 控制台生产者中添加密钥序列化器和值序列化器

问题描述

我在 spring boot kafka producer application.yaml 中设置了以下属性

消费者属性

        key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

生产者属性

        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

我必须从 kafka 控制台生产者那里产生消息,例如- kafka-console-producer --bootstrap-server confluent-cp-kafka:9092 --topic TSTTOPIC --producer-property key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

但它不工作,当我从控制台生产者产生消息时,我在消费者日志中收到如下错误

enter image description here

解决方法

您不能在 CLI 上使用冒号。

如果您想使用您的属性文件,则将 --producer.configproducer.properties 文件一起传递

否则,您可以将 kafka-avro-console-producer--producer-property key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer 一起使用

至于 Avro 序列化程序,您似乎缺少任何 key.schemavalue.schema + schema.registry.url,它们只是 kakfa-avro-console-producer 读取的属性,并且会解释为什么您的Avro 消费者将无法读取数据(以明文形式发送)