问题描述
我有两个应用程序 - 第一个应用程序使用 spring-cloud-stream/function 和 AWS Kinesis Binder 生成消息,第二个应用程序基于 spring 集成构建以使用消息。两者之间的通信不是问题 - 我可以从“流”发送消息并在“集成”中轻松处理。
当我想发送自定义标头时,就会出现问题。标头作为使用“新”格式(开头有 0xff 等)的嵌入标头到达消费者 - 请参阅 spring-cloud-stream 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable。
但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入的标头形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(请参阅#toMessage)。由于嵌入的标头(0xff 等)中包含附加信息,它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String,Number,Array,Object or token 'null','true' or 'false')
。
我需要通过线路发送标头(标头用于在另一侧路由),因此不能选择“关闭”生产者的标头。我看不到使用“旧”嵌入标头的方法。
我想在生产者端使用 spring-cloud-stream/function - 太棒了。我希望我可以重做消费者,但是...
我可以编写自己的可理解新格式的嵌入式标头映射器(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。
鉴于 spring-cloud-stream 和 spring-integration 之间的密切关系,我一定是做错了什么。 Spring Integration 是否有理解新嵌入表单的 OutboundMessageMapper?
或者有没有办法强制 spring cloud stream 使用不同的嵌入策略?
我可以在生产者方面使用 Spring Integration。 (悲伤的脸)。
有什么想法吗?提前致谢。
解决方法
了解新格式
这不是一个“新”格式,它是Spring Cloud Stream创建的一种格式,最初是为了Kafka,在0.11中才增加了header支持。
我可以编写自己的嵌入式标头映射器来理解新格式(使用 EmbeddedHeaderUtils
),并将其连接到 KinesisMessageDrivenChannelAdapter
。
我建议您这样做,并考虑将 contributing it to the core Spring Integration Project 与 EmbeddedJsonHeadersMessageMapper
一起使用,以便它可以用于所有不支持本机标头的技术。