问题描述
我有一条消息SomeMessage
看起来像这样:
class SomeMessage{
id,title
}
当前,我根据ID汇总邮件。消息将在10秒后释放。
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
我需要的是一种基于title
属性来限制消息的方法。如果为title=="A"
,则仍应等待10秒钟进行汇总;如果为title=="B"
,则应等待60秒以进行聚合,并且不应立即将其发送到amqpOutboundEndpoint
,但应进行一些限制(例如,每条包含title=="B"
的消息之间要间隔30秒)。 / p>
什么是最好的方法?是否在AmqpOutboundEndpoint
上进行限制?
更新
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage,Boolean>) ec ->
ec.getTitle().equals("B"),m -> m.subFlowMapping(true,sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60,TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false,IntegrationFlowDeFinition::bridge))
.handle(amqpOutboundEndpoint)
解决方法
使用groupTimeoutExpression()
而不是固定的超时时间...
payload.title == 'A' ? 10000 : 30000