定期删除EMR群集日志

问题描述

我有一个EMR群集,该群集成功运行了几天的Spark Streaming作业。但是几天后,群集因步骤失败而终止。 我检查了日志,上面写着

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f8cb0854000,12288,0) Failed; error='Cannot allocate memory' (errno=12)
Command exiting with ret '1'

对于此错误,我检查并发现,对于JRE,内存不足。
我发现集群创建EMR步骤日志并存储在路径 / mnt / var / logs / hadoop / steps / step_id / 中,而在集群创建过程中,我给出了 logUri 路径由于该原因,日志被复制到s3位置。 所以我的猜测是,由于这些日志,导致步骤失败。

有人可以建议我如何定期从群集中删除这些emr步骤日志,以便群集不会耗尽内存吗?

解决方法

您可以使用以下boto3代码(我肯定也可以使用Java的AWS开发工具包在Java中完成此操作)来删除日志,要定期删除,您可以使用以下选项

  1. 使用Airflow之类的工作流计划程序,请参见下面的示例
  2. 将其用作lambda函数并安排其定期运行(容易得多)
  3. 在本地使用cron jon(不太可行)

删除日志的功能(输入即将到期的thresholdbucket nameprefix,可以是"logs/sparksteps/j-"

def clean_s3(buck,match_prefix,exp_threshold):

    s3_client = boto3.client('s3')
    key_names = []
    file_timestamp = []
    file_size = []
    kwargs = {"Bucket": buck,"Prefix": match_prefix}
    while True:
        result = s3_client.list_objects_v2(**kwargs)
        for obj in result["Contents"]:

            if "." in obj["Key"]:
                key_names.append(obj["Key"])
                file_timestamp.append(obj["LastModified"].timestamp())
                file_size.append(obj["Size"])
        try:
            kwargs["ContinuationToken"] = result["NextContinuationToken"]
        except KeyError:
            break

    key_info = {
        "key_path": key_names,"timestamp": file_timestamp,"size": file_size
    }
    #print(f'All Keys in {buck} with {prefix} Prefix found!')
    s3_file = key_info
    for i,fs in enumerate(s3_file["timestamp"]):
            #file_expired = is_expired(fs)
            #print(fs)
            if fs < exp_threshold: #if True is recieved
                    print("Deleting %s" % {s3_file["key_path"][i]})
                    s3_client.delete_object(Bucket=buck,Key=s3_file["key_path"][i])

您可以按以下方式计算需要通过的到期阈值(以秒为单位)

date_now = time.time()
days = 7 # 7 days
total_time = 86400*days 
exp_threshold = date_now-total_time

现在,对于选项1,您可以像下面这样制作气流操作器

  s3_cleanup = PythonOperator(
        task_id='s3cleanup',python_callable=clean_s3,op_kwargs={
            'buck': '<you bucket>','match_prefix': "logs/sparksteps/j-",'exp_threshold':exp_threshold,},dag=dag)

或者,使用方法2,您可以使用AWS lamda对其进行计划,请参见guide for schedling with lambda here