Python clickhouse-driver:ValueError:参数应采用字典格式

问题描述

我有一些ETL,可使用clickhouse-driver将数据保存到clickhouse。

保存功能如下:

def insert_data(data: Iterable[Dict],table: str,client: Client = None):
    columns = get_table_cols(table)
    client = client or get_ch_client(0)
    query = f"insert into {table} ({','.join(columns)}) values"
    data = map(lambda row: {key: row[key] for key in columns},data)
    client.execute(query,data)

调用insert_data函数如下:

def save_data(data: DataFrame,client: Client):

    mapper = get_mapper(my_table_map)
    data = map(lambda x: {col_new: getattr(x,col_old)
                          for col_old,col_new in map_dataframe_to_ch.items()},data.collect())
    data = map(mapper,data)
    insert_data(data,'my_table_name',client)

get_mapper返回如下所示的映射函数

def map_row(row: Dict[str,Any]) -> Dict[str,Any]:
    nonlocal map_
    return {key: map_[key](val) for key,val in row.items()}

所以基本上到最后我有了一些嵌套的生成器,它们生成字典。并确认这一点,如果我将print(next(data))放在client.execute之前,我将得到我期望的结果。这是一个隐藏了敏感信息的示例:

{'account_currency': '***','instrument': '***','operation': 'open','event_time': datetime.datetime(2020,7,19,11,49),'country': 'CN','region': 'Asia and Pacific','registration_source': '***','account_type': '***','platform': '***','server_key': '***'}

表架构如下:

"account_currency": "String","instrument": "String","operation": "String","event_time": "DateTime","country": "String","region": "String","registration_source": "String","account_type": "String","platform": "String","server_key": "String"

但是无论出于什么原因,我都会收到此错误

  File "src/etl/usd_volume/prepare_users.py",line 356,in <module>
    main()
  File "src/etl/usd_volume/prepare_users.py",line 348,in main
    save_data(data,client)
  File "src/etl/usd_volume/prepare_users.py",line 302,in save_data
    insert_data(data,'report_Traded_volume_usd',client)
  File "/root/data/src/common/clickhouse_helper.py",line 14,in insert_data
    client.execute(query,data)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 224,in execute
    columnar=columnar
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 341,in process_ordinary_query
    query = self.substitute_params(query,params)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 422,in substitute_params
    raise ValueError('Parameters are expected in dict form')

根据文档:

:param params:SELECT查询的替换参数和数据 插入查询。 INSERT的数据可以为listtuple 或:data:~types.GeneratorType认值为None(无参数或数据)。

很明显,我的数据符合这些要求。

但是在源代码中只有此检查:

def substitute_params(self,query,params):
    if not isinstance(params,dict):
        raise ValueError('Parameters are expected in dict form')

    escaped = escape_params(params)
    return query % escaped

我并没有真正找到他们将其检查为发电机的地方。 Clickhouse驱动程序版本为0.1.4

在此问题上的任何帮助,将不胜感激。

解决方法

好的,对源代码的进一步研究揭示了根本原因。

substitute_params类的process_ordinary_query方法内调用引发错误Client的函数。基本上,除INSERT之外的任何查询都将调用此方法。

通过execute方法的这一部分检查查询的符号是INSERT还是其他任何符号:

is_insert = isinstance(params,(list,tuple,types.GeneratorType))

if is_insert:
    rv = self.process_insert_query(
        query,params,external_tables=external_tables,query_id=query_id,types_check=types_check,columnar=columnar
    )
else:
    rv = self.process_ordinary_query(
        query,params=params,with_column_types=with_column_types,columnar=columnar
    )

关键是isinstance(params,types.GeneratorType))

types.GeneratorType的定义如下:

def _g():
    yield 1
GeneratorType = type(_g())

这导致了什么:

>>>GeneratorType
<class 'generator'>

很显然,对于我的数据map

>>>type(map(...))
<class 'map'>
>>>isinstance(map(...),GeneratorType)
False

因此,避免此问题的最简单解决方案是将data转换为具有生成器理解的生成器。这样就完全解决了问题。

>>>data = (i for i in data)
>>>isinstance(data,GeneratorType)
True

或者,如果您要专门执行INSERT查询,则可以直接调用process_insert_query,这将消除将数据转换为生成器的需求。

我认为这是由clickhouse驱动程序检查的类型有点模棱两可,但这就是我们所拥有的。