avro模式问题:TypeError:无法散列的类型:'dict'

问题描述

我需要为以下数据编写一个Avro模式。曝光是一个由3个数字组成的数组。

{
"Response": {
    "status": "","responseDetail": {
        "request_id": "Z618978.R","exposure": [
            [
                372,20000000.0,31567227140.238808
            ]
            [
                373,480000000.0,96567227140.238808
            ]
            [
                374,23300000.0,251567627149.238808
            ]
        ],"product": "ABC",}
}
}

所以我想出了类似以下的模式:

{
"name": "Response","type":{
    "name": "algoResponseType","type": "record","fields":
    [
            {"name": "status","type": ["null","string"]},{
            "name": "responseDetail","type": {
                    "name": "responseDetailType","fields":
                    [
                            {"name": "request_id","type": "string"},{
                            "name": "exposure","type": {
                                    "type": "array","items":
                                    {
                                    "name": "single_exposure","type": {
                                            "type": "array","items": "string"
                                    }
                                    }
                            }
                            },{"name": "product","string"]}
                    ]
            }
            }
    ]
   }
}

当我尝试注册架构时。我收到以下错误。 TypeError:无法散列的类型:'dict',这意味着我使用列表作为字典键。

Traceback (most recent call last):
  File "sa_publisher_main4test.py",line 28,in <module>
    schema_registry_client)
  File "/usr/local/lib64/python3.6/site-packages/confluent_kafka/schema_registry/avro.py",line 175,in __init__
    parsed_schema = parse_schema(schema_dict)
  File "fastavro/_schema.pyx",line 71,in fastavro._schema.parse_schema
  File "fastavro/_schema.pyx",line 204,in fastavro._schema._parse_schema
TypeError: unhashable type: 'dict'

任何人都可以帮助指出导致错误的原因吗?

解决方法

您收到的错误是因为Schema Registry不接受您的架构。您最重要的元素必须是带有“响应”字段的记录。

我更改了数组项的类型,此模式应该可以工作,因为在您的消息中您使用的是float而不是string。

{
    "type": "record","name": "yourMessage","fields": [
        {
            "name": "Response","type": {
                "name": "AlgoResponseType","type": "record","fields": [
                    {
                        "name": "status","type": [
                            "null","string"
                        ]
                    },{
                        "name": "responseDetail","type": {
                            "name": "ResponseDetailType","fields": [
                                {
                                    "name": "request_id","type": "string"
                                },{
                                    "name": "exposure","type": {
                                        "type": "array","items": {
                                            "type": "array","items": "float"
                                        }
                                    }
                                },{
                                    "name": "product","type": [
                                        "null","string"
                                    ]
                                }
                            ]
                        }
                    }
                ]
            }
        }
    ]
}

您的消息不正确,因为数组元素之间必须有逗号。

{
    "Response": {
        "status": "","responseDetail": {
            "request_id": "Z618978.R","exposure": [
                [
                    372,20000000.0,31567227140.238808
                ],[
                    373,480000000.0,96567227140.238808
                ],[
                    374,23300000.0,251567627149.238808
                ]
            ],"product": "ABC",}
    }
}

在使用fastavro时,我建议运行此代码来检查您的消息是否是模式示例。

from fastavro.validation import validate
import json

with open('schema.avsc','r') as schema_file:
    schema = json.loads(schema_file.read())

message = {
    "Response": {
        "status": "",}
    }
}

try:
    validate(message,schema)
    print('Message is matching schema')
except Exception as ex:
    print(ex)
,

有几个问题。

首先,在架构的最顶层,您具有以下内容:

{
  "name": "Response","type": {...}
}

但这是不对的。顶层应该是一种记录类型,其字段名为Response。所以它应该像这样:

{
  "name": "Response","fields": [
    {
      "name": "Response","type": {...}
    }
  ]
}

第二个问题是,对于数组数组,您当前具有以下条件:

{
   "name":"exposure","type":{
      "type":"array","items":{
        "name":"single_exposure","type":{
          "type":"array","items":"string"
        }
     }
   }
}

但是它应该看起来像这样:

{
   "name":"exposure","items":{
        "type":"array","items":"string"
     }
   }
}

修复这些错误后,将可以解析该模式,但是您的数据包含一个浮点数数组,并且您的模式表示它应该是一个字符串数组。因此,要么需要将架构更改为浮点型,要么数据必须是字符串。

作为参考,这是一个示例脚本,可在解决这些问题后起作用:

import fastavro

s = {
   "name":"Response","type":"record","fields":[
      {
         "name":"Response","type": {
            "name":"algoResponseType","fields":[
               {
                  "name":"status","type":[
                     "null","string"
                  ]
               },{
                  "name":"responseDetail","type":{
                     "name":"responseDetailType","fields":[
                        {
                           "name":"request_id","type":"string"
                        },{
                           "name":"exposure","type":{
                              "type":"array","items":{
                                "type":"array","items":"string"
                             }
                           }
                        },{
                           "name":"product","type":[
                              "null","string"
                           ]
                        }
                     ]
                  }
               }
            ]
         }
      }
   ]
}

data = {
   "Response":{
      "status":"","responseDetail":{
         "request_id":"Z618978.R","exposure":[
            [
               "372","20000000.0","31567227140.238808"
            ],[
               "373","480000000.0","96567227140.238808"
            ],[
               "374","23300000.0","251567627149.238808"
            ]
         ],"product":"ABC"
      }
   }
}

parsed_schema = fastavro.parse_schema(s)
fastavro.validate(data,parsed_schema)