AsyncItemWriter-将自定义类展开到Future中时,面对ClassCastException

问题描述

我正在编写一个春季批处理作业,该作业读取一些数据并将其写入文件。数据分为3个块,在合并之前,每个块都需要写入一个单独的文件中。阅读和汇总工作正常。

但是,当异步编写器抛出错误时, java.lang.classCastException:class package.Custom无法转换为类java.util.concurrent.Future

似乎我的自定义DTO并没有展开为Future对象。 这是我的简化代码

 @Bean
    public Step workerStep() throws Exception {
        return stepBuilderFactory.get(WORKER_BEAN)
                .<Custom,Custom>chunk(CHUNK_SIZE)
                .reader(reader(null))
                .writer(writer())
                .taskExecutor(threadPoolExecutor)
                .throttleLimit(THRottLE_LIMIT)
                .build();
    }

    @Bean
    @StepScope
    public Customreader reader(@Value("#{stepExecutionContext['currentMod']}") Long currentMod) {
        if ( ... some logic ... ) {
            return null;
        }
        return new Customreader();
    }

    @Bean
    @StepScope
    public AsyncItemWriter writer() throws Exception {
        AsyncItemWriter<Custom> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(delegateWriter(null));
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean(destroyMethod=EMPTY)
    @StepScope
    public CustomWriter<Custom> delegateWriter(@Value("#{stepExecutionContext['currentMod']}") Long currentMod) {
        CustomWriter<Custom> customWriter = new CustomWriter<>();

        customWriter.setLineAggregator(new DelimitedLineAggregator<Custom>() {
            {
                setDelimiter(COMMA);
                setFieldExtractor(new BeanWrapperFieldExtractor<Custom>() {
                    {
                        setNames( ... get names logic ... );
                    }
                });
            }
        });

        customWriter.setResource(new FileSystemResource(name));
        
        // ... header call-back logic ...
        
        customWriter.setAppendAllowed(true);
        customWriter.setShouldDeleteIfEmpty(true);
        customWriter.setShouldDeleteIfExists(true);
        return customWriter;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor(THREAD_NAME_PREFIX);
    } 

解决方法

AsyncItemWriter期望将项目包装在Future中(请参见Javadoc),这通常由AsyncItemProcessor完成。 AsyncItemWriterAsyncItemProcessor一起用于实现派生/合并方案,请参见Asynchronous Processors部分。