Spring Integration Aggregator Throttler

问题描述

我有一条消息SomeMessage看起来像这样:

class SomeMessage{
 id,title
}

当前,我根据ID汇总邮件。消息将在10秒后释放。

.aggregate(
            a ->
                a 
                .outputProcessor(messageProcessor())
                .messageStore(messageGroupStore())
                .correlationStrategy(correlationStrategy())
                .expireGroupsUponCompletion(true)
                .sendPartialResultOnExpiry(true)
                .groupTimeout(TimeUnit.SECONDS.toMillis(10)))
        .handle(amqpOutboundEndpoint)

我需要的是一种基于title属性来限制消息的方法。如果为title=="A",则仍应等待10秒钟进行汇总;如果为title=="B",则应等待60秒以进行聚合,并且不应立即将其发送到amqpOutboundEndpoint,但应进行一些限制(例如,每条包含title=="B"的消息之间要间隔30秒)。 / p>

什么是最好的方法?是否在AmqpOutboundEndpoint上进行限制?

更新

.groupTimeout(messageGroup -> {
                      if(anyMessageInGroupHasTitleB(messageGroup)){
                        return TimeUnit.SECONDS.toMillis(60);
                      }
                      else {
                        return TimeUnit.SECONDS.toMillis(10);
                      }
                    }))
        .route(
            (Function<SomeMessage,Boolean>) ec ->
            ec.getTitle().equals("B"),m -> m.subFlowMapping(true,sf ->
            sf.channel(channels -> channels.queue(1))
                .bridge(e -> e.poller(Pollers
                    .fixedDelay(60,TimeUnit.SECONDS)
                    .maxMessagesPerPoll(1)
                ))
        ).subFlowMapping(false,IntegrationFlowDeFinition::bridge))
    .handle(amqpOutboundEndpoint)

解决方法

使用groupTimeoutExpression()而不是固定的超时时间...

payload.title == 'A' ? 10000 : 30000

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...