项目:conductor
文件:SQSObservableQueue.java
private List<String> delete(List<Message> messages) {
if (messages == null || messages.isEmpty()) {
return null;
}
DeleteMessageBatchRequest batch = new DeleteMessageBatchRequest().withQueueUrl(queueURL);
List<DeleteMessageBatchRequestEntry> entries = batch.getEntries();
messages.stream().forEach(m -> entries.add(new DeleteMessageBatchRequestEntry().withId(m.getId()).withReceiptHandle(m.getReceipt())));
DeleteMessageBatchResult result = client.deleteMessageBatch(batch);
List<String> failures = result.getFailed().stream().map(fm -> fm.getId()).collect(Collectors.toList());
logger.debug("failed to delete: {}",failures);
return failures;
}
项目:awslocal
文件:DirectorySQS.java
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt delete on each
for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) {
try {
queue.delete(batchRequestEntry.getReceiptHandle());
batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(true).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest
request,ResultCapture<DeleteMessageBatchResult> extractor) {
ActionResult result = resource.performAction("DeleteMessages",request,extractor);
if (result == null) return null;
return (DeleteMessageBatchResult) result.getData();
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(
ResultCapture<DeleteMessageBatchResult> extractor) {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
return deleteMessages(request,extractor);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(
List<DeleteMessageBatchRequestEntry> entries) {
return deleteMessages(entries,(ResultCapture<DeleteMessageBatchResult>)null);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(
List<DeleteMessageBatchRequestEntry> entries,ResultCapture<DeleteMessageBatchResult> extractor) {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest()
.withEntries(entries);
return deleteMessages(request,extractor);
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testBulkSendDelete_shouldWork() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send batch
SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one")
.withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}");
SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two")
.withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}");
SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three")
.withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}");
// verify send batch result
SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl())
.withEntries(ImmutableList.of(firstRequest,secondRequest,thirdRequest)));
assertNotNull("verify that batch send returned ok",sendResult);
assertTrue("no request failed",sendResult.getFailed().isEmpty());
assertEquals("verify successfull message count",3,sendResult.getSuccessful().size());
SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get();
assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody());
assertNotNull("verify message id exists",firstResultEntry.getMessageId());
ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4));
// delete batch
List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>();
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests));
// verify delete batch result
assertNotNull("verify that batch delete returned ok",deleteBatchResult);
assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty());
assertEquals("verify successfull message count",deleteBatchResult.getSuccessful().size());
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
for(Message message : receivedMessagesResult.getMessages()) {
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty());
}
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request) {
return Observable.from(sqsClient.deleteMessageBatchAsync(request));
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<DeleteMessageBatchResult> deleteMessageBatchAsync(String queueUrl,List<DeleteMessageBatchRequestEntry> entries) {
return Observable.from(sqsClient.deleteMessageBatchAsync(queueUrl,entries));
}
项目:zipkin-aws
文件:SQSSpanProcessor.java
private DeleteMessageBatchResult delete(List<DeleteMessageBatchRequestEntry> entries) {
return client.deleteMessageBatch(queueUrl,entries);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest
request) {
return deleteMessages(request,null);
}
项目:aws-sdk-java-resources
文件:QueueImpl.java
@Override
public DeleteMessageBatchResult deleteMessages() {
return deleteMessages((ResultCapture<DeleteMessageBatchResult>)null);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response. Also deletes the message
* payloads from Amazon S3 when necessary.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,you should check for batch errors
* even when the call returns an HTTP status code of 200.
* </p>
* <p>
* <b>NOTE:</b>Some API actions take lists of parameters. These lists are
* specified using the param.n notation. Values of n are integers starting
* from 1. For example,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param deleteMessageBatchRequest
* Container for the necessary parameters to execute the
* DeleteMessageBatch service method on AmazonSQS.
*
* @return The response from the DeleteMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotDistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) {
if (deleteMessageBatchRequest == null) {
String errorMessage = "deleteMessageBatchRequest cannot be null.";
LOG.error(errorMessage);
throw new AmazonClientException(errorMessage);
}
deleteMessageBatchRequest.getRequestClientOptions().appendUserAgent(
SQSExtendedClientConstants.USER_AGENT_HEADER);
if (!clientConfiguration.isLargePayloadSupportEnabled()) {
return super.deleteMessageBatch(deleteMessageBatchRequest);
}
for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.getEntries()) {
String receiptHandle = entry.getReceiptHandle();
String origReceiptHandle = receiptHandle;
if (isS3ReceiptHandle(receiptHandle)) {
deleteMessagePayloadFromS3(receiptHandle);
origReceiptHandle = getOrigReceiptHandle(receiptHandle);
}
entry.setReceiptHandle(origReceiptHandle);
}
return super.deleteMessageBatch(deleteMessageBatchRequest);
}
项目:amazon-sqs-java-messaging-lib
文件:AmazonSQSMessagingClientWrapper.java
/**
* Calls <code>deleteMessageBatch</code> and wraps
* <code>AmazonClientException</code>. This is used to acknowledge multiple
* messages on client_acknowledge mode,so that they can be deleted from SQS
* queue.
*
* @param deleteMessageBatchRequest
* Container for the necessary parameters to execute the
* deleteMessageBatch service method on AmazonSQS. This is the
* batch version of deleteMessage. Max batch size is 10.
* @return The response from the deleteMessageBatch service method,as
* returned by AmazonSQS
* @throws JMSException
*/
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
try {
prepareRequest(deleteMessageBatchRequest);
return amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest);
} catch (AmazonClientException e) {
throw handleException(e,"deleteMessageBatch");
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param deleteMessageBatchRequest
* Container for the necessary parameters to execute the
* DeleteMessageBatch service method on AmazonSQS.
*
* @return The response from the DeleteMessageBatch service method,as
* returned by AmazonSQS.
*
* @throws BatchEntryIdsNotDistinctException
* @throws TooManyEntriesInBatchRequestException
* @throws InvalidBatchEntryIdException
* @throws EmptyBatchRequestException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest)
throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.deleteMessageBatch(deleteMessageBatchRequest);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of receipt handles for the messages to be deleted.
*
* @return The response from the DeleteMessageBatch service method,or a server
* side issue.
*/
public DeleteMessageBatchResult deleteMessageBatch(String queueUrl,List<DeleteMessageBatchRequestEntry> entries)
throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.deleteMessageBatch(queueUrl,entries);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes up to ten messages from the specified queue. This is a batch
* version of DeleteMessage. The result of the delete action on each message
* is reported individually in the response. Also deletes the message
* payloads from Amazon S3 when necessary.
* </p>
* <p>
* <b>IMPORTANT:</b> Because the batch request can result in a combination
* of successful and unsuccessful actions,a parameter list with two elements looks like this:
* </p>
* <p>
* <code>&Attribute.1=this</code>
* </p>
* <p>
* <code>&Attribute.2=that</code>
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param entries
* A list of receipt handles for the messages to be deleted.
*
* @return The response from the DeleteMessageBatch service method,List<DeleteMessageBatchRequestEntry> entries) {
DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueUrl,entries);
return deleteMessageBatch(deleteMessageBatchRequest);
}
项目:aws-sdk-java-resources
文件:Queue.java
/**
* Performs the <code>DeleteMessages</code> action.
*
* <p>
* The following request parameters will be populated from the data of this
* <code>Queue</code> resource,and any conflicting parameter value set in
* the request will be overridden:
* <ul>
* <li>
* <b><code>QueueUrl</code></b>
* - mapped from the <code>Url</code> identifier.
* </li>
* </ul>
*
* <p>
*
* @return The response of the low-level client operation associated with
* this resource action.
* @see DeleteMessageBatchRequest
*/
DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* Performs the <code>DeleteMessages</code> action and use a ResultCapture
* to retrieve the low-level client response.
*
* <p>
* The following request parameters will be populated from the data of this
* <code>Queue</code> resource,and any conflicting parameter value set in
* the request will be overridden:
* <ul>
* <li>
* <b><code>QueueUrl</code></b>
* - mapped from the <code>Url</code> identifier.
* </li>
* </ul>
*
* <p>
*
* @return The response of the low-level client operation associated with
* this resource action.
* @see DeleteMessageBatchRequest
*/
DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request,ResultCapture<DeleteMessageBatchResult> extractor);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest)
*/
DeleteMessageBatchResult deleteMessages();
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest,ResultCapture)
*/
DeleteMessageBatchResult deleteMessages(
ResultCapture<DeleteMessageBatchResult> extractor);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest)
*/
DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry>
entries);
项目:aws-sdk-java-resources
文件:Queue.java
/**
* The convenient method form for the <code>DeleteMessages</code> action.
*
* @see #deleteMessages(DeleteMessageBatchRequest,ResultCapture)
*/
DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry>
entries,ResultCapture<DeleteMessageBatchResult> extractor);