问题描述
我有几个parquet
文件(数据帧),我将它们作为一个简单的数据帧图和示例加载。
然后,我根据数据框中的原始数据执行一些计算,并将新列追加到我的dask数据框中。
最后,我要计算其中的所有列的mean()
和std()
,并得到一个ValueError
,我不确定它来自哪里或做错了什么
import pandas as pd
import numpy as np
import tensorflow as tf
import os
from os.path import join
import dask
import dask.dataframe as dd
import dask.array as da
# read in the data
data_pq = dd.read_parquet(join(path_to_data,'filter_width_*_DNN_train.parquet'),chunksize='4GB')
print('Convert to single precission and sample')
data_pq = data_pq.astype(np.float32).sample(frac=0.1)
# ## compute the additional quantites (tensors)
# compute tensors R,S mag(U) etc.
mag_U = da.sqrt(data_pq['U_bar'].values**2 + data_pq['V_bar'].values**2 +data_pq['W_bar'].values**2)
mag_grad_c = da.sqrt(data_pq['grad_c_x_LES'].values**2 + data_pq['grad_c_y_LES'].values**2 +data_pq['grad_c_z_LES'].values**2)
sum_U = data_pq['U_bar'].values + data_pq['V_bar']+data_pq['W_bar'].values
sum_c = da.absolute(data_pq['grad_c_x_LES'].values) + da.absolute(data_pq['grad_c_y_LES'].values) +da.absolute(data_pq['grad_c_z_LES'].values)
grad_U = da.sqrt(data_pq['grad_U_x_LES'].values**2 + data_pq['grad_U_y_LES'].values**2 +data_pq['grad_U_z_LES'].values**2)
grad_V = da.sqrt(data_pq['grad_V_x_LES'].values**2 + data_pq['grad_V_y_LES'].values**2 +data_pq['grad_V_z_LES'].values**2)
grad_W = da.sqrt(data_pq['grad_W_x_LES'].values**2 + data_pq['grad_W_y_LES'].values**2 +data_pq['grad_W_z_LES'].values**2)
mag_grad_U = da.sqrt(grad_U**2 + grad_V**2 +grad_W**2)
sum_grad_U = da.absolute(grad_U) + da.absolute(grad_V) +da.absolute(grad_W)
print('Computing gradient_tensor')
gradient_tensor = da.array([
[data_pq['grad_U_x_LES'],data_pq['grad_V_x_LES'],data_pq['grad_W_x_LES']],[data_pq['grad_U_y_LES'],data_pq['grad_V_y_LES'],data_pq['grad_W_y_LES']],[data_pq['grad_U_z_LES'],data_pq['grad_V_z_LES'],data_pq['grad_W_z_LES']]
])
print('Computing S and R')
# symetric strain
Strain = 0.5*(gradient_tensor + da.transpose(gradient_tensor,(1,2)))
#anti symetric strain
Anti = 0.5*(gradient_tensor - da.transpose(gradient_tensor,2)))
print('Computing lambdas')
lambda_1 = da.trace(Strain**2)
lambda_2 = da.trace(Anti**2)
lambda_3 = da.trace(Strain**3)
lambda_4 = da.trace(Anti**2 * Strain)
lambda_5 = da.trace(Anti**2 * Strain**2)
# Add to the dask dataframe
data_pq['mag_grad_c'] = mag_grad_c
data_pq['mag_U'] = mag_U
data_pq['sum_c'] = sum_c
data_pq['sum_U'] = sum_U
data_pq['sum_grad_U'] = sum_grad_U
data_pq['mag_grad_U'] = mag_grad_U
# REPARTITON
data_pq = data_pq.repartition(npartitions=lambda_1.npartitions)
data_pq['lambda_1'] = lambda_1
data_pq['lambda_2'] = lambda_2
data_pq['lambda_3'] = lambda_3
data_pq['lambda_4'] = lambda_4
data_pq['lambda_5'] = lambda_5
print('Done with feature computation')
# reindex and compute mean and std
data_pq = data_pq.reset_index().drop('index',axis=1)
# compute the mean and std
data_mean,data_std = dask.compute(data_pq.mean(),data_pq.std())
不确定其来源。它说索引不匹配。 这是我收到的错误消息:
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) ~/Python/Data_driven_models/dask_processing/dask_parquet.py in <module>
119 data_pq = data_pq.reset_index().drop('index',axis=1)
120
--> 121 data_mean,data_pq.std())
122
~/.local/lib/python3.6/site-packages/dask/base.py in compute(*args,**kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk,keys,**kwargs)
453 return repack([f(r,*a) for r,(f,a) in zip(results,postcomputes)])
454
~/.local/lib/python3.6/site-packages/dask/threaded.py in get(dsk,result,cache,num_workers,pool,**kwargs)
82 get_id=_thread_get_id,83 pack_exception=pack_exception,---> 84 **kwargs
85 )
86
~/.local/lib/python3.6/site-packages/dask/local.py in get_async(apply_async,dsk,get_id,rerun_exceptions_locally,pack_exception,raise_exception,callbacks,dumps,loads,**kwargs)
484 _execute_task(task,data) # Re-execute locally
485 else:
--> 486 raise_exception(exc,tb)
487 res,worker_id = loads(res_info)
488 state["cache"][key] = res
~/.local/lib/python3.6/site-packages/dask/local.py in reraise(exc,tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/.local/lib/python3.6/site-packages/dask/local.py in execute_task(key,task_info,pack_exception)
220 try:
221 task,data = loads(task_info)
--> 222 result = _execute_task(task,data)
223 id = get_id()
224 result = dumps((result,id))
~/.local/lib/python3.6/site-packages/dask/core.py in
_execute_task(arg,dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a,cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.local/lib/python3.6/site-packages/pandas/core/series.py in
__init__(self,data,index,dtype,name,copy,fastpath)
312 if len(index) != len(data):
313 raise ValueError(
--> 314 f"Length of passed values is {len(data)},"
315 f"index implies {len(index)}."
316 )
ValueError: Length of passed values is 3728270,index implies 2135992.
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)