如何将自定义标头从 spring-cloud-streamaws kinesis binder发送到“传统”spring 集成消费者

问题描述

我有两个应用程序 - 第一个应用程序使用 spring-cloud-stream/function 和 AWS Kinesis Binder 生成消息,第二个应用程序基于 spring 集成构建以使用消息。两者之间的通信不是问题 - 我可以从“流”发送消息并在“集成”中轻松处理。

当我想发送自定义标头时,就会出现问题。标头作为使用“新”格式(开头有 0xff 等)的嵌入标头到达消费者 - 请参阅 spring-cloud-stream 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable。

但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入的标头形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(请参阅#toMessage)。由于嵌入的标头(0xff 等)中包含附加信息,它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String,Number,Array,Object or token 'null','true' or 'false')

我需要通过线路发送标头(标头用于在另一侧路由),因此不能选择“关闭”生产者的标头。我看不到使用“旧”嵌入标头的方法

我想在生产者端使用 spring-cloud-stream/function - 太棒了。我希望我可以重做消费者,但是...

我可以编写自己的可理解新格式的嵌入式标头映射器(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。

鉴于 spring-cloud-stream 和 spring-integration 之间的密切关系,我一定是做错了什么。 Spring Integration 是否有理解新嵌入表单的 OutboundMessageMapper?

或者有没有办法强制 spring cloud stream 使用不同的嵌入策略?

我可以在生产者方面使用 Spring Integration。 (悲伤的脸)。

有什么想法吗?提前致谢。

解决方法

了解新格式

这不是一个“新”格式,它是Spring Cloud Stream创建的一种格式,最初是为了Kafka,在0.11中才增加了header支持。

我可以编写自己的嵌入式标头映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter

我建议您这样做,并考虑将 contributing it to the core Spring Integration ProjectEmbeddedJsonHeadersMessageMapper 一起使用,以便它可以用于所有不支持本机标头的技术。