问题描述
-
我正在尝试从Blob存储中读取文件,加载到熊猫并将其写入BlobStorage
-
我有一个带有PythonScriptStep的Azure机器学习管道,该管道需要2个PipelineParameters,并且是如下所示的DataPaths。
from azureml.core import Datastore from azureml.data.datapath import DataPath,DataPathComputeBinding,DataReference from azureml.pipeline.core import PipelineParameter datastore = Datastore(ws,"SampleStore") in_raw_path_default = 'somefolder/raw/alerts/2020/08/03/default_in.csv' in_cleaned_path_default= 'somefolder/cleaned/alerts/2020/08/03/default_out.csv' in_raw_datapath = DataPath(datastore=datastore,path_on_datastore=in_raw_path_default) in_raw_path_pipelineparam = PipelineParameter(name="inrawpath",default_value=in_raw_datapath) raw_datapath_input = (in_raw_path_pipelineparam,DataPathComputeBinding(mode='mount')) in_cleaned_datapath = DataPath(datastore=datastore,path_on_datastore=in_cleaned_path_default) in_cleaned_path_pipelineparam = PipelineParameter(name="incleanedpath",default_value=in_cleaned_datapath) cleaned_datapath_input = (in_cleaned_path_pipelineparam,DataPathComputeBinding(mode='mount')) from azureml.pipeline.steps import PythonScriptStep source_directory = script_folder + '/pipeline_Steps' dataprep_step = PythonScriptStep( script_name="SimpleTest.py",arguments=["--input_data",raw_datapath_input,"--cleaned_data",cleaned_datapath_input],inputs=[raw_datapath_input,compute_target=default_compute,source_directory=source_directory,runconfig=run_config,allow_reuse=True ) from azureml.pipeline.core import Pipeline pipeline_test = Pipeline(workspace=ws,steps=[dataprep_step]) test_raw_path = DataPath(datastore=datastore,path_on_datastore='samplefolder/raw/alerts/2017/05/31/test.csv') test_cleaned_path = DataPath(datastore=datastore,path_on_datastore='samplefolder/cleaned/alerts/2020/09/03') pipeline_run_msalerts = Experiment(ws,'SampleExperiment').submit(pipeline_test,pipeline_parameters={"inrawpath" : test_raw_path,"incleanedpath" : test_cleaned_path})```
这是使用的脚本(SimpleTest.py):
import os
import sys
import argparse
import pathlib
import azureml.core
import pandas as pd
parser = argparse.ArgumentParser("datapreponly")
parser.add_argument("--input_data",type=str)
parser.add_argument("--cleaned_data",type=str)
args = parser.parse_args()
print("Argument 1: %s" % args.input_data)
print("Argument 2: %s" % args.cleaned_data)
testDf = pd.read_csv(args.input_data,error_bad_lines=False)
print('Total Data Shape' + str(testDf.shape))
if not (args.cleaned_data is None):
output_path = args.cleaned_data
os.makedirs(output_path,exist_ok=True)
outdatapath = output_path + '/alert.csv'
testDf.to_csv(outdatapath,index=False)
从AzureDataFactory触发此AzureMLPipeline:
通过在AzureMLWorkspace / PipelinesDK中执行ML管道,以上代码可以正常工作。我正在尝试从AzureDataFactory(AzureMachineLearningExecutePipeline)活动触发AzureMLpipeline,如下所示
通过传递2个字符串输入路径来尝试进行如下调试运行
rawdatapath =“ samplefolder / raw / alerts / 2017/05/31 / test.csv”
cleaneddatapath =“ samplefolder / raw / cleaned / 2020/09/03 /”
Current directory: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/workspaceblobstore/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade
Preparing to call script [ SimpleTest.py ]
with arguments:
['--input_data','/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv','--cleaned_data','/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
After variable expansion,calling script [ SimpleTest.py ] with arguments:
['--input_data','/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
Script type = None
Argument 1: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv
Argument 2: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv
.......................
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv'
它表明采用默认路径而不是管道参数(没有这样的文件或目录错误并不重要,因为要点是采用默认路径而不是管道参数)。我怀疑它是因为将pipelineparameter作为字符串而不是数据路径传递。
最后一个问题:如何将数据路径从Azure数据工厂传递到AzureMLPipelineActivity?
谢谢。
解决方法
获得Microsoft的答复(请参考此线程here)。 Azure数据工厂产品团队确认,今天在Azure数据工厂(ADF)中不支持“ DataPath”参数的数据类型。但是,已经为它提出了一个功能,并且正在为此进行工作。该功能将成为11月发行版的一部分。
,输入参数似乎定义为字符串,请尝试将其修改为Object数据类型。根据{{3}},它期望对象 {“ Key”:“ value”}参数。
, This notebook演示了DataPath
和PipelineParameters
在AML管道中的用法。您将学习如何参数化字符串和DataPath
并通过PipelineParameters
提交给AML管道。您可以参数化输入数据集,这是sample笔记本,显示了如何进行操作。
当前,ParallelRunStep
接受数据集作为数据输入。您可以在ParallelRunStep
之前再添加一个步骤,以创建一个指向新数据的数据集对象,然后传递给ParallelRunStep
。 an example这里使用多个步骤:
对于输出,如果使用append_row
输出操作,则可以通过append_row_file_name
配置自定义输出文件名。输出将存储在默认Blob中。要将其移至其他商店,建议在DataTransferStep
之后使用另一个ParallelRunStep
。
请按照this example进行数据传输步骤: