项目:unitstack
文件:MockSqsTest.java
@Test
public void testPurgeQueue_shouldRemoveAll() {
// create queue
CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
// send messages
String messageBody = "{\"life-universe-everything\":42}";
sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody)
.withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl()));
String messageBody2 = "{\"dead-emptyness-nothing\":24}";
sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody2)
.withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl()));
// purge queues
PurgeQueueResult result = sqs.purgeQueue(new PurgeQueueRequest().withQueueUrl(createdQueue.getQueueUrl()));
assertNotNull("verify that purge queue returned ok",result);
// verify empty queue
ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest()
.withMaxNumberOfMessages(9).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10)
.withWaitTimeSeconds(0));
assertEquals("verify that queue is empty",messageResult.getMessages().size());
// cleanup
getQueues().remove("tea-earl-grey-queue");
}
项目:RekognitionS3Batch
文件:Scanner.java
private boolean processObjects(List<S3ObjectSummary> objects) {
Logger.Debug("Scanning next batch of %s ",objects.size());
objects
.parallelStream()
.filter(this::shouldEnqueue)
.forEach(object -> {
numSeen.incrementAndGet();
String path = object.getBucketName() + "/" + object.getKey();
Logger.Info("Posting: %s",path);
SendMessageRequest msg = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(path);
sqs.sendMessage(msg);
});
if (max > -1L && numSeen.incrementAndGet() > max) {
Logger.Info("Added max jobs,quitting");
return false;
}
return true;
}
项目:flume-ng-aws-sqs-sink
文件:BasicSQSMsgSender.java
@Override
public int send(Channel channel) throws EventDeliveryException {
int eventProcessedCounter = 0;
Event event = channel.take();
if (event == null || event.getBody().length == 0) {
// Don't bother with anything if the channel returns null event or an event with empty body
return eventProcessedCounter;
}
try {
this.amazonSQS.sendMessage(new SendMessageRequest(sqsUrl,new String(event.getBody(),"UTF-8").trim()));
// This event is processed successfully to increment the counter
eventProcessedCounter++;
}
catch (AmazonServiceException ase) {
throw new EventDeliveryException("Failure sending message request to Amazon SQS," +
"the request made it to SQS but was rejected for some reason.",ase);
}
catch (AmazonClientException ace) {
throw new EventDeliveryException("Failure sending message request to Amazon SQS.",ace);
}
catch (UnsupportedEncodingException e) {
throw new EventDeliveryException("Character set UTF-8 not supported.",e);
}
return eventProcessedCounter;
}
项目:flume-ng-aws-sqs-sink
文件:BasicSQSMsgSenderTest.java
@Test
public void testSend() throws Exception {
BasicSQSMsgSender msgSender =
new BasicSQSMsgSender("https://some-fake/url","us-east-1","someAwsAccessKey","someAwsSecretKey");
Channel mockChannel = mock(Channel.class);
Event mockEvent = mock(Event.class);
when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes());
when(mockChannel.take()).thenReturn(mockEvent);
AmazonSQS mockSqs = mock(AmazonSQS.class);
when(mockSqs.sendMessage(any(SendMessageRequest.class))).thenReturn(new SendMessageResult());
msgSender.setAmazonSQS(mockSqs);
int eventCount = msgSender.send(mockChannel);
assertEquals(1,eventCount);
}
项目:flume-ng-aws-sqs-sink
文件:BasicSQSMsgSenderTest.java
@Test
public void testSendEventWithEmptyBody() throws Exception {
BasicSQSMsgSender msgSender =
new BasicSQSMsgSender("https://some-fake/url","someAwsSecretKey");
Channel mockChannel = mock(Channel.class);
Event mockEvent = mock(Event.class);
when(mockEvent.getBody()).thenReturn("".getBytes());
when(mockChannel.take()).thenReturn(mockEvent);
AmazonSQS mockSqs = mock(AmazonSQS.class);
when(mockSqs.sendMessage(any(SendMessageRequest.class))).thenReturn(new SendMessageResult());
msgSender.setAmazonSQS(mockSqs);
int eventCount = msgSender.send(mockChannel);
assertEquals(0,eventCount);
}
项目:flume-ng-aws-sqs-sink
文件:BasicSQSMsgSenderTest.java
@Test(expected = EventDeliveryException.class)
public void testSendFailureAmazonServiceException() throws Exception {
BasicSQSMsgSender msgSender =
new BasicSQSMsgSender("https://some-fake/url","someAwsSecretKey");
Channel mockChannel = mock(Channel.class);
Event mockEvent = mock(Event.class);
when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes());
when(mockChannel.take()).thenReturn(mockEvent);
AmazonSQS mockSqs = mock(AmazonSQS.class);
when(mockSqs.sendMessage(any(SendMessageRequest.class)))
.thenThrow(new AmazonServiceException("Mock AmazonServiceException"));
msgSender.setAmazonSQS(mockSqs);
msgSender.send(mockChannel);
}
项目:flume-ng-aws-sqs-sink
文件:BasicSQSMsgSenderTest.java
@Test(expected = EventDeliveryException.class)
public void testSendFailureAmazonClientException() throws Exception {
BasicSQSMsgSender msgSender =
new BasicSQSMsgSender("https://some-fake/url","someAwsSecretKey");
Channel mockChannel = mock(Channel.class);
Event mockEvent = mock(Event.class);
when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes());
when(mockChannel.take()).thenReturn(mockEvent);
AmazonSQS mockSqs = mock(AmazonSQS.class);
when(mockSqs.sendMessage(any(SendMessageRequest.class)))
.thenThrow(new AmazonClientException("Mock AmazonClientException"));
msgSender.setAmazonSQS(mockSqs);
msgSender.send(mockChannel);
}
项目:paradox-nakadi-consumer
文件:SQSErrorHandlerTest.java
@Test
public void testShouldGetFailedResponseAfterSendingTheEvent() {
final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult();
getQueueUrlResult.setQueueUrl(randomAlphabetic(10));
final SendMessageResult sendMessageResult = new SendMessageResult();
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(400);
sendMessageResult.setSdkHttpMetadata(responseMetadata);
when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenThrow(new RuntimeException("expected"));
assertThatThrownBy(() ->
sqsErrorHandler.onError(randomAlphabetic(10),new RuntimeException(),EventTypePartition.of(EventType.of(randomAlphabetic(10)),randomAlphabetic(1)),randomNumeric(10),randomAlphabetic(50)))
.isInstanceOf(RuntimeException.class).hasMessageContaining("expected");
}
项目:paradox-nakadi-consumer
文件:SQSErrorHandlerTest.java
@Test
public void testShouldSendEventToSQS() throws JsonProcessingException {
final SendMessageResult sendMessageResult = new SendMessageResult();
final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class);
when(responseMetadata.getHttpStatusCode()).thenReturn(200);
sendMessageResult.setSdkHttpMetadata(responseMetadata);
when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult);
sqsErrorHandler.onError(randomAlphabetic(10),randomAlphabetic(50));
verify(objectMapper).writeValueAsString(anyString());
verify(amazonSQS).sendMessage(any(SendMessageRequest.class));
}
项目:Camel
文件:SqsProducer.java
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
SendMessageRequest request = new SendMessageRequest(getQueueUrl(),body);
request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(),exchange));
addDelay(request,exchange);
LOG.trace("Sending request [{}] from exchange [{}]...",request,exchange);
SendMessageResult result = getClient().sendMessage(request);
LOG.trace("Received result [{}]",result);
Message message = getMessageForResponse(exchange);
message.setHeader(SqsConstants.MESSAGE_ID,result.getMessageId());
message.setHeader(SqsConstants.MD5_OF_BODY,result.getMD5OfMessageBody());
}
项目:Camel
文件:SqsProducerTest.java
@Before
public void setup() throws Exception {
underTest = new SqsProducer(sqsEndpoint);
sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID);
sqsConfiguration = new SqsConfiguration();
HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy();
sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult);
when(exchange.getOut()).thenReturn(outMessage);
when(exchange.getIn()).thenReturn(inMessage);
when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
}
项目:Camel
文件:SqsProducerTest.java
@Test
public void isAttributeMessageStringHeaderOnTheRequest() throws Exception {
Map<String,Object> headers = new HashMap<String,Object>();
headers.put(SAMPLE_MESSAGE_HEADER_NAME_1,SAMPLE_MESSAGE_HEADER_VALUE_1);
when(inMessage.getHeaders()).thenReturn(headers);
underTest.process(exchange);
ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSQSClient).sendMessage(capture.capture());
assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
.getStringValue());
assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
.getBinaryValue());
}
项目:Camel
文件:SqsProducerTest.java
@Test
public void isAttributeMessageByteBufferHeaderOnTheRequest() throws Exception {
Map<String,Object>();
headers.put(SAMPLE_MESSAGE_HEADER_NAME_2,SAMPLE_MESSAGE_HEADER_VALUE_2);
when(inMessage.getHeaders()).thenReturn(headers);
underTest.process(exchange);
ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSQSClient).sendMessage(capture.capture());
assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
.getBinaryValue());
assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
.getStringValue());
}
项目:Camel
文件:SqsProducerTest.java
@Test
public void isAllAttributeMessagesOnTheRequest() throws Exception {
Map<String,SAMPLE_MESSAGE_HEADER_VALUE_1);
headers.put(SAMPLE_MESSAGE_HEADER_NAME_2,SAMPLE_MESSAGE_HEADER_VALUE_2);
headers.put(SAMPLE_MESSAGE_HEADER_NAME_3,SAMPLE_MESSAGE_HEADER_VALUE_3);
headers.put(SAMPLE_MESSAGE_HEADER_NAME_4,SAMPLE_MESSAGE_HEADER_VALUE_4);
when(inMessage.getHeaders()).thenReturn(headers);
underTest.process(exchange);
ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSQSClient).sendMessage(capture.capture());
assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
.getStringValue());
assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
.getBinaryValue());
assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_3,capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_3)
.getStringValue());
assertEquals(3,capture.getValue().getMessageAttributes().size());
}
项目:Camel
文件:AmazonSQSClientMock.java
@Override
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) throws AmazonServiceException,AmazonClientException {
Message message = new Message();
message.setBody(sendMessageRequest.getMessageBody());
message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5zC9+4QMqJZ0DJ3gVOmjI2Gh/oFnb0IeJqy5Zc8kH4JX7GVpfjcEDjaAPSeOkXQZRcaBqt"
+ "4lOtyfj0kcclVV/zS7aenhfhX5Ixfgz/rHhsJwtCPPvTAdgQFGYrqaHly+etJiawiNPVc=");
synchronized (messages) {
messages.add(message);
}
SendMessageResult result = new SendMessageResult();
result.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
result.setMD5OfMessageBody("6a1559560f67c5e7a7d5d838bf0272ee");
return result;
}
项目:dropwizard-sqs-bundle
文件:SqsSenderTest.java
@Test
public void shouldSendMessageWithCorrectAttributes() {
//GIVEN
String body = "Sample text message";
Map<String,MessageAttributeValue> attributes = new HashMap<>();
attributes.put("attribute1",new MessageAttributeValue()
.withDataType("String")
.withStringValue("value1"));
attributes.put("attribute2",new MessageAttributeValue()
.withDataType("Number")
.withStringValue("230.000000000000000001"));
//WHEN
sender.send(body,attributes);
//THEN
SendMessageRequest expected = new SendMessageRequest();
expected.withQueueUrl(queueUrl)
.withMessageBody(body)
.withMessageAttributes(attributes);
verify(sqs).sendMessage(expected);
}
项目:dropwizard-sqs-bundle
文件:SqsSenderTest.java
@Test
public void shouldSendObjectMessageWithCorrectAttributes() throws JsonProcessingException {
//GIVEN
DummyObject bodyObject = new DummyObject();
Map<String,new MessageAttributeValue()
.withDataType("Number")
.withStringValue("230.000000000000000001"));
//WHEN
sender.send(bodyObject,attributes);
//THEN
SendMessageRequest expected = new SendMessageRequest();
expected.withQueueUrl(queueUrl)
.withMessageBody(objectMapper.writeValueAsString(bodyObject))
.withMessageAttributes(attributes);
verify(sqs).sendMessage(expected);
}
项目:logback-sqs
文件:SqsAppender.java
@Override
public void write(byte[] bytes) throws IOException {
if (bytes == null || bytes.length == 0) {
return;
}
final String msg = new String(bytes);
if (bytes.length > maxMessageSizeInKB * 1024) {
addWarn(format("Logging event '%s' exceeds the maximum size of %dkB",msg,maxMessageSizeInKB));
return;
}
sqs.sendMessageAsync(new SendMessageRequest(queueUrl,msg),new AsyncHandler<SendMessageRequest,SendMessageResult>() {
public void onError(Exception exception) {
addWarn(format("Appender '%s' failed to send logging event '%s' to '%s'",getName(),queueUrl),exception);
}
public void onSuccess(SendMessageRequest request,SendMessageResult result) {
/** noop **/
}
});
}
项目:amazon-sqs-connector
文件:AmazonSQSOutputInteraction.java
@Override
public Properties send(Properties properties,Object message)
throws ConnectorException {
String access_key_id = properties.getProperty("AccessKeyId");
String secret_access_key = properties.getProperty("SecretAccessKey");
BasicAWSCredentials credentials = new BasicAWSCredentials(access_key_id,secret_access_key);
AmazonSQS sqs = new AmazonSQSClient(credentials);
//System.out.println(properties.getProperty("region"));
// Region selection
Region region = Region.getRegion(Regions.fromName(properties.getProperty("region")));
sqs.setRegion(region);
GetQueueUrlResult queueUrl = sqs.getQueueUrl(properties.getProperty("Queue"));
String messageStr = new String((byte[])message);
sqs.sendMessage(new SendMessageRequest(queueUrl.getQueueUrl(),messageStr));
return properties;
}
项目:aws-java-sdk-stubs
文件:AmazonSQSStubTest.java
@Test
public void sendAndReceiveMessage() {
final String queueName = "bizo";
final String messageBody = "hi everybody";
final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
sqs.createQueue(createQueueRequest);
final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName);
final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest);
final String queueUrl = getQueueUrlResult.getQueueUrl();
final SendMessageRequest sendMessageRequest =
new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody);
sqs.sendMessage(sendMessageRequest);
final int maxNumberOfMessages = 10;
final ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages);
final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest);
final List<Message> messages = receiveMessageResult.getMessages();
assertThat(messages.size(),equalTo(1));
assertThat(messages.get(0).getBody(),equalTo(messageBody));
}
项目:amazon-cloudengine
文件:WorkerThread.java
@SuppressWarnings("unchecked")
@Override
public void run() {
//Get queue url
GetQueueUrlResult urlResult = sqs.getQueueUrl(responseQName);
String QueueUrl = urlResult.getQueueUrl();
JSONObject result = new JSONObject();
try {
Thread.sleep(sleepLength);
result.put("task_id",task_id);
result.put("result","0");
sqs.sendMessage(new SendMessageRequest(QueueUrl,result.toString()));
//System.out.println(Thread.currentThread().getName()+" sleep done!");
} catch (Exception e) {
result.put("task_id","1");
sqs.sendMessage(new SendMessageRequest(QueueUrl,result.toString()));
}
}
项目:spring-cloud-aws
文件:QueueMessageChannel.java
private SendMessageRequest prepareSendMessageRequest(Message<?> message) {
SendMessageRequest sendMessageRequest = new SendMessageRequest(this.queueUrl,String.valueOf(message.getPayload()));
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)) {
sendMessageRequest.setMessageGroupId(message.getHeaders().get(SqsMessageHeaders.SQS_GROUP_ID_HEADER,String.class));
}
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)) {
sendMessageRequest.setMessageDeduplicationId(message.getHeaders().get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER,String.class));
}
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)) {
sendMessageRequest.setDelaySeconds(message.getHeaders().get(SqsMessageHeaders.SQS_DELAY_HEADER,Integer.class));
}
Map<String,MessageAttributeValue> messageAttributes = getMessageAttributes(message);
if (!messageAttributes.isEmpty()) {
sendMessageRequest.withMessageAttributes(messageAttributes);
}
return sendMessageRequest;
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_validTextMessage_returnsTrue() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
Message<String> stringMessage = MessageBuilder.withPayload("message content").build();
MessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
boolean sent = messageChannel.send(stringMessage);
// Assert
verify(amazonSqs,only()).sendMessage(any(SendMessageRequest.class));
assertEquals("message content",sendMessageRequestArgumentCaptor.getValue().getMessageBody());
assertTrue(sent);
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_serviceThrowsError_throwsMessagingException() throws Exception {
//Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
Message<String> stringMessage = MessageBuilder.withPayload("message content").build();
MessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
when(amazonSqs.sendMessage(new SendMessageRequest("http://testQueue","message content").withDelaySeconds(0)
.withMessageAttributes(isNotNull()))).
thenThrow(new AmazonServiceException("wanted error"));
//Assert
this.expectedException.expect(MessagingException.class);
this.expectedException.expectMessage("wanted error");
//Act
messageChannel.send(stringMessage);
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withMimeTypeAsStringHeader_shouldPassItAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
String mimeTypeAsString = new MimeType("test","plain",Charset.forName("UTF-8")).toString();
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE,mimeTypeAsString).build();
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertTrue(sent);
assertEquals(mimeTypeAsString,sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue());
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withMimeTypeHeader_shouldPassItAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
MimeType mimeType = new MimeType("test",Charset.forName("UTF-8"));
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE,mimeType).build();
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertTrue(sent);
assertEquals(mimeType.toString(),sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue());
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withStringMessageHeader_shouldBeSentAsQueueMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
String headerValue = "Header value";
String headerName = "MyHeader";
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName,headerValue).build();
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertTrue(sent);
assertEquals(headerValue,sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getStringValue());
assertEquals(MessageAttributeDataTypes.STRING,sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType());
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withBinaryMessageHeader_shouldBeSentAsBinaryMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
String headerName = "MyHeader";
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName,sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getBinaryValue());
assertEquals(MessageAttributeDataTypes.BINARY,sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType());
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withUuidAsId_shouldConvertUuidToString() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").build();
UUID uuid = (UUID) message.getHeaders().get(MessageHeaders.ID);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertTrue(sent);
assertEquals(uuid.toString(),sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.ID).getStringValue());
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
@SuppressWarnings("unchecked")
public void sendMessage_withTimeout_sendsMessageAsyncAndReturnsTrueOnceFutureCompleted() throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000,TimeUnit.MILLISECONDS)).thenReturn(new SendMessageResult());
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
boolean result = queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(),1000);
// Assert
assertTrue(result);
verify(amazonSqs,only()).sendMessageAsync(any(SendMessageRequest.class));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
@SuppressWarnings("unchecked")
public void sendMessage_withSendMessageAsyncTakingMoreTimeThanSpecifiedTimeout_returnsFalse() throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000,TimeUnit.MILLISECONDS)).thenThrow(new TimeoutException());
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,1000);
// Assert
assertFalse(result);
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
@SuppressWarnings("unchecked")
public void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException() throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000,TimeUnit.MILLISECONDS)).thenThrow(new ExecutionException(new Exception()));
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Assert
this.expectedException.expect(MessageDeliveryException.class);
// Act
queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(),1000);
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withDelayHeader_shouldSetDelayOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_DELAY_HEADER,15).build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue();
assertEquals(new Integer(15),sendMessageRequest.getDelaySeconds());
assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withoutDelayHeader_shouldNotSetDelayOnSendMessageRequestAndNotSetHeaderAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue();
assertNull(sendMessageRequest.getDelaySeconds());
assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withGroupIdHeader_shouldSetGroupIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_GROUP_ID_HEADER,"id-5").build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue();
assertEquals("id-5",sendMessageRequest.getMessageGroupId());
assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void sendMessage_withDeduplicationIdHeader_shouldSetDeduplicationIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER,sendMessageRequest.getMessageDeduplicationId());
assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER));
}
项目:spring-cloud-aws
文件:QueueMessagingTemplateTest.java
@Test
public void instantiation_WithCustomJacksonConverterThatSupportsJava8Types_shouldConvertMessageToString() throws IOException {
// Arrange
AmazonSQSAsync amazonSqs = createAmazonSqs();
ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build();
MappingJackson2MessageConverter simpleMessageConverter = new MappingJackson2MessageConverter();
simpleMessageConverter.setSerializedPayloadClass(String.class);
simpleMessageConverter.setObjectMapper(objectMapper);
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs,(ResourceIdResolver) null,simpleMessageConverter);
// Act
queueMessagingTemplate.convertAndSend("test",new TestPerson("Agim","Emruli",LocalDate.of(2017,1,1)));
// Assert
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture());
TestPerson testPerson = objectMapper.readValue(sendMessageRequestArgumentCaptor.getValue().getMessageBody(),TestPerson.class);
assertEquals("Agim",testPerson.getFirstName());
assertEquals("Emruli",testPerson.getLastName());
assertEquals(LocalDate.of(2017,1),testPerson.getActiveSince());
}
项目:spring-cloud-aws
文件:QueueMessagingTemplateTest.java
@Test
public void instantiation_withDefaultMapping2JacksonConverter_shouldSupportJava8Types() throws IOException {
// Arrange
AmazonSQSAsync amazonSqs = createAmazonSqs();
ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build();
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
// Act
queueMessagingTemplate.convertAndSend("test",testPerson.getActiveSince());
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerFifoTest.java
/**
* Test sendInternal input with SQSTextMessage
*/
@Test
public void testSendInternalSQSTextMessage() throws JMSException {
String messageBody = "MyText1";
SQSTextMessage msg = spy(new SQSTextMessage(messageBody));
msg.setStringProperty(SQSMessagingClientConstants.JMSX_GROUP_ID,GROUP_ID);
msg.setStringProperty(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID,DEDUP_ID);
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER));
producer.sendInternal(destination,msg);
verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL,messageBody,SQSMessage.TEXT_MESSAGE_TYPE,GROUP_ID,DEDUP_ID)));
verify(msg).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID);
verify(msg).setSQSMessageId(MESSAGE_ID);
verify(msg).setSequenceNumber(SEQ_NUMBER);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerFifoTest.java
/**
* Test sendInternal input with SQSObjectMessage
*/
@Test
public void testSendInternalSQSObjectMessage() throws JMSException {
HashSet<String> set = new HashSet<String>();
set.add("data1");
SQSObjectMessage msg = spy(new SQSObjectMessage(set));
msg.setStringProperty(SQSMessagingClientConstants.JMSX_GROUP_ID,DEDUP_ID);
String msgBody = msg.getMessageBody();
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER));
producer.sendInternal(destination,msgBody,SQSMessage.OBJECT_MESSAGE_TYPE,DEDUP_ID)));
verify(msg).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID);
verify(msg).setSQSMessageId(MESSAGE_ID);
verify(msg).setSequenceNumber(SEQ_NUMBER);
}