Kafka Producer失败-意外错误代码:87

问题描述

尝试将Avro格式的消息发送到Kafka并使用它。直到一些研究添加Thread.sleep(16000)以便生产者等待消息后,它才发送消息。但是,它再次停止工作。是org.apache.kafka.common.protocol.Errors - Unexpected error code: 87. Failed to produce messages to topic

有什么建议吗?我的下面的代码

public class AvroAutomationTest3IT {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroAutomationTest3IT.class);
    private static Properties producerProps;
    private final String topic = "topic.one";

    String schemaPath = "src/test/resources/automation-tests/sample-avro.avsc";

    // subject convention is "<topic-name>-value"
    String subject = topic + "-value";

    // avsc json string.
    String schema = null;

    // kafka broker list.
    private static String brokers = "xxx:9093";
    // schema registry url.
    private static String registry = "xxx:8081";

    private static Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

    @BeforeAll
    public static void setUp() throws IOException {
        producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
        producerProps.put("acks","1");
        producerProps.put("reconnect.backoff.ms","5000");
        producerProps.put("retry.backoff.ms","1000");
        producerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,registry);
        //producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
        producerProps.put(KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
        // producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroSerializer");
        //  producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        producerProps.put(CLIENT_ID_CONFIG,"AvroProducer");
        // producerProps.put(ProducerConfig.ACKS_CONFIG,"0");
        // producerProps.put(ProducerConfig.RETRIES_CONFIG,"0");

        //configure the KafkaAvroSerializer
        producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG,KafkaAvroSerializer.class.getName());
        producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        producerProps.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");

        //consumer properties
        producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
        producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
        producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
        producerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"NewConsumer");

        producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);

        //sslConfig
        producerProps.put("security.protocol","SSL");
        producerProps.put("ssl.truststore.location","C:/Users/xx/truststore.jks");
        producerProps.put("ssl.truststore.password","expeditors");
        producerProps.put("ssl.keystore.location","C:/Users/xx/xx.jks");
        producerProps.put("ssl.keystore.password","xxx");
        producerProps.put("ssl.key.password","xxx");

    }

    @Test
         public void avroTest() throws Exception {
        sendMessage();
        Thread.sleep(16000);

        readMessage();


    }

    public void readMessage() {
        KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<>(producerProps);
        consumer.subscribe(Collections.singletonList(topic));
        try {
            ConsumerRecords<String,byte[]> records = consumer.poll(Duration.ofMillis(15000));

            // assertEquals(2,records.count(),"Expected 2 record");
            for (ConsumerRecord<String,byte[]> record : records) {
                try {
                    JsonElement el = this.parseAvroMessage(topic,record.value());

                    System.out.printf("offset = %d,value = %s\n",record.offset(),el);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitAsync();
            consumer.close(Duration.ofMillis(3000));
        }
    }

    private JsonElement parseAvroMessage(String topic,byte[] value) {
        HashMap<String,String> configs = new HashMap<>();
        configs.put("schema.registry.url",registry);

        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
        deserializer.configure(configs,true);
        Object obj = deserializer.deserialize(topic,value);
        return gson.fromJson(obj.toString(),JsonElement.class);
    }

    public void sendMessage() throws IOException {

        // construct kafka producer.
        Producer<Integer,GenericRecord> producer = new KafkaProducer<Integer,GenericRecord>(producerProps);


        // message key.
        // int userIdInt = 1;
        // message value,avro generic record.
        GenericRecord record = this.buildRecord();

        // send avro message to the topic page-view-event.
        producer.send(new ProducerRecord<Integer,GenericRecord>("visibility.avro.topic.source.one",null,record));
        // producer.flush();
    }

    public GenericRecord buildRecord() throws IOException {

        // avsc json string.
        String schemaString = null;

        FileInputStream inputStream = new FileInputStream(schemaPath);
        try {
            schemaString = IOUtils.toString(inputStream,StandardCharsets.UTF_8);
        } finally {
            inputStream.close();
        }

        // avro schema.
        Schema schema = new Schema.Parser().parse(schemaString);

        GenericRecord metadata = new GenericData.Record(schema.getField("metadata").schema());
        metadata.put("version","1");
        metadata.put("eventName","event.name");

        GenericRecord data = new GenericData.Record(schema.getField("data").schema());

        data.put("name","Bob");
        data.put("age",25);

        GenericRecord record = new GenericData.Record(schema);
        record.put("metadata",metadata);
        record.put("data",data);

        return record;
    }

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)