如何将 RabbitListeners 绑定到 CloudAMQP?

问题描述

我目前在我的两个应用程序(网络/工作者)之间实现 RabbitMQ 消息传递时遇到问题。我的 RabbitMQ 服务托管在 CloudamQP(Heroku 插件)上。但是,我声明的任何 @RabbitListener 似乎都在尝试连接到 localhost 而不是云服务。

将以下组件添加到我的工作应用程序后:

@Service
public class TaskConsumer {
    @RabbitListener(queues = "worker.rpc.requests",containerFactory = "rabbitListenerContainerFactory")
    public String fetch(String p) {
        return p;
    }
}

我遇到以下错误

2021-07-05 14:38:23.006  INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145  WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception,processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145  INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]],channel=null,ackNowledgeMode=AUTO local queue size=0

如何绑定 RabbitListener 使其连接到 AMQP 环境?这是我的配置:

@Configuration
@EnableRabbit
public class RabbitConfig {

    protected final String workerQueueName = "worker.rpc.requests";
    protected final String routingKeyName = "rpc";
    protected final String directExcName = "worker.exchange";

    @Bean
    public ConnectionFactory connectionFactory() {
        final URI ampqUrl;
        try {
            ampqUrl = new URI(getEnvOrThrow("CLOUdamQP_URL"));
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }

        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
        factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
        factory.setHost(ampqUrl.getHost());
        factory.setPort(ampqUrl.getPort());
        factory.setVirtualHost(ampqUrl.getPath().substring(1));

        try {
            factory.getRabbitConnectionFactory().setUri(ampqUrl);
        } catch (URISyntaxException e) {
            e.printstacktrace();
        } catch (NoSuchAlgorithmException e) {
            e.printstacktrace();
        } catch (KeyManagementException e) {
            e.printstacktrace();
        }

        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());

        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(queue());
        factory.setMaxConcurrentConsumers(5);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.workerQueueName);
        template.setDefaultReceiveQueue(this.workerQueueName);
        return template;
    }

    @Bean
    public Queue queue() {
        return new Queue(this.workerQueueName);
    }

    @Bean
    public DirectExchange direct() {
        return new DirectExchange(this.directExcName);
    }

    @Bean
    public Binding binding(DirectExchange direct,Queue autoDeleteQueue1) {
        return BindingBuilder.bind(autoDeleteQueue1)
                .to(direct)
                .with(this.routingKeyName);
    }

    /**
     * required for executing adminstration functions against an AMQP broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    private static String getEnvOrThrow(String name) {
        final String env = getenv(name);
        if (env == null) {
            throw new IllegalStateException("Environment variable [" + name + "] is not set."); 
        }
        return env;
    }

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...