Spring Integration:如何对测试者建议进行单元测试

问题描述

我正在尝试对轮询器上的建议进行单元测试,该建议将阻止mongo通道适配器的执行,直到满足特定条件(=处理此批中的所有消息)为止。

流程如下:

IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory,new Query().with(Sort.by(Sort.Direction.DESC,"modifiedDate")).limit(1))
                    .collectionName("Metadata")
                    .entityClass(Metadata.class)
                    .expectSingleResult(true),e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds))
                    .advice(this.advices.waitUntilCompletedAdvice())))
            .handle((p,h) -> {
                this.advices.waitUntilCompletedAdvice().setWait(true);
                return p;
            })
            .handle(doSomething())
            .channel(Channels.DOCUMENT_HEADER.name())
            .get();

还有以下建议豆:

@Bean
public WaitUntilCompletedAdvice waitUntilCompletedAdvice() {
    DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1));
    return new WaitUntilCompletedAdvice(trigger);
}

建议本身:

public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice {

    AtomicBoolean wait = new AtomicBoolean(false);

    public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        if (getWait())
            return false;
        return true;
    }

    public boolean getWait() {
        return wait.get();
    }

    public void setWait(boolean newWait) {
        if (getWait() == newWait)
            return;

        while (true) {
            if (wait.compareAndSet(!newWait,newWait)) {
                return;
            }
        }
    }
}

我正在使用以下测试来测试流程:

    @Test
    public void testClaimPoollingAdapterFlow() throws Exception {
        // given
        ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
        CountDownLatch receiveLatch = new CountDownLatch(1);
        MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
        this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader",mockMessageHandler);
        LocalDateTime modifiedDate = LocalDateTime.Now();
        ProcessingMetadata data = Metadata.builder()
                .modifiedDate(modifiedDate)
                .build();
        assert !this.advices.waitUntilCompletedAdvice().getWait();

        // when
        itf.getInputChannel().send(new Genericmessage<>(Mono.just(data)));

        // then
        assertthat(receiveLatch.await(10,TimeUnit.SECONDS)).isTrue();
        verify(mockMessageHandler).handleMessage(any());
        assertthat(captor.getValue().getPayload()).isEqualTo(modifiedDate);
        assert this.advices.waitUntilCompletedAdvice().getWait();
    }

这可以正常工作,但是当我向输入通道发送另一条消息时,它仍然在不考虑建议的情况下处理该消息。

这是预期的行为吗?如果是这样,我如何使用单元测试来验证轮询器确实在等待该建议?

解决方法

itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

这会绕过轮询器并直接发送消息。

您可以通过从测试中调用beforeReceive()来对已配置的建议进行单元测试

或者您可以使用相同的建议创建虚拟测试流程

IntegationFlows.from(() -> "foo",e -> e.poller(...))
       ...

并确认仅发送了一封邮件。

,

示例实现:

@Test
public void testWaitingActivate() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(true);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNull();
}

@Test
public void testWaitingInactive() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(false);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNotNull();
}

@Configuration
@EnableIntegration
public static class Config {

    @Autowired
    private Advices advices;

    @Bean
    public PollableChannel testChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fakeFlow() {
        this.advices.waitUntilCompletedAdvice().setWait(true);
        return IntegrationFlows.from(() -> "foo",e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
                .advice(this.advices.waitUntilCompletedAdvice()))).channel("testChannel").get();
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...