项目:async-sqs
文件:SendMessageBatchAction.java
@VisibleForTesting
static SendMessageBatchRequest createRequest(String queueUrl,Map<String,SendMessageEntry> entries) {
return new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(entries.entrySet().stream().map(keyValue -> {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry()
.withId(keyValue.getKey())
.withMessageBody(keyValue.getValue().getBody());
keyValue.getValue().getDelay()
.ifPresent((delay) -> entry.setDelaySeconds((int) delay.getSeconds()));
return entry;
}).collect(Collectors.toList())
);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSender.java
private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries,List<BatchResultErrorEntry> errors) {
StringBuilder errorMessage = new StringBuilder();
int count = 0;
for (BatchResultErrorEntry error : errors) {
if (count > 0) {
errorMessage.append(",");
}
SendMessageBatchRequestEntry failedRequestEventEntry =
findRequestEventEntryById(batchRequestEntries,error.getId());
String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody();
errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]");
count++;
}
return errorMessage.toString();
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests happy path scenario.
* <p>
* <pre>
* Inputs:
* channel = never empty
* batchSize = 5
* maxMessageSize = 10 Bytes
* each message size = 2 Bytes
*
* Expected Output:
* number of batches = 1
* number of messages in batch = 5
* </pre>
*/
@Test
public void testCreateBatches() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url","us-east-1","someAwsAccessKey","someAwsSecretKey",5,10);
byte[] mockMsgPayload = {'A','b'};
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(1,batches.size());
List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
Assert.assertNotNull(msgEntries);
Assert.assertEquals(5,msgEntries.size());
assertCorrectPayloadInEntries(mockMsgPayload,msgEntries);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* channel is empty after first event.
* <p>
* <pre>
* Inputs:
* channel = 1 Event (Empty after first take)
* batchSize = 5
* maxMessageSize = 10 Bytes
*
* Expected Output:
* number of batches = 1
* number of messages in batch = 1
* </pre>
*/
@Test
public void testCreateBatchesEmptyChannelAfterFirstTake() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",10);
byte[] mockMsgPayload = {'A','b'};
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent).thenReturn(null);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(1,batches.size());
List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
Assert.assertNotNull(msgEntries);
Assert.assertEquals(1,msgEntries);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* channel is empty after the last take for the batch.
* <p>
* <pre>
* Inputs:
* channel = 5 Events (Empty after 5th take)
* batchSize = 5
* maxMessageSize = 10 Bytes
*
* Expected Output:
* number of batches = 1
* number of messages in batch = 5
* </pre>
*/
@Test
public void testCreateBatchesEmptyChannelAfterLastTake() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",'b'};
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent,mockEvent,null);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(1,msgEntries);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* channel is empty in the middle of taking events for the batch
* <p>
* <pre>
* Inputs:
* channel = 3 Events (Empty after 3rd take)
* batchSize = 5
* maxMessageSize = 10 Bytes
*
* Expected Output:
* number of batches = 1
* number of messages in batch = 3
* </pre>
*/
@Test
public void testCreateBatchesEmptyChannelInTheMiddle() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",batches.size());
List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
Assert.assertNotNull(msgEntries);
Assert.assertEquals(3,msgEntries);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* channel is not empty but contains events with empty body in the middle of taking events for the batch
* <p>
* <pre>
* Inputs:
* channel = 4 Events (3 Events with Body and 4th Event empty)
* batchSize = 5
* maxMessageSize = 10 Bytes
*
* Expected Output:
* number of batches = 1
* number of messages in batch = 3
* </pre>
*/
@Test
public void testCreateBatchesEmptyEventInTheMiddle() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",'b'};
byte[] mockEmptyMsgPayload = {};
Event mockEvent = Mockito.mock(Event.class);
Event mockEmptyEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
when(mockEmptyEvent.getBody()).thenReturn(mockEmptyMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent,mockEmptyEvent);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(1,msgEntries);
}
项目:micro-genie
文件:SqsProducer.java
/***
* Categorize the messages into batches per queue
* @param messages
* @return messageBatches - belonging to one or more queues
*/
private Map<String,List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) {
final Map<String,List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String,List<SendMessageBatchRequestEntry>>();
for(Message message : messages){
final Map<String,MessageAttributeValue> attributes = this.toMessageAttrs(message);
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry()
.withId(message.getId())
.withMessageAttributes(attributes)
.withMessageBody(message.getBody());
if(!messageBatches.containsKey(message.getQueue())){
messageBatches.put(message.getQueue(),new ArrayList<SendMessageBatchRequestEntry>());
}
messageBatches.get(message.getQueue()).add(entry);
}
return messageBatches;
}
项目:izettle-toolbox
文件:QueueServiceSender.java
/**
* Posts many messages to queue,with a message envelope that makes them look like they
* were sent through Amazon SNS.
*
* @param messages list of messages to post
* @param eventName the value that will be used as "subject" in the SNS envelope
* @throws MessagingException Failed to post messages.
*/
@Override
public <T> void postBatch(Collection<T> messages,String eventName) throws MessagingException {
if (empty(eventName)) {
throw new MessagingException("Cannot publish message with empty eventName!");
}
try {
Collection<SendMessageBatchRequestEntry> allEntries = new ArrayList<>(messages.size());
int messageIdInBatch = 0;
for (T message : messages) {
++messageIdInBatch;
String messageBody = wrapInSNSMessage(message,eventName);
allEntries.add(new SendMessageBatchRequestEntry(String.valueOf(messageIdInBatch),messageBody));
}
sendMessageBatch(allEntries);
} catch (AmazonServiceException | IOException | CryptographyException e) {
throw new MessagingException("Failed to post messages: " + messages.getClass(),e);
}
}
项目:conductor
文件:SQSObservableQueue.java
void publishMessages(List<Message> messages) {
logger.info("Sending {} messages",messages.size());
SendMessageBatchRequest batch = new SendMessageBatchRequest(queueURL);
messages.stream().forEach(msg -> {
SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(),msg.getPayload());
batch.getEntries().add(sendr);
});
logger.info("sending {}",batch.getEntries().size());
SendMessageBatchResult result = client.sendMessageBatch(batch);
logger.info("send result {}",result.getFailed().toString());
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSender.java
/**
* Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail
* and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS.
* Currently,this method does just logs errors and skips the messages in case some messages from the batched failed
* to be delivered but some succeeded (i.e.,partial batch failure).
* <p>
* TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
*
* @param batchRequest The SQS SendMessageBatchRequest
* @param batchResult The SQS SendMessageBatchResult
*
* @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS
*/
protected void handleResult(SendMessageBatchRequest batchRequest,SendMessageBatchResult batchResult)
throws EventDeliveryException {
List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries();
List<BatchResultErrorEntry> errors = batchResult.getFailed();
int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size();
int errorCount = errors == null ? 0 : errors.size();
if (errorCount > 0) {
String errorMessage = buildErrorMessage(batchRequestEntries,errors);
if (attemptedCount == errorCount) {
// if it was a non-empty batch and if all the messages in the batch have errors then fail the whole
// batch and let flume rollback the transaction and retry it
// Just throw the EventDeliveryException. This will eventually cause the channel's transaction to
// rollback.
throw new EventDeliveryException(errorMessage);
}
else {
// TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
// Just log the error message and let flume drop failed messages in case of partial batch failures
LOG.error(errorMessage);
}
}
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSender.java
private SendMessageBatchRequestEntry findRequestEventEntryById(List<SendMessageBatchRequestEntry> entries,String id) {
SendMessageBatchRequestEntry foundEntry = null;
if (entries != null) {
for (SendMessageBatchRequestEntry entry : entries) {
if (entry.getId().equals(id)) {
foundEntry = entry;
break;
}
}
}
return foundEntry;
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests invalid characters not
* allowed by the SQS. See [http://docs.aws.amazon
* .com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html]
* for list of valid characters allowed by SQS.
* <p>
* <p>
* <pre>
* Inputs:
* channel = never empty. with messages containing invalid characters.
*
* Expected Output:
* The sink messages should not contain invalid characters
* </pre>
*/
@Test
public void testInvalidCharacters() throws Exception {
// See
// http://stackoverflow.com/questions/16688523/aws-sqs-valid-characters
// http://stackoverflow.com/questions/1169754/amazon-sqs-invalid-binary-character-in-message-body
// https://forums.aws.amazon.com/thread.jspa?messageID=459090
// http://stackoverflow.com/questions/16329695/invalid-binary-character-when-transmitting-protobuf-net
// -messages-over-aws-sqs
byte invalidCharByte = 0x1C;
String mockMsg = "Test with some invalid chars at the end 0%2F>^F";
byte[] origPayloadWithInvalidChars = ArrayUtils.add(mockMsg.getBytes(),invalidCharByte);
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",1,origPayloadWithInvalidChars.length);
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(origPayloadWithInvalidChars);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
assertCorrectPayloadInEntries(new String(origPayloadWithInvalidChars).trim().getBytes(),msgEntries);
// Make sure that the message being sent by the sink doesn't contain the invalid characters
for (SendMessageBatchRequestEntry entry : msgEntries) {
Assert.assertNotNull(entry);
Assert.assertTrue(ArrayUtils.contains(new String(origPayloadWithInvalidChars).getBytes(),invalidCharByte));
Assert.assertTrue(!ArrayUtils.contains(entry.getMessageBody().getBytes(),invalidCharByte));
}
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* specified <i>batchSize</i> can not be fit into the specified <i>maxMessageSize</i>
* <p>
* <pre>
* Inputs:
* channel = never empty
* batchSize = 5
* maxMessageSize = 10 Bytes
* each message size = 3 Bytes
*
* Expected Output:
* number of batches = 2
* number of messages in batch 1 = 3
* number of messages in batch 2 = 2
* </pre>
*/
@Test
public void testCreateBatchesExceedingSize() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",'b','~'};
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(2,batches.size());
List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries();
Assert.assertNotNull(msgEntries1);
Assert.assertEquals(3,msgEntries1.size());
List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
Assert.assertNotNull(msgEntries2);
Assert.assertEquals(2,msgEntries2.size());
assertCorrectPayloadInEntries(mockMsgPayload,msgEntries2);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
/**
* Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
* specified <i>batchSize</i> can not fit into the specified <i>maxMessageSize</i> and channel gets empty after
* certain number of events "takes".
* <p>
* <pre>
* Inputs:
* channel = 4 Events
* batchSize = 5
* maxMessageSize = 10 Bytes
* each message size = 3 Bytes
*
* Expected Output:
* number of batches = 2
* number of messages in batch 1 = 3
* number of messages in batch 2 = 1
* </pre>
*/
@Test
public void testCreateBatchesExceedingSizeLimitedChannel() throws Exception {
BatchSQSMsgSender sqsMsgSender =
new BatchSQSMsgSender("https://some-fake/url",10);
byte[] mockMsgPayload = {'^','@','~'};
Event mockEvent = Mockito.mock(Event.class);
when(mockEvent.getBody()).thenReturn(mockMsgPayload);
Channel mockChannel = Mockito.mock(Channel.class);
when(mockChannel.take()).thenReturn(mockEvent,null);
List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);
Assert.assertNotNull(batches);
Assert.assertEquals(2,msgEntries1.size());
List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
Assert.assertNotNull(msgEntries2);
Assert.assertEquals(1,msgEntries2);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
private void assertCorrectPayloadInEntries(byte[] mockMsgPayload,List<SendMessageBatchRequestEntry> msgEntries)
throws UnsupportedEncodingException {
for (SendMessageBatchRequestEntry entry : msgEntries) {
Assert.assertNotNull(entry);
Assert.assertEquals(new String(mockMsgPayload,"UTF-8"),entry.getMessageBody());
}
}
项目:aws-doc-sdk-examples
文件:SendReceiveMessages.java
public static void main(String[] args)
{
final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
try {
CreateQueueResult create_result = sqs.createQueue(QUEUE_NAME);
} catch (AmazonSQSException e) {
if (!e.getErrorCode().equals("QueueAlreadyExists")) {
throw e;
}
}
String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
SendMessageRequest send_msg_request = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("hello world")
.withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
// Send multiple messages to the queue
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(
new SendMessageBatchRequestEntry(
"msg_1","Hello from message 1"),new SendMessageBatchRequestEntry(
"msg_2","Hello from message 2")
.withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
// receive messages from the queue
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
// delete messages from the queue
for (Message m : messages) {
sqs.deleteMessage(queueUrl,m.getReceiptHandle());
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) {
checkMessageAttributes(batchEntry.getMessageAttributes());
String s3Key = UUID.randomUUID().toString();
// Read the content of the message from message body
String messageContentStr = batchEntry.getMessageBody();
Long messageContentSize = getStringSizeInBytes(messageContentStr);
// Add a new message attribute as a flag
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("Number");
messageAttributeValue.setStringValue(messageContentSize.toString());
batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME,messageAttributeValue);
// Store the message content in S3.
storeTextInS3(s3Key,messageContentStr,messageContentSize);
LOG.info("S3 object created,Bucket name: " + clientConfiguration.getS3BucketName() + ",Object key: " + s3Key
+ ".");
// Convert S3 pointer (bucket name,key,etc) to JSON string
MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(),s3Key);
String s3PointerStr = getJSONFromS3Pointer(s3Pointer);
// Storing S3 pointer in the message body.
batchEntry.setMessageBody(s3PointerStr);
return batchEntry;
}
项目:CliDispatcher
文件:SQSClient.java
/**
* @todo Roba schifosa!
*/
public void sendBulkMessage(List<String> messages){
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(10);
Integer i=0;
for(String m : messages){
entries.add(new SendMessageBatchRequestEntry((i++).toString(),m));
}
c.sendMessageBatch(queueUrl,entries);
}
项目:amazon-cloudengine
文件:ServerThread.java
public void remoteBatchSend(BufferedReader in) throws ParseException{
//Batch sending task to remote workers
List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>();
String message;
final int batchSize = 10;
try {
JSONParser parser=new JSONParser();
while ((message = in.readLine()) != null) {
JSONArray taskList = (JSONArray)parser.parse(message);
for(int i=0; i< taskList.size(); i++){
JSONObject task = (JSONObject)taskList.get(i);
msg_cnt++;
entries.add(new SendMessageBatchRequestEntry()
.withId(Integer.toString(msg_cnt))
.withMessageBody(task.toString()));
}
if(entries.size() == batchSize){
jobQ.batchSend(entries);
entries.clear();
}
}
if(!entries.isEmpty()){
jobQ.batchSend(entries);
entries.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
项目:para
文件:AWSQueueUtils.java
/**
* Pushes a number of messages in batch to an SQS queue.
* @param queueURL the URL of the SQS queue
* @param messages the massage bodies
*/
public static void pushMessages(String queueURL,List<String> messages) {
if (!StringUtils.isBlank(queueURL) && messages != null) {
// only allow strings - ie JSON
try {
int j = 0;
List<SendMessageBatchRequestEntry> msgs = new ArrayList<>(MAX_MESSAGES);
for (int i = 0; i < messages.size(); i++) {
String message = messages.get(i);
if (!StringUtils.isBlank(message)) {
msgs.add(new SendMessageBatchRequestEntry().
withMessageBody(message).
withId(Integer.toString(i)));
}
if (++j >= MAX_MESSAGES || i == messages.size() - 1) {
if (!msgs.isEmpty()) {
getClient().sendMessageBatch(queueURL,msgs);
msgs.clear();
}
j = 0;
}
}
} catch (AmazonServiceException ase) {
logException(ase);
} catch (AmazonClientException ace) {
logger.error("Could not reach SQS. {}",ace.toString());
}
}
}
项目:micro-genie
文件:SqsProducer.java
/***
* Submit the batches of messages
* @param messageBatches
*/
private void submitBatches(
final Map<String,List<SendMessageBatchRequestEntry>> messageBatches) {
for(Entry<String,List<SendMessageBatchRequestEntry>> queueBatchEntry : messageBatches.entrySet()){
final String queueUrl = this.queueAdmin.getQueueUrl(queueBatchEntry.getKey());
final SendMessageBatchRequest batch = new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(queueBatchEntry.getValue());
final SendMessageBatchResult batchResult = this.sqs.sendMessageBatch(batch);
this.logFailures(batchResult.getFailed());
}
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public SendMessageBatchResult sendMessages(
List<SendMessageBatchRequestEntry> entries) {
return sendMessages(entries,(ResultCapture<SendMessageBatchResult>)null);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public SendMessageBatchResult sendMessages(
List<SendMessageBatchRequestEntry> entries,ResultCapture<SendMessageBatchResult> extractor) {
SendMessageBatchRequest request = new SendMessageBatchRequest()
.withEntries(entries);
return sendMessages(request,extractor);
}
项目:aws-sdk-java-resources
文件:SQSResourcesIntegrationTest.java
/**
* Tests sending messages using batch operation and retrieve them. Also
* tests setting the queue attributes and retrieving them.
*/
@Test
@Ignore
public void testQueueSubResourceAndAttributes() throws InterruptedException {
/**
* Trying to get the message which is deleted. Here there is no service
* call made,a new sub resource is created with the given handle. So,* this wont be returning null.
*/
Message message = queue.getMessage("invalid-recepient-handle");
assertNotNull(message);
try {
message.getAttributes();
fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute");
} catch (UnsupportedOperationException use) { }
SendMessageBatchResult sendMessageBatchResult = queue
.sendMessages(new SendMessageBatchRequest()
.withEntries(new SendMessageBatchRequestEntry("msg1",TEST_MESSAGE)));
SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult
.getSuccessful().get(0);
List<Message> messages = waitForMessagesFromQueue(null);
assertNotNull(messages);
assertEquals(1,messages.size());
message = messages.get(0);
assertMessage(TEST_MESSAGE,sendMessageBatchResultEntry.getMessageId(),sendMessageBatchResultEntry.getMD5OfMessageBody(),message);
queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize","2048"));
assertTrue(queue.getAttributes().containsKey("MaximumMessageSize"));
}
项目:spring-cloud-aws
文件:MessageListenerContainerAwsTest.java
@Override
public void run() {
List<SendMessageBatchRequestEntry> messages = new ArrayList<>();
for (int i = 0; i < BATCH_MESSAGE_SIZE; i++) {
messages.add(new SendMessageBatchRequestEntry(Integer.toString(i),new StringBuilder().append("message_").append(i).toString()));
}
this.amazonSqs.sendMessageBatch(new SendMessageBatchRequest(this.queueUrl,messages));
this.countDownLatch.countDown();
}
项目:izettle-toolbox
文件:QueueServiceSender.java
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) {
for (Collection<SendMessageBatchRequestEntry> batch : partition(messages,MAX_BATCH_SIZE)) {
final SendMessageBatchResult sendMessageBatchResult =
amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl,new ArrayList<>(batch)));
final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed();
if (!failed.isEmpty()) {
try {
Set<String> failedMessageIds =
failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet());
final Map<String,SendMessageBatchRequestEntry> failedMessageIdToMessage =
batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap(
SendMessageBatchRequestEntry::getId,Function.identity()
));
failed.stream().forEach(failMessage -> {
final SendMessageBatchRequestEntry failedEntry =
failedMessageIdToMessage.get(failMessage.getId());
if (failedEntry != null) {
final String messageBody = failedEntry.getMessageBody();
LOG.error(
"Failed to send message,due to {},message content : {} ",failMessage,messageBody
);
}
});
} catch (Exception e) {
LOG.error("Failed to log failed to send messages",e);
}
}
}
}
项目:izettle-toolbox
文件:QueueServiceSenderTest.java
@Test
public void postBatchShouldSendMessagesWithSNSEnvelope() throws Exception {
// Arrange
when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class));
ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
// Act
messagePublisher.postBatch(
Arrays.asList(
new TestMessage("Hello"),new TestMessage("world")
),"subject"
);
// Assert
verify(mockAmazonSQS).sendMessageBatch(captor.capture());
SendMessageBatchRequest sendMessageBatchRequest = captor.getValue();
assertThat(sendMessageBatchRequest.getQueueUrl()).isEqualTo("queueUrl");
List<SendMessageBatchRequestEntry> entries = sendMessageBatchRequest.getEntries();
assertThat(entries.size()).isEqualTo(2);
ObjectMapper mapper = new ObjectMapper();
AmazonSNSMessage msg1 = mapper.readValue(entries.get(0).getMessageBody(),AmazonSNSMessage.class);
assertThat(msg1.getSubject()).isEqualTo("subject");
assertThat(msg1.getMessage()).isEqualTo("{\"message\":\"Hello\"}");
AmazonSNSMessage msg2 = mapper.readValue(entries.get(1).getMessageBody(),AmazonSNSMessage.class);
assertThat(msg2.getSubject()).isEqualTo("subject");
assertThat(msg2.getMessage()).isEqualTo("{\"message\":\"world\"}");
}
项目:awslocal
文件:DirectorySQS.java
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt to change the visibility on each
for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) {
try {
final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(),0);//0 is amazon spec default
Message sentMessage = queue.send(batchRequestEntry.getMessageBody(),invisibilityDelay);
batchResultEntries.add(new SendMessageBatchResultEntry().
withId(batchRequestEntry.getId()).
withMessageId(sentMessage.getMessageId()).
withMD5OfMessageBody(sentMessage.getMD5OfBody()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(false).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new SendMessageBatchResult().
withFailed(batchResultErrorEntries).
withSuccessful(batchResultEntries);
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testBulkSendDelete_shouldWork() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send batch
SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one")
.withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}");
SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two")
.withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}");
SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three")
.withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}");
// verify send batch result
SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl())
.withEntries(ImmutableList.of(firstRequest,secondRequest,thirdRequest)));
assertNotNull("verify that batch send returned ok",sendResult);
assertTrue("no request failed",sendResult.getFailed().isEmpty());
assertEquals("verify successfull message count",3,sendResult.getSuccessful().size());
SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get();
assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody());
assertNotNull("verify message id exists",firstResultEntry.getMessageId());
ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4));
// delete batch
List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>();
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests));
// verify delete batch result
assertNotNull("verify that batch delete returned ok",deleteBatchResult);
assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty());
assertEquals("verify successfull message count",deleteBatchResult.getSuccessful().size());
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
for(Message message : receivedMessagesResult.getMessages()) {
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty());
}
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<SendMessageBatchResult> sendMessageBatchAsync(String queueUrl,List<SendMessageBatchRequestEntry> entries) {
return Observable.from(sqsClient.sendMessageBatchAsync(queueUrl,entries));
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
private boolean isLarge(SendMessageBatchRequestEntry batchEntry) {
int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
}
项目:amazon-cloudengine
文件:SQSService.java
public void batchSend(List<SendMessageBatchRequestEntry> entries){
try {
// Send batch messages
//System.out.println("\nSending a message to jobQueue.\n");
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl);
batchRequest.setEntries(entries);
SendMessageBatchResult batchResult = sqs.sendMessageBatch(batchRequest);
// sendMessageBatch can return successfully,and yet individual batch
// items fail. So,make sure to retry the failed ones.
if (!batchResult.getFailed().isEmpty()) {
//System.out.println("Retry sending failed messages...");
List<SendMessageBatchRequestEntry> failedEntries = new ArrayList<SendMessageBatchRequestEntry>();
Iterator<SendMessageBatchRequestEntry> iter = entries.iterator();
while(iter.hasNext()){
if(batchResult.getFailed().contains(iter.next())){
failedEntries.add((SendMessageBatchRequestEntry) iter.next());
}
}
batchRequest.setEntries(failedEntries);
sqs.sendMessageBatch(batchRequest);
}
} catch (AmazonServiceException ase) {
System.out.println("Caught an AmazonServiceException,which means your request made it " +
"to Amazon SQS,but was rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("Caught an AmazonClientException,which means the client encountered " +
"a serious internal problem while trying to communicate with SQS,such as not " +
"being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Delivers up to ten messages to the specified queue. This is a batch
* version of SendMessage. The result of the send action on each message is
* reported individually in the response. The maximum allowed individual
* message size is 256 KB (262,144 bytes).
* </p>
* <p>
* The maximum total payload size (i.e.,the sum of all a batch's individual
* message lengths) is also 256 KB (262,144 bytes).
* </p>
* <p>
* If the <code>DelaySeconds</code> parameter is not specified for an entry,* the default for the queue is used.
* </p>
* <p>
* <b>IMPORTANT:</b>The following list shows the characters (in Unicode)
* that are allowed in your message,according to the W3C XML specification.
* For more information,go to http://www.faqs.org/rfcs/rfc1321.html. If you
* send any characters that are not included in the list,your request will
* be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] |
* [#x10000 to #x10FFFF]
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,you should check for batch errors
* even when the call returns an HTTP status code of 200.
* </p>
* <p>
* <b>NOTE:</b>Some API actions take lists of parameters. These lists are
* specified using the param.n notation. Values of n are integers starting
* from 1. For example,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of <a>SendMessageBatchRequestEntry</a> items.
*
* @return The response from the SendMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotDistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws BatchRequestTooLongException
* @throws UnsupportedOperationException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public SendMessageBatchResult sendMessageBatch(String queueUrl,List<SendMessageBatchRequestEntry> entries)
throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.sendMessageBatch(queueUrl,entries);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Delivers up to ten messages to the specified queue. This is a batch
* version of SendMessage. The result of the send action on each message is
* reported individually in the response. Uploads message payloads to Amazon
* S3 when necessary.
* </p>
* <p>
* If the <code>DelaySeconds</code> parameter is not specified for an entry,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of <a>SendMessageBatchRequestEntry</a> items.
*
* @return The response from the SendMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotDistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws BatchRequestTooLongException
* @throws UnsupportedOperationException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,List<SendMessageBatchRequestEntry> entries) {
SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(queueUrl,entries);
return sendMessageBatch(sendMessageBatchRequest);
}
项目:micro-genie
文件:SqsProducer.java
/**
* Submit a batch of messages to SQS. The messages can be destined for different queues. This
* method will categories the messages in batches according to what queue they around bound for. After
* categorization of the messages into batches,each batch will be sent serially.
*
* failures will be written to the error log.
*
* @param messages - The batches of messages which can be destined for one or more queues
*/
@Override
public void submitBatch(final List<Message> messages) {
final Map<String,List<SendMessageBatchRequestEntry>> messageBatches = this.createBatchesForQueues(messages);
this.submitBatches(messageBatches);
}
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>SendMessages</code> action.
*
* @see #sendMessages(SendMessageBatchRequest)
*/
SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry>
entries);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>SendMessages</code> action.
*
* @see #sendMessages(SendMessageBatchRequest,ResultCapture)
*/
SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry>
entries,ResultCapture<SendMessageBatchResult> extractor);