问题描述
我正在使用KPL的addUserRecord发送图书。我要:
- 知道何时实际处理了给定作者的所有书籍(需要更新审核表)
- 记录每本书及其ISBN的成功和失败
我尝试了两件事。
我创建了ISBN到将来对象的映射。对于成功,效果很好,但是对于失败,我无法访问UserRecordResult对象。
Map<String,ListenableFuture<UserRecordResult>> sendResultFuturesMap = new HashMap<>();
ListenableFuture<UserRecordResult> resultFuture = producer.addUserRecord(kinesisStream,partitionKey,data);
sendResultFuturesMap.put(key,resultFuture);
...
// Verify
for(Map.Entry<String,ListenableFuture<UserRecordResult>> entry : sendResultFuturesMap.entrySet()) {
String key = entry.getKey();
ListenableFuture<UserRecordResult> resultFuture = entry.getValue();
UserRecordResult result = null;
try {
result = resultFuture.get();
} catch (Exception e) {
// I always get an ExecutionException that just says "com.amazonaws.services.kinesis.producer.UserRecordFailedException"
// the if below is never executed,so I never get the UserRecordResult object
if (e instanceof UserRecordFailedException) {
result = ((UserRecordFailedException) e).getResult();
}
}
}
我尝试了回调,并且可以成功和失败都获取UserRecordResult对象,但是我不确定如何将其链接到我的ISBN。
ListenableFuture<UserRecordResult> resultFuture = producer.addUserRecord(kinesisStream,data);
com.google.common.util.concurrent.Futures.addCallback(resultFuture,kinesisSentCallback,resultFutureThreadPool);
private static FutureCallback<UserRecordResult> kinesisSentCallback = new FutureCallback<>() {
...
@Override
public void onFailure(Throwable t) {
//Works fine. I'm able to get the UserRecordResult object
if (t instanceof UserRecordFailedException) {
UserRecordFailedException e = (UserRecordFailedException) t;
UserRecordResult result = e.getResult();
// How do I get ISBN from the result object?
}
}
我应该为每个消息创建一个新的回调吗?看来效率低下。 ppl如何做到?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)