通过KakfaTemplate的send方法返回的ListenableFuture使用回调时,无法捕获响应

问题描述

我正在使用KafkaTemplate,它提供了异步发送方法,该方法返回Future,然后在侦听器中注册回调以异步接收发送结果,但是onFailure我需要捕获消息并将其存储在GCS存储桶中,但是由于某些未知原因,我无法这样做。以下是代码段-:

public void send(String message) {

ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(Topic,message);
future.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {

  @Override
  public void onSuccess(SendResult<String,String> result) {
    LOG.info("message received is : " + result.toString());
  }

  @Override
  public void onFailure(Throwable ex) {
      String dateFolder = dateFormat.format(new Date());
      
      Storage storage = StorageOptions.getDefaultInstance().getService();
      String failureFolder = failureDirectory + "/testing" + "/" + dateFolder;
      String bucket = failureFolder.split("/")[2];
      BlobId blobId = BlobId.of(bucket,failureFolder.split(bucket + "/")[1]);
      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
      Blob blob = storage.create(blobInfo,message.getBytes());
      
      LOG.info("Writing to failure folder successful...");
  }
});

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)