项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void oneQueue() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return one queue
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(new ListQueuesResult().withQueueUrls("test-foo"));
// return 3 messages from the queue
when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class)))
.thenReturn(new ReceiveMessageResult().withMessages(newMessage("foo"),newMessage("foo"),newMessage("foo")));
AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS,"test",1,60 * 1000);
List<Message> messages = provider.next();
assertMessages(messages,3,"foo");
verify(amazonSQS).listQueues(any(ListQueuesRequest.class));
verify(amazonSQS).receiveMessage(any(ReceiveMessageRequest.class));
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testCreateGetUrlListQueue_shouldCreateReturnUrlAndListQueue() {
// create first queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
assertNotNull("verify that,on creation,queue url was returned",createdQueue.getQueueUrl());
// create other queues
CreateQueueResult secondTeaQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-mate-queue"));
CreateQueueResult anotherQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("coffee-queue"));
// get queue url
GetQueueUrlResult queueUrlResult = sqs.getQueueUrl(new GetQueueUrlRequest()
.withQueueName("tea-earl-grey-queue").withQueueOwnerAWSAccountId("some owner"));
assertNotNull("verify that,on fetch,queueUrlResult.getQueueUrl());
// get all queues
ListQueuesResult allQueues = sqs.listQueues();
assertEquals("verify all queues are returned",allQueues.getQueueUrls().size());
assertTrue("verify that all queues contain first queue",allQueues.getQueueUrls().contains(createdQueue.getQueueUrl()));
assertTrue("verify that all queues contain second tea queue",allQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl()));
assertTrue("verify that all queues contain coffee queue",allQueues.getQueueUrls().contains(anotherQueue.getQueueUrl()));
// get only queues that start with 'tea'
ListQueuesResult teaQueues = sqs.listQueues(new ListQueuesRequest("tea"));
assertEquals("verify only tea queues are returned",2,teaQueues.getQueueUrls().size());
assertTrue("verify that tea queues contain first queue",teaQueues.getQueueUrls().contains(createdQueue.getQueueUrl()));
assertTrue("verify that tea queues contain second tea queue",teaQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl()));
assertNotNull("verify that delete queue returned ok",sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(queueUrlResult.getQueueUrl())));
assertFalse("verify that the queue was removed",sqs.listQueues().getQueueUrls().stream()
.anyMatch( queueUrl -> StringUtils.equals(queueUrl,queueUrlResult.getQueueUrl()) ));
// cleanup
getQueues().remove("tea-earl-grey-queue");
getQueues().remove("tea-mate-queue");
getQueues().remove("coffee-queue");
}
项目:aws-sdk-java-resources
文件:SQSImpl.java
@Override
public QueueCollection getQueues(ListQueuesRequest request) {
ResourceCollectionImpl result = service.getCollection("Queues",request);
if (result == null) return null;
return new QueueCollectionImpl(result);
}
项目:awslocal
文件:DirectorySQS.java
@Override
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonClientException {
List<String> queueUrls = Lists.newArrayListWithCapacity(_queuesByUrl.size());
try (DirectoryStream<Path> queuePaths = Files.newDirectoryStream(_rootDirectory.toPath())) {
for (Path queuePath : queuePaths) {
if (listQueuesRequest.getQueueNamePrefix() == null || queuePath.getFileName().toString().startsWith(listQueuesRequest.getQueueNamePrefix())) {
queueUrls.add(queuePath.toUri().toString());
}
}
} catch (IOException e) {
throw new AmazonServiceException("could not get queue list",e);
}
return new ListQueuesResult().withQueueUrls(queueUrls);
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProvider.java
private void updateAvailableQueues() {
try {
ListQueuesResult result = sqs.listQueues(new ListQueuesRequest(queuePrefix));
List<String> availableQueues = Lists.newArrayList(Iterables.filter(result.getQueueUrls(),include));
Collections.sort(availableQueues,queueComparator);
messageProviders.clear();
for (String queueUrl : availableQueues) {
messageProviders.add(new AmazonSQSMessageProvider(sqs,queueUrl,waitTimeSeconds));
}
} catch (AmazonClientException e) {
LOG.error("An error occurred while listing SQS queues: {}",e);
}
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void noQueues() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return an empty list
when(amazonSQS.listQueues(any(ListQueuesRequest.class))).thenReturn(new ListQueuesResult());
AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS,10 * 1000);
List<Message> empty = provider.next();
Assert.assertEquals(0,empty.size());
verify(amazonSQS).listQueues(any(ListQueuesRequest.class));
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testListQueues_withEmptyRequestParams_shouldWork() {
assertNotNull(sqs.listQueues(new ListQueuesRequest()));
}
项目:conductor
文件:SQSObservableQueue.java
private List<String> listQueues(String queueName) {
ListQueuesRequest listQueuesRequest = new ListQueuesRequest().withQueueNamePrefix(queueName);
ListQueuesResult resultList = client.listQueues(listQueuesRequest);
List<String> queueUrls = resultList.getQueueUrls().stream().filter(queueUrl -> queueUrl.contains(queueName)).collect(Collectors.toList());
return queueUrls;
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<ListQueuesResult> listQueuesAsync(ListQueuesRequest request) {
return Observable.from(sqsClient.listQueuesAsync(request));
}
项目:aws-sdk-java-resources
文件:SQSImpl.java
@Override
public QueueCollection getQueues(String queueNamePrefix) {
return getQueues( new ListQueuesRequest(queueNamePrefix));
}
项目:aws-sdk-java-resources
文件:SQSImpl.java
@Override
public QueueCollection getQueues() {
return getQueues((ListQueuesRequest)null);
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void drainThreeQueues() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return three queues
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(new ListQueuesResult().withQueueUrls("test-A","test-C","test-B","test-D"));
// each queue has N messages to return and then 0 messages
when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class)))
.thenAnswer(new Answer<Object>() {
// fake that a queue has no messages left with flags
boolean aDone = false;
boolean bDone = false;
boolean cDone = false;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0];
if (receiveMessageRequest.getQueueUrl().equals("test-A")) {
// return 3 messages for A,then no more messages
if (!aDone) {
aDone = true;
return new ReceiveMessageResult().withMessages(newMessage("A"),newMessage("A"),newMessage("A"));
}
} else if (receiveMessageRequest.getQueueUrl().equals("test-B")) {
// return 4 messages for B,then no more messages
if (!bDone) {
bDone = true;
return new ReceiveMessageResult().withMessages(newMessage("B"),newMessage("B"),newMessage("B"));
}
} else if (receiveMessageRequest.getQueueUrl().equals("test-C")) {
// return 1 message for C,then no more messages
if (!cDone) {
cDone = true;
return new ReceiveMessageResult().withMessages(newMessage("C"));
}
}
// fall through to return 0 messages
return new ReceiveMessageResult().withMessages();
}
});
// verify the order of next objects by counting the number of messages we return
AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS,TimeUnit.MINUTES.toMillis(30));
// for queues with messages,1 receive returns messages,1 receive returns none and removes the provider
assertMessages(provider.next(),"A");
assertMessages(provider.next(),4,"B");
assertMessages(provider.next(),"C");
// for queues with no messages,"D-but-nothing-really");
// when we have no queues left,there are no messages
assertMessages(provider.next(),"nothing");
assertMessages(provider.next(),"nothing");
// we only checked for queues once here
verify(amazonSQS,times(1)).listQueues(any(ListQueuesRequest.class));
// the total number of receives sent to AWS
verify(amazonSQS,times(7)).receiveMessage(any(ReceiveMessageRequest.class));
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void updatingQueues() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return queues
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(
new ListQueuesResult().withQueueUrls("test-A","test-D"),new ListQueuesResult().withQueueUrls("test-C","test-D"));
// always return messages from these queues
when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0];
return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()),newMessage(receiveMessageRequest.getQueueUrl()));
}
});
// check for new queues every 30 minutes,should be way longer than this test runs so we can simulate elapsed time
final long intervalNs = TimeUnit.MINUTES.toNanos(30);
// simulate the passage of time
final Iterator<Long> time = Lists.newArrayList(
0L,// start at 0,always an initial update
1 * intervalNs / 2,// update
1 * intervalNs,// no update
1 * intervalNs + (intervalNs / 2),// update
2 * intervalNs,// no update
3 * intervalNs + 1,// should update
3 * intervalNs + 2 // no update
).iterator();
AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS,TimeUnit.NANOSECONDS.toMillis(intervalNs)) {
@Override
public long currentNanoTime() {
return time.next();
}
};
// no update on constructor
verify(amazonSQS,times(0)).listQueues(any(ListQueuesRequest.class));
// update on first next
provider.next();
verify(amazonSQS,times(1)).listQueues(any(ListQueuesRequest.class));
// no update
provider.next();
verify(amazonSQS,times(1)).listQueues(any(ListQueuesRequest.class));
// update
provider.next();
verify(amazonSQS,times(2)).listQueues(any(ListQueuesRequest.class));
// no update
provider.next();
verify(amazonSQS,times(2)).listQueues(any(ListQueuesRequest.class));
// update
provider.next();
verify(amazonSQS,times(3)).listQueues(any(ListQueuesRequest.class));
// no update
provider.next();
verify(amazonSQS,times(3)).listQueues(any(ListQueuesRequest.class));
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void filteredQueues() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return queues
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(
new ListQueuesResult().withQueueUrls("test-A",newMessage(receiveMessageRequest.getQueueUrl()));
}
});
AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS,0)
.withInclude(new Predicate<String>() {
@Override
public boolean apply(String input) {
return input.equals("test-B");
}
});
// no update on constructor
verify(amazonSQS,times(0)).listQueues(any(ListQueuesRequest.class));
// update on every next,only test-B messages
assertMessages(provider.next(),"test-B");
verify(amazonSQS,times(1)).listQueues(any(ListQueuesRequest.class));
// update on every next,times(2)).listQueues(any(ListQueuesRequest.class));
// update on every next,should be no messages when there is no B
assertMessages(provider.next(),"nothing");
verify(amazonSQS,times(3)).listQueues(any(ListQueuesRequest.class));
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void sortedQueuesReverse() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return queues
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(
new ListQueuesResult().withQueueUrls("test-A",new ListQueuesResult().withQueueUrls("test-A","test-B"),"test-e","test-D","test-F","test-A"));
// always return messages from these queues
when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0];
return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()),0)
.withQueueComparator(new Comparator<String>() {
@Override
public int compare(String s1,String s2) {
// reverse sorting
return s2.compareTo(s1);
}
});
// no update on constructor
verify(amazonSQS,only test-D messages
assertMessages(provider.next(),"test-D");
verify(amazonSQS,only test-C messages
assertMessages(provider.next(),"test-C");
verify(amazonSQS,only test-e messages
assertMessages(provider.next(),"test-e");
verify(amazonSQS,times(3)).listQueues(any(ListQueuesRequest.class));
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProviderTest.java
@Test
public void sortedQueuesReverseIgnoreCase() {
AmazonSQS amazonSQS = mock(AmazonSQS.class);
// return queues
when(amazonSQS.listQueues(any(ListQueuesRequest.class)))
.thenReturn(
new ListQueuesResult().withQueueUrls("test-A",String s2) {
// reverse sorting,ignore case
return s2.compareToIgnoreCase(s1);
}
});
// no update on constructor
verify(amazonSQS,only test-F messages
assertMessages(provider.next(),"test-F");
verify(amazonSQS,times(3)).listQueues(any(ListQueuesRequest.class));
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Returns a list of your queues. The maximum number of queues that can be
* returned is 1000. If you specify a value for the optional
* <code>QueueNamePrefix</code> parameter,only queues with a name beginning
* with the specified value are returned.
* </p>
*
* @param listQueuesRequest
* Container for the necessary parameters to execute the
* ListQueues service method on AmazonSQS.
*
* @return The response from the ListQueues service method,as returned by
* AmazonSQS.
*
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.listQueues(listQueuesRequest);
}
项目:aws-sdk-java-resources
文件:SQS.java
/**
* Retrieves the Queues collection referenced by this resource.
*/
QueueCollection getQueues(ListQueuesRequest request);