AWS Kinesis发送失败原因

问题描述

我正在使用KPL的addUserRecord发送图书。我要:

  1. 知道何时实际处理了给定作者的所有书籍(需要更新审核表)
  2. 记录每本书及其ISBN的成功和失败

我尝试了两件事。

我创建了ISBN到将来对象的映射。对于成功,效果很好,但是对于失败,我无法访问UserRecordResult对象。

    Map<String,ListenableFuture<UserRecordResult>> sendResultFuturesMap = new HashMap<>();    
ListenableFuture<UserRecordResult> resultFuture = producer.addUserRecord(kinesisStream,partitionKey,data);
sendResultFuturesMap.put(key,resultFuture);
...
// Verify
for(Map.Entry<String,ListenableFuture<UserRecordResult>> entry : sendResultFuturesMap.entrySet()) {
  String key = entry.getKey();
  ListenableFuture<UserRecordResult> resultFuture = entry.getValue();
  UserRecordResult result = null;
  try {
    result = resultFuture.get();
  } catch (Exception e) {
    // I always get an ExecutionException that just says "com.amazonaws.services.kinesis.producer.UserRecordFailedException"
    // the if below is never executed,so I never get the UserRecordResult object
    if (e instanceof UserRecordFailedException) {
      result = ((UserRecordFailedException) e).getResult();
    }
  }
  
}

我尝试了回调,并且可以成功和失败都获取UserRecordResult对象,但是我不确定如何将其链接到我的ISBN。

ListenableFuture<UserRecordResult> resultFuture = producer.addUserRecord(kinesisStream,data);
com.google.common.util.concurrent.Futures.addCallback(resultFuture,kinesisSentCallback,resultFutureThreadPool);

private static FutureCallback<UserRecordResult> kinesisSentCallback = new FutureCallback<>() {
  ...
  @Override
  public void onFailure(Throwable t) {
    //Works fine. I'm able to get the UserRecordResult object 
    if (t instanceof UserRecordFailedException) {
      UserRecordFailedException e = (UserRecordFailedException) t;
      UserRecordResult result = e.getResult();
      // How do I get ISBN from the result object?
    }
}

我应该为每个消息创建一个新的回调吗?看来效率低下。 ppl如何做到?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)