问题描述
我想要一个绑定,该绑定连接到特定的队列/主题,并根据特定的标头条目路由到正确的功能。
在这种情况下,我找不到任何示例。我尝试了几种方法,但没有一个方法获得成功。
例如,这不起作用:
spring:
cloud:
function:
routing:
enabled: true
stream:
function:
routing:
enabled: true
deFinition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: mybroker
routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
myOtherConsumer-in-0: #without specific routing
赞赏每个具体的例子
解决方法
消费者不“路由”消息,而是从队列中消费消息。生产者使用s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression
路由消息。
我终于找到了实现目标的方法。但是我不确定这是这样做的方法:
spring:
cloud:
function:
routing:
enabled: true
routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
stream:
function:
definition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: myBroker
myOtherConsumer-in-0: #without specific routing
使用以下豆子:
@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
return message -> {
LOG.info("Sending to routingFunction");
routingFunction.apply(message);
};
}
@Bean
public Consumer<byte[]> evenConsumer() {
return (payload) -> LOG.info("even got: {}",new String(payload));
}
@Bean
public Consumer<byte[]> oddConsumer() {
return (payload) -> LOG.info("odd got: {}",new String(payload));
}
,
要启用路由,默认情况下会创建名为 functionRouter 的绑定。
根据文档:
RoutingFunction 注册在 FunctionCatalog 下 功能路由器。为了简单和一致性,您还可以参考 RoutingFunction.FUNCTION_NAME 常量。
下面的配置应该可以正常工作:
spring:
cloud:
stream:
function:
definition: functionRouter;
routing:
enabled: true
kafka:
binder:
brokers:
- localhost:9092
bindings:
functionRouter-in-0:
destination: my.topic
group: my.topic.group
function:
routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"
您不需要创建偶数和奇数消费者函数定义。
,您实际上不需要在 spring.cloud.stream.function.routing.enabled=true
文件中提供 application.properties
参数来使路由工作,因为只要您提供 routing-expression
参数它就会自动工作 - 请参阅: spring cloud stream documentation