与JMS的Spring Batch集成

问题描述

当前正在从事Spring Batch项目,我们在应用程序启动时启动工作。我的春季批处理工作(多线程步骤)包括

  • 使用JmsItemReader读取jms消息
  • 处理消息,将其转换为模型对象。
  • 在提交数据库之前先在writer中调用一些rest服务。

但是,在研究表明最佳的解决方案是使用spring集成ServiceActivator来启动作业之后,我们现在希望在将消息添加到队列时就开始作业。我面临的问题是,一旦启动作业并执行了Jms项目读取器,消息就不再在队列中,因为它们已被ChannelPublishingJmsMessageListener占用。

我的代码

 @Bean
    public JmsItemReader<Message> reader() {
        JmsItemReader<Message> itemReader = new JmsItemReader<>();
        itemReader.setItemType(Message.class);
        itemReader.setjmstemplate(jmstemplate());
        return itemReader;
    }

 // Jobs et Steps
    @Bean
    Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {

        int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));

        return steps.get("stepDetectionIncoherencesLiq").<Message,DetectionIncoherenceLiqJmsOut>chunk(1)
                .reader(reader())
                .processor(processor())
                .writer(writer()).readerIsTransactionalQueue().faultTolerant()
                                   .taskExecutor(taskExecutor()).throttleLimit(threadSize)
                                    .listener(stepListener()).build();
    }

 @Bean
    // @DependsOn({"getJobRepository"})
    Job job(@Autowired JobBuilderFactory jobs,@Qualifier("stepDetectionIncoherencesLiq") Step stepDetectionIncoherencesLiq) {

        LOGGER.error("Creation bean job ");

        return jobs.get("job")
                 .incrementer(new RunIdIncrementer())
                .start(stepDetectionIncoherencesLiq).build();
    }

具有spring集成实现:

public SimpleMessageListenerContainer messageListenerContainer() {

        SimpleMessageListenerContainer defmessageListenerContainer = new SimpleMessageListenerContainer();
        defmessageListenerContainer.setConnectionFactory((ConnectionFactory) queueConnectionFactory().getobject());
        defmessageListenerContainer.setDestination((Destination) jmsQueue().getobject());
//        defmessageListenerContainer.setSessionTransacted(true);

        return defmessageListenerContainer;
    }

    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoilnt() {
        ChannelPublishingJmsMessageListener channelPublishingJml = new ChannelPublishingJmsMessageListener();
        channelPublishingJml.setRequestChannel(inputChannel());
//        channelPublishingJml.setReplyChannel(outputChannel());
        channelPublishingJml.setMessageConverter(new JmsMessageConverter());
        return new JmsMessageDrivenEndpoint(messageListenerContainer(),channelPublishingJml);
    }
    
    @Bean
    public IntegrationFlow myFlow(JobLaunchingGateway jobLaunchingGateway) {
        return IntegrationFlows.from("outputChannel").handle(jobLaunchingGateway)   .handle(logger()).get();
    }

    @Bean
    JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
        return new JobLaunchingGateway(jobLauncher);
    }

和服务激活器:

@ServiceActivator(inputChannel = "inputChannel",outputChannel = "outputChannel")
public JobLaunchRequest process(DetectionIncoherenceLiqJmsOut jmsOut) {

    log.info("Starting Job");

    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    return new JobLaunchRequest(job,jobParametersBuilder.toJobParameters());
}

我是Spring集成的新手,这很困难,我有以下问题:

  • 如何在不使用消息的情况下触发工作?
  • 任何人都可以提供频道项目阅读器或tasklet代码吗?我可以从通道而不是队列中读取消息吗?
  • 将jms消息转换为模型对象并将其传递给作业参数是否很好? 感谢您的帮助。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)