项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
private SendMessageBatchResult mockBatchResult(int batchSize,int expectedSuccessCount) {
SendMessageBatchResult mockResult = Mockito.mock(SendMessageBatchResult.class);
List<SendMessageBatchResultEntry> successfulEntries = new ArrayList<SendMessageBatchResultEntry>();
for (int i = 0; i < expectedSuccessCount; i++) {
successfulEntries.add(new SendMessageBatchResultEntry().withId(String.valueOf(i + 1)));
}
when(mockResult.getSuccessful()).thenReturn(successfulEntries);
List<BatchResultErrorEntry> failedEntries = new ArrayList<BatchResultErrorEntry>();
for (int i = expectedSuccessCount; i < batchSize; i++) {
failedEntries.add(
new BatchResultErrorEntry().withId(String.valueOf(i + 1)).withCode("401").withSenderFault(true)
.withMessage("Invalid binary character"));
}
when(mockResult.getFailed()).thenReturn(failedEntries);
return mockResult;
}
项目:aws-sdk-java-resources
文件:SQSResourcesIntegrationTest.java
/**
* Tests sending messages using batch operation and retrieve them. Also
* tests setting the queue attributes and retrieving them.
*/
@Test
@Ignore
public void testQueueSubResourceAndAttributes() throws InterruptedException {
/**
* Trying to get the message which is deleted. Here there is no service
* call made,a new sub resource is created with the given handle. So,* this wont be returning null.
*/
Message message = queue.getMessage("invalid-recepient-handle");
assertNotNull(message);
try {
message.getAttributes();
fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute");
} catch (UnsupportedOperationException use) { }
SendMessageBatchResult sendMessageBatchResult = queue
.sendMessages(new SendMessageBatchRequest()
.withEntries(new SendMessageBatchRequestEntry("msg1",TEST_MESSAGE)));
SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult
.getSuccessful().get(0);
List<Message> messages = waitForMessagesFromQueue(null);
assertNotNull(messages);
assertEquals(1,messages.size());
message = messages.get(0);
assertMessage(TEST_MESSAGE,sendMessageBatchResultEntry.getMessageId(),sendMessageBatchResultEntry.getMD5OfMessageBody(),message);
queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize","2048"));
assertTrue(queue.getAttributes().containsKey("MaximumMessageSize"));
}
项目:awslocal
文件:DirectorySQS.java
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt to change the visibility on each
for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) {
try {
final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(),0);//0 is amazon spec default
Message sentMessage = queue.send(batchRequestEntry.getMessageBody(),invisibilityDelay);
batchResultEntries.add(new SendMessageBatchResultEntry().
withId(batchRequestEntry.getId()).
withMessageId(sentMessage.getMessageId()).
withMD5OfMessageBody(sentMessage.getMD5OfBody()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(false).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new SendMessageBatchResult().
withFailed(batchResultErrorEntries).
withSuccessful(batchResultEntries);
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testBulkSendDelete_shouldWork() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send batch
SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one")
.withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}");
SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two")
.withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}");
SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three")
.withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}");
// verify send batch result
SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl())
.withEntries(ImmutableList.of(firstRequest,secondRequest,thirdRequest)));
assertNotNull("verify that batch send returned ok",sendResult);
assertTrue("no request failed",sendResult.getFailed().isEmpty());
assertEquals("verify successfull message count",3,sendResult.getSuccessful().size());
SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get();
assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody());
assertNotNull("verify message id exists",firstResultEntry.getMessageId());
ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4));
// delete batch
List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>();
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests));
// verify delete batch result
assertNotNull("verify that batch delete returned ok",deleteBatchResult);
assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty());
assertEquals("verify successfull message count",deleteBatchResult.getSuccessful().size());
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
for(Message message : receivedMessagesResult.getMessages()) {
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty());
}
// cleanup
getQueues().remove("tea-earl-grey-queue");
}