问题描述
现在,我将许多小的单个火花镶木文件从EMR传输到S3。我目前这样做的方式是通过在集群Steps UI中创建一个AWS建议的step函数,下面是一个示例
JAR location :command-runner.jar
Main class :None
Arguments :/usr/bin/s3-dist-cp --src=/refoutput --dest=s3://***-us-east-1/bens-flattened-step/refs
Action on failure:Continue
实际上,我需要根据要上传的文件来更改--dest参数的某些部分。我想做的是创建相同的step函数,除了使用我可以放入Jupyter笔记本中的python代码之外,而不是使用UI来执行。这可能吗? 另外,当有许多小文件时,spark.parquet.write('s3 path')导致S3挂起,因此这不是可行的解决方案。
解决方法
在Notebook中,您可以使用boto3列出群集,并使用Cluster-ID,可以提交启动步骤。
首先。要安装boto3,您可以将pip软件包安装为
sc.install_pypi_package("boto3")
#You can check it by using sc.list_packages()
使用boto3,您可以列出群集ID
import boto3
boto3 = boto3.session.Session(region_name='us-east-2')
emr = boto3.client('emr')
page_iterator = emr.get_paginator('list_clusters').paginate(
ClusterStates=['RUNNING','WAITING']
)
for page in page_iterator:
for item in page['Clusters']:
print(item['Id'])
或更简单
response = emr.list_clusters(
ClusterStates=[
'RUNNING','WAITING',],)
print(response)
您也可以过滤集群,例如使用list_cluster
CreatedAfter=datetime(2019,11,11),CreatedBefore=datetime(2020,1,1)
手头没有cluster-id,您可以使用add_job_flow_steps
例如,一个步骤可以是
newsteps =[
{
'Name': 'AWS S3 Copy','ActionOnFailure': 'CONTINUE','HadoopJarStep': {
'Jar': 'command-runner.jar','Args':["aws","s3","cp","s3://XXX/","/home/hadoop/copy/","--recursive"],}
}
]
添加具有您检索到的集群ID的步骤(例如j-xxxxxxxxxxx)
step_response = emr.add_job_flow_steps(JobFlowId="j-xxxxxxxxxxxxxxx",Steps=newsteps)
step_ids = step_response['StepIds']
print("Step IDs:",step_ids)