消费者生产者设计

问题描述

我正在寻找有关在使用Pulsar / kafka的解决方案中如何解决以下问题的建议。

场景是:生产者正在发送消息(使用JSON),而消费者正在接收消息并将数据插入数据库表(在消息中指定)。

突然,生产者更改了消息中发送的数据的结构(可以说是因为数据库中的表结构具有新列)。

暂时,队列中包含具有旧数据结构的消息,现在开始接收具有新数据结构的消息。

我对消费者应该如何处理这种情况感到怀疑。 现在使用无效的具有旧结构的消息该怎么办,因为由于表结构已更改,因此无法将它们插入数据库表中。重试,然后永久失败(死字母Q?)。

此外,您通常选择将元数据与消息一起发送,还是通常以单独的主题或其他形式进行处理。

谢谢您的建议

解决方法

所描述的问题主要与外部系统有关,该外部系统需要在生产者端进行某种门控/预验证,以便了解如何使用数据以防止这种情况。不幸的是,这引入了紧密的耦合,因此,如果没有紧密的耦合,则必须显式编写使用者代码才能进行可靠的消息转换和异常处理,可能在每种消息中都包含某种版本号或显式架构,例如Confluent Schema Registry提供的(也许还有Pulsar的Schema Registry功能)