问题描述
我正在尝试获取有关我的 pyarrow 表中两列中值的不同组合是什么的信息。
我目前正在做的是:
import pandas as pd
import pyarrow as pa
my_table = pa.Table.from_pandas(
pd.DataFrame(
{
'col1':['a','a','b','b'],'col2':[1,1,2,3],'col3':[1,3,4,5,6,7,8]
}
)
)
a = [i.to_numpy().astype('str') for i in my_table.select(['col1','col2']).columns]
unique = np.unique(np.array(a),axis = 1)
返回预期结果:
unique
>array([['a',['1','2','1','3']],dtype='<U21')
但是对于较大的表来说这很慢,我希望有更快的方法吗?
或者,我真正想知道的是,当我尝试编写分区数据集时,如何提前知道它将写入哪些目录(即哪些分区在我的表中有一些数据)
编辑:
它可以更快地转换为 Pandas 而不是多个 numpy 数组然后使用 drop_duplicates()
:
my_table.select(['col1','col2']).to_pandas().drop_duplicates()
解决方法
对直接编码结构的支持由 https://issues.apache.org/jira/browse/ARROW-3978
跟踪与此同时,这里有一种解决方法,它在计算上类似于 Pandas 的独特功能,但通过使用 pyarrow
自己的计算内核避免了转换为 Pandas 的成本。
import pyarrow as pa
import pyarrow.compute as pc
def _dictionary_and_indices(column):
assert isinstance(column,pa.ChunkedArray)
if not isinstance(column.type,pa.DictionaryType):
column = pc.dictionary_encode(column,null_encoding_behavior='encode')
dictionary = column.chunk(0).dictionary
indices = pa.chunked_array([c.indices for c in column.chunks])
if indices.null_count != 0:
# We need nulls to be in the dictionary so that indices can be
# meaningfully multiplied,so we must round trip through decoded
column = pc.take(dictionary,indices)
return _dictionary_and_indices(column)
return dictionary,indices
def unique(table):
"produce a table containing only the unique rows from the input"
if table.num_columns == 0:
return None
table = table.unify_dictionaries()
dictionaries = []
fused_indices = None
for c in table.columns:
dictionary,indices = _dictionary_and_indices(c)
if fused_indices is None:
fused_indices = indices
else:
# pack column's indices into fused_indices
fused_indices = pc.add(
pc.multiply(fused_indices,len(dictionary)),indices)
dictionaries.append(dictionary)
uniques = []
# pc.unique can now be invoked on the single array of fused_indices
fused_indices = pc.unique(fused_indices)
for dictionary in reversed(dictionaries):
# unpack the column's indices from fused_indices
quotient = pc.divide(fused_indices,len(dictionary))
remainder = pc.subtract(fused_indices,pc.multiply(quotient,len(dictionary)))
# decode this column's uniques
uniques.insert(0,pc.take(dictionary,remainder))
fused_indices = quotient
return pa.Table.from_arrays(uniques,names=table.column_names)
if __name__ == '__main__':
my_table = pa.table({
'col1': ['a','a','b','b'],'col2': [1,1,2,3],'col3': [1,3,4,5,6,7,8],})
assert unique(my_table.select(['col1','col2'])).equals(pa.table({
'col1': ['a',}))