问题描述
我正在尝试使用一个写入器写入成功的记录,而另一个写入器写入失败的记录。
我已经编写了实现ItemWriter的BillerOrderWriter类。我放了一些日志语句,可以看到它写成功billerOrderId或失败的billerOrderId。但是,似乎它没有调用DatabasetoCsvFileJobConfig或SuccessOrdersToCsvFileJobConfig。
public class BillerOrderWriter implements ItemWriter<BillerOrder>{
private static Logger log = LoggerFactory.getLogger("BillerOrderWriter.class");
@Autowired
SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig;
@Autowired
DatabasetoCsvFileJobConfig databasetoCsvFileJobConfig;
@Override
public void write(List<? extends BillerOrder> items) throws Exception {
for (BillerOrder item : items) {
log.info("item = " + item.toString());
if (item.getResult().equals("SUCCESS")) {
log.info(" Success billerOrderId = " + item.getBillerOrderId());
successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
} else {
log.info("Failed billerOrderId = " + item.getBillerOrderId());
databasetoCsvFileJobConfig.databaseCsvItemWriter();
}
}
}
}
这是BatchConfig类。
@Bean
public BillerOrderWriter billerOrderWriter() {
return new BillerOrderWriter();
}
@Bean
public Job importJobOrder(JobCompletionNotificationListner listener,Step step1) {
return jobBuilderFactory.get("importJobOrder")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean(name="step1")
public Step step1(BillerOrderWriter billerOrderWriter) {
return stepBuilderFactory.get("step1")
.<BillerOrder,BillerOrder> chunk(10)
.reader((ItemReader<? extends BillerOrder>) reader())
.processor(processor())
.writer(billerOrderWriter)
.build();
}
这是我的成功撰写者和失败撰写者的课程。
@Configuration
public class SuccessfulOrdersToCsvFileJobConfig {
private static Logger log = LoggerFactory.getLogger("SuccessfulOrdersToCsvFileJobConfig.class");
@Bean
public ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter() {
log.info("Entering SuccessfulOrdersToCsvFileJobConfig...");
FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
String exportFileHeader = "BillerOrderId;SuccessMessage";
OrderWriter headerWriter = new OrderWriter(exportFileHeader);
csvFileWriter.setHeaderCallback(headerWriter);
String exportFilePath = "/tmp/SuccessBillerOrderIdForRetry.csv";
csvFileWriter.setResource(new FileSystemResource(exportFilePath));
LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
csvFileWriter.setLineAggregator(lineAggregator);
return csvFileWriter;
}
private LineAggregator<BillerOrder> createOrderLineAggregator() {
log.info("Entering createOrderLineAggregator...");
DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(";");
FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
lineAggregator.setFieldExtractor(fieldExtractor);
return lineAggregator;
}
private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
log.info("Entering createOrderFieldExtractor...");
BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] {"billerOrderId","successMessage"});
return extractor;
}
}
@Configuration
public class DatabasetoCsvFileJobConfig {
private static Logger log = LoggerFactory.getLogger("DatabasetoCsvFileJobConfig.class");
@Bean
public ItemWriter<BillerOrder> databaseCsvItemWriter() {
log.info("Entering databaseCsvItemWriter...");
FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
String exportFileHeader = "BillerOrderId;ErrorMessage";
OrderWriter headerWriter = new OrderWriter(exportFileHeader);
csvFileWriter.setHeaderCallback(headerWriter);
String exportFilePath = "/tmp/FailedBillerOrderIdForRetry.csv";
csvFileWriter.setResource(new FileSystemResource(exportFilePath));
LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
csvFileWriter.setLineAggregator(lineAggregator);
return csvFileWriter;
}
private LineAggregator<BillerOrder> createOrderLineAggregator() {
log.info("Entering createOrderLineAggregator...");
DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(";");
FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
lineAggregator.setFieldExtractor(fieldExtractor);
return lineAggregator;
}
private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
log.info("Entering createOrderFieldExtractor...");
BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] {"billerOrderId","errorMessage"});
return extractor;
}
}
这是我的工作完成侦听器类。
@Component
public class JobCompletionNotificationListner extends JobExecutionListenerSupport {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(JobCompletionNotificationListner.class);
@Override
public void afterJob(JobExecution jobExecution) {
log.info("In afterJob ...");
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
DatabasetoCsvFileJobConfig databasetoCsvFileJobConfig = new DatabasetoCsvFileJobConfig();
SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig = new SuccessfulOrdersToCsvFileJobConfig();
}
}
}
解决方法
在您的BillerOrderWriter#write
方法中,应该编写对项进行实际写入操作的代码到数据接收器。但是对于您而言,您正在调用successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
和databaseToCsvFileJobConfig.databaseCsvItemWriter();
来创建项目编写者bean。您应该注入那些委托编写者,并在需要时调用其write
方法,例如:
public class BillerOrderWriter implements ItemWriter<BillerOrder>{
private ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter;
private ItemWriter<BillerOrder> databaseCsvItemWriter;
// constructor with successfulDatabaseCsvItemWriter + databaseCsvItemWriter
@Override
public void write(List<? extends BillerOrder> items) throws Exception {
for (BillerOrder item : items) {
if (item.getResult().equals("SUCCESS")) {
successfulDatabaseCsvItemWriter.write(Collections.singletonList(item));
} else {
databaseCsvItemWriter.write(Collections.singletonList(item));
}
}
}
}
,
我代替BillerOrderWriter编写BillerOrderClassifier类。
public class BillerOrderClassifier implements Classifier<BillerOrder,ItemWriter<? super BillerOrder>> {
private static final long serialVersionUID = 1L;
private ItemWriter<BillerOrder> successItemWriter;
private ItemWriter<BillerOrder> failedItemWriter;
public BillerOrderClassifier(ItemWriter<BillerOrder> successItemWriter,ItemWriter<BillerOrder> failedItemWriter) {
this.successItemWriter = successItemWriter;
this.failedItemWriter = failedItemWriter;
}
@Override
public ItemWriter<? super BillerOrder> classify(BillerOrder billerOrder) {
return billerOrder.getResult().equals("SUCCESS") ? successItemWriter : failedItemWriter;
}
}
在BatchConfiguration中,我编写了classifierBillerOrderCompositeItemWriter方法。
@Bean
public ClassifierCompositeItemWriter<BillerOrder> classifierBillerOrderCompositeItemWriter() throws Exception {
ClassifierCompositeItemWriter<BillerOrder> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(new BillerOrderClassifier(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter(),databaseToCsvFileJobConfig.databaseCsvItemWriter()));
return compositeItemWriter;
}
@Bean(name="step1")
public Step step1() throws Exception{
return stepBuilderFactory.get("step1")
.<BillerOrder,BillerOrder> chunk(10)
.reader((ItemReader<? extends BillerOrder>) reader())
.processor(processor())
.writer(classifierBillerOrderCompositeItemWriter())
.stream(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter())
.stream(databaseToCsvFileJobConfig.databaseCsvItemWriter())
.build();
}