Kafka Stream使用相同的谓词将消息发送到多个分支

问题描述

我需要给两个不同的分支提供相同的消息,但要从单个流中读取。两个分支都应该获取所有消息(即,我对两个分支的谓词都相同。

我尝试了下面的代码,但是我看到只有第一个分支正在收到消息,而第二个分支却没有得到消息。

 KStream<String,byte[]>[] branches = builder.<String,byte[]>stream("source-topic)
                .branch((key,val) -> true,// this goes to output topic1
                        (key,val) -> true); // this goes to output topic2

// branch[0] map on logic1Ondata and send to topic1
branches[0].map(logic1OnData).filter(
                (key,value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic1",Produced.with(Serdes.String(),Serdes.String())

// branch[1] map on logic2Ondata and send to topic2
branches[1].map(logic2OnData).filter(
                (key,value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic2",Serdes.String())

期望两个分支都应从“源主题”中获得相同的消息。每个分支分别对logic1OnData和logic2OnData进行验证,并写入topic1和topic2 但是我只看到branch [0]接收消息,而没有消息到branch [1]

如果不能选择分支,我想知道是否还有其他方法。

编辑: 分支是另外一种情况。因此,如果第一个谓词匹配,则不会考虑休息。因此,由于这个原因,只有topic1才能获得上述逻辑的消息。

代替下面的方法,我可以将消息分配给多个路径。

KStream<String,byte[]>[] stream = builder.<String,byte[]>stream("source-topic);

stream.map(logic1OnData).filter(
                (key,Serdes.String())

stream.map(logic2OnData).filter(
                (key,Serdes.String())

此更改将所有消息发送到两个地图。进一步过滤并写入相应主题。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...