问题描述
当前正在从事Spring Batch项目,我们在应用程序启动时启动工作。我的春季批处理工作(多线程步骤)包括:
但是,在研究表明最佳的解决方案是使用spring集成ServiceActivator来启动作业之后,我们现在希望在将消息添加到队列时就开始作业。我面临的问题是,一旦启动作业并执行了Jms项目读取器,消息就不再在队列中,因为它们已被ChannelPublishingJmsMessageListener占用。
我的代码:
@Bean
public JmsItemReader<Message> reader() {
JmsItemReader<Message> itemReader = new JmsItemReader<>();
itemReader.setItemType(Message.class);
itemReader.setjmstemplate(jmstemplate());
return itemReader;
}
// Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {
int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));
return steps.get("stepDetectionIncoherencesLiq").<Message,DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer()).readerIsTransactionalQueue().faultTolerant()
.taskExecutor(taskExecutor()).throttleLimit(threadSize)
.listener(stepListener()).build();
}
@Bean
// @DependsOn({"getJobRepository"})
Job job(@Autowired JobBuilderFactory jobs,@Qualifier("stepDetectionIncoherencesLiq") Step stepDetectionIncoherencesLiq) {
LOGGER.error("Creation bean job ");
return jobs.get("job")
.incrementer(new RunIdIncrementer())
.start(stepDetectionIncoherencesLiq).build();
}
具有spring集成实现:
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer defmessageListenerContainer = new SimpleMessageListenerContainer();
defmessageListenerContainer.setConnectionFactory((ConnectionFactory) queueConnectionFactory().getobject());
defmessageListenerContainer.setDestination((Destination) jmsQueue().getobject());
// defmessageListenerContainer.setSessionTransacted(true);
return defmessageListenerContainer;
}
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoilnt() {
ChannelPublishingJmsMessageListener channelPublishingJml = new ChannelPublishingJmsMessageListener();
channelPublishingJml.setRequestChannel(inputChannel());
// channelPublishingJml.setReplyChannel(outputChannel());
channelPublishingJml.setMessageConverter(new JmsMessageConverter());
return new JmsMessageDrivenEndpoint(messageListenerContainer(),channelPublishingJml);
}
@Bean
public IntegrationFlow myFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from("outputChannel").handle(jobLaunchingGateway) .handle(logger()).get();
}
@Bean
JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
return new JobLaunchingGateway(jobLauncher);
}
和服务激活器:
@ServiceActivator(inputChannel = "inputChannel",outputChannel = "outputChannel")
public JobLaunchRequest process(DetectionIncoherenceLiqJmsOut jmsOut) {
log.info("Starting Job");
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
return new JobLaunchRequest(job,jobParametersBuilder.toJobParameters());
}
我是Spring集成的新手,这很困难,我有以下问题:
- 如何在不使用消息的情况下触发工作?
- 任何人都可以提供频道项目阅读器或tasklet的代码吗?我可以从通道而不是队列中读取消息吗?
- 将jms消息转换为模型对象并将其传递给作业参数是否很好? 感谢您的帮助。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)