项目:Camel
文件:SqsEndpoint.java
private Exchange createExchange(ExchangePattern pattern,com.amazonaws.services.sqs.model.Message msg) {
Exchange exchange = super.createExchange(pattern);
Message message = exchange.getIn();
message.setBody(msg.getBody());
message.setHeaders(new HashMap<String,Object>(msg.getAttributes()));
message.setHeader(SqsConstants.MESSAGE_ID,msg.getMessageId());
message.setHeader(SqsConstants.MD5_OF_BODY,msg.getMD5OfBody());
message.setHeader(SqsConstants.RECEIPT_HANDLE,msg.getReceiptHandle());
message.setHeader(SqsConstants.ATTRIBUTES,msg.getAttributes());
message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES,msg.getMessageAttributes());
//Need to apply the SqsHeaderFilterStrategy this time
HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
//add all sqs message attributes as camel message headers so that knowledge of
//the Sqs class MessageAttributeValue will not leak to the client
for (Entry<String,MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) {
String header = entry.getKey();
Object value = translateValue(entry.getValue());
if (!headerFilterStrategy.applyFilterToExternalHeaders(header,value,exchange)) {
message.setHeader(header,value);
}
}
return exchange;
}
项目:herd
文件:SqsDaoImpl.java
@Override
public SendMessageResult sendMessage(AwsParamsDto awsParamsDto,String queueName,String messageText,List<MessageHeader> messageHeaders)
{
Map<String,MessageAttributeValue> messageAttributes = null;
if (CollectionUtils.isNotEmpty(messageHeaders))
{
messageAttributes = new HashMap<>();
for (MessageHeader messageHeader : messageHeaders)
{
messageAttributes.put(messageHeader.getKey(),new MessageAttributeValue().withDataType("String").withStringValue(messageHeader.getValue()));
}
}
return sqsOperations.sendMessage(queueName,messageText,messageAttributes,awsClientFactory.getAmazonSQSClient(awsParamsDto));
}
项目:herd
文件:MockSqsOperationsImpl.java
@Override
public SendMessageResult sendMessage(String queueName,Map<String,MessageAttributeValue> messageAttributes,AmazonSQS amazonSQS)
{
// Throw a throttling exception for a specific queue name for testing purposes.
if (queueName.equals(MockAwsOperationsHelper.AMAZON_THROTTLING_EXCEPTION))
{
AmazonServiceException throttlingException = new AmazonServiceException("test throttling exception");
throttlingException.setErrorCode("ThrottlingException");
throw throttlingException;
}
// Throw an illegal state exception for a specific queue name for testing purposes.
if (queueName.equals(MOCK_SQS_QUEUE_NOT_FOUND_NAME))
{
throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.",queueName));
}
// Nothing else to do in the normal case since our unit tests aren't reading messages once they have been published.
return new SendMessageResult().withMessageId(AbstractDaoTest.MESSAGE_ID);
}
项目:dropwizard-sqs-bundle
文件:SqsSenderTest.java
@Test
public void shouldSendMessageWithCorrectAttributes() {
//GIVEN
String body = "Sample text message";
Map<String,MessageAttributeValue> attributes = new HashMap<>();
attributes.put("attribute1",new MessageAttributeValue()
.withDataType("String")
.withStringValue("value1"));
attributes.put("attribute2",new MessageAttributeValue()
.withDataType("Number")
.withStringValue("230.000000000000000001"));
//WHEN
sender.send(body,attributes);
//THEN
SendMessageRequest expected = new SendMessageRequest();
expected.withQueueUrl(queueUrl)
.withMessageBody(body)
.withMessageAttributes(attributes);
verify(sqs).sendMessage(expected);
}
项目:dropwizard-sqs-bundle
文件:SqsSenderTest.java
@Test
public void shouldSendObjectMessageWithCorrectAttributes() throws JsonProcessingException {
//GIVEN
DummyObject bodyObject = new DummyObject();
Map<String,new MessageAttributeValue()
.withDataType("Number")
.withStringValue("230.000000000000000001"));
//WHEN
sender.send(bodyObject,attributes);
//THEN
SendMessageRequest expected = new SendMessageRequest();
expected.withQueueUrl(queueUrl)
.withMessageBody(objectMapper.writeValueAsString(bodyObject))
.withMessageAttributes(attributes);
verify(sqs).sendMessage(expected);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
private int getMsgAttributesSize(Map<String,MessageAttributeValue> msgAttributes) {
int totalMsgAttributesSize = 0;
for (Entry<String,MessageAttributeValue> entry : msgAttributes.entrySet()) {
totalMsgAttributesSize += getStringSizeInBytes(entry.getKey());
MessageAttributeValue entryVal = entry.getValue();
if (entryVal.getDataType() != null) {
totalMsgAttributesSize += getStringSizeInBytes(entryVal.getDataType());
}
String stringVal = entryVal.getStringValue();
if (stringVal != null) {
totalMsgAttributesSize += getStringSizeInBytes(entryVal.getStringValue());
}
ByteBuffer binaryVal = entryVal.getBinaryValue();
if (binaryVal != null) {
totalMsgAttributesSize += binaryVal.array().length;
}
}
return totalMsgAttributesSize;
}
项目:micro-genie
文件:SqsProducer.java
/***
* Categorize the messages into batches per queue
* @param messages
* @return messageBatches - belonging to one or more queues
*/
private Map<String,List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) {
final Map<String,List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String,List<SendMessageBatchRequestEntry>>();
for(Message message : messages){
final Map<String,MessageAttributeValue> attributes = this.toMessageAttrs(message);
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry()
.withId(message.getId())
.withMessageAttributes(attributes)
.withMessageBody(message.getBody());
if(!messageBatches.containsKey(message.getQueue())){
messageBatches.put(message.getQueue(),new ArrayList<SendMessageBatchRequestEntry>());
}
messageBatches.get(message.getQueue()).add(entry);
}
return messageBatches;
}
项目:spring-cloud-aws
文件:QueueMessageChannel.java
private SendMessageRequest prepareSendMessageRequest(Message<?> message) {
SendMessageRequest sendMessageRequest = new SendMessageRequest(this.queueUrl,String.valueOf(message.getPayload()));
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)) {
sendMessageRequest.setMessageGroupId(message.getHeaders().get(SqsMessageHeaders.SQS_GROUP_ID_HEADER,String.class));
}
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)) {
sendMessageRequest.setMessageDeduplicationId(message.getHeaders().get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER,String.class));
}
if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)) {
sendMessageRequest.setDelaySeconds(message.getHeaders().get(SqsMessageHeaders.SQS_DELAY_HEADER,Integer.class));
}
Map<String,MessageAttributeValue> messageAttributes = getMessageAttributes(message);
if (!messageAttributes.isEmpty()) {
sendMessageRequest.withMessageAttributes(messageAttributes);
}
return sendMessageRequest;
}
项目:spring-cloud-aws
文件:QueueMessageUtils.java
private static Map<String,Object> getMessageAttributesAsMessageHeaders(com.amazonaws.services.sqs.model.Message message) {
Map<String,Object> messageHeaders = new HashMap<>();
for (Map.Entry<String,MessageAttributeValue> messageAttribute : message.getMessageAttributes().entrySet()) {
if (MessageHeaders.CONTENT_TYPE.equals(messageAttribute.getKey())) {
messageHeaders.put(MessageHeaders.CONTENT_TYPE,MimeType.valueOf(messageAttribute.getValue().getStringValue()));
} else if (MessageHeaders.ID.equals(messageAttribute.getKey())) {
messageHeaders.put(MessageHeaders.ID,UUID.fromString(messageAttribute.getValue().getStringValue()));
} else if (MessageAttributeDataTypes.STRING.equals(messageAttribute.getValue().getDataType())) {
messageHeaders.put(messageAttribute.getKey(),messageAttribute.getValue().getStringValue());
} else if (messageAttribute.getValue().getDataType().startsWith(MessageAttributeDataTypes.NUMBER)) {
Object numberValue = getNumberValue(messageAttribute.getValue());
if (numberValue != null) {
messageHeaders.put(messageAttribute.getKey(),numberValue);
}
} else if (MessageAttributeDataTypes.BINARY.equals(messageAttribute.getValue().getDataType())) {
messageHeaders.put(messageAttribute.getKey(),messageAttribute.getValue().getBinaryValue());
}
}
return messageHeaders;
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
MimeType mimeType = new MimeType("test","plain",Charset.forName("UTF-8"));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString())))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertEquals(mimeType,receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
String headerValue = "Header value";
String headerName = "MyHeader";
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(Collections.singletonMap(headerName,new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(headerValue)))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertEquals(headerValue,receivedMessage.getHeaders().get(headerName));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
this.expectedException.expect(IllegalArgumentException.class);
this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]");
HashMap<String,MessageAttributeValue> messageAttributes = new HashMap<>();
AtomicInteger atomicInteger = new AtomicInteger(17);
messageAttributes.put("atomicInteger",new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger)));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(messageAttributes)));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
messageChannel.receive();
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
this.expectedException.expect(MessagingException.class);
this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" +
" into a Number because target class was not found.");
HashMap<String,MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("classNotFound",new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12"));
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(messageAttributes)));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
messageChannel.receive();
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
String headerName = "MyHeader";
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(Collections.singletonMap(headerName,new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue)))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,receivedMessage.getHeaders().get(headerName));
}
项目:spring-cloud-aws
文件:QueueMessageChannelTest.java
@Test
public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
UUID uuid = UUID.randomUUID();
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
withWaitTimeSeconds(0).
withMaxNumberOfMessages(1).
withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
withMessageAttributeNames("All"))).
thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
withMessageAttributes(Collections.singletonMap(MessageHeaders.ID,new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString())))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID);
assertTrue(UUID.class.isInstance(idMessageHeader));
assertEquals(uuid,idMessageHeader);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducer.java
/**
* Adds the reply-to queue name and url attributes during send as part of the send message
* request,if necessary
*/
private void addReplyToQueueReservedAttributes(Map<String,SQSMessage message) throws JMSException {
Destination replyTo = message.getJMSReplyTo();
if (replyTo instanceof SQSQueueDestination) {
SQSQueueDestination replyToQueue = (SQSQueueDestination)replyTo;
/**
* This will override the existing attributes if exists. Everything that
* has prefix JMS_ is reserved for JMS Provider,but if the user sets that
* attribute,it will be overwritten.
*/
addStringAttribute(messageAttributes,SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME,replyToQueue.getQueueName());
addStringAttribute(messageAttributes,SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL,replyToQueue.getQueueUrl());
}
}
项目:Camel
文件:SqsEndpoint.java
private Object translateValue(MessageAttributeValue mav) {
Object result = null;
if (mav.getStringValue() != null) {
result = mav.getStringValue();
} else if (mav.getBinaryValue() != null) {
result = mav.getBinaryValue();
}
return result;
}
项目:herd
文件:SqsOperationsImpl.java
@Override
public SendMessageResult sendMessage(String queueName,AmazonSQS amazonSQS)
{
try
{
return amazonSQS.sendMessage(new SendMessageRequest().withQueueUrl(amazonSQS.getQueueUrl(queueName).getQueueUrl()).withMessageBody(messageText)
.withMessageAttributes(messageAttributes));
}
catch (QueueDoesNotExistException e)
{
throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.",queueName),e);
}
}
项目:dropwizard-sqs-bundle
文件:SqsSender.java
public void send(Object object,MessageAttributeValue> attributes) {
final String json;
try {
json = objectMapper.writeValueAsString(object);
send(json,attributes);
} catch (JsonProcessingException e) {
LOGGER.error("Could not send message to SQS,cause is " + e.getMessage(),e);
}
}
项目:dropwizard-sqs-bundle
文件:SqsSender.java
public void send(String body,MessageAttributeValue> attributes) {
SendMessageRequest sendMessageRequest = new SendMessageRequest();
sendMessageRequest.withQueueUrl(queueUrl);
sendMessageRequest.withMessageBody(body);
for (Map.Entry<String,MessageAttributeValue> entry : attributes.entrySet()) {
sendMessageRequest.addMessageAttributesEntry(entry.getKey(),entry.getValue());
}
sqs.sendMessage(sendMessageRequest);
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) {
checkMessageAttributes(batchEntry.getMessageAttributes());
String s3Key = UUID.randomUUID().toString();
// Read the content of the message from message body
String messageContentStr = batchEntry.getMessageBody();
Long messageContentSize = getStringSizeInBytes(messageContentStr);
// Add a new message attribute as a flag
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("Number");
messageAttributeValue.setStringValue(messageContentSize.toString());
batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME,messageAttributeValue);
// Store the message content in S3.
storeTextInS3(s3Key,messageContentStr,messageContentSize);
LOG.info("S3 object created,Bucket name: " + clientConfiguration.getS3BucketName() + ",Object key: " + s3Key
+ ".");
// Convert S3 pointer (bucket name,key,etc) to JSON string
MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(),s3Key);
String s3PointerStr = getJSONFromS3Pointer(s3Pointer);
// Storing S3 pointer in the message body.
batchEntry.setMessageBody(s3PointerStr);
return batchEntry;
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClient.java
private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageRequest) {
checkMessageAttributes(sendMessageRequest.getMessageAttributes());
String s3Key = UUID.randomUUID().toString();
// Read the content of the message from message body
String messageContentStr = sendMessageRequest.getMessageBody();
Long messageContentSize = getStringSizeInBytes(messageContentStr);
// Add a new message attribute as a flag
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("Number");
messageAttributeValue.setStringValue(messageContentSize.toString());
sendMessageRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME,messageContentSize);
LOG.info("S3 object created,s3Key);
String s3PointerStr = getJSONFromS3Pointer(s3Pointer);
// Storing S3 pointer in the message body.
sendMessageRequest.setMessageBody(s3PointerStr);
return sendMessageRequest;
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientTest.java
@Test
public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() {
int messageLength = LESS_THAN_SQS_SIZE_LIMIT;
String messageBody = generateStringWithLength(messageLength);
SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL,messageBody);
extendedSqsWithDefaultConfig.sendMessage(messageRequest);
ArgumentCaptor<SendMessageRequest> sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture());
Map<String,MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes();
Assert.assertTrue(attributes.isEmpty());
}
项目:amazon-sqs-java-extended-client-lib
文件:AmazonSQSExtendedClientTest.java
@Test
public void testWhenLargeMessgaeIsSentThenAttributeWithPayloadSizeIsAdded() {
int messageLength = MORE_THAN_SQS_SIZE_LIMIT;
String messageBody = generateStringWithLength(messageLength);
SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL,MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes();
Assert.assertEquals("Number",attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getDataType());
Assert.assertEquals(messageLength,(int)Integer.valueOf(attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getStringValue()));
}
项目:micro-genie
文件:SqsProducer.java
/**
* Convert Message headers from {@link Message} to SQS {@link MessageAttributeValue} Map entries
* @param message
* @return messageAttributes
*/
private Map<String,MessageAttributeValue> toMessageAttrs(Message message){
if(message!=null && message.getHeaders() != null && message.getHeaders().size()>0){
final Map<String,MessageAttributeValue> messageAttrs = Maps.newHashMap();
for(Entry<String,String> attr : message.getHeaders().entrySet()){
messageAttrs.put(attr.getKey(),new MessageAttributeValue().withStringValue(attr.getValue()));
}
return messageAttrs;
}
return null;
}
项目:aws-sdk-java-resources
文件:SQSResourcesIntegrationTest.java
/**
* Tests sending of message with message attributes. Asserts that the
* message received has the attributes. Also changes the visibility of the
* messages and tries to retrieve them. Performs delete action on the
* message to the delete it from the queue.
*/
@Test
@Ignore
public void testSendReceiveMessageAttributes() throws InterruptedException {
SendMessageResult sendMessageResult = queue
.sendMessage(new SendMessageRequest().withMessageBody(
TEST_MESSAGE_ATTRIBUTES).withMessageAttributes(
ImmutableMapParameter.of(
"testAttribute",new MessageAttributeValue().withDataType(
"String").withStringValue(
"testAttributeValue"))));
List<Message> messages = waitForMessagesFromQueue(new ReceiveMessageRequest()
.withMessageAttributeNames("testAttribute"));
assertNotNull(messages);
assertEquals(1,messages.size());
Message message = messages.get(0);
assertMessage(TEST_MESSAGE_ATTRIBUTES,sendMessageResult.getMessageId(),sendMessageResult.getMD5OfMessageBody(),message);
Map<String,MessageAttributeValue> messageAttributes = message
.getMessageAttributes();
assertNotNull(messageAttributes);
assertTrue(messageAttributes.containsKey("testAttribute"));
assertEquals(messageAttributes.get("testAttribute").getStringValue(),"testAttributeValue");
message.changeVisibility(10);
messages = waitForMessagesFromQueue(null);
message.delete();
}
项目:spring-cloud-aws
文件:QueueMessageChannel.java
private Map<String,MessageAttributeValue> getMessageAttributes(Message<?> message) {
HashMap<String,MessageAttributeValue> messageAttributes = new HashMap<>();
for (Map.Entry<String,Object> messageHeader : message.getHeaders().entrySet()) {
String messageHeaderName = messageHeader.getKey();
Object messageHeaderValue = messageHeader.getValue();
if (isSkipHeader(messageHeaderName)) {
continue;
}
if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName,getContentTypeMessageAttribute(messageHeaderValue));
} else if (MessageHeaders.ID.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName,getStringMessageAttribute(messageHeaderValue.toString()));
} else if (messageHeaderValue instanceof String) {
messageAttributes.put(messageHeaderName,getStringMessageAttribute((String) messageHeaderValue));
} else if (messageHeaderValue instanceof Number) {
messageAttributes.put(messageHeaderName,getNumberMessageAttribute(messageHeaderValue));
} else if (messageHeaderValue instanceof ByteBuffer) {
messageAttributes.put(messageHeaderName,getBinaryMessageAttribute((ByteBuffer) messageHeaderValue));
} else {
this.logger.warn(String.format("Message header with name '%s' and type '%s' cannot be sent as" +
" message attribute because it is not supported by SQS.",messageHeaderName,messageHeaderValue != null ? messageHeaderValue.getClass().getName() : ""));
}
}
return messageAttributes;
}
项目:spring-cloud-aws
文件:QueueMessageChannel.java
private MessageAttributeValue getContentTypeMessageAttribute(Object messageHeaderValue) {
if (messageHeaderValue instanceof MimeType) {
return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(messageHeaderValue.toString());
} else if (messageHeaderValue instanceof String) {
return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue((String) messageHeaderValue);
}
return null;
}
项目:spring-cloud-aws
文件:QueueMessageUtils.java
private static Object getNumberValue(MessageAttributeValue value) {
String numberType = value.getDataType().substring(MessageAttributeDataTypes.NUMBER.length() + 1);
try {
Class<? extends Number> numberTypeClass = Class.forName(numberType).asSubclass(Number.class);
return NumberUtils.parseNumber(value.getStringValue(),numberTypeClass);
} catch (ClassNotFoundException e) {
throw new MessagingException(String.format("Message attribute with value '%s' and data type '%s' could not be converted " +
"into a Number because target class was not found.",value.getStringValue(),value.getDataType()),e);
}
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducer.java
/**
* Not verified on the client side,but SQS Attribute names must be valid
* letter or digit on the basic multilingual plane in addition to allowing
* '_','-' and '.'. No component of an attribute name may be empty,thus an
* attribute name may neither start nor end in '.'. And it may not contain
* "..".
*/
Map<String,MessageAttributeValue> propertyToMessageAttribute(SQSMessage message)
throws JMSException {
Map<String,MessageAttributeValue> messageAttributes = new HashMap<String,MessageAttributeValue>();
Enumeration<String> propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = propertyNames.nextElement();
// This is generated from SQS message attribute "ApproximateReceiveCount"
if (propertyName.equals(SQSMessagingClientConstants.JMSX_DELIVERY_COUNT)) {
continue;
}
// This property will be used as DeduplicationId argument of SendMessage call
// On receive it is mapped back to this JMS property
if (propertyName.equals(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID)) {
continue;
}
// the JMSXGroupID and JMSXGroupSeq are always stored as message
// properties,so they are not lost between send and receive
// even though SQS Classic does not respect those values when returning messages
// and SQS FIFO has a different understanding of message groups
JMSMessagePropertyValue propertyObject = message.getJMSMessagePropertyValue(propertyName);
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType(propertyObject.getType());
messageAttributeValue.setStringValue(propertyObject.getStringMessageAttributeValue());
messageAttributes.put(propertyName,messageAttributeValue);
}
return messageAttributes;
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducer.java
/**
* Convenience method for adding a single string attribute.
*/
private void addStringAttribute(Map<String,String key,String value) {
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
messageAttributeValue.setStringValue(value);
messageAttributes.put(key,messageAttributeValue);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test propertyToMessageAttribute with empty messages of different type
*/
@Test
public void testPropertyToMessageAttributeWithEmpty() throws JMSException {
/*
* Test Empty text message default attribute
*/
SQSMessage sqsText = new SQSTextMessage();
Map<String,MessageAttributeValue> messageAttributeText = producer.propertyToMessageAttribute(sqsText);
assertEquals(0,messageAttributeText.size());
/*
* Test Empty object message default attribute
*/
SQSMessage sqsObject = new SQSObjectMessage();
Map<String,MessageAttributeValue> messageAttributeObject = producer.propertyToMessageAttribute(sqsObject);
assertEquals(0,messageAttributeObject.size());
/*
* Test Empty byte message default attribute
*/
MessageAttributeValue messageAttributeValueByte = new MessageAttributeValue();
messageAttributeValueByte.setDataType("String");
messageAttributeValueByte.setStringValue("byte");
SQSMessage sqsByte = new SQSBytesMessage();
Map<String,MessageAttributeValue> messageAttributeByte = producer.propertyToMessageAttribute(sqsByte);
assertEquals(0,messageAttributeObject.size());
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test sendInternal input with SQSTextMessage
*/
@Test
public void testSendInternalSQSTextMessage() throws JMSException {
String messageBody1 = "MyText1";
String messageBody2 = "MyText2";
SQSTextMessage msg = spy(new SQSTextMessage(messageBody1));
Map<String,MessageAttributeValue> messageAttributes = createMessageAttribute("text");
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));
producer.sendInternal(destination,msg);
/*
* Re send the message
*/
msg.setText(messageBody2);
producer.sendInternal(destination,msg);
List<String> messagesBody = Arrays.asList(messageBody1,messageBody2);
verify(amazonSQSClient,times(2)).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL,messagesBody,messageAttributes)));
verify(msg,times(2)).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
verify(msg).setSQSMessageId(MESSAGE_ID_1);
verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test sendInternal input with SQSTextMessage
*/
@Test
public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException {
/*
* Set up non JMS sqs message
*/
Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String,MessageAttributeValue>();
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setStringValue(SQSMessage.TEXT_MESSAGE_TYPE);
messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE,messageAttributeValue);
Map<String,String> mapAttributes = new HashMap<String,String>();
mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT,"1");
com.amazonaws.services.sqs.model.Message message =
new com.amazonaws.services.sqs.model.Message()
.withMessageAttributes(mapMessageAttributes)
.withAttributes(mapAttributes)
.withBody("MessageBody");
SQSTextMessage msg = spy(new SQSTextMessage(acknowledger,QUEUE_URL,message));
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));
producer.sendInternal(destination,msg);
List<String> messagesBody = Arrays.asList("MessageBody");
verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL,mapMessageAttributes)));
verify(msg).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
verify(msg).setSQSMessageId(MESSAGE_ID_1);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test sendInternal input with SQSObjectMessage
*/
@Test
public void testSendInternalSQSObjectMessage() throws JMSException {
HashSet<String> set1 = new HashSet<String>();
set1.add("data1");
HashSet<String> set2 = new HashSet<String>();
set2.add("data2");
SQSObjectMessage msg = spy(new SQSObjectMessage(set1));
String megBody1 = msg.getMessageBody();
Map<String,MessageAttributeValue> messageAttributes = createMessageAttribute("object");
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));
producer.sendInternal(destination,msg);
/*
* Re send the message
*/
msg.clearBody();
msg.setObject(set2);
String megBody2 = msg.getMessageBody();
producer.sendInternal(destination,msg);
ArgumentCaptor<SendMessageRequest> argumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSQSClient,times(2)).sendMessage(argumentCaptor.capture());
assertEquals(megBody1,argumentCaptor.getAllValues().get(0).getMessageBody());
assertEquals(megBody2,argumentCaptor.getAllValues().get(1).getMessageBody());
verify(msg,times(2)).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
verify(msg).setSQSMessageId(MESSAGE_ID_1);
verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test sendInternal input with SQSByteMessage
*/
@Test
public void testSendInternalSQSByteMessage() throws JMSException {
SQSBytesMessage msg = spy(new SQSBytesMessage());
msg.writeByte((byte)0);
msg.reset();
Map<String,MessageAttributeValue> messageAttributes = createMessageAttribute("byte");
String messageId = "MessageId";
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));
producer.sendInternal(destination,msg);
/*
* Re send the message
*/
msg.clearBody();
msg.writeInt(42);
producer.sendInternal(destination,msg);
List<String> messagesBody = Arrays.asList("AA==","AAAAKg==");
verify(amazonSQSClient,messageAttributes)));
verify(msg,times(2)).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
verify(msg).setSQSMessageId(MESSAGE_ID_1);
verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
/**
* Test sendInternal input with SQSByteMessage
*/
@Test
public void testSendInternalSQSByteMessageFromReceivedMessage() throws JMSException,IOException {
/*
* Set up non JMS sqs message
*/
Map<String,MessageAttributeValue>();
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setStringValue(SQSMessage.BYTE_MESSAGE_TYPE);
messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE,"1");
byte[] byteArray = new byte[] { 1,'a',65 };
String messageBody = Base64.encodeAsString(byteArray);
com.amazonaws.services.sqs.model.Message message =
new com.amazonaws.services.sqs.model.Message()
.withMessageAttributes(mapMessageAttributes)
.withAttributes(mapAttributes)
.withBody(messageBody);
SQSObjectMessage msg = spy(new SQSObjectMessage(acknowledger,message));
Map<String,msg);
verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL,Arrays.asList(messageBody),messageAttributes)));
verify(msg).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
verify(msg).setSQSMessageId(MESSAGE_ID_1);
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerTest.java
private Map<String,MessageAttributeValue> createMessageAttribute(String type) {
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("String");
messageAttributeValue.setStringValue(type);
Map<String,MessageAttributeValue>();
messageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE,messageAttributeValue);
return messageAttributes;
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerFifoTest.java
/**
* Test propertyToMessageAttribute with empty messages of different type
*/
@Test
public void testPropertyToMessageAttributeWithEmpty() throws JMSException {
/*
* Test Empty text message default attribute
*/
SQSMessage sqsText = new SQSTextMessage();
Map<String,messageAttributeObject.size());
/*
* Test Empty byte message default attribute
*/
SQSMessage sqsByte = new SQSBytesMessage();
Map<String,messageAttributeByte.size());
}
项目:amazon-sqs-java-messaging-lib
文件:SQSMessageProducerFifoTest.java
/**
* Test sendInternal input with SQSTextMessage
*/
@Test
public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException {
/*
* Set up non JMS sqs message
*/
Map<String,"1");
mapAttributes.put(SQSMessagingClientConstants.MESSAGE_GROUP_ID,GROUP_ID);
mapAttributes.put(SQSMessagingClientConstants.MESSAGE_DEDUPLICATION_ID,DEDUP_ID);
mapAttributes.put(SQSMessagingClientConstants.SEQUENCE_NUMBER,SEQ_NUMBER);
com.amazonaws.services.sqs.model.Message message =
new com.amazonaws.services.sqs.model.Message()
.withMessageAttributes(mapMessageAttributes)
.withAttributes(mapAttributes)
.withBody("MessageBody");
SQSTextMessage msg = spy(new SQSTextMessage(acknowledger,message));
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER_2));
producer.sendInternal(destination,"MessageBody",SQSMessage.TEXT_MESSAGE_TYPE,GROUP_ID,DEDUP_ID)));
verify(msg).setJMSDestination(destination);
verify(msg).setJMSMessageID("ID:" + MESSAGE_ID);
verify(msg).setSQSMessageId(MESSAGE_ID);
verify(msg).setSequenceNumber(SEQ_NUMBER_2);
}