问题描述
我正在处理具有时间、纬度和经度维度的数据数组。可以使用以下代码重现数据:
import numpy as np
from datetime import timedelta
import datetime
import xarray as xr
precipitation = 10 * np.random.rand(20,40,2880)
lon = range(20)
lat = range(40)
time = np.arange('2017-06-01','2017-07-31',timedelta(minutes=30),dtype='datetime64[ns]')
data =xr.DataArray(
data=precipitation,dims=["lon","lat","time"],coords=[lon,lat,time])
print (data)
我愿意为数据数组中的每个网格提取时间序列数据并将其存储在一个 csv 文件中。
这是我目前尝试过的:
stacked_data = data.stack(z=("lon","lat"))
for index in stacked_data.indexes["z"]:
data = stacked_data.sel(z=index,drop=True)
data_df=data.to_dataframe(name =str(index))
data_df.to_csv("time series"+str(index)+".txt")
此方法适用于此处提供的示例。然而,在处理从时间序列光栅文件创建的大数据数组时需要很长时间(“时间”很多年,数千个“经度”和数千个“纬度”)。
谢谢!
解决方法
如果您的工作流程很简单并且您在一台机器上处理,我建议使用 concurrent.futures
使用多个进程将数据转储到 csv:
from concurrent.futures import ProcessPoolExecutor
def dump_to_csv(x,ix):
x = x.to_dataframe(name=str(ix))
x.to_csv("temp/time series"+str(ix)+".txt")
return True
with ProcessPoolExecutor(max_workers=4) as pool:
res = [pool.submit(dump_to_csv,stacked_data.sel(z=ix,drop=True),ix) for ix in stacked_data.indexes["z"]]
当然,您可以根据您的要求/硬件调整并发进程的数量。您还可以实现一种更复杂的方法来检查文件是否已写入,就像在我的快速示例中一样,您会在 True
中返回 future
。
如果您的规模更大和/或涉及多台机器,您可以查看 dask.distributed
和 dask.delayed
。通过这种方式,您可以在工作人员之间传播数据,然后将其同时写入磁盘。