问题描述
我正在尝试对轮询器上的建议进行单元测试,该建议将阻止mongo通道适配器的执行,直到满足特定条件(=处理此批中的所有消息)为止。
流程如下:
IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory,new Query().with(Sort.by(Sort.Direction.DESC,"modifiedDate")).limit(1))
.collectionName("Metadata")
.entityClass(Metadata.class)
.expectSingleResult(true),e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds))
.advice(this.advices.waitUntilCompletedAdvice())))
.handle((p,h) -> {
this.advices.waitUntilCompletedAdvice().setWait(true);
return p;
})
.handle(doSomething())
.channel(Channels.DOCUMENT_HEADER.name())
.get();
还有以下建议豆:
@Bean
public WaitUntilCompletedAdvice waitUntilCompletedAdvice() {
DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1));
return new WaitUntilCompletedAdvice(trigger);
}
建议本身:
public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice {
AtomicBoolean wait = new AtomicBoolean(false);
public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
if (getWait())
return false;
return true;
}
public boolean getWait() {
return wait.get();
}
public void setWait(boolean newWait) {
if (getWait() == newWait)
return;
while (true) {
if (wait.compareAndSet(!newWait,newWait)) {
return;
}
}
}
}
我正在使用以下测试来测试流程:
@Test
public void testClaimPoollingAdapterFlow() throws Exception {
// given
ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
CountDownLatch receiveLatch = new CountDownLatch(1);
MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader",mockMessageHandler);
LocalDateTime modifiedDate = LocalDateTime.Now();
ProcessingMetadata data = Metadata.builder()
.modifiedDate(modifiedDate)
.build();
assert !this.advices.waitUntilCompletedAdvice().getWait();
// when
itf.getInputChannel().send(new Genericmessage<>(Mono.just(data)));
// then
assertthat(receiveLatch.await(10,TimeUnit.SECONDS)).isTrue();
verify(mockMessageHandler).handleMessage(any());
assertthat(captor.getValue().getPayload()).isEqualTo(modifiedDate);
assert this.advices.waitUntilCompletedAdvice().getWait();
}
这可以正常工作,但是当我向输入通道发送另一条消息时,它仍然在不考虑建议的情况下处理该消息。
这是预期的行为吗?如果是这样,我如何使用单元测试来验证轮询器确实在等待该建议?
解决方法
itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));
这会绕过轮询器并直接发送消息。
您可以通过从测试中调用beforeReceive()
来对已配置的建议进行单元测试
或者您可以使用相同的建议创建虚拟测试流程
IntegationFlows.from(() -> "foo",e -> e.poller(...))
...
并确认仅发送了一封邮件。
,示例实现:
@Test
public void testWaitingActivate() {
// given
this.advices.waitUntilCompletedAdvice().setWait(true);
// when
Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);
// then
assertThat(receive).isNull();
}
@Test
public void testWaitingInactive() {
// given
this.advices.waitUntilCompletedAdvice().setWait(false);
// when
Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);
// then
assertThat(receive).isNotNull();
}
@Configuration
@EnableIntegration
public static class Config {
@Autowired
private Advices advices;
@Bean
public PollableChannel testChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow fakeFlow() {
this.advices.waitUntilCompletedAdvice().setWait(true);
return IntegrationFlows.from(() -> "foo",e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
.advice(this.advices.waitUntilCompletedAdvice()))).channel("testChannel").get();
}
}