正在向ReactiveStream.publisher

问题描述

我正在构建一个quarkus应用程序,它将接收和发送来自Rabbitmq的消息。

现在我可以从代理接收消息了,但是问题是我迷失了,该消息仅应由通过频道订阅的人接收。

我有一堂课对此进行了扩展

import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
public final class RabbitMQConnector extends RabbitMQConnectorConfig implements IncomingConnectorFactory {

    private static final Logger log = Logger.getLogger(RabbitMQConnector.class.getName());
    public static final String CONNECTOR_NAME = "rabbitmq";

    @Inject
    private ExecutionHolder executionHolder;

    @Override
    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        final RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config);

        final List<Message<RabbitMQMessage>> messages = new ArrayList<>();

        start(vetx(),ic,rabbitMQMessage -> {
            log.info("Got message: " + rabbitMQMessage.body().toString());
            messages.add(Message.of(rabbitMQMessage));
        });

        return ReactiveStreams.frompublisher(Multi.createFrom().iterable(messages));
    }

    private Vertx vetx() {
        return this.executionHolder.vertx();
    }

}

我可以确认我可以从日志中收到消息。

2020-10-01 06:40:53,643 INFO  [io.sma.rea.mes.rab.RabbitMQConnector] (vert.x-eventloop-thread-1) Got message: message from q2
2020-10-01 06:41:45,071 INFO  [io.sma.rea.mes.rab.RabbitMQConnector] (vert.x-eventloop-thread-1) Got message: message from q1
2020-10-01 06:41:53,637 INFO  [io.sma.rea.mes.rab.RabbitMQConnector] (vert.x-eventloop-thread-1) Got message: rasdf22222222222222

但是我的问题是我有一个基于频道/队列侦听/订阅的配置

mp.messaging.incoming.q1.connector=rabbitmq

mp.messaging.incoming.q2.connector=rabbitmq

现在我需要实现的是针对 q1 的所有消息,此方法应该捕获它。

@Incoming("q1")
public void sample(Long test){
   System.out.print("got from one" + test + " ");
}

q2

相同
@Incoming("q1")
public void sampleTwo(Long test){
 System.out.print(" got from two" + test + " ");
}

但用于Got message: rasdf22222222222222 我希望没人能做到这一点,因为我没有为此创建一个@Incoming并在属性文件中进行配置

最后,我设置了queue = channel,所以在这种情况下

mp.messaging.incoming.q1.connector=rabbitmq

通道为q1,因此队列也为q1。其他也一样

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)