问题描述
我需要给两个不同的分支提供相同的
我尝试了下面的代码,但是我看到只有第一个分支正在收到消息,而第二个分支却没有得到消息。
KStream<String,byte[]>[] branches = builder.<String,byte[]>stream("source-topic)
.branch((key,val) -> true,// this goes to output topic1
(key,val) -> true); // this goes to output topic2
// branch[0] map on logic1Ondata and send to topic1
branches[0].map(logic1OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1",Produced.with(Serdes.String(),Serdes.String())
// branch[1] map on logic2Ondata and send to topic2
branches[1].map(logic2OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2",Serdes.String())
期望两个分支都应从“源主题”中获得相同的消息。每个分支分别对logic1OnData和logic2OnData进行验证,并写入topic1和topic2 但是我只看到branch [0]接收消息,而没有消息到branch [1]
如果不能选择分支,我想知道是否还有其他方法。
编辑: 分支是另外一种情况。因此,如果第一个谓词匹配,则不会考虑休息。因此,由于这个原因,只有topic1才能获得上述逻辑的消息。
代替下面的方法,我可以将消息分配给多个路径。
KStream<String,byte[]>[] stream = builder.<String,byte[]>stream("source-topic);
stream.map(logic1OnData).filter(
(key,Serdes.String())
stream.map(logic2OnData).filter(
(key,Serdes.String())
此更改将所有消息发送到两个地图。进一步过滤并写入相应主题。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)