如何使用“Multi”流解析 JPQL resultList() 的输出以将单个项目发送到主题而不是整个列表对象

问题描述

尝试创建一个将事件列表发送到 kafka 主题的反应式方法(使用 @Outgoing 注释)

例如

@Outgoing("kafkatopic01")
public Multi<List<Thing>> poll() {

    return Multi.createFrom()
            .ticks()
            .every(Duration.ofSeconds(10))
            .onOverflow().drop()
            .map(tick -> (List<Things>) ds.getData())
            [...]
            

“ds.getData()” - 在上面的例子中 - 返回一个事件列表(“Thing”) - 从 JPQL 命名查询 - 发送到一个主题

问题:我如何编写上述代码...

"return Multi.createFrom()..."  

...这样返回的列表不是作为单个对象发送到“@Outgoing”主题

换句话说,我如何修改上面的“Multi”流,以便“Thing”事件列表单独发送,而不是作为单个对象发送

卡夫卡 夸克 1.11.0.CR1 爪哇11

解决方法

看起来这个“.onItem().disjoint()”完成了我正在寻找的......

return Multi.createFrom()
        .ticks()
        .every(Duration.ofSeconds(10))
        .onOverflow().drop()
        .map(tick -> { 
            List<Thing> list =  (List<Thing>) polldata.get(time.get("datetime").atZone(ZoneId.of("America/New_York")));
            time.put("datetime",(list.size() == 0 ? time.get("datetime") : Instant.now()));
            return list;
        })
        .onItem()
        .<Thing>disjoint();