问题描述
我要完成read sub-folders/directory
中given folder/path
的工作/任务。路径是动态的,我们从Controller
获得。目前,我使用tasklet
,有3个小任务,一个读取子目录,另一个处理它以准备要保存到DB的对象,最后一个将处理后的数据对象写入数据库。
这些文件夹可以包含任意数量的子文件夹。目前,我已使用以下代码:
Path start = Paths.get("x:\\data\\");
Stream<Path> stream = Files.walk(start,1);
List<String> collect = stream
.map(String::valueOf)
.sorted()
.collect(Collectors.toList());
一次读取所有子文件夹。
为此,我遵循了https://www.baeldung.com/spring-batch-tasklet-chunk
实现的这个tasklet
示例。这是正确的方法吗?我还需要使用多线程异步运行Job。
由于可能存在大量子文件夹,因此so there can be huge number of
行or
列表 of data to process and write to the database.
请提出适当的方法。
我正在学习Spring Batch
,也已经在file read/process/write
上做了一些示例,并为此使用了Chunk
方法。
但是我的工作是读取文件夹/路径的子目录,因此我无法决定采用哪种方法。
解决方法
我有一个类似的情况:我需要从文件夹中读取所有文件,进行处理并写入db(Doc)
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,Step masterStep) {
return jobBuilderFactory.get("MainJob")
.incrementer(new RunIdIncrementer())
.flow(masterStep)
.end()
.build();
}
@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,JdbcBatchItemWriter<Transaction> writer,ItemReader<String> reader,TransactionItemProcessor processor) {
return stepBuilderFactory.get("Main")
.<String,Transaction>chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
**.taskExecutor(jobTaskExecutor())**
.listener(new ItemReaderListener())
.build();
}
@Bean
public TaskExecutor jobTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
@StepScope
public ItemReader<String> reader(@Value("#{stepExecution}") StepExecution stepExecution) throws IOException {
Path start = Paths.get("D:\\test");
List<String> inputFile = Files.walk(start,1)
.map(String::valueOf)
.sorted()
.collect(Collectors.toList());
return new IteratorItemReader<>(inputFile);
}
@Bean
@StepScope
public TransactionItemProcessor processor(@Value("#{stepExecution}") StepExecution stepExecution) {
return new TransactionItemProcessor();
}
@Bean
@StepScope
public JdbcBatchItemWriter<Transaction> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Transaction>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO transaction (id,date,type) VALUES (:id,:date,:type)")
.dataSource(dataSource)
.build();
}
}