使用 spring 云流功能进行条件路由

问题描述

在旧的命令式编程类型被弃用后,我遇到了一些问题。 我有两个微服务(一个作为发布者,另一个作为订阅者)并且以旧方式,使用注释 @StreamListener(target = "events",condition = "headers['type']=='consumerPermissionEvent'") 我能够有两个函数只侦听该记录,现在我不知道该怎么做.

我正在阅读所有文档 event routing 并尝试使用路由表达式,但两个消费者正在阅读所有记录。

一个微服务的应用yaml:

spring:
  cloud:
    stream:
      bindings:
        output: 
          destination: topicEvents

秒应用yaml为:

spring:
  cloud:
    function:
      routing-expression: headers['type']
      deFinition: consumerPermissionEvent;consumerApiEvent
    stream:
      bindings:
        consumerPermissionEvent-in-0:
          destination: topicUsers
        consumerApiEvent-in-0:
          destination: topicUsers

我从第一个这样的微服务发送:

@Autowired
private StreamBridge bridge;

public void send(PermissionEvent event){
    Message<PermissionEvent> message = MessageBuilder.withPayload(event)
            .setHeader("type","consumerPermissionEvent").build();
    bridge.send("output",message);
}

第二个微服务有两个消费者:

    @Bean
    public Consumer<Message<ApiEvent>> consumerApiEvent() {
        return e -> log.debug("READED API EVENT: {}",e.getPayload());
    }

    @Bean
    public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
        return e -> log.debug("READED PERMISSION EVENT: {}",e.getPayload());
    }

以及第二个微服务的输出日志:

[KafkaConsumerDestination{consumerDestinationName='topicUsers',partitions=1,dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers',dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)

知道怎么做吗?

提前致谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)