问题描述
就我而言,我在S3中有几个文件,还有一个自定义函数,可以读取每个文件并使用所有线程对其进行处理。为了简化示例,我只生成一个数据帧df
,并假定我的函数是tsfresh.extract_features
,它使用多处理。
生成数据
import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures,\
load_robot_execution_failures
download_robot_execution_failures()
ts,y = load_robot_execution_failures()
df = []
for i in range(5):
tts = ts.copy()
tts["id"] += 88 * i
df.append(tts)
df = pd.concat(df,ignore_index=True)
功能
def fun(df,n_jobs):
extracted_features = extract_features(df,column_id="id",column_sort="time",n_jobs=n_jobs)
集群
import dask
from dask.distributed import Client,progress
from dask import compute,delayed
from dask_cloudprovider import FargateCluster
my_vpc = # your vpc
my_subnets = # your subnets
cpu = 2
ram = 4
cluster = FargateCluster(n_workers=1,image='rpanai/feats-worker:2020-08-24',vpc=my_vpc,subnets=my_subnets,worker_cpu=int(cpu * 1024),worker_mem=int(ram * 1024),cloudwatch_logs_group="my_log_group",task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],scheduler_timeout='20 minutes'
)
cluster.adapt(minimum=1,maximum=4)
client = Client(cluster)
client
使用所有工作线程(FAIL)
to_process = [delayed(fun)(df,cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children
仅使用一个线程(确定)
在这种情况下,它可以正常工作,但是我在浪费资源。
to_process = [delayed(fun)(df,0) for i in range(10)]
out = compute(to_process)
问题
我知道,对于这个特定的功能,我最终可以使用多线程和其他一些技巧来编写自定义distributor,但是我想分配一份工作,在每位工作者上我都可以利用所有资源,而不必太担心了。
更新
该函数只是一个示例,实际上在提取实际特征之前以及将其保存到S3
之后,它进行了某种清理。
def fun(filename,bucket_name,filename_out,n_jobs):
#
df pd.read_parquet(f"s3://{bucket_name}/{filename}")
# do some cleaning
extracted_features = extract_features(df,n_jobs=n_jobs)
extract_features.to_parquet(f"s3://{bucket_name}/{filename_out}")
解决方法
我可以帮助您回答有关tsfresh
的特定问题,但是如果tsfresh
只是一个简单的玩具示例,可能就不是您想要的。
对于tsfresh
,通常不会混合tsfresh
和dask的多重处理,而让dask完成所有处理。这意味着,您从一个dask.DataFrame
开始(在测试用例中,您可以将pandas数据框转换为简单的数据框-对于您的读取用例,可以直接从S3
{{3}读取) },然后将特征提取分布在dask数据帧中(特征提取的好处是,它在每个时间序列上均独立工作。因此,我们可以为每个时间序列生成一个作业)。
tsfresh
(0.16.0)的当前版本具有一个小的帮助程序功能,可以为您完成此操作:docu。
在下一版本中,甚至可以直接在dask数据帧上运行extract_features
。
我不确定这是否有助于解决您的一般性问题。在我看来,您(在大多数情况下)不想混合dask的分布函数和“本地”多核计算,而只是让dask处理所有事情。因为如果您位于集群群集中,则您甚至可能都不知道每台计算机上将拥有多少个内核(或者每个作业可能只有一个内核)。
这意味着,如果您的工作可以被分配N次,并且每个工作都可以开始M个子工作,那么您只需给“ N x M”个工作进行梳理,然后找出其余的工作(包括数据位置)。