问题描述
我正在尝试配置一个轮询器,该轮询器查询Bean以每X秒将一个List放入通道。该通道具有下游流,该流将列表拆分并输出到pub / sub通道(进一步的异步流) 我该如何确保在任何给定时间仅执行一次流程执行且轮询器必须等待/阻塞,直到流程完成,直到准备进行下一次轮询为止(固定速率/延迟)?
<int:channel id="configListChannel" />
<task:executor id="pollExecutor" pool-size="1" queue-capacity="1" rejection-policy="ABORT" />
<int:inbound-channel-adapter expression="configMap().values()" auto-startup="true" channel="configListChannel">
<int:poller fixed-delay="30" time-unit="SECONDS" task-executor="pollExecutor"/>
</int:inbound-channel-adapter>
<task:executor id="configExecutor" pool-size="5"/>
<int:channel id="configChannel" >
<int:dispatcher task-executor="configExecutor"/>
</int:channel>
<int:chain input-channel="configListChannel" output-channel="configChannel" id="configChain">
<int:splitter/>
<int:filter expression="payload.enablePolling"/>
</int:chain>
... configChannel
上的进一步异步流发送出站消息
是否有异步轮询的阻塞轮询器的任何示例,并使用屏障来信号传递完整的轮询器线程?一次也只能进行一次民意调查。
解决方法
我建议您实施ReceiveMessageAdvice
(从5.3或AbstractMessageSourceAdvice
开始)。 afterReceive()
应该按原样返回消息,但是beforeReceive()
应该检查某些状态,如果您目前无法轮询,则返回false
。
您可能不需要为此任务设置障碍,而是使用简单的AtomicBoolean
bean来检查beforeReceive()
到false
的状态并将其返回到true
当您在下游完成任务时。