项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void stop_withContainerHavingMultipleQueuesRunning_shouldStopQueuesInParallel() throws Exception {
// Arrange
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",TestMessageListener.class);
applicationContext.registerSingleton("anotherTestMessageListener",AnotherTestMessageListener.class);
CountDownLatch testQueueCountdownLatch = new CountDownLatch(1);
CountDownLatch anotherTestQueueCountdownLatch = new CountDownLatch(1);
CountDownLatch spinningThreadsStarted = new CountDownLatch(2);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
public void stopQueue(String logicalQueueName) {
if ("testQueue".equals(logicalQueueName)) {
testQueueCountdownLatch.countDown();
} else if ("anotherTestQueue".equals(logicalQueueName)) {
anotherTestQueueCountdownLatch.countDown();
}
super.stopQueue(logicalQueueName);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
container.setAmazonSqs(sqs);
container.setBackOffTime(100);
container.setQueueStopTimeout(5000);
QueueMessageHandler messageHandler = new QueueMessageHandler();
messageHandler.setApplicationContext(applicationContext);
container.setMessageHandler(messageHandler);
mockGetQueueUrl(sqs,"testQueue","http://testQueue.amazonaws.com");
mockGetQueueUrl(sqs,"anotherTestQueue","http://anotherTestQueue.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://testQueue.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://anotherTestQueue.amazonaws.com");
when(sqs.receiveMessage(new ReceiveMessageRequest("http://testQueue.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenAnswer((Answer<ReceiveMessageResult>) invocation -> {
spinningThreadsStarted.countDown();
testQueueCountdownLatch.await(1,TimeUnit.SECONDS);
throw new OverLimitException("Boom");
});
when(sqs.receiveMessage(new ReceiveMessageRequest("http://anotherTestQueue.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenAnswer((Answer<ReceiveMessageResult>) invocation -> {
spinningThreadsStarted.countDown();
anotherTestQueueCountdownLatch.await(1,TimeUnit.SECONDS);
throw new OverLimitException("Boom");
});
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
container.start();
spinningThreadsStarted.await(1,TimeUnit.SECONDS);
StopWatch stopWatch = new StopWatch();
// Act
stopWatch.start();
container.stop();
stopWatch.stop();
// Assert
assertTrue("Stop time must be shorter than stopping one queue after the other",stopWatch.getTotalTimeMillis() < 200);
}