气流:错误-类型为'int'的对象没有len

问题描述

要求:要从雪花中获取更多数据并插入到oracle表中

问题:错误-类型为'int'的对象没有len()

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py",line 978,in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/airflow/dags/plugins/bi_plugin.py",line 5437,in execute
    orcl.bulk_insert_rows(table=self.oracle_table,rows=result)
  File "/usr/local/lib/python3.6/site-packages/airflow/hooks/oracle_hook.py",line 210,in bulk_insert_rows
    values=','.join(':%s' % i for i in range(1,len(values_base) + 1)),TypeError: object of type 'int' has no len()

代码

 def __init__(self,sNowflake_conn_id,oracle_conn_id,sql,oracle_table,target_fields,*args,**kwargs):
    super().__init__(*args,**kwargs)
    self.sNowflake_conn_id = sNowflake_conn_id
    self.oracle_conn_id = oracle_conn_id
    self.sql = sql
    self.oracle_table=oracle_table
    self.target_fields=target_fields

def get_records(self,hook,sql):
    with closing(hook.get_conn()) as conn:
        with closing(conn.cursor()) as cur:
            cur.execute(sql)
            while True:
                results = cur.fetchmany(size=10000)
                if not results:
                    break
                for result in results:
                    yield result

def execute(self,context):
    snflk_hook = SNowflakeHook(sNowflake_conn_id=self.sNowflake_conn_id)
    orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
    logging.info("Inserting rows into Oracle")
    for result in self.get_records(snflk_hook,self.sql):
        orcl.bulk_insert_rows(table=self.oracle_table,rows=[result],target_fields = self.target_fields)

编辑-解决方案与Mike建议一起使用

但是它非常慢,如何才能以10000行或更快的速度获得大量插入内容。从文件加载到表插入速度更快吗? 它正在作为每一行插入

[2020-09-28 12:54:25,346] {logging_mixin.py:112} INFO - [2020-09-28 12:54:25,346] {oracle_hook.py:229} INFO - [table_name] inserted 1 rows

解决方法

我的猜测是OracleHook.bulk_insert_rows(rows=result)只是没有将正确的类型传递给rows参数。错误明确指出,len(values_base)代码中传递给oracle_hook.py的任何值都没有len()。

这表明它期望使用Collection类型。您可能想在进行呼叫时检查result是什么类型。您可能会发现您想将其强制到orcl.bulk_insert_rows(table=self.oracle_table,rows=[result]) .....

这样的集合中

我的猜测是,您希望从bulk_insert_rows()加载列表列表。目前,您只传递一个列表