嵌套记录的融合 Kafka 生产者消息格式

问题描述

我在 kafka 主题中注册了一个 AVRO 架构,并且正在尝试向其发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地向其发送数据。

示例架构: *忽略架构中的任何拼写错误(真实的非常大,只是一个示例)

 {
 "namespace": "company__name","name": "our_data","type": "record","fields": [
           {
            "name": "datatype1","type": ["null",{
                 "type": "record","name": "datatype1_1","fields": [ 
                     {"name": "site","type": "string"},{"name": "units","type": "string"}
                  ]
             }]
             "default": null
            }
            {
            "name": "datatype2","name": "datatype2_1","type": "string"}
                  ]
             }]
             "default": null
            }
           ]
          }

我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典 key: value 对并将其序列化。如何发送嵌套数据以使用架构。

到目前为止我尝试过的...

message = {'datatype1': 
            {'site': 'sitename','units': 'm'
            }
           }

此版本不会导致任何 kafka 错误,但所有列都显示为空

还有……

message = {'datatype1': 
            {'datatype1_1':
              {'site': 'sitename','units': 'm'
              }
            }
           }

此版本产生了架构的 kafka 错误。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)