项目:conductor
文件:SQSObservableQueue.java
private List<String> delete(List<Message> messages) {
if (messages == null || messages.isEmpty()) {
return null;
}
DeleteMessageBatchRequest batch = new DeleteMessageBatchRequest().withQueueUrl(queueURL);
List<DeleteMessageBatchRequestEntry> entries = batch.getEntries();
messages.stream().forEach(m -> entries.add(new DeleteMessageBatchRequestEntry().withId(m.getId()).withReceiptHandle(m.getReceipt())));
DeleteMessageBatchResult result = client.deleteMessageBatch(batch);
List<String> failures = result.getFailed().stream().map(fm -> fm.getId()).collect(Collectors.toList());
logger.debug("Failed to delete: {}",failures);
return failures;
}
项目:emodb
文件:SQSScanWorkflow.java
@Override
public void releaseCompleteScanRanges(Collection<ScanRangeComplete> completions) {
if (completions.isEmpty()) {
return;
}
int id = 0;
List<DeleteMessageBatchRequestEntry> entries = Lists.newArrayListWithCapacity(completions.size());
for (ScanRangeComplete completion : completions) {
entries.add(
new DeleteMessageBatchRequestEntry()
.withId(String.valueOf(id++))
.withReceiptHandle(((QueueScanRangeComplete) completion).getMessageId()));
}
_sqs.deleteMessageBatch(new DeleteMessageBatchRequest()
.withQueueUrl(getQueueUrl(_completeScanRangeQueue))
.withEntries(entries));
}
项目:micro-genie
文件:Poller.java
/***
* Handle Messages
* @param queue
* @param sqsMessages
*/
private void handleMessages(final String queue,final List<com.amazonaws.services.sqs.model.Message> sqsMessages) {
final List<Message> messages = fromSqsMessages(queue,sqsMessages);
if(CollectionUtil.hasElements(messages)){
this.handler.handleBatch(messages);
final List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<DeleteMessageBatchRequestEntry>();
/*** Todo Allow the caller to specify messages to delete **/
for(com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages){
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(sqsMessage.getMessageId(),sqsMessage.getReceiptHandle());
deleteEntries.add(entry);
}
/** Delete the message batch - Todo - This should actually Respect the Feedback given from the handler on which messages to delete**/
this.client.deleteMessageBatch(
new DeleteMessageBatchRequest(this.queueUrl)
.withEntries(deleteEntries));
}
}
项目:aws-sdk-java-resources
文件:SQSResourcesIntegrationTest.java
/**
* Tests a simple send,receive and delete of a message from the queue
* resource. Asserts the message contents and its associated attributes.
*/
@Test
@Ignore
public void testSendReceiveDelete() throws InterruptedException {
SendMessageResult sendMessageResult = queue.sendMessage(TEST_MESSAGE);
assertNotNull(sendMessageResult);
assertNotNull(sendMessageResult.getMessageId());
List<Message> messages = waitForMessagesFromQueue(null);
assertNotNull(messages);
assertEquals(1,messages.size());
Message message = messages.get(0);
assertMessage(TEST_MESSAGE,sendMessageResult.getMessageId(),sendMessageResult.getMD5OfMessageBody(),message);
queue.deleteMessages(new DeleteMessageBatchRequest()
.withEntries(new DeleteMessageBatchRequestEntry("msg1",message
.getReceiptHandle())));
}
项目:awslocal
文件:DirectorySQS.java
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt delete on each
for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) {
try {
queue.delete(batchRequestEntry.getReceiptHandle());
batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(true).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:async-sqs
文件:DeleteMessageBatchAction.java
@VisibleForTesting
static DeleteMessageBatchRequest createRequest(String queueUrl,Map<String,DeleteMessageEntry> entries) {
return new DeleteMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(entries.entrySet().stream()
.map(keyvalue -> new DeleteMessageBatchRequestEntry()
.withId(keyvalue.getKey())
.withReceiptHandle(keyvalue.getValue().getReceiptHandle())
).collect(Collectors.toList()));
}
项目:aws-codecommit-trigger-plugin
文件:RequestFactoryImpl.java
@Override
public DeleteMessageBatchRequest createDeleteMessageBatchRequest(String queueUrl,List<Message> messages) {
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(messages.size());
for (final Message message : messages) {
final DeleteMessageBatchRequestEntry entry = this.createDeleteMessageBatchRequestEntry(message);
entries.add(entry);
}
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl);
request.setEntries(entries);
return request;
}
private void process(final List<Message> messages) {
if (messages.size() == 0) return;
final List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
int count = 0;
for (Message message : messages) {
final String deleteId = String.valueOf(count++);
try {
String stringBody = message.getBody();
if (stringBody.isEmpty()) continue;
// allow plain-text json,but permit base64 encoded thrift or json
byte[] spans = stringBody.charat(0) == '['
? stringBody.getBytes(UTF_8)
: Base64.decode(stringBody);
collector.acceptSpans(spans,DETECTING_DECODER,new Callback<Void>() {
@Override public void onSuccess(Void value) {
toDelete.add(new DeleteMessageBatchRequestEntry(deleteId,message.getReceiptHandle()));
}
@Override public void onError(Throwable t) {
// don't delete messages. this will allow accept calls retry once the
// messages are marked visible by sqs.
logger.log(Level.WARNING,"collector accept Failed",t);
}
});
} catch (RuntimeException | Error e) {
logger.log(Level.WARNING,"message decoding Failed",e);
toDelete.add(new DeleteMessageBatchRequestEntry(deleteId,message.getReceiptHandle()));
}
}
delete(toDelete);
}
/**
* Pulls a number of messages from an SQS queue.
* @param queueURL the URL of the SQS queue
* @param numberOfMessages the number of messages to pull
* @return a list of messages
*/
public static List<String> pullMessages(String queueURL,int numberOfMessages) {
List<String> messages = new ArrayList<>();
if (!StringUtils.isBlank(queueURL)) {
try {
int batchSteps = 1;
int maxForBatch = numberOfMessages;
if ((numberOfMessages > MAX_MESSAGES)) {
batchSteps = (numberOfMessages / MAX_MESSAGES) + ((numberOfMessages % MAX_MESSAGES > 0) ? 1 : 0);
maxForBatch = MAX_MESSAGES;
}
for (int i = 0; i < batchSteps; i++) {
List<Message> list = getClient().receiveMessage(new ReceiveMessageRequest(queueURL).
withMaxnumberOfMessages(maxForBatch).withWaitTimeSeconds(POLLING_INTERVAL)).getMessages();
if (list != null && !list.isEmpty()) {
List<DeleteMessageBatchRequestEntry> del = new ArrayList<>();
for (Message msg : list) {
messages.add(msg.getBody());
del.add(new DeleteMessageBatchRequestEntry(msg.getMessageId(),msg.getReceiptHandle()));
}
getClient().deleteMessageBatch(queueURL,del);
}
}
} catch (AmazonServiceException ase) {
logException(ase);
} catch (AmazonClientException ace) {
logger.error("Could not reach SQS. {}",ace.toString());
}
}
return messages;
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(
List<DeleteMessageBatchRequestEntry> entries) {
return deleteMessages(entries,(ResultCapture<DeleteMessageBatchResult>)null);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(
List<DeleteMessageBatchRequestEntry> entries,ResultCapture<DeleteMessageBatchResult> extractor) {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest()
.withEntries(entries);
return deleteMessages(request,extractor);
}
/**
* AckNowledges up to 10 messages via calling
* <code>deleteMessageBatch</code>.
*/
@Override
public void action(String queueUrl,List<String> receiptHandles) throws JMSException {
if (receiptHandles == null || receiptHandles.isEmpty()) {
return;
}
List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries = new ArrayList<DeleteMessageBatchRequestEntry>();
int batchId = 0;
for (String receiptHandle : receiptHandles) {
// Remove the message from queue of unAckMessages
unAckMessages.poll();
DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(
Integer.toString(batchId),receiptHandle);
deleteMessageBatchRequestEntries.add(entry);
batchId++;
}
DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(
queueUrl,deleteMessageBatchRequestEntries);
/**
* Todo: If one of the batch calls fail,then the remaining messages on
* the batch will not be deleted,and will be visible and delivered as
* duplicate after visibility timeout expires.
*/
amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest);
}
项目:s3mper
文件:AlertJanitor.java
private void delete(String queue,List<Message> messages) {
List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<DeleteMessageBatchRequestEntry>();
for(Message m : messages) {
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId(m.getMessageId()).withReceiptHandle(m.getReceiptHandle()));
}
log.info(format("Deleting %s messages",deleteRequests.size()));
DeleteMessageBatchRequest batchDelete = new DeleteMessageBatchRequest();
batchDelete.setQueueUrl(queue);
batchDelete.setEntries(deleteRequests);
sqs.deleteMessageBatch(batchDelete);
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testBulkSendDelete_shouldWork() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send batch
SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one")
.withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}");
SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two")
.withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}");
SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three")
.withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}");
// verify send batch result
SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl())
.withEntries(ImmutableList.of(firstRequest,secondRequest,thirdRequest)));
assertNotNull("verify that batch send returned ok",sendResult);
assertTrue("no request Failed",sendResult.getFailed().isEmpty());
assertEquals("verify successfull message count",3,sendResult.getSuccessful().size());
SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get();
assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody());
assertNotNull("verify message id exists",firstResultEntry.getMessageId());
ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxnumberOfMessages(4));
// delete batch
List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>();
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests));
// verify delete batch result
assertNotNull("verify that batch delete returned ok",deleteBatchResult);
assertTrue("no request Failed",deleteBatchResult.getFailed().isEmpty());
assertEquals("verify successfull message count",deleteBatchResult.getSuccessful().size());
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
for(Message message : receivedMessagesResult.getMessages()) {
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty());
}
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:aws-codecommit-trigger-plugin
文件:RequestFactoryImpl.java
private DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(final Message message) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setReceiptHandle(message.getReceiptHandle());
entry.setId(message.getMessageId());
return entry;
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<DeleteMessageBatchResult> deleteMessageBatchAsync(String queueUrl,List<DeleteMessageBatchRequestEntry> entries) {
return Observable.from(sqsClient.deleteMessageBatchAsync(queueUrl,entries));
}
private DeleteMessageBatchResult delete(List<DeleteMessageBatchRequestEntry> entries) {
return client.deleteMessageBatch(queueUrl,entries);
}
项目:sqs-retryqueue
文件:SQSRetryQueue.java
protected void doReceive() {
// This is where the interesting stuff happens
while (isListening()) {
synchronized (this.monitor) {
try {
this.monitor.wait(this.getVisibilityTimeout() * 1000);
} catch (InterruptedException e) {
}
}
boolean messagesReceived = false;
do {
ReceiveMessageRequest request = new ReceiveMessageRequest().withQueueUrl(this.queueUrl)
.withWaitTimeSeconds(1).withMaxnumberOfMessages(10);
ReceiveMessageResult result = sqs.receiveMessage(request);
List<Message> messages = result.getMessages();
messagesReceived = messages.size() > 0;
if (!messagesReceived) {
break;
}
List<DeleteMessageBatchRequestEntry> deletes = new ArrayList<DeleteMessageBatchRequestEntry>();
for (Message message : messages) {
String messageBody = message.getBody();
try {
this.messageConsumer.accept(messageBody);
DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(
UUID.randomUUID().toString(),message.getReceiptHandle());
deletes.add(entry);
} catch (Throwable exp) {
Logger.getLogger(getSqsQueueName()).log(Level.WARNING,"Could not process message: " + messageBody,exp);
}
}
if (!deletes.isEmpty()) {
DeleteMessageBatchRequest deleteBatch = new DeleteMessageBatchRequest(this.queueUrl,deletes);
sqs.deleteMessageBatch(deleteBatch);
}
} while (messagesReceived);
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response. Also deletes the message
* payloads from Amazon S3 when necessary.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,you should check for batch errors
* even when the call returns an HTTP status code of 200.
* </p>
* <p>
* <b>NOTE:</b>Some API actions take lists of parameters. These lists are
* specified using the param.n notation. Values of n are integers starting
* from 1. For example,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param deleteMessageBatchRequest
* Container for the necessary parameters to execute the
* DeleteMessageBatch service method on AmazonSQS.
*
* @return The response from the DeleteMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotdistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) {
if (deleteMessageBatchRequest == null) {
String errorMessage = "deleteMessageBatchRequest cannot be null.";
LOG.error(errorMessage);
throw new AmazonClientException(errorMessage);
}
deleteMessageBatchRequest.getRequestClientOptions().appendUserAgent(
SQSExtendedClientConstants.USER_AGENT_HEADER);
if (!clientConfiguration.isLargePayloadSupportEnabled()) {
return super.deleteMessageBatch(deleteMessageBatchRequest);
}
for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.getEntries()) {
String receiptHandle = entry.getReceiptHandle();
String origReceiptHandle = receiptHandle;
if (isS3ReceiptHandle(receiptHandle)) {
deleteMessagePayloadFromS3(receiptHandle);
origReceiptHandle = getorigReceiptHandle(receiptHandle);
}
entry.setReceiptHandle(origReceiptHandle);
}
return super.deleteMessageBatch(deleteMessageBatchRequest);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of receipt handles for the messages to be deleted.
*
* @return The response from the DeleteMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotdistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageBatchResult deleteMessageBatch(String queueUrl,List<DeleteMessageBatchRequestEntry> entries)
throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.deleteMessageBatch(queueUrl,entries);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response. Also deletes the message
* payloads from Amazon S3 when necessary.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of receipt handles for the messages to be deleted.
*
* @return The response from the DeleteMessageBatch service method,List<DeleteMessageBatchRequestEntry> entries) {
DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueUrl,entries);
return deleteMessageBatch(deleteMessageBatchRequest);
}
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest)
*/
DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry>
entries);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest,ResultCapture)
*/
DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry>
entries,ResultCapture<DeleteMessageBatchResult> extractor);