Spring Cloud Kinesis Binder如何为生产者处理错误-根据文档,它不起作用

问题描述

我已经遵循以下文档,并且有一个制作人和一个消费者在Kinesis Stream上工作得很好。我想了解在发生任何异常的情况下如何处理Producer(源)中的错误

我已经按照Spring Stream错误处理文档尝试了以下方法

@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")

两者均不起作用。我在生产者方法中显式抛出RuntimeException,并期望它将在“ errorChannel”中,但无法接收到它。

如果有人成功完成此操作,请帮助我弄清楚或与我分享方法

解决方法

注意:几个月前,我遇到了这个确切的问题。我们在用Kafka代替Kinesis。既然,您已经标记了spring-cloud-stream-binder-kafka,那么我在相同的地方给出输入。

您可以为生产者编写自定义的回叫,并且该回叫可以告诉您消息是失败还是成功发布。失败时,记录消息的元数据。

下面是一个小的代码段,无法更好地解释我刚才所说的回叫。

请注意,这是给卡夫卡的。相应地更改Kinesis的代码。

public class ProduceToKafka {

  private ProducerRecord<String,String> message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer<String,String> myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord<String,String> message = new ProducerRecord<>(
        "topicName",string);

    myProducer.send(message,new MyCallback(message.key(),message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key,String value) {
      this.key = key;
      this.value = value;
    }


    @Override
    public void onCompletion(RecordMetadata metadata,Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info("" + metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}

我写了一篇中篇文章,介绍了在Spring抽象的不同级别处理生产者失败的情况。这可以为您提供帮助。看看这个 : https://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c