问题描述
我在 kafka 主题中注册了一个 AVRO 架构,并且正在尝试向其发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地向其发送数据。
示例架构: *忽略架构中的任何拼写错误(真实的非常大,只是一个示例)
{
"namespace": "company__name","name": "our_data","type": "record","fields": [
{
"name": "datatype1","type": ["null",{
"type": "record","name": "datatype1_1","fields": [
{"name": "site","type": "string"},{"name": "units","type": "string"}
]
}]
"default": null
}
{
"name": "datatype2","name": "datatype2_1","type": "string"}
]
}]
"default": null
}
]
}
我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典 key: value
对并将其序列化。如何发送嵌套数据以使用架构。
到目前为止我尝试过的...
message = {'datatype1':
{'site': 'sitename','units': 'm'
}
}
还有……
message = {'datatype1':
{'datatype1_1':
{'site': 'sitename','units': 'm'
}
}
}
此版本产生了架构的 kafka 错误。
解决方法
如果使用命名空间,则不必担心命名冲突,并且可以正确构建可选记录: 例如,两者
{
"meta": {
"instanceID": "something"
}
}
和
{}
是以下的有效实例:
{
"doc": "Survey","name": "Survey","type": "record","fields": [
{
"name": "meta","type": [
"null",{
"name": "meta","fields": [
{
"name": "instanceID","type": [
"null","string"
],"namespace": "Survey.meta"
}
],"namespace": "Survey"
}
],"namespace": "Survey"
}
]
}