问题描述
主要问题: 每批处理数据批处理时,如何处理pyarrow中的模式更改?
长话短说
例如,我有以下数据
import paramiko
from time import sleep
host = "100.100.100.100"
port = 22
uname = "admin"
passw = "admin"
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host,port,"cli","cli")
sleep(0.25)
ch = ssh.invoke_shell()
ch.sendall(f"{uname}\n")
sleep(0.25)
ch.sendall(f"{passw}\n")
sleep(0.25)
print(ch.recv(9999999).decode())
我正在使用python 3.7并使用pandas 1.1.0。
| col_a | col_b |
-----------------
| 10 | 42 |
| 41 | 21 |
| 'foo' | 11 |
| 'bar' | 99 |
我需要使用PyArrow 1.0.1实现开始使用Apache Arrow。在我的应用程序中,我们每批工作。这意味着我们看到了部分数据,因此也看到了部分数据类型。
>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
col_a col_b
0 10 42
1 41 21
2 foo 11
3 bar 99
>>> df.dtypes
col_a object
col_b int64
dtype: object
>>>
我的目标是在每批工作批次的约束下,使用Apache Arrow的拼花格式保留整个数据帧。它要求我们正确填充架构。如何处理随批次变化的dtypes? 这是使用上述数据重现该问题的完整代码。
>>> dfi = pd.read_csv('test.csv',iterator=True,chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
col_a col_b
0 10 42
1 41 21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
col_a col_b
2 foo 11
3 bar 99
>>> dfg2
col_a col_b
2 foo 11
3 bar 99
>>> sub_1.dtypes
col_a int64
col_b int64
dtype: object
>>> sub_2.dtypes
col_a object
col_b int64
dtype: object
>>>
我们收到以下异常
from pyarrow import RecordBatch,RecordBatchFileWriter,RecordBatchFileReader
import pandas as pd
pd.DataFrame([['10',42],['41',21],['foo',11],['bar',99]],columns=['col_a','col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv',chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)
# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet',schema)
writer.write(batch_to_write_1)
# We expect to keep the same schema but that is not true,the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2,schema)
# writer.write(batch_to_write_2) # This will fail bcs batch_to_write_2 is not defined
解决方法
此行为是有意的。可以尝试一些替代方法(我相信它们应该可以工作,但我尚未测试所有方法):
- 如果您知道最终的模式是在pyarrow中手动构建的,而不是依靠第一批记录中推断出的模式。
- 遍历所有数据并计算最终方案。然后使用新的架构重新处理数据。
- 检测架构更改和recast个先前的记录批次。
- 检测模式更改并启动一个新表(然后,每个模式将得到一个Parquet文件,并且需要另一个过程来统一这些模式)。
最后,如果可行,并且您正在尝试转换CSV数据,则可以考虑使用内置的Arrow CSV解析器。