Kafka流和Spring Cloud流-处理器效率

问题描述

我想确认一下我对让多个处理器从一个Kafka Stream源读取效率的理解。我相信如果要根据谓词逻辑执行两个不同的处理,示例1中的以下内容将是最有效的。谓词查看值的内容(此处为Notification对象)。如果示例1中的以下每个处理器中都有一个断点,则说明每个传入的Notification都调用了每个Function。而在示例2中,只有在满足谓词逻辑的情况下,才调用process2函数。

示例1

@Bean
public Function<KStream<String,Notification>,KStream<String,Notification>> process1() {

    return input -> input
            .branch(PREDICATE_FOR_OUT_0,PREDICATE_FOR_OUT_1);
}

@Bean
public Function<KStream<String,EnrichedNotification>> process2() {
    return input -> input
            .filter(PREDICATE_FOR_OUT_2);
            .map((key,value) ->.........; //different additional processing to map to EnrichedNotification type
}

是否没有必要执行以下操作并尝试将一个处理器的输出路由到另一个处理器? (不确定是否有可能)

示例2(概念性) 我可能会这样想,因为我来自使用纯Kafka。这里process1有一个3向分支。其中两个分支转到各自的流,然后转到主题,但是第三个分支需要进一步处理才能将其路由到主题。

@Bean
public Function<KStream<String,Notification>[]> process1() {

    return input -> input
            .branch(PREDICATE_FOR_OUT_0,PREDICATE_FOR_OUT_1,PREDICATE_FOR_OUT_2);
}

我们是否可以将PREDICATE_FOR_OUT_2的分支路由到process2中。这意味着只有在满足PREDICATE_FOR_OUT_2的情况下才会调用process2

@Bean
public Function<KStream<String,EnrichedNotification>> process2() {
    return input -> input
            .map((key,value) ->.........; //different additional processing to map to EnrichedNotification type
}

我的想法是示例2是多余的(无论如何实际上是不可能的),因为Kafka Streams提供了抽象和功能

解决方法

我认为您的示例的两种情况都可以完成工作,但是存在一些差异。在第一个示例中,您有两个函数,都从同一个Kafka主题接收数据,第二个函数在路由到输出主题之前执行一些附加的逻辑。在第二个示例中,您再次具有两个功能。在第一个函数中,您有3个分支,每个分支都将数据发送到Kafka主题(我假设它们是3个不同的主题)。然后,在第二个函数中,您将从第一个函数的第三个输出主题接收数据。在执行示例2的第二个功能中的逻辑之后,将其发送到该分支的最终目标。您将为第二个示例引入一个额外的主题。我认为您的第一个示例更具可读性和简洁性。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...