通过python代码在jupyter笔记本中创建EMR步骤功能

问题描述

现在,我将许多小的单个火花镶木文件从EMR传输到S3。我目前这样做的方式是通过在集群Steps UI中创建一个AWS建议的step函数,下面是一个示例

JAR location :command-runner.jar
Main class :None
Arguments :/usr/bin/s3-dist-cp --src=/refoutput --dest=s3://***-us-east-1/bens-flattened-step/refs
Action on failure:Continue

实际上,我需要根据要上传文件来更改--dest参数的某些部分。我想做的是创建相同的step函数,除了使用我可以放入Jupyter笔记本中的python代码之外,而不是使用UI来执行。这可能吗? 另外,当有许多小文件时,spark.parquet.write('s3 path')导致S3挂起,因此这不是可行的解决方案。

解决方法

在Notebook中,您可以使用boto3列出群集,并使用Cluster-ID,可以提交启动步骤。

首先。要安装boto3,您可以将pip软件包安装为

sc.install_pypi_package("boto3")
#You can check it by using sc.list_packages()

使用boto3,您可以列出群集ID

import boto3

boto3 = boto3.session.Session(region_name='us-east-2')
emr = boto3.client('emr')

page_iterator = emr.get_paginator('list_clusters').paginate(
    ClusterStates=['RUNNING','WAITING']
)

for page in page_iterator:
    for item in page['Clusters']:
        print(item['Id'])

或更简单

response = emr.list_clusters(
    ClusterStates=[
        'RUNNING','WAITING',],)
print(response)

您也可以过滤集群,例如使用list_cluster

的日期参数
CreatedAfter=datetime(2019,11,11),CreatedBefore=datetime(2020,1,1)

手头没有cluster-id,您可以使用add_job_flow_steps

提交步骤

例如,一个步骤可以是

newsteps =[
                 {
            'Name': 'AWS S3 Copy','ActionOnFailure': 'CONTINUE','HadoopJarStep': {
                'Jar': 'command-runner.jar','Args':["aws","s3","cp","s3://XXX/","/home/hadoop/copy/","--recursive"],}
              }
              ]

添加具有您检索到的集群ID的步骤(例如j-xxxxxxxxxxx)

step_response = emr.add_job_flow_steps(JobFlowId="j-xxxxxxxxxxxxxxx",Steps=newsteps)

step_ids = step_response['StepIds']

print("Step IDs:",step_ids)