问题描述
我有一些ETL,可使用clickhouse-driver将数据保存到clickhouse。
保存功能如下:
def insert_data(data: Iterable[Dict],table: str,client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({','.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns},data)
client.execute(query,data)
def save_data(data: DataFrame,client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x,col_old)
for col_old,col_new in map_dataframe_to_ch.items()},data.collect())
data = map(mapper,data)
insert_data(data,'my_table_name',client)
get_mapper
返回如下所示的映射函数:
def map_row(row: Dict[str,Any]) -> Dict[str,Any]:
nonlocal map_
return {key: map_[key](val) for key,val in row.items()}
所以基本上到最后我有了一些嵌套的生成器,它们生成字典。并确认这一点,如果我将print(next(data))
放在client.execute
之前,我将得到我期望的结果。这是一个隐藏了敏感信息的示例:
{'account_currency': '***','instrument': '***','operation': 'open','event_time': datetime.datetime(2020,7,19,11,49),'country': 'CN','region': 'Asia and Pacific','registration_source': '***','account_type': '***','platform': '***','server_key': '***'}
表架构如下:
"account_currency": "String","instrument": "String","operation": "String","event_time": "DateTime","country": "String","region": "String","registration_source": "String","account_type": "String","platform": "String","server_key": "String"
但是无论出于什么原因,我都会收到此错误:
File "src/etl/usd_volume/prepare_users.py",line 356,in <module>
main()
File "src/etl/usd_volume/prepare_users.py",line 348,in main
save_data(data,client)
File "src/etl/usd_volume/prepare_users.py",line 302,in save_data
insert_data(data,'report_Traded_volume_usd',client)
File "/root/data/src/common/clickhouse_helper.py",line 14,in insert_data
client.execute(query,data)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 224,in execute
columnar=columnar
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 341,in process_ordinary_query
query = self.substitute_params(query,params)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py",line 422,in substitute_params
raise ValueError('Parameters are expected in dict form')
根据文档:
:param params:SELECT查询的替换参数和数据 插入查询。 INSERT的数据可以为
list
,tuple
或:data:~types.GeneratorType
。 默认值为None
(无参数或数据)。
很明显,我的数据符合这些要求。
但是在源代码中只有此检查:
def substitute_params(self,query,params):
if not isinstance(params,dict):
raise ValueError('Parameters are expected in dict form')
escaped = escape_params(params)
return query % escaped
我并没有真正找到他们将其检查为发电机的地方。 Clickhouse驱动程序版本为0.1.4
在此问题上的任何帮助,将不胜感激。
解决方法
好的,对源代码的进一步研究揭示了根本原因。
在substitute_params
类的process_ordinary_query
方法内调用引发错误Client
的函数。基本上,除INSERT之外的任何查询都将调用此方法。
通过execute
方法的这一部分检查查询的符号是INSERT还是其他任何符号:
is_insert = isinstance(params,(list,tuple,types.GeneratorType))
if is_insert:
rv = self.process_insert_query(
query,params,external_tables=external_tables,query_id=query_id,types_check=types_check,columnar=columnar
)
else:
rv = self.process_ordinary_query(
query,params=params,with_column_types=with_column_types,columnar=columnar
)
关键是isinstance(params,types.GeneratorType))
types.GeneratorType
的定义如下:
def _g():
yield 1
GeneratorType = type(_g())
这导致了什么:
>>>GeneratorType
<class 'generator'>
很显然,对于我的数据map
:
>>>type(map(...))
<class 'map'>
>>>isinstance(map(...),GeneratorType)
False
因此,避免此问题的最简单解决方案是将data
转换为具有生成器理解的生成器。这样就完全解决了问题。
>>>data = (i for i in data)
>>>isinstance(data,GeneratorType)
True
或者,如果您要专门执行INSERT查询,则可以直接调用process_insert_query
,这将消除将数据转换为生成器的需求。
我认为这是由clickhouse驱动程序检查的类型有点模棱两可,但这就是我们所拥有的。