问题描述
我是 kafka 的新手。我定义了以下状态存储:
StoreBuilder<keyvalueStore<String,String>> storeBuilder=Stores.keyvalueStoreBuilder(
Stores.inMemorykeyvalueStore(storeName),Serdes.String(),Serdes.String());
builder.addStateStore(storeBuilder);
我想将记录删除到商店中,但我不能这样做。我曾尝试使用墓碑,但它会向我插入带有键和空值的新记录,而不是删除所有内容。我试过做 stream.cleanup() 但它甚至没有去。我的商店越来越多,我不知道该怎么做。 这是我使用商店尝试取消记录的一段代码:
private String checkAndUpdateStateStore(Vessel v) throws ParseException {
store.put(v.getVessel(),null);
//store.delete(v.getVessel());
return v.getVessel();
}
另一个问题是,在存储中,我希望对于同一个键,旧值必须替换为新值,而不是为同一个键创建新记录。 提前感谢那些让我摆脱困境的人
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)