项目:generic-queue
文件:AwsSQS.java
@Override
public long size() throws Exception {
// get all the attributes of the queue
List<String> attributeNames = new ArrayList<String>();
attributeNames.add("All");
// list the attributes of the queue we are interested in
GetQueueAttributesRequest request = new GetQueueAttributesRequest(queueUrl);
request.setAttributeNames(attributeNames);
Map<String,String> attributes = client.getQueueAttributes(request)
.getAttributes();
int messages = Integer.parseInt(attributes
.get("ApproximateNumberOfMessages"));
//int messagesNotVisible = Integer.parseInt(attributes
// .get("ApproximateNumberOfMessagesNotVisible"));
return (long) messages;
}
项目:sqs-utils
文件:QueueFactoryTest.java
@Test
public void testGet() {
// given
QueueName qn = new QueueName("q1");
GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class);
when(queueUrlResult.getQueueUrl()).thenReturn("url1");
GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class);
HashMap<String,String> attributes = new HashMap<>();
attributes.put("1","3");
attributes.put("hi","ho");
when(attributesResult.getAttributes()).thenReturn(attributes);
when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult);
when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(
attributesResult);
// when
Queue queue = uut.get(qn);
// then
assertEquals("url1",queue.getUrl());
assertEquals("q1",queue.getName().getId());
assertEquals(attributes,queue.getQueueAttributes());
}
项目:paradox-nakadi-consumer
文件:SQSFailedEventSourceTest.java
@Test
public void testShouldReturnTotalNumberOfFailedEvents() {
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(200);
final String totalNumberOfFailedEvents = RandomStringUtils.randomNumeric(4);
final Map<String,String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.ApproximateNumberOfMessages.name(),totalNumberOfFailedEvents);
final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult();
getQueueAttributesResult.setSdkHttpMetadata(responseMetadata);
getQueueAttributesResult.setAttributes(attributes);
when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult);
assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(totalNumberOfFailedEvents));
}
项目:s3_video
文件:AWSAdapter.java
public String subscribeQueueToTopic(String snsTopicArn,String sqsQueueUrl){
Map<String,String> queueAttributes = sqsClient.getQueueAttributes(new GetQueueAttributesRequest(sqsQueueUrl)
.withAttributeNames(QueueAttributeName.QueueArn.toString())).getAttributes();
String sqsQueueArn = queueAttributes.get(QueueAttributeName.QueueArn.toString());
Policy policy = new Policy().withStatements(
new Statement(Effect.Allow)
.withId("topic-subscription-" + snsTopicArn)
.withPrincipals(Principal.AllUsers)
.withActions(SQSActions.SendMessage)
.withResources(new Resource(sqsQueueArn))
.withConditions(ConditionFactory.newSourceArnCondition(snsTopicArn)));
logger.debug("Policy: " + policy.toJson());
queueAttributes = new HashMap<String,String>();
queueAttributes.put(QueueAttributeName.Policy.toString(),policy.toJson());
sqsClient.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueUrl,queueAttributes));
SubscribeResult subscribeResult =
snsClient.subscribe(new SubscribeRequest()
.withEndpoint(sqsQueueArn)
.withProtocol("sqs")
.withTopicArn(snsTopicArn));
return subscribeResult.getSubscriptionArn();
}
项目:support
文件:SQS.java
public static int getCount(String name) {
try {
String queueUrl = getConnection().createQueue(
new CreateQueueRequest(name)).getQueueUrl();
List<String> attributeNames = new ArrayList<String>();
attributeNames.add("All");
// list the attributes of the queue we are interested in
GetQueueAttributesRequest request = new GetQueueAttributesRequest(queueUrl);
request.setAttributeNames(attributeNames);
Map<String,String> attributes = sqs.getQueueAttributes(request).getAttributes();
int messages = Integer.parseInt(attributes.get("ApproximateNumberOfMessages"));
//System.out.println("Messages in the queue: " + messages);
return messages;
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}
项目:spring-integration-aws
文件:SqsExecutor.java
private void addPermissions() {
if (permissions != null && permissions.isEmpty() == false) {
GetQueueAttributesResult result = sqsClient
.getQueueAttributes(new GetQueueAttributesRequest(queueUrl,Arrays.asList("Policy")));
AwsUtil.addPermissions(result.getAttributes(),permissions,new AwsUtil.AddPermissionHandler() {
@Override
public void execute(Permission p) {
sqsClient.addPermission(new AddPermissionRequest()
.withQueueUrl(queueUrl)
.withLabel(p.getLabel())
.withAWSAccountIds(p.getAwsAccountIds())
.withActions(p.getActions()));
}
});
}
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testNonInjectableMocks_shouldReturnNormal() {
assertNotNull(sqs.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest()));
assertNotNull(sqs.addPermission(new AddPermissionRequest().withActions("one").withAWSAccountIds("two","three").withLabel("four").withQueueUrl("five")));
assertNotNull(sqs.listDeadLetterSourceQueues(new ListDeadLetterSourceQueuesRequest().withQueueUrl("ten")));
assertNotNull(sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(ImmutableList.of("eleven")).withQueueUrl("twelve")));
assertNotNull(sqs.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(ImmutableMap.of("thirteen","fourteen")).withQueueUrl("fifteen")));
}
项目:sqs-utils
文件:QueueFactory.java
public Queue get(@NonNull QueueName queueName) {
GetQueueUrlRequest urlRequest = new GetQueueUrlRequest().withQueueName(queueName.getId());
String queueUrl = amazonSQS.getQueueUrl(urlRequest).getQueueUrl();
GetQueueAttributesRequest attributesRequest = new GetQueueAttributesRequest(queueUrl,Collections.singletonList("All"));
Map<String,String> attributes = amazonSQS.getQueueAttributes(attributesRequest)
.getAttributes();
return new Queue(queueName,queueUrl,attributes);
}
项目:paradox-nakadi-consumer
文件:SQSFailedEventSource.java
@Override
public long getSize() {
final GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(queueUrl,Collections.singletonList(QueueAttributeName.ApproximateNumberOfMessages.name()));
final GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes(getQueueAttributesRequest);
if (queueAttributes.getAttributes() != null) {
return Long.valueOf(queueAttributes.getAttributes().getOrDefault(
QueueAttributeName.ApproximateNumberOfMessages.name(),"0"));
} else {
return 0L;
}
}
项目:paradox-nakadi-consumer
文件:SQSFailedEventSourceTest.java
@Test
public void testShouldReturnDefaultTotalNumberOfFailedEvents() {
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(200);
final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult();
getQueueAttributesResult.setSdkHttpMetadata(responseMetadata);
getQueueAttributesResult.setAttributes(new HashMap<>());
when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult);
assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L));
}
项目:paradox-nakadi-consumer
文件:SQSFailedEventSourceTest.java
@Test
public void testShouldReturnDefaultTotalNumberOfFailedEventsWhenThereIsNoQueueAttributes() {
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(200);
final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult();
getQueueAttributesResult.setSdkHttpMetadata(responseMetadata);
when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult);
assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L));
}
项目:pipeline
文件:AmazonSQSIterator.java
/**
* Return the approximate number of visible messages in an SQS queue.
*
* @param client
* SQS client
* @param queueUrl
* Queue URL
* @return approximate number of visible messages
*/
private int getNumMessages() {
try {
final GetQueueAttributesResult result = sqs
.getQueueAttributes(new GetQueueAttributesRequest(queueUrl)
.withAttributeNames(NUM_MESSAGES_KEY));
final int count = Integer.parseInt(
result.getAttributes().getOrDefault(NUM_MESSAGES_KEY,"0"));
LOGGER.info("Approximately {} messages in queue",count);
return count;
} catch (Exception e) {
LOGGER.error("Unable to get approximate number of messages",e);
}
return 0;
}
项目:amazon-cloudengine
文件:RemoteWorker.java
public static int getQueueSize(AmazonSQS sqs,String queueUrl){
HashMap<String,String> attributes;
Collection<String> attributeNames = new ArrayList<String>();
attributeNames.add("ApproximateNumberOfMessages");
GetQueueAttributesRequest getAttributesRequest = new GetQueueAttributesRequest(queueUrl)
.withAttributeNames(attributeNames);
attributes = (HashMap<String,String>) sqs.getQueueAttributes(getAttributesRequest).getAttributes();
return Integer.valueOf(attributes.get("ApproximateNumberOfMessages"));
}
项目:amazon-cloudengine
文件:SQS.java
public int getApproximateQueueSize() {
Collection<String> attributeNames = new ArrayList<String>();
attributeNames.add("ApproximateNumberOfMessages");
GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueURL).withAttributeNames(attributeNames);
Map<String,String> attributes = sqs.getQueueAttributes(queueAttributesRequest).getAttributes();
return Integer.valueOf(attributes.get("ApproximateNumberOfMessages"));
}
项目:amazon-cloudengine
文件:SQS.java
public int getApproximateNotVisibleMessageNum() {
Collection<String> attributeNames = new ArrayList<String>();
attributeNames.add("ApproximateNumberOfMessagesNotVisible");
GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueURL).withAttributeNames(attributeNames);
Map<String,String> attributes = sqs.getQueueAttributes(queueAttributesRequest).getAttributes();
return Integer.valueOf(attributes.get("ApproximateNumberOfMessagesNotVisible"));
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void testWithDefaultTaskExecutorAndOneHandler() throws Exception {
int testedMaxNumberOfMessages = 10;
Map<QueueMessageHandler.MappingInformation,HandlerMethod> messageHandlerMethods = Collections.singletonMap(
new QueueMessageHandler.MappingInformation(Collections.singleton("testQueue"),SqsMessageDeletionPolicy.ALWAYS),null);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class);
AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl"));
when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods);
container.setMaxNumberOfMessages(testedMaxNumberOfMessages);
container.setAmazonSqs(mockedSqs);
container.setMessageHandler(mockedHandler);
container.afterPropertiesSet();
int expectedPoolMaxSize = messageHandlerMethods.size() * (testedMaxNumberOfMessages + 1);
ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container.getTaskExecutor();
assertNotNull(taskExecutor);
assertEquals(expectedPoolMaxSize,taskExecutor.getMaxPoolSize());
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Bean
public AmazonSQSAsync amazonSQS() {
AmazonSQSAsync mockAmazonSQS = mock(AmazonSQSAsync.class,withSettings().stubOnly());
mockGetQueueUrl(mockAmazonSQS,"testQueue","http://testQueue.amazonaws.com");
when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult());
when(mockAmazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
return mockAmazonSQS;
}
项目:spring-cloud-aws
文件:MessageListenerContainerTest.java
@Test
public void receiveMessageRequests_withOneElement_created() throws Exception {
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class,withSettings().stubOnly());
QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setAmazonSqs(mock);
container.setMessageHandler(mock(QueueMessageHandler.class));
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("messageListener",MessageListener.class);
container.setMaxNumberOfMessages(11);
container.setVisibilityTimeout(22);
container.setWaitTimeOut(33);
messageHandler.setApplicationContext(applicationContext);
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))).
thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
container.start();
Map<String,QueueAttributes> registeredQueues = container.getRegisteredQueues();
assertEquals("http://testQueue.amazonaws.com",registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl());
assertEquals(11L,registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue());
assertEquals(22L,registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue());
assertEquals(33L,registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue());
}
项目:spring-cloud-aws
文件:MessageListenerContainerTest.java
@Test
public void receiveMessageRequests_withMultipleElements_created() throws Exception {
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class,withSettings().stubOnly());
container.setAmazonSqs(mock);
StaticApplicationContext applicationContext = new StaticApplicationContext();
QueueMessageHandler messageHandler = new QueueMessageHandler();
messageHandler.setApplicationContext(applicationContext);
container.setMessageHandler(messageHandler);
applicationContext.registerSingleton("messageListener",MessageListener.class);
applicationContext.registerSingleton("anotherMessageListener",AnotherMessageListener.class);
container.setMaxNumberOfMessages(11);
container.setVisibilityTimeout(22);
container.setWaitTimeOut(33);
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))).
thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com"));
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))).
thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
container.start();
Map<String,registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue());
assertEquals("http://anotherTestQueue.amazonaws.com",registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl());
assertEquals(11L,registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue());
assertEquals(22L,registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue());
assertEquals(33L,registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue());
}
项目:spring-cloud-aws
文件:MessageListenerContainerTest.java
@Test
public void receiveMessageRequests_withDestinationResolverThrowingException_shouldLogWarningAndNotCreateRequest() throws Exception {
// Arrange
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
Logger loggerMock = container.getLogger();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class,AnotherMessageListener.class);
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))).
thenThrow(new DestinationResolutionException("Queue not found"));
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))).
thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
// Act
container.start();
// Assert
ArgumentCaptor<String> logMsgArgCaptor = ArgumentCaptor.forClass(String.class);
verify(loggerMock).warn(logMsgArgCaptor.capture());
Map<String,QueueAttributes> registeredQueues = container.getRegisteredQueues();
assertFalse(registeredQueues.containsKey("testQueue"));
assertEquals("Ignoring queue with name 'testQueue' as it does not exist.",logMsgArgCaptor.getValue());
assertEquals("http://anotherTestQueue.amazonaws.com",registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl());
}
项目:izettle-toolbox
文件:AmazonSNSSubscriptionSetup.java
private static String getSQSQueueARN(AmazonSQS amazonSQS,String queueURL) {
// This statement will throw if the queue does not exist.
GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest()
.withQueueUrl(queueURL)
.withAttributeNames(QueueAttributeName.QueueArn)
);
return queueAttributes
.getAttributes()
.get(QueueAttributeName.QueueArn.name());
}
项目:awslocal
文件:DirectorySQS.java
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(getQueueAttributesRequest.getQueueUrl(),false);
Map<String,String> attributes = Maps.newHashMap();
List<String> unsupported = Lists.newArrayList();
for (String attribute : getQueueAttributesRequest.getAttributeNames()) {
switch (attribute) {
case "QueueArn":
attributes.put("QueueArn",BASE_ARN + queue.getQueuePath().getFileName());
break;
case "All":
case "ApproximateNumberOfMessages":
case "ApproximateNumberOfMessagesNotVisible":
case "VisibilityTimeout":
case "CreatedTimestamp":
case "LastModifiedTimestamp":
case "Policy":
case "MaximumMessageSize":
case "MessageRetentionPeriod":
case "ApproximateNumberOfMessagesDelayed":
case "DelaySeconds":
case "ReceiveMessageWaitTimeSeconds":
default:
unsupported.add(attribute);
break;
}
}
if (!unsupported.isEmpty()) {
throw new UnsupportedOperationException("attributes not implemented: " + unsupported);
}
return new GetQueueAttributesResult().withAttributes(attributes);
}
项目:awslocal
文件:TestSQSClient.java
public void getQueueArnFromAttributes() {
String queueName = someQueueName();
CreateQueueResult createQueueResult = _amazonSQS.createQueue(new CreateQueueRequest(queueName));
String queueUrl = createQueueResult.getQueueUrl();
List<String> requestedAttributes = ImmutableList.of("QueueArn");
GetQueueAttributesResult getQueueAttributesResult = _amazonSQS.getQueueAttributes(new GetQueueAttributesRequest()
.withQueueUrl(queueUrl)
.withAttributeNames(requestedAttributes));
Map<String,String> resultAttributes = getQueueAttributesResult.getAttributes();
String queueArn = resultAttributes.get("QueueArn");
String queueNameFromArn = queueArn.substring(queueArn.lastIndexOf(":") + 1);
Assert.assertEquals(queueNameFromArn,queueName);
}
项目:cfnassist
文件:QueuePolicyManager.java
public Map<String,String> getQueueAttributes(String url) throws MissingArgumentException {
// find the queue arn,we need this to create the SNS subscription
GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(url);
getQueueAttributesRequest.setAttributeNames(attributeNames);
GetQueueAttributesResult attribResult = sqsClient.getQueueAttributes(getQueueAttributesRequest);
Map<String,String> attribMap = attribResult.getAttributes();
if (!attribMap.containsKey(QUEUE_ARN_KEY)) {
String msg = "Missing arn attirbute,tried attribute with name: " + QUEUE_ARN_KEY;
logger.error(msg);
throw new MissingArgumentException(msg);
}
return attribMap;
}
项目:spring-integration-aws
文件:SqsExecutor.java
private void resolveQueueArn() {
GetQueueAttributesRequest request = new GetQueueAttributesRequest(
queueUrl);
GetQueueAttributesResult result = sqsClient.getQueueAttributes(request
.withAttributeNames(Collections.singletonList(QUEUE_ARN_KEY)));
queueArn = result.getAttributes().get(QUEUE_ARN_KEY);
}
项目:async-sqs
文件:GetQueueAttributesAction.java
@VisibleForTesting
static GetQueueAttributesRequest createRequest(String queueUrl) {
return new GetQueueAttributesRequest(queueUrl,Collections.singletonList("All"));
}
项目:async-sqs
文件:GetQueueAttributesActionTest.java
@Test
public void testCreateRequest() {
GetQueueAttributesRequest request = GetQueueAttributesAction.createRequest(QUEUE_URL);
assertThat(request.getQueueUrl()).isEqualTo(QUEUE_URL);
assertThat(request.getAttributeNames()).isEqualTo(Collections.singletonList("All"));
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<GetQueueAttributesResult> getQueueAttributesAsync(GetQueueAttributesRequest request) {
return Observable.from(sqsClient.getQueueAttributesAsync(request));
}
项目:distributed-image-classification
文件:Queue.java
public int getNumberOfItems() {
String key = "ApproximateNumberOfMessages";
List<String> attrib = Arrays.asList(key);
GetQueueAttributesResult res = _sqs.getQueueAttributes(new GetQueueAttributesRequest(_queueURL,attrib));
return Integer.parseInt(res.getAttributes().get(key));
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public boolean load(GetQueueAttributesRequest request) {
return load(request,null);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public boolean load(GetQueueAttributesRequest request,ResultCapture<GetQueueAttributesResult> extractor) {
return resource.load(request,extractor);
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void testSimpleReceiveMessage() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
container.setAmazonSqs(sqs);
CountDownLatch countDownLatch = new CountDownLatch(1);
QueueMessageHandler messageHandler = new QueueMessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
countDownLatch.countDown();
assertEquals("messageContent",message.getPayload());
}
};
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",TestMessageListener.class);
messageHandler.setApplicationContext(applicationContext);
container.setBeanName("testContainerName");
messageHandler.afterPropertiesSet();
mockGetQueueUrl(sqs,"http://testSimpleReceiveMessage.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://testSimpleReceiveMessage.amazonaws.com");
container.afterPropertiesSet();
when(sqs.receiveMessage(new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent"),new Message().withBody("messageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
container.start();
assertTrue(countDownLatch.await(1,TimeUnit.SECONDS));
container.stop();
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void listener_withMultipleMessageHandlers_shouldBeCalled() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",TestMessageListener.class);
applicationContext.registerSingleton("anotherTestMessageListener",AnotherTestMessageListener.class);
mockGetQueueUrl(sqs,"http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com");
mockGetQueueUrl(sqs,"anotherTestQueue","http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("anotherMessageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
container.start();
assertTrue(countDownLatch.await(2L,TimeUnit.SECONDS));
container.stop();
assertEquals("messageContent",applicationContext.getBean(TestMessageListener.class).getMessage());
assertEquals("anotherMessageContent",applicationContext.getBean(AnotherTestMessageListener.class).getMessage());
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders() throws Exception {
// Arrange
CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = spy(new QueueMessageHandler());
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",TestMessageListener.class);
mockGetQueueUrl(sqs,"http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent").withAttributes(Collections.singletonMap("SenderId","ID"))))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
// Act
container.start();
// Assert
assertTrue(countDownLatch.await(2L,TimeUnit.SECONDS));
container.stop();
verify(messageHandler).handleMessage(this.stringMessageCaptor.capture());
assertEquals("ID",this.stringMessageCaptor.getValue().getHeaders().get("SenderId"));
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader() throws Exception {
// Arrange
CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class,"http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
MimeType mimeType = new MimeType("text","plain",Charset.forName("UTF-8"));
when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)))
.thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent")
.withAttributes(Collections.singletonMap("SenderId","ID"))
.withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,new MessageAttributeValue().withDataType("String")
.withStringValue(mimeType.toString())))))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
// Act
container.start();
// Assert
assertTrue(countDownLatch.await(2L,TimeUnit.SECONDS));
container.stop();
verify(messageHandler).handleMessage(this.stringMessageCaptor.capture());
assertEquals(mimeType,this.stringMessageCaptor.getValue().getHeaders().get(MessageHeaders.CONTENT_TYPE));
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception {
// Arrange
Level previous = disableLogging();
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class,withSettings().stubOnly());
when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(new RuntimeException("Boom!"))
.thenReturn(new ReceiveMessageResult()
.withMessages(new Message().withBody("messageContent"),new Message().withBody("messageContent")));
CountDownLatch countDownLatch = new CountDownLatch(1);
QueueMessageHandler messageHandler = new QueueMessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
countDownLatch.countDown();
assertEquals("messageContent",message.getPayload());
}
};
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",TestMessageListener.class);
messageHandler.setApplicationContext(applicationContext);
mockGetQueueUrl(amazonSqs,"http://receiveMessage_throwsAnException_operationShouldBeRetried.amazonaws.com");
messageHandler.afterPropertiesSet();
when(amazonSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setBackOffTime(0);
container.setAmazonSqs(amazonSqs);
container.setMessageHandler(messageHandler);
container.setAutoStartup(false);
container.afterPropertiesSet();
// Act
container.start();
// Assert
assertTrue(countDownLatch.await(1,TimeUnit.SECONDS));
container.stop();
setLogLevel(previous);
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException {
return new GetQueueAttributesResult();
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
private static void mockGetQueueAttributesWithRedrivePolicy(AmazonSQSAsync sqs,String queueUrl) {
when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))).
thenReturn(new GetQueueAttributesResult().addAttributesEntry(QueueAttributeName.RedrivePolicy.toString(),"{\"some\": \"JSON\"}"));
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
private static void mockGetQueueAttributesWithEmptyResult(AmazonSQSAsync sqs,String queueUrl) {
when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))).
thenReturn(new GetQueueAttributesResult());
}
项目:izettle-toolbox
文件:AmazonSNSSubscriptionSetup.java
private static void allowSQSQueueToReceiveMessagesFromSNSTopic(
AmazonSQS amazonSQS,String queueURL,String queueARN,String topicARN
) {
GetQueueAttributesResult queueAttributesResult =
amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest().withQueueUrl(queueURL).withAttributeNames(
QueueAttributeName.Policy
)
);
String policyJson = queueAttributesResult.getAttributes().get(QueueAttributeName.Policy.name());
final List<Statement> statements;
if (policyJson != null) {
statements = new ArrayList<>(Policy.fromJson(policyJson).getStatements());
} else {
// no policies yet exist
statements = new ArrayList<>();
}
statements.add(
new Statement(Statement.Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueARN))
.withActions(SQSActions.SendMessage)
.withConditions(ConditionFactory.newSourceArnCondition(topicARN))
);
Policy policy = new Policy();
policy.setStatements(statements);
Map<String,String> queueAttributes = new HashMap<>();
queueAttributes.put(QueueAttributeName.Policy.name(),policy.toJson());
// Note that if the queue already has this policy,this will do nothing.
amazonSQS.setQueueAttributes(
new SetQueueAttributesRequest()
.withQueueUrl(queueURL)
.withAttributes(queueAttributes)
);
}