项目:rxjava2-aws
文件:SqsMessageTest.java
@Test
public void shutdownOfSqsAndS3FactoryCreatedClientsOccursWhenS3DeleteObjectFails() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class);
String s3Id = "123";
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,new byte[] {},1000,Optional.of(s3Id),new SqsMessage.Service(Optional.of(() -> s3),() -> sqs,Optional.of(s3),sqs,QUEUE,Optional.of(BUCKET)));
Mockito.when(sqs.deleteMessage(QUEUE,RECEIPT_HANDLE))
.thenReturn(new DeleteMessageResult());
Mockito.doThrow(new RuntimeException()).when(s3).deleteObject(BUCKET,s3Id);
try {
m.deleteMessage(Client.FROM_FACTORY);
Assert.fail();
} catch (RuntimeException e) {
// do nothing
}
InOrder inorder = Mockito.inOrder(sqs,s3);
inorder.verify(s3,Mockito.times(1)).deleteObject(BUCKET,s3Id);
inorder.verify(s3,Mockito.times(1)).shutdown();
inorder.verify(sqs,Mockito.times(1)).shutdown();
Mockito.verifyNoMoreInteractions(sqs,s3);
}
项目:rxjava-aws
文件:SqsMessageTest.java
@Test
public void deleteMessageFromFactoryWhenS3FactoryExists() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class);
String s3Id = "123";
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,RECEIPT_HANDLE))
.thenReturn(new DeleteMessageResult());
m.deleteMessage(Client.FROM_FACTORY);
InOrder inorder = Mockito.inOrder(sqs,s3Id);
inorder.verify(sqs,Mockito.times(1)).deleteMessage(QUEUE,RECEIPT_HANDLE);
inorder.verify(s3,s3);
}
项目:rxjava-aws
文件:SqsMessageTest.java
@Test
public void shutdownOfSqsAndS3FactoryCreatedClientsOccursWhenS3DeleteObjectFails() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class);
String s3Id = "123";
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,s3);
}
项目:rxjava2-aws
文件:SqsMessageTest.java
@Test
public void testClientFromFactory() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,Optional.empty(),new SqsMessage.Service(Optional.empty(),Optional.empty()));
Mockito.when(sqs.deleteMessage(QUEUE,RECEIPT_HANDLE))
.thenReturn(new DeleteMessageResult());
m.deleteMessage(Client.FROM_FACTORY);
InOrder inorder = Mockito.inOrder(sqs);
inorder.verify(sqs,RECEIPT_HANDLE);
inorder.verify(sqs,Mockito.times(1)).shutdown();
Mockito.verifyNoMoreInteractions(sqs);
}
项目:rxjava2-aws
文件:SqsMessageTest.java
@Test
public void testClientFromSourceFailsThenFailsOverToFromFactory() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,RECEIPT_HANDLE)).thenThrow(new RuntimeException())
.thenReturn(new DeleteMessageResult());
m.deleteMessage();
InOrder inorder = Mockito.inOrder(sqs);
inorder.verify(sqs,Mockito.times(2)).deleteMessage(QUEUE,Mockito.times(1)).shutdown();
Mockito.verifyNoMoreInteractions(sqs);
}
项目:rxjava2-aws
文件:SqsMessageTest.java
@Test
public void deleteMessageFromFactoryWhenS3FactoryExists() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class);
String s3Id = "123";
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,s3);
}
项目:paradox-nakadi-consumer
文件:SQSFailedEventSourceTest.java
@Test
public void testShouldCommitTheMessageSuccessfully() {
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(200);
final DeleteMessageResult deleteMessageResult = new DeleteMessageResult();
deleteMessageResult.setSdkHttpMetadata(responseMetadata);
when(amazonSQS.deleteMessage(anyString(),anyString())).thenReturn(deleteMessageResult);
sqsFailedEventSource.commit(new SQSFailedEvent(new FailedEvent()));
verify(amazonSQS).deleteMessage(anyString(),anyString());
}
项目:rxjava-aws
文件:SqsMessageTest.java
@Test
public void testClientFromFactory() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,Mockito.times(1)).shutdown();
Mockito.verifyNoMoreInteractions(sqs);
}
项目:rxjava-aws
文件:SqsMessageTest.java
@Test
public void testClientFromSourceFailsThenFailsOverToFromFactory() {
AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class);
SqsMessage m = new SqsMessage(RECEIPT_HANDLE,Mockito.times(1)).shutdown();
Mockito.verifyNoMoreInteractions(sqs);
}
项目:Camel
文件:AmazonSQSClientMock.java
@Override
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException {
String receiptHandle = deleteMessageRequest.getReceiptHandle();
if (inFlight.containsKey(receiptHandle)) {
ScheduledFuture inFlightTask = inFlight.get(receiptHandle);
inFlightTask.cancel(true);
}
return new DeleteMessageResult();
}
项目:awslocal
文件:DirectorySQS.java
@Override
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException {
try {
DirectorySQSQueue queue = getQueueFromUrl(deleteMessageRequest.getQueueUrl(),false);
queue.delete(deleteMessageRequest.getReceiptHandle());
return new DeleteMessageResult();
} catch (IOException e) {
throw new AmazonServiceException("error deleting message",e);
}
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testSendChangeVisibilityReceiveDeleteMessage_shouldSendChangeVisibilityReceiveAndDeleteMessage() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send message
String messageBody = "{\"life-universe-everything\":42}";
SendMessageResult sendResult = sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody)
.withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl()));
assertNotNull("message sending returned ok",sendResult);
assertNotNull("verify body MD5 exists",sendResult.getMD5OfMessageBody());
assertNotNull("verify message id exists",sendResult.getMessageId());
// receive message
ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest()
.withMaxNumberOfMessages(3).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10)
.withWaitTimeSeconds(0));
assertNotNull("verify received message returned ok",messageResult);
assertEquals("verify correct receive count",1,messageResult.getMessages().size());
Message firstMessage = messageResult.getMessages().get(0);
assertEquals("verify correct body returned",messageBody,firstMessage.getBody());
assertEquals("verify correct message MD5",getAwsMessageMD5(messageBody),firstMessage.getMD5OfBody());
assertNotNull("verify message id exists",firstMessage.getMessageId());
assertNotNull("verify receipt handle exists",firstMessage.getReceiptHandle());
// extend visibility timeout
ChangeMessageVisibilityResult visibilityResult = sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest()
.withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()).withVisibilityTimeout(40));
assertNotNull("changing visibility returned ok",visibilityResult);
// verify if message is invisible
ReceiveMessageResult emptyResult = sqs.receiveMessage(new ReceiveMessageRequest()
.withMaxNumberOfMessages(1).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(20)
.withWaitTimeSeconds(0));
assertTrue("at visibility timeout the message should not be available.",emptyResult.getMessages().isEmpty());
// delete message from queue
DeleteMessageResult deleteResult = sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()));
assertNotNull("verify deletion returned ok",deleteResult);
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(firstMessage.getReceiptHandle()).isEmpty());
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request) {
return Observable.from(sqsClient.deleteMessageAsync(request));
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<DeleteMessageResult> deleteMessageAsync(String queueUrl,String receiptHandle) {
return Observable.from(sqsClient.deleteMessageAsync(queueUrl,receiptHandle));
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes the specified message from the specified queue and deletes the
* message payload from Amazon S3 when necessary. You specify the message by
* using the message's <code>receipt handle</code> and not the
* <code>message ID</code> you received when you sent the message. Even if
* the message is locked by another reader due to the visibility timeout
* setting,it is still deleted from the queue. If you leave a message in
* the queue for longer than the queue's configured retention period,Amazon
* SQS automatically deletes it.
* </p>
* <p>
* <b>NOTE:</b> The receipt handle is associated with a specific instance of
* receiving the message. If you receive a message more than once,the
* receipt handle you get each time you receive the message is different.
* When you request DeleteMessage,if you don't provide the most recently
* received receipt handle for the message,the request will still succeed,* but the message might not be deleted.
* </p>
* <p>
* <b>IMPORTANT:</b> It is possible you will receive a message even after
* you have deleted it. This might happen on rare occasions if one of the
* servers storing a copy of the message is unavailable when you request to
* delete the message. The copy remains on the server and might be returned
* to you again on a subsequent receive request. You should create your
* system to be idempotent so that receiving a particular message more than
* once is not a problem.
* </p>
*
* @param deleteMessageRequest
* Container for the necessary parameters to execute the
* DeleteMessage service method on AmazonSQS.
*
* @return The response from the DeleteMessage service method,as returned
* by AmazonSQS.
*
* @throws ReceiptHandleIsInvalidException
* @throws InvalidIdFormatException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) {
if (deleteMessageRequest == null) {
String errorMessage = "deleteMessageRequest cannot be null.";
LOG.error(errorMessage);
throw new AmazonClientException(errorMessage);
}
deleteMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER);
if (!clientConfiguration.isLargePayloadSupportEnabled()) {
return super.deleteMessage(deleteMessageRequest);
}
String receiptHandle = deleteMessageRequest.getReceiptHandle();
String origReceiptHandle = receiptHandle;
if (isS3ReceiptHandle(receiptHandle)) {
deleteMessagePayloadFromS3(receiptHandle);
origReceiptHandle = getOrigReceiptHandle(receiptHandle);
}
deleteMessageRequest.setReceiptHandle(origReceiptHandle);
return super.deleteMessage(deleteMessageRequest);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Deletes the specified message from the specified queue. You specify
* the message by using the message's <code>receipt handle</code> and not
* the <code>message ID</code> you received when you sent the message.
* Even if the message is locked by another reader due to the visibility
* timeout setting,it is still deleted from the queue. If you leave a
* message in the queue for longer than the queue's configured retention
* period,Amazon SQS automatically deletes it.
* </p>
* <p>
* <b>NOTE:</b> The receipt handle is associated with a specific
* instance of receiving the message. If you receive a message more than
* once,the receipt handle you get each time you receive the message is
* different. When you request DeleteMessage,if you don't provide the
* most recently received receipt handle for the message,the request
* will still succeed,but the message might not be deleted.
* </p>
* <p>
* <b>IMPORTANT:</b> It is possible you will receive a message even
* after you have deleted it. This might happen on rare occasions if one
* of the servers storing a copy of the message is unavailable when you
* request to delete the message. The copy remains on the server and
* might be returned to you again on a subsequent receive request. You
* should create your system to be idempotent so that receiving a
* particular message more than once is not a problem.
* </p>
*
* @param deleteMessageRequest Container for the necessary parameters to
* execute the DeleteMessage service method on AmazonSQS.
*
* @return The response from the DeleteMessage service method,as returned
* by AmazonSQS.
*
* @throws ReceiptHandleIsInvalidException
* @throws InvalidIdFormatException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client while
* attempting to make the request or handle the response. For example
* if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server side issue.
*/
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) {
return amazonSqsToBeExtended.deleteMessage(deleteMessageRequest);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Deletes the specified message from the specified queue. You specify the
* message by using the message's <code>receipt handle</code> and not the
* <code>message ID</code> you received when you sent the message. Even if
* the message is locked by another reader due to the visibility timeout
* setting,* but the message might not be deleted.
* </p>
* <p>
* <b>IMPORTANT:</b> It is possible you will receive a message even after
* you have deleted it. This might happen on rare occasions if one of the
* servers storing a copy of the message is unavailable when you request to
* delete the message. The copy remains on the server and might be returned
* to you again on a subsequent receive request. You should create your
* system to be idempotent so that receiving a particular message more than
* once is not a problem.
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param receiptHandle
* The receipt handle associated with the message to delete.
*
* @return The response from the DeleteMessage service method,as returned
* by AmazonSQS.
*
* @throws ReceiptHandleIsInvalidException
* @throws InvalidIdFormatException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public DeleteMessageResult deleteMessage(String queueUrl,String receiptHandle)
throws AmazonServiceException,AmazonClientException {
return amazonSqsToBeExtended.deleteMessage(queueUrl,receiptHandle);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Deletes the specified message from the specified queue and deletes the
* message payload from Amazon S3 when necessary. You specify the message by
* using the message's <code>receipt handle</code> and not the
* <code>message ID</code> you received when you sent the message. Even if
* the message is locked by another reader due to the visibility timeout
* setting,* but the message might not be deleted.
* </p>
* <p>
* <b>IMPORTANT:</b> It is possible you will receive a message even after
* you have deleted it. This might happen on rare occasions if one of the
* servers storing a copy of the message is unavailable when you request to
* delete the message. The copy remains on the server and might be returned
* to you again on a subsequent receive request. You should create your
* system to be idempotent so that receiving a particular message more than
* once is not a problem.
* </p>
*
* @param queueUrl
* The URL of the Amazon SQS queue to take action on.
* @param receiptHandle
* The receipt handle associated with the message to delete.
*
* @return The response from the DeleteMessage service method,String receiptHandle) {
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl,receiptHandle);
return deleteMessage(deleteMessageRequest);
}