问题描述
Jackson无法反序列化。请找到堆栈跟踪,SQS配置和侦听器
org.springframework.cloud.aws.messaging.listener.QueueMessageHandler.processHandlerMethodException(QueueMessageHandler.java:248) t stack_trace JsonParseException: Unrecognized token 's3': was expecting (JSON String,Number,Array,Object or token 'null','true'
或'false') 在[来源:(String)“ s3://abc-invoices45-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf”;行:1,列:3] 在com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) 在com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) 在com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2867) 在com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1913) 在com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:772) 在com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4340) 在com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4189) 在com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205) 在org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:230)处 ... MessageConversionException:无法读取JSON:无法识别的令牌's3':预期(JSON字符串,数字,数组,对象或令牌) 'null','true'或'false') 在[来源:(String)“ s3://abc-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf”; 行:1,列:3];嵌套异常为 com.fasterxml.jackson.core.JsonParseException:无法识别的令牌 's3':预期(JSON字符串,数字,数组,对象或令牌 'null','true'或'false') 在[来源:(String)“ s3://abc-invoices/www-k24-at/2020/09/07/504000101-3436547667.pdf”; 行:1,列:3]
这是我的SQS配置
import com.amazonaws.auth.AWsstaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncclientBuilder;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.abc.properties.LocalPropertyService;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
@Configuration
@Import(SqsConfiguration.class)
public class InvoiceSqsConfig {
@Bean(name = "amazonSQSAsyncclient")
public AmazonSQSAsync getAmazonSQSAsyncclient(LocalPropertyService propertyService) {
System.setProperty("invoice.queue.name",propertyService.getString("sqs.invoice.queueName"));
return AmazonSQSAsyncclientBuilder.standard()
.withRegion(
propertyService.getString("sqs.aws.region"))
.withCredentials(
new AWsstaticCredentialsProvider(new BasicAWSCredentials(
propertyService.getString("sqs.invoice.accessKey"),propertyService.getString("sqs.invoice.secretKey"))))
.build();
}
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setStrictContentTypeMatch(false);
messageConverter.getobjectMapper().registerModule(new JavaTimeModule());
factory.setMessageConverters(Collections.singletonList(messageConverter));
return factory;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setMaxnumberOfMessages(10);
factory.setWaitTimeOut(20);
return factory;
}
}
这是消息侦听器
import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;
import com.abc.order.invoice.exception.InvalidInvoiceMessageException;
import com.abc.order.invoice.exception.OrderNotFoundException;
import com.abc.order.order.OrderService;
import com.abc.order.order.model.Order;
import lombok.NonNull;
import lombok.requiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@requiredArgsConstructor
public class InvoiceUpdateListener {
@NonNull
private final OrderService orderService;
@SqsListener(value = "${invoice.queue.name}",deletionPolicy = ON_SUCCESS)
@SneakyThrows
public void receiveInvoice(@NotificationMessage EnvelopedMessage envelopedMessage) {
log.debug("Received message from the invoice queue : {} ",envelopedMessage.getMessage());
String message = envelopedMessage.getMessage();
if (StringUtils.isBlank(message)) {
throw new InvalidInvoiceMessageException("Received invalid message from the invoice queue");
}
String orderNumber = extractOrderNumber(message);
Order order = orderService.getorderByOrderNumber(orderNumber);
if (order == null) {
throw new OrderNotFoundException(
"Could not find the order with order number : " + orderNumber);
}
order.setInvoiceUrl(message);
log.debug("Saving invoice url for the orderNumber : {} ",orderNumber);
orderService.save(order);
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnkNown = true)
public class EnvelopedMessage {
@JsonProperty("Type")
private String type;
@JsonProperty("MessageId")
private String messageId;
@JsonProperty("TopicArn")
private String topicArn;
@JsonProperty("Subject")
private String subject;
@JsonProperty("Message")
private String message;
@JsonProperty("Timestamp")
private zoneddatetime createdAt;
@JsonProperty("Signatureversion")
private String signatureversion;
@JsonProperty("Signature")
private String signature;
@JsonProperty("SigningCertURL")
private String certUrl;
@JsonProperty("UnsubscribeURL")
private String unsubscribeUrl;
}
这是我从制作人那里收到的消息
{
"Type" : "Notification","MessageId" : "d77fa67e-6aa6-5b87-a707-f1ae5ba5922f","TopicArn" : "arn:aws:sns:eu-central-1:726569450381:invoices-from-core","Subject" : "File uploaded: invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf","Message" : "s3://k24-invoices/www-abc-at/2020/09/07/504000101-3436547667.pdf","Timestamp" : "2020-09-07T12:59:47.192Z","Signatureversion" : "1","Signature" : "dummysignature","SigningCertURL" : "dummy url","UnsubscribeURL" : "dummy url"
}
解决方法
您的错误表明问题出在邮件的Message
键周围。
您共享的消息实际上看起来像comes from SNS,这是一种常见的模式,其中生产者将消息发布到SNS主题中,SNS主题将其消息发布到SQS队列中,而消费者将消息发布到SQS队列中端被触发/轮询消息队列。
在您的情况下,发生的事情是您的消费者希望Message
是一个类似于"{\"some_key\": \"some_value\"}"
的JSON字符串,因此在读取封装消息中的Message
键的值后,尝试将其解析为实际的字典/对象。
您应该指示您的代码将此值视为实际字符串,并避免进行转换,或者将消息包含在JSON格式中。
,通过对消息侦听器进行以下更改来完成这项工作
public void receiveInvoice(@NotificationMessage String message) {
//
}
并删除此行messageConverter.setSerializedPayloadClass(String.class);从InvoiceSqsConfig.java的Bean中获取
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setStrictContentTypeMatch(false);
messageConverter.getObjectMapper().registerModule(new JavaTimeModule());
factory.setMessageConverters(Collections.singletonList(messageConverter));
return factory;
}