Spring Data Redis - StreamMessageListenerContainer 只产生一个线程

问题描述

我正在使用 spring 数据 redis 订阅“任务”redis 流来处理任务。 出于某种原因,即使我明确提供了一个线程池 TaskExecutor,redis 流消费者也只产生一个线程并一次按顺序处理一条消息。

我希望它将线程的创建委托给提供的线程池,并产生一个线程,直到线程池配置的限制。我可以看到它正在使用给定的 TaskExecutor,但它不会产生多个线程。

即使我没有指定自己的 taskExecutor,并且它在内部认为 SimpleAsyncTaskExecutor,问题仍然继续。任务一次一个一个一个地依次处理,即使它们是长期任务。

在这里遗漏了什么?

@Bean
public Subscription
      redisTaskStreamListenerContainer(
      RedisConnectionFactory connectionFactory,@Qualifier("task") Redistemplate<String,Task<TransportEnvelope>> redistemplate,@Qualifier("task") StreamListener<String,MapRecord<String,String,String>> listener,@Qualifier("task") Executor taskListenerExecutor) {

 
    StreamMessageListenerContainerOptions<String,String>>
        containerOptions = StreamMessageListenerContainerOptions.builder()
        .pollTimeout(Duration.ofMillis(consumerPollTimeOutInMilli))
          .batchSize(consumerReadBatchSize)
          .executor(taskListenerExecutor)
          .build();

    StreamMessageListenerContainer<String,String>> container =
        StreamMessageListenerContainer.create(connectionFactory,containerOptions);

    StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readOptions
        =
        StreamMessageListenerContainer.StreamReadRequest
            .builder(StreamOffset.create(streamName,ReadOffset.lastConsumed()))
            //turn off auto shutdown of stream consumer if an error occurs.
            .cancelOnError((ex) -> false)
            .consumer(Consumer.from(groupId,consumerId))
            .build();


    Subscription subscription = container.register(readOptions,listener);
    container.start();
    return subscription;
  }


  @Bean
  @Qualifier("task")
  public Executor redisListenerThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(30);
    threadPoolTaskExecutor.setMaxPoolSize(50);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
    threadPoolTaskExecutor.setThreadNamePrefix("redis-listener-");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return threadPoolTaskExecutor;
  }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)