项目:sqs-utils
文件:VisibilityTimeoutExtenderTest.java
@Test
public void testRun() {
// given
// when
uut.run();
// then
ArgumentCaptor<ChangeMessageVisibilityRequest> captor = ArgumentCaptor.forClass(
ChangeMessageVisibilityRequest.class);
verify(sqsClient).changeMessageVisibility(captor.capture());
ChangeMessageVisibilityRequest request = captor.getValue();
assertEquals("rhd",request.getReceiptHandle());
assertEquals("queue",request.getQueueUrl());
assertEquals(600,request.getVisibilityTimeout().intValue());
}
项目:soundwave
文件:SqsTriggeredJobExecutor.java
public void changeMessageVisibility(Message msg,int value) {
logger.info("Change visibility to {} seconds",value);
if (value > 36000) {
value = 36000;
}
ChangeMessageVisibilityRequest
request =
new ChangeMessageVisibilityRequest()
.withQueueUrl(this.queueUrl)
.withReceiptHandle(msg.getReceiptHandle()).withVisibilityTimeout(value);
this.getClient().changeMessageVisibility(request);
}
项目:sqs-utils
文件:VisibilityTimeoutExtender.java
VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient,@NonNull Duration newVisibilityTimeout,@NonNull Message<?> message,@NonNull String queueUrl) {
this.sqsClient = sqsClient;
request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(
message.getHeaders().get("ReceiptHandle",String.class)).withVisibilityTimeout(
timeoutInSeconds(newVisibilityTimeout));
}
项目:Camel
文件:SqsExtendMessageVisibilityTest.java
@Test
public void longReceiveExtendsMessageVisibility() throws Exception {
this.mock.expectedMessageCount(1);
this.mock.whenAnyExchangeReceived(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Simulate message that takes a while to receive.
Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT.
}
});
Message message = new Message();
message.setBody("Message 1");
message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
message.setReceiptHandle(RECEIPT_HANDLE);
this.clientMock.messages.add(message);
assertMockEndpointsSatisfied(); // Wait for message to arrive.
assertTrue("Expected at least one changeMessageVisibility request.",this.clientMock.changeMessageVisibilityRequests.size() >= 1);
for (ChangeMessageVisibilityRequest req : this.clientMock.changeMessageVisibilityRequests) {
assertEquals("https://queue.amazonaws.com/541925086079/MyQueue",req.getQueueUrl());
assertEquals(RECEIPT_HANDLE,req.getReceiptHandle());
Integer expectedTimeout = new Integer(6); // Should be 1.5 x TIMEOUT as takes into account the delay period
assertEquals(expectedTimeout,req.getVisibilityTimeout());
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* Simplified method form for invoking the ChangeMessageVisibility
* operation.
*
* @see #changeMessageVisibility(ChangeMessageVisibilityRequest)
*/
public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl,String receiptHandle,Integer visibilityTimeout) {
ChangeMessageVisibilityRequest changeMessageVisibilityRequest =
new ChangeMessageVisibilityRequest(queueUrl,receiptHandle,visibilityTimeout);
return changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:aws-sdk-java-resources
文件:MessageImpl.java
@Override
public void changeVisibility(Integer visibilityTimeout,ResultCapture<Void>
extractor) {
ChangeMessageVisibilityRequest request = new
ChangeMessageVisibilityRequest()
.withVisibilityTimeout(visibilityTimeout);
changeVisibility(request,extractor);
}
项目:spring-cloud-aws
文件:QueueMessageVisibility.java
@Override
public Future<?> extend(int seconds) {
return this.amazonSqsAsync.changeMessageVisibilityAsync(new ChangeMessageVisibilityRequest()
.withQueueUrl(this.queueUrl)
.withReceiptHandle(this.receiptHandle)
.withVisibilityTimeout(seconds));
}
项目:amazon-sqs-java-messaging-lib
文件:AmazonSQSMessagingClientWrapperTest.java
@Test
public void testChangeMessageVisibility() throws JMSException {
ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest();
wrapper.changeMessageVisibility(changeMessageVisibilityRequest);
verify(amazonSQSClient).changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:amazon-sqs-java-messaging-lib
文件:AmazonSQSMessagingClientWrapperTest.java
@Test(expected = JMSException.class)
public void testChangeMessageVisibilityThrowAmazonClientException() throws JMSException {
ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest();
doThrow(new AmazonClientException("ace"))
.when(amazonSQSClient).changeMessageVisibility(eq(changeMessageVisibilityRequest));
wrapper.changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:amazon-sqs-java-messaging-lib
文件:AmazonSQSMessagingClientWrapperTest.java
@Test(expected = JMSException.class)
public void testChangeMessageVisibilityThrowAmazonServiceException() throws JMSException {
ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest();
doThrow(new AmazonServiceException("ase"))
.when(amazonSQSClient).changeMessageVisibility(eq(changeMessageVisibilityRequest));
wrapper.changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:awslocal
文件:DirectorySQS.java
@Override
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonClientException {
try {
DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityRequest.getQueueUrl(),false);
queue.changeVisibility(changeMessageVisibilityRequest.getReceiptHandle(),changeMessageVisibilityRequest.getVisibilityTimeout());
return new ChangeMessageVisibilityResult();
} catch (IOException e) {
throw new AmazonServiceException("error",e);
}
}
项目:awslocal
文件:TestSQSClient.java
public void willReceiveMessageAfterTimeout()
throws InterruptedException {
final String queueUrl = someNewQueue();
final String messageBody = someMessageBody();
final SendMessageResult sendResult = _amazonSQS.sendMessage(new SendMessageRequest(queueUrl,messageBody));
Assert.assertNotNull(sendResult.getMD5OfMessageBody());
verifyReceiveEmail(sendResult.getMessageId(),queueUrl,1);
sleep(1);
verifyReceiveEmail(sendResult.getMessageId(),2);
sleep(1);
verifyReceiveNone(queueUrl);
sleep(1);
final String receiptHandle = verifyReceiveEmail(sendResult.getMessageId(),1);
_amazonSQS.changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrl,3));
sleep(2);
verifyReceiveNone(queueUrl);
sleep(1);
verifyReceiveEmail(sendResult.getMessageId(),null);
}
项目:queue-slayer
文件:AmazonSQSPrioritizedMessageProvider.java
@Override
public void setVisibilityTimeout(Message message,Integer visibilityTimeoutSeconds) {
if (message instanceof OriginatingMessage) {
OriginatingMessage originatingMessage = (OriginatingMessage) message;
sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest(originatingMessage.getOriginatingQueueUrl(),message.getReceipt(),visibilityTimeoutSeconds));
} else {
throw new RuntimeException("Unsupported message type: " + message.getBody());
}
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testSendChangeVisibilityReceiveDeleteMessage_shouldSendChangeVisibilityReceiveAndDeleteMessage() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send message
String messageBody = "{\"life-universe-everything\":42}";
SendMessageResult sendResult = sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody)
.withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl()));
assertNotNull("message sending returned ok",sendResult);
assertNotNull("verify body MD5 exists",sendResult.getMD5OfMessageBody());
assertNotNull("verify message id exists",sendResult.getMessageId());
// receive message
ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest()
.withMaxNumberOfMessages(3).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10)
.withWaitTimeSeconds(0));
assertNotNull("verify received message returned ok",messageResult);
assertEquals("verify correct receive count",1,messageResult.getMessages().size());
Message firstMessage = messageResult.getMessages().get(0);
assertEquals("verify correct body returned",messageBody,firstMessage.getBody());
assertEquals("verify correct message MD5",getAwsMessageMD5(messageBody),firstMessage.getMD5OfBody());
assertNotNull("verify message id exists",firstMessage.getMessageId());
assertNotNull("verify receipt handle exists",firstMessage.getReceiptHandle());
// extend visibility timeout
ChangeMessageVisibilityResult visibilityResult = sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest()
.withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()).withVisibilityTimeout(40));
assertNotNull("changing visibility returned ok",visibilityResult);
// verify if message is invisible
ReceiveMessageResult emptyResult = sqs.receiveMessage(new ReceiveMessageRequest()
.withMaxNumberOfMessages(1).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(20)
.withWaitTimeSeconds(0));
assertTrue("at visibility timeout the message should not be available.",emptyResult.getMessages().isEmpty());
// delete message from queue
DeleteMessageResult deleteResult = sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()));
assertNotNull("verify deletion returned ok",deleteResult);
assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(firstMessage.getReceiptHandle()).isEmpty());
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:unitstack
文件:MockSqsTest.java
@Test
public void testChangeMessageVisibility_withEmptyRequestParams_shouldWork() {
assertNotNull(sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest()));
}
项目:conductor
文件:SQSObservableQueue.java
@Override
public void setUnackTimeout(Message message,long unackTimeout) {
int unackTimeoutInSeconds = (int) (unackTimeout / 1000);
ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(queueURL,unackTimeoutInSeconds);
client.changeMessageVisibility(request);
}
项目:reactive-sqs-client
文件:ReactiveSqsClient.java
public Observable<ChangeMessageVisibilityResult> changeMessageVisibilityAsync(ChangeMessageVisibilityRequest request) {
return Observable.from(sqsClient.changeMessageVisibilityAsync(request));
}
项目:Camel
文件:AmazonSQSClientMock.java
@Override
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonServiceException,AmazonClientException {
this.changeMessageVisibilityRequests.add(changeMessageVisibilityRequest);
return new ChangeMessageVisibilityResult();
}
项目:aws-sdk-java-resources
文件:MessageImpl.java
@Override
public void changeVisibility(ChangeMessageVisibilityRequest request) {
changeVisibility(request,null);
}
项目:aws-sdk-java-resources
文件:MessageImpl.java
@Override
public void changeVisibility(ChangeMessageVisibilityRequest request,ResultCapture<Void> extractor) {
resource.performAction("ChangeVisibility",request,extractor);
}
项目:spring-cloud-aws
文件:SimpleMessageListenerContainerTest.java
@Test
public void receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility() throws Exception {
// Arrange
CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class);
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testListener",TestMessageListenerWithVisibilityProlong.class);
mockGetQueueUrl(sqs,"testQueue","http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,"http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
mockReceiveMessage(sqs,"http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com","messageContent","ReceiptHandle");
// Act
container.start();
// Assert
countDownLatch.await(1L,TimeUnit.SECONDS);
verify(sqs,never()).changeMessageVisibilityAsync(any(ChangeMessageVisibilityRequest.class));
TestMessageListenerWithVisibilityProlong testMessageListenerWithVisibilityProlong = applicationContext.getBean(TestMessageListenerWithVisibilityProlong.class);
testMessageListenerWithVisibilityProlong.getCountDownLatch().await(1L,TimeUnit.SECONDS);
testMessageListenerWithVisibilityProlong.extend(5);
verify(sqs,times(1)).changeMessageVisibilityAsync(eq(new ChangeMessageVisibilityRequest("http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com","ReceiptHandle",5)));
container.stop();
}
项目:queue-slayer
文件:AmazonSQSMessageProvider.java
@Override
public void setVisibilityTimeout(Message message,Integer visibilityTimeoutSeconds) {
sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrl,visibilityTimeoutSeconds));
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
/**
* <p>
* Changes the visibility timeout of a specified message in a queue to a new
* value. The maximum allowed timeout value you can set the value to is 12
* hours. This means you can't extend the timeout of a message in an
* existing queue to more than a total visibility timeout of 12 hours. (For
* more information visibility timeout,see <a href=
* "http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html"
* > Visibility Timeout </a> in the <i>Amazon SQS Developer Guide</i> .)
* </p>
* <p>
* For example,let's say you have a message and its default message
* visibility timeout is 30 minutes. You could call
* <code>ChangeMessageVisiblity</code> with a value of two hours and the
* effective timeout would be two hours and 30 minutes. When that time comes
* near you could again extend the time out by calling
* ChangeMessageVisiblity,but this time the maximum allowed timeout would
* be 9 hours and 30 minutes.
* </p>
* <p>
* <b>NOTE:</b> There is a 120,000 limit for the number of inflight messages
* per queue. Messages are inflight after they have been received from the
* queue by a consuming component,but have not yet been deleted from the
* queue. If you reach the 120,000 limit,you will receive an OverLimit
* error message from Amazon SQS. To help avoid reaching the limit,you
* should delete the messages from the queue after they have been processed.
* You can also increase the number of queues you use to process the
* messages.
* </p>
* <p>
* <b>IMPORTANT:</b>If you attempt to set the VisibilityTimeout to an amount
* more than the maximum time left,Amazon SQS returns an error. It will not
* automatically recalculate and increase the timeout to the maximum time
* remaining.
* </p>
* <p>
* <b>IMPORTANT:</b>Unlike with a queue,when you change the visibility
* timeout for a specific message,that timeout value is applied immediately
* but is not saved in memory for that message. If you don't delete a
* message after it is received,the visibility timeout for the message the
* next time it is received reverts to the original timeout value,not the
* value you set with the ChangeMessageVisibility action.
* </p>
*
* @param changeMessageVisibilityRequest
* Container for the necessary parameters to execute the
* ChangeMessageVisibility service method on AmazonSQS.
*
*
* @throws ReceiptHandleIsInvalidException
* @throws MessageNotInflightException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,or a server
* side issue.
*/
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest)
throws AmazonServiceException,AmazonClientException {
if (isS3ReceiptHandle(changeMessageVisibilityRequest.getReceiptHandle())) {
changeMessageVisibilityRequest.setReceiptHandle(
getOrigReceiptHandle(changeMessageVisibilityRequest.getReceiptHandle()));
}
return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:amazon-sqs-java-messaging-lib
文件:AmazonSQSMessagingClientWrapper.java
/**
* Calls <code>changeMessageVisibility</code> and wraps <code>AmazonClientException</code>. This is
* used to for negative acknowledge of a single message,so that messages can be received again without any delay.
*
* @param changeMessageVisibilityRequest
* Container for the necessary parameters to execute the
* changeMessageVisibility service method on AmazonSQS.
* @throws JMSException
*/
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
try {
prepareRequest(changeMessageVisibilityRequest);
amazonSQSClient.changeMessageVisibility(changeMessageVisibilityRequest);
} catch (AmazonClientException e) {
throw handleException(e,"changeMessageVisibility");
}
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientBase.java
/**
* <p>
* Changes the visibility timeout of a specified message in a queue to a new
* value. The maximum allowed timeout value you can set the value to is 12
* hours. This means you can't extend the timeout of a message in an
* existing queue to more than a total visibility timeout of 12 hours. (For
* more information visibility timeout,not the
* value you set with the ChangeMessageVisibility action.
* </p>
*
* @param changeMessageVisibilityRequest
* Container for the necessary parameters to execute the
* ChangeMessageVisibility service method on AmazonSQS.
*
*
* @throws ReceiptHandleIsInvalidException
* @throws MessageNotInflightException
*
* @throws AmazonClientException
* If any internal errors are encountered inside the client
* while attempting to make the request or handle the response.
* For example if a network connection is not available.
* @throws AmazonServiceException
* If an error response is returned by AmazonSQS indicating
* either a problem with the data in the request,AmazonClientException {
return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequest);
}
项目:aws-sdk-java-resources
文件:Message.java
/**
* Performs the <code>ChangeVisibility</code> action.
*
* <p>
* The following request parameters will be populated from the data of this
* <code>Message</code> resource,and any conflicting parameter value set in
* the request will be overridden:
* <ul>
* <li>
* <b><code>QueueUrl</code></b>
* - mapped from the <code>QueueUrl</code> identifier.
* </li>
* <li>
* <b><code>ReceiptHandle</code></b>
* - mapped from the <code>ReceiptHandle</code> identifier.
* </li>
* </ul>
*
* <p>
*
* @see ChangeMessageVisibilityRequest
*/
void changeVisibility(ChangeMessageVisibilityRequest request);
项目:aws-sdk-java-resources
文件:Message.java
/**
* Performs the <code>ChangeVisibility</code> action and use a ResultCapture
* to retrieve the low-level client response.
*
* <p>
* The following request parameters will be populated from the data of this
* <code>Message</code> resource,and any conflicting parameter value set in
* the request will be overridden:
* <ul>
* <li>
* <b><code>QueueUrl</code></b>
* - mapped from the <code>QueueUrl</code> identifier.
* </li>
* <li>
* <b><code>ReceiptHandle</code></b>
* - mapped from the <code>ReceiptHandle</code> identifier.
* </li>
* </ul>
*
* <p>
*
* @see ChangeMessageVisibilityRequest
*/
void changeVisibility(ChangeMessageVisibilityRequest request,ResultCapture<Void> extractor);