问题描述
我正在使用 spring 数据 redis 订阅“任务”redis 流来处理任务。 出于某种原因,即使我明确提供了一个线程池 TaskExecutor,redis 流消费者也只产生一个线程并一次按顺序处理一条消息。
我希望它将线程的创建委托给提供的线程池,并产生一个线程,直到线程池配置的限制。我可以看到它正在使用给定的 TaskExecutor,但它不会产生多个线程。
即使我没有指定自己的 taskExecutor,并且它在内部默认为 SimpleAsyncTaskExecutor,问题仍然继续。任务一次一个、一个接一个地依次处理,即使它们是长期任务。
我在这里遗漏了什么?
@Bean
public Subscription
redisTaskStreamListenerContainer(
RedisConnectionFactory connectionFactory,@Qualifier("task") Redistemplate<String,Task<TransportEnvelope>> redistemplate,@Qualifier("task") StreamListener<String,MapRecord<String,String,String>> listener,@Qualifier("task") Executor taskListenerExecutor) {
StreamMessageListenerContainerOptions<String,String>>
containerOptions = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(consumerPollTimeOutInMilli))
.batchSize(consumerReadBatchSize)
.executor(taskListenerExecutor)
.build();
StreamMessageListenerContainer<String,String>> container =
StreamMessageListenerContainer.create(connectionFactory,containerOptions);
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readOptions
=
StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset.create(streamName,ReadOffset.lastConsumed()))
//turn off auto shutdown of stream consumer if an error occurs.
.cancelOnError((ex) -> false)
.consumer(Consumer.from(groupId,consumerId))
.build();
Subscription subscription = container.register(readOptions,listener);
container.start();
return subscription;
}
@Bean
@Qualifier("task")
public Executor redisListenerThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(30);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
threadPoolTaskExecutor.setThreadNamePrefix("redis-listener-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)