Spring Batch分区作业聚合问题

问题描述

我们从Spring Batch 2.x迁移到4.x。我们有许多分区的工作,这些工作有以下几部分:

<bean id="xxx.partitioned.jms.handler" class="MessageChannelPartitionHandler">
    <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="xxx.partitioned.jms.requests"/>
            <property name="receiveTimeout" value="${xxx.partitioned.timeout}"/>
        </bean>
    </property>
    <property name="stepName" value="xxx.partitioned.applycredits"/>
    <property name="gridSize" value="${xxx.partitioned.gridSize}"/>
</bean>

<int:channel id="xxx.partitioned.jms.requests">
        <int:dispatcher task-executor="springbatch.partitioned.jms.taskExecutor"/>    
</int:channel>
<int:channel id="xxx.partitioned.jms.reply" />
                              
<int-jms:outbound-gateway 
    id="xxx.gateway"
    auto-startup="false"
    connection-factory="springbatch.jmsConnectionFactory" 
    request-channel="xxx.partitioned.jms.requests" 
    request-destination="springbatch.partition.jms.requestsQueue" 
    reply-channel="xxx.partitioned.jms.reply"
    reply-destination="springbatch.partition.jms.repliesQueue" 
    receive-timeout="${xxx.partitioned.timeout}" 
    correlation-key="JMSCorrelationID" >
    <int-jms:reply-listener cache-level="0" />        
</int-jms:outbound-gateway>

<int:aggregator 
    input-channel="xxx.partitioned.jms.reply" 
    ref="xxx.partitioned.jms.handler"
    release-strategy="partitionReleaseStrategy"
/>

<bean id="partitionReleaseStrategy"
    class="org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy" 
/>

我们看到的问题是步骤未完成。我加快了日志记录的速度,并查看了已发送,已接收和已处理以及已收到回复的消息。在大多数情况下,聚合器会提取最后一条消息并继续。有时候,我看到聚合器收到了消息,但是MessageChannelPartitionerHandler没有得到合并的消息。日志显示了这些行(每个网格尺寸为1行)

2020-08-23 18:55:08,679 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-1) preSend on channel 'xxx.partitioned.jms.reply',message: 
2020-08-23 18:55:08,679 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-1) org.springframework.integration.config.Aggregatorfactorybean#112 received message: 
2020-08-23 18:55:08,679 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-1) Handling message with correlationKey 
2020-08-23 18:55:08,694 TRACE [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-1) Adding message to group 

我看到一个相关键的消息数等于网格大小.....

但是,有时看不到以下行....导致卡住的批处理作业。

2020-08-23 18:55:08,694 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-1) Completing group with correlationKey 

有什么想法为什么聚合器没有完成分区的工作?

************更新*************

我在日志行中仔细查看了“向组添加消息”。

对于这一个分区的作业,似乎有2个不同的消息组,并且消息交替发送给他们.....

所有行都具有相同的相关ID ...

如何创建2个MessageStore?

以下是日志行.....注意添加消息到消息组中的前两次[]。然后在它们之间进行乒乓操作以获取连续的消息

05,696 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-20) preSend on channel 'xxx.jms.reply',message: Genericmessage [payload=StepExecution: id=2515378,version=4,name=policy.step.partitioned.createaudit:5,status=COMPLETED,exitStatus=COMPLETED,readCount=1,filterCount=0,writeCount=0 readSkipCount=0,writeSkipCount=1,processSkipCount=0,commitCount=2,rollbackCount=2,exitDescription=,headers={sequenceNumber=31,sequenceSize=50,jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue],priority=4,jms_timestamp=1598223485697,replyChannel=org.springframework.integration.channel.QueueChannel@abd62a7,jms_redelivered=false,JMSXDeliveryCount=1,jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue],correlationId=164882:policy.step.partitioned.createaudit,id=4bd943fc-467c-eaf1-7364-dd86136c5d24,jms_messageId=ID:1adf657c-e594-11ea-a7e4-00505695e074,timestamp=1598223485696}]
2020-08-23 18:58:05,696 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-20) org.springframework.integration.config.Aggregatorfactorybean#43 received message: Genericmessage [payload=StepExecution: id=2515378,697 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-20) Handling message with correlationKey [164882:policy.step.partitioned.createaudit]: Genericmessage [payload=StepExecution: id=2515378,697 TRACE [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-20) Adding message to group [ SimpleMessageGroup{groupId=164882:policy.step.partitioned.createaudit,messages=[],timestamp=1598223485697,lastModified=0}]
2020-08-23 18:58:05,697 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-20) postSend (sent=true) on channel 'xxx.jms.reply',710 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-48) preSend on channel 'xxx.jms.reply',message: Genericmessage [payload=StepExecution: id=2515380,version=3,name=policy.step.partitioned.createaudit:32,readCount=0,writeSkipCount=0,commitCount=1,rollbackCount=0,headers={sequenceNumber=38,jms_timestamp=1598223485717,id=340ff8b7-c200-6a2f-5d9c-d01942f620d8,jms_messageId=ID:1ae42020-e594-11ea-8f8a-005056958f37,timestamp=1598223485710}]
2020-08-23 18:58:05,711 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-48) org.springframework.integration.config.Aggregatorfactorybean#105 received message: Genericmessage [payload=StepExecution: id=2515380,711 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-48) Handling message with correlationKey [164882:policy.step.partitioned.createaudit]: Genericmessage [payload=StepExecution: id=2515380,711 TRACE [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-48) Adding message to group [ SimpleMessageGroup{groupId=164882:policy.step.partitioned.createaudit,timestamp=1598223485711,712 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-48) postSend (sent=true) on channel 'xxx.jms.reply',964 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-21) preSend on channel 'xxx.jms.reply',message: Genericmessage [payload=StepExecution: id=2515373,name=policy.step.partitioned.createaudit:10,headers={sequenceNumber=5,jms_timestamp=1598223485949,id=f263c03e-5e6b-8370-02a6-23464d140fbf,jms_messageId=ID:1b06000d-e594-11ea-a5ea-005056955e17,timestamp=1598223485964}]
2020-08-23 18:58:05,964 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-21) org.springframework.integration.config.Aggregatorfactorybean#43 received message: Genericmessage [payload=StepExecution: id=2515373,964 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-21) Handling message with correlationKey [164882:policy.step.partitioned.createaudit]: Genericmessage [payload=StepExecution: id=2515373,965 TRACE [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-21) Adding message to group [ SimpleMessageGroup{groupId=164882:policy.step.partitioned.createaudit,messages=[Genericmessage [payload=StepExecution: id=2515378,timestamp=1598223485696}]],lastModified=1598223485697}]
2020-08-23 18:58:05,966 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-21) postSend (sent=true) on channel 'xxx.jms.reply',timestamp=1598223485964}]
2020-08-23 18:58:06,111 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-23) preSend on channel 'xxx.jms.reply',message: Genericmessage [payload=StepExecution: id=2515383,name=policy.step.partitioned.createaudit:46,headers={sequenceNumber=30,jms_timestamp=1598223486104,id=76a8c91c-b8c2-448f-4d22-37df6e391205,jms_messageId=ID:1b1d5905-e594-11ea-b002-00505695053a,timestamp=1598223486111}]
2020-08-23 18:58:06,111 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-23) org.springframework.integration.config.Aggregatorfactorybean#105 received message: Genericmessage [payload=StepExecution: id=2515383,111 DEBUG [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-23) Handling message with correlationKey [164882:policy.step.partitioned.createaudit]: Genericmessage [payload=StepExecution: id=2515383,111 TRACE [org.springframework.integration.aggregator.AggregatingMessageHandler] (springbatch.partitioned.jms.taskExecutor-23) Adding message to group [ SimpleMessageGroup{groupId=164882:policy.step.partitioned.createaudit,messages=[Genericmessage [payload=StepExecution: id=2515380,timestamp=1598223485710}]],lastModified=1598223485711}]
2020-08-23 18:58:06,112 DEBUG [org.springframework.integration.channel.DirectChannel] (springbatch.partitioned.jms.taskExecutor-23) postSend (sent=true) on channel 'xxx.jms.reply',timestamp=1598223486111}]

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)