专用绑定的Spring Cloud Stream路由表达式

问题描述

我想要一个绑定,该绑定连接到特定的队列/主题,并根据特定的标头条目路由到正确的功能

在这种情况下,我找不到任何示例。我尝试了几种方法,但没有一个方法获得成功。

例如,这不起作用:

spring:
  cloud:
    function:
      routing:
        enabled: true
    stream:
      function:
        routing:
          enabled: true
        deFinition: myConsumer;myOtherConsumer;
        bindings:
          myConsumer-in-0:
            destination: myTopic
            group:  myGroup
            binder: mybroker
            routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
          myOtherConsumer-in-0: #without specific routing

赞赏每个具体的例子

解决方法

消费者不“路由”消息,而是从队列中消费消息。生产者使用s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression路由消息​​。

,

我终于找到了实现目标的方法。但是我不确定这是这样做的方法:

    spring:
      cloud:
        function:
          routing:
            enabled: true
          routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
        stream:
          function:
            definition: myConsumer;myOtherConsumer;
            bindings:
              myConsumer-in-0:
                destination: myTopic
                group:  myGroup
                binder: myBroker
              myOtherConsumer-in-0: #without specific routing

使用以下豆子:

@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
        return message -> {
           LOG.info("Sending to routingFunction");
           routingFunction.apply(message);
        };
}

@Bean
public Consumer<byte[]> evenConsumer() {
      return (payload) -> LOG.info("even got: {}",new String(payload));
}

@Bean
public Consumer<byte[]> oddConsumer() {
    return (payload) -> LOG.info("odd got: {}",new String(payload));
}
,

要启用路由,默认情况下会创建名为 functionRouter 的绑定。

根据文档:

RoutingFunction 注册在 FunctionCatalog 下 功能路由器。为了简单和一致性,您还可以参考 RoutingFunction.FUNCTION_NAME 常量。

下面的配置应该可以正常工作:

spring:
  cloud:
    stream:
      function:
        definition: functionRouter;
        routing:
          enabled: true
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        functionRouter-in-0:
          destination: my.topic
          group: my.topic.group
    function:
      routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"

您不需要创建偶数和奇数消费者函数定义。

,

您实际上不需要在 spring.cloud.stream.function.routing.enabled=true 文件中提供 application.properties 参数来使路由工作,因为只要您提供 routing-expression 参数它就会自动工作 - 请参阅: spring cloud stream documentation