Apache箭头中的架构不一致

问题描述

主要问题: 每批处理数据批处理时,如何处理pyarrow中的模式更改?

长话短说

例如,我有以下数据

import paramiko
from time import sleep

host = "100.100.100.100"
port = 22
uname = "admin"
passw = "admin"

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host,port,"cli","cli")
sleep(0.25)
ch = ssh.invoke_shell()
ch.sendall(f"{uname}\n")
sleep(0.25)
ch.sendall(f"{passw}\n")
sleep(0.25)
print(ch.recv(9999999).decode())

我正在使用python 3.7并使用pandas 1.1.0。

| col_a | col_b |
-----------------
|  10   |  42   |
|  41   |  21   |
| 'foo' |  11   |
| 'bar' |  99   |

我需要使用PyArrow 1.0.1实现开始使用Apache Arrow。在我的应用程序中,我们每批工作。这意味着我们看到了部分数据,因此也看到了部分数据类型。

>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
  col_a  col_b
0    10     42
1    41     21
2   foo     11
3   bar     99
>>> df.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

我的目标是在每批工作批次的约束下,使用Apache Arrow的拼花格式保留整个数据帧。它要求我们正确填充架构。如何处理随批次变化的dtypes? 这是使用上述数据重现该问题的完整代码

>>> dfi = pd.read_csv('test.csv',iterator=True,chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
   col_a  col_b
0     10     42
1     41     21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
  col_a  col_b
2   foo     11
3   bar     99
>>> dfg2
  col_a  col_b
2   foo     11
3   bar     99
>>> sub_1.dtypes
col_a    int64
col_b    int64
dtype: object 
>>> sub_2.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

我们收到以下异常

from pyarrow import RecordBatch,RecordBatchFileWriter,RecordBatchFileReader
import pandas as pd

pd.DataFrame([['10',42],['41',21],['foo',11],['bar',99]],columns=['col_a','col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv',chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)

# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet',schema)
writer.write(batch_to_write_1)

# We expect to keep the same schema but that is not true,the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2,schema)
# writer.write(batch_to_write_2)  # This will fail bcs batch_to_write_2 is not defined

解决方法

此行为是有意的。可以尝试一些替代方法(我相信它们应该可以工作,但我尚未测试所有方法):

  1. 如果您知道最终的模式是在pyarrow中手动构建的,而不是依靠第一批记录中推断出的模式。
  2. 遍历所有数据并计算最终方案。然后使用新的架构重新处理数据。
  3. 检测架构更改和recast个先前的记录批次。
  4. 检测模式更改并启动一个新表(然后,每个模式将得到一个Parquet文件,并且需要另一个过程来统一这些模式)。

最后,如果可行,并且您正在尝试转换CSV数据,则可以考虑使用内置的Arrow CSV解析器。