问题描述
要求:要从雪花中获取更多数据并插入到oracle表中
问题:错误-类型为'int'的对象没有len()
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",line 978,in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/dags/plugins/bi_plugin.py",line 5437,in execute
orcl.bulk_insert_rows(table=self.oracle_table,rows=result)
File "/usr/local/lib/python3.6/site-packages/airflow/hooks/oracle_hook.py",line 210,in bulk_insert_rows
values=','.join(':%s' % i for i in range(1,len(values_base) + 1)),TypeError: object of type 'int' has no len()
代码:
def __init__(self,sNowflake_conn_id,oracle_conn_id,sql,oracle_table,target_fields,*args,**kwargs):
super().__init__(*args,**kwargs)
self.sNowflake_conn_id = sNowflake_conn_id
self.oracle_conn_id = oracle_conn_id
self.sql = sql
self.oracle_table=oracle_table
self.target_fields=target_fields
def get_records(self,hook,sql):
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.execute(sql)
while True:
results = cur.fetchmany(size=10000)
if not results:
break
for result in results:
yield result
def execute(self,context):
snflk_hook = SNowflakeHook(sNowflake_conn_id=self.sNowflake_conn_id)
orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
logging.info("Inserting rows into Oracle")
for result in self.get_records(snflk_hook,self.sql):
orcl.bulk_insert_rows(table=self.oracle_table,rows=[result],target_fields = self.target_fields)
编辑-解决方案与Mike建议一起使用
但是它非常慢,如何才能以10000行或更快的速度获得大量插入内容。从文件加载到表插入速度更快吗? 它正在作为每一行插入
[2020-09-28 12:54:25,346] {logging_mixin.py:112} INFO - [2020-09-28 12:54:25,346] {oracle_hook.py:229} INFO - [table_name] inserted 1 rows
解决方法
我的猜测是OracleHook.bulk_insert_rows(rows=result)
只是没有将正确的类型传递给rows参数。错误明确指出,len(values_base)
代码中传递给oracle_hook.py
的任何值都没有len()。
这表明它期望使用Collection
类型。您可能想在进行呼叫时检查result
是什么类型。您可能会发现您想将其强制到orcl.bulk_insert_rows(table=self.oracle_table,rows=[result])
.....
我的猜测是,您希望从bulk_insert_rows()
加载列表列表。目前,您只传递一个列表