问题描述
尝试创建一个将事件列表发送到 kafka 主题的反应式方法(使用 @Outgoing 注释)
例如
@Outgoing("kafkatopic01")
public Multi<List<Thing>> poll() {
return Multi.createFrom()
.ticks()
.every(Duration.ofSeconds(10))
.onOverflow().drop()
.map(tick -> (List<Things>) ds.getData())
[...]
“ds.getData()” - 在上面的例子中 - 返回一个事件列表(“Thing”) - 从 JPQL 命名查询 - 发送到一个主题。
问题:我如何编写上述代码...
"return Multi.createFrom()..."
...这样返回的列表不是作为单个对象发送到“@Outgoing”主题?
换句话说,我如何修改上面的“Multi”流,以便“Thing”事件列表单独发送,而不是作为单个对象发送
卡夫卡 夸克 1.11.0.CR1 爪哇11
解决方法
看起来这个“.onItem().disjoint()”完成了我正在寻找的......
return Multi.createFrom()
.ticks()
.every(Duration.ofSeconds(10))
.onOverflow().drop()
.map(tick -> {
List<Thing> list = (List<Thing>) polldata.get(time.get("datetime").atZone(ZoneId.of("America/New_York")));
time.put("datetime",(list.size() == 0 ? time.get("datetime") : Instant.now()));
return list;
})
.onItem()
.<Thing>disjoint();