问题描述
我正在使用 spring-amqp,并使用 consumerBatchEnabled 接收以下链接中提到的一批事件: https://docs.spring.io/spring-amqp/docs/2.2.5.RELEASE/reference/html/#receiving-batch
并注册监听器如下:
struct Material
{
std::string name;
std::optional<size_t> albedo;
std::optional<size_t> normal;
std::optional<size_t> Metalness;
std::optional<size_t> roughness;
std::optional<size_t> ao; // ambient occlusion
bool hasAlphaChannel = false;
};
还有如下定义的配置类
import org.springframework.messaging.Message;
@RabbitListener(queues = "batch.2",containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
//code here to process events
}
但是,当我发布事件时,我得到 ClassCastException,java.util.ArrayList 无法转换为 org.springframework.amqp.core.Message:
堆栈跟踪
@Bean
@Autowired
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
factory.setPrefetchCount(1000);
factory.setBatchListener(true);
factory.setBatchSize(1000);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(1000l);
configurer.configure(factory,connectionFactory);
return factory;
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
没有consumerBatchEnabled就没有问题,并且能够接收和处理Message
Execution of Rabbit message listener Failed.","logger_name":"org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler","thread_name":"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1","level":"WARN","level_value":30000,"stack_trace":"java.lang.classCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message\n\tat brave.spring.rabbit.TracingRabbitListenerAdvice.invoke(TracingRabbitListenerAdvice.java:75)\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)\n\tat org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)\n\tat org.springframework.amqp.rabbit.listener.$Proxy210.invokeListener(UnkNown Source)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1537)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1532)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1472)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1037)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1026)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:923)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1298)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1204)\n\tat java.lang.Thread.run(Thread.java:748)\n"}
{"timestamp":"2021-06-10T13:47:56.851+0000","message":"Restarting Consumer
解决方法
这是一个已知问题 https://github.com/openzipkin/brave/issues/1240。 Bug 在库版本 5.12.4 中得到解决