项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSender.java
private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries,List<BatchResultErrorEntry> errors) {
StringBuilder errorMessage = new StringBuilder();
int count = 0;
for (BatchResultErrorEntry error : errors) {
if (count > 0) {
errorMessage.append(",");
}
SendMessageBatchRequestEntry failedRequestEventEntry =
findRequestEventEntryById(batchRequestEntries,error.getId());
String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody();
errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]");
count++;
}
return errorMessage.toString();
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSenderTest.java
private SendMessageBatchResult mockBatchResult(int batchSize,int expectedSuccessCount) {
SendMessageBatchResult mockResult = Mockito.mock(SendMessageBatchResult.class);
List<SendMessageBatchResultEntry> successfulEntries = new ArrayList<SendMessageBatchResultEntry>();
for (int i = 0; i < expectedSuccessCount; i++) {
successfulEntries.add(new SendMessageBatchResultEntry().withId(String.valueOf(i + 1)));
}
when(mockResult.getSuccessful()).thenReturn(successfulEntries);
List<BatchResultErrorEntry> failedEntries = new ArrayList<BatchResultErrorEntry>();
for (int i = expectedSuccessCount; i < batchSize; i++) {
failedEntries.add(
new BatchResultErrorEntry().withId(String.valueOf(i + 1)).withCode("401").withSenderFault(true)
.withMessage("Invalid binary character"));
}
when(mockResult.getFailed()).thenReturn(failedEntries);
return mockResult;
}
项目:awslocal
文件:DirectorySQS.java
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt delete on each
for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) {
try {
queue.delete(batchRequestEntry.getReceiptHandle());
batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(true).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:awslocal
文件:DirectorySQS.java
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<ChangeMessageVisibilityBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt to change the visibility on each
for (ChangeMessageVisibilityBatchRequestEntry batchRequestEntry : changeMessageVisibilityBatchRequest.getEntries()) {
try {
queue.changeVisibility(batchRequestEntry.getReceiptHandle(),batchRequestEntry.getVisibilityTimeout());
batchResultEntries.add(new ChangeMessageVisibilityBatchResultEntry().withId(batchRequestEntry.getId()));
} catch (Exception e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(true).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new ChangeMessageVisibilityBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:flume-ng-aws-sqs-sink
文件:BatchSQSMsgSender.java
/**
* Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail
* and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS.
* Currently,this method does just logs errors and skips the messages in case some messages from the batched failed
* to be delivered but some succeeded (i.e.,partial batch failure).
* <p>
* TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
*
* @param batchRequest The SQS SendMessageBatchRequest
* @param batchResult The SQS SendMessageBatchResult
*
* @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS
*/
protected void handleResult(SendMessageBatchRequest batchRequest,SendMessageBatchResult batchResult)
throws EventDeliveryException {
List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries();
List<BatchResultErrorEntry> errors = batchResult.getFailed();
int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size();
int errorCount = errors == null ? 0 : errors.size();
if (errorCount > 0) {
String errorMessage = buildErrorMessage(batchRequestEntries,errors);
if (attemptedCount == errorCount) {
// if it was a non-empty batch and if all the messages in the batch have errors then fail the whole
// batch and let flume rollback the transaction and retry it
// Just throw the EventDeliveryException. This will eventually cause the channel's transaction to
// rollback.
throw new EventDeliveryException(errorMessage);
}
else {
// TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
// Just log the error message and let flume drop failed messages in case of partial batch failures
LOG.error(errorMessage);
}
}
}
项目:micro-genie
文件:SqsProducer.java
/***
* Logging Failures
* @param failed
*/
private void logFailures(final List<BatchResultErrorEntry> failed) {
if(failed != null){
for(BatchResultErrorEntry batchError : failed){
LOGGER.error("Failed to submit sqs batch message entry - Id: {} - Code: {} - Message: {},isSenders fault: {}",batchError.getId(),batchError.getCode(),batchError.getMessage(),batchError.getSenderFault());
}
}
}
项目:izettle-toolbox
文件:QueueServiceSender.java
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) {
for (Collection<SendMessageBatchRequestEntry> batch : partition(messages,MAX_BATCH_SIZE)) {
final SendMessageBatchResult sendMessageBatchResult =
amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl,new ArrayList<>(batch)));
final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed();
if (!failed.isEmpty()) {
try {
Set<String> failedMessageIds =
failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet());
final Map<String,SendMessageBatchRequestEntry> failedMessageIdToMessage =
batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap(
SendMessageBatchRequestEntry::getId,Function.identity()
));
failed.stream().forEach(failMessage -> {
final SendMessageBatchRequestEntry failedEntry =
failedMessageIdToMessage.get(failMessage.getId());
if (failedEntry != null) {
final String messageBody = failedEntry.getMessageBody();
LOG.error(
"Failed to send message,due to {},message content : {} ",failMessage,messageBody
);
}
});
} catch (Exception e) {
LOG.error("Failed to log failed to send messages",e);
}
}
}
}
项目:awslocal
文件:DirectorySQS.java
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException {
DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(),false);
//lists for reporting
List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
//attempt to change the visibility on each
for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) {
try {
final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(),0);//0 is amazon spec default
Message sentMessage = queue.send(batchRequestEntry.getMessageBody(),invisibilityDelay);
batchResultEntries.add(new SendMessageBatchResultEntry().
withId(batchRequestEntry.getId()).
withMessageId(sentMessage.getMessageId()).
withMD5OfMessageBody(sentMessage.getMD5OfBody()));
} catch (IOException e) {
BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
withSenderFault(false).
withId(batchRequestEntry.getId()).
withMessage(e.getMessage());
batchResultErrorEntries.add(batchResultErrorEntry);
}
}
return new SendMessageBatchResult().
withFailed(batchResultErrorEntries).
withSuccessful(batchResultEntries);
}