如何将 stepfunction executionId 解析为 SageMaker 批量转换作业名称?

问题描述

我创建了一个步进函数,下面这个状态机的定义 (step-function.json) 用于 terraform(使用本页中的语法:https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html

如果我第一次执行这个状态机,它会创建一个名为 example-jobname 的 SageMaker 批量转换作业,但我需要每天执行这个状态机,然后它会给我错误 "error": "SageMaker.ResourceInUseException","cause": "Job name must be unique within an AWS account and region,and a job with this name already exists 。>

原因是因为作业名被硬编码为example-jobname所以如果状态机在第一次之后执行,由于作业名需要唯一,任务就会失败,只是想知道我是怎么做的可以添加一个字符串(类似于作业名称末尾的 ExecutionId)。这是我尝试过的:

  1. 我在 json 文件"executionId.$": "States.Format('somestring {}',$$.Execution.Id)" 部分添加Parameters,但是当我执行任务时出现错误 "error": "States.Runtime","cause": "An error occurred while executing the state 'SageMaker CreateTransformJob' (entered at the event id #2). The Parameters '{\"BatchStrategy\":\"SingleRecord\",..............\"executionId\":\"somestring arn:aws:states:us-east-1:xxxxx:execution:xxxxx-state-machine:xxxxxxxx72950\"}' Could not be used to start the Task: [The field \"executionId\" is not supported by Step Functions]"}

  2. 我将json文件中的jobname修改"TransformJobName": "example-jobname-States.Format('somestring {}',$$.Execution.Id)",,当我执行状态机时,它给了我错误"error": "SageMaker.AmazonSageMakerException","cause": "2 validation errors detected: Value 'example-jobname-States.Format('somestring {}',$$.Execution.Id)' at 'transformJobName' Failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9]){0,62}; Value 'example-jobname-States.Format('somestring {}',$$.Execution.Id)' at 'transformJobName' Failed to satisfy constraint: Member must have length less than or equal to 63

我真的没有想法了,有人可以帮忙吗?非常感谢。

解决方法

因此,根据 documentation,我们应该按以下格式传递参数

        "Parameters": {
            "ModelName.$": "$$.Execution.Name",....
        },

如果您仔细观察,您的定义中缺少某些内容,因此您的步进函数定义应如下所示:

要么

      "TransformJobName.$": "$$.Execution.Id",

      "TransformJobName.$: "States.Format('mytransformjob{}',$$.Execution.Id)"

完整的状态机定义:

    {
        "Comment": "Defines the statemachine.","StartAt": "Generate Random String","States": {
            "Generate Random String": {
                "Type": "Task","Resource": "arn:aws:lambda:eu-central-1:1234567890:function:randomstring","ResultPath": "$.executionid","Parameters": {
                "executionId.$": "$$.Execution.Id"
                },"Next": "SageMaker CreateTransformJob"
            },"SageMaker CreateTransformJob": {
            "Type": "Task","Resource": "arn:aws:states:::sagemaker:createTransformJob.sync","Parameters": {
            "BatchStrategy": "SingleRecord","DataProcessing": {
                "InputFilter": "$","JoinSource": "Input","OutputFilter": "xxx"
            },"Environment": {
                "SAGEMAKER_MODEL_SERVER_TIMEOUT": "300"
            },"MaxConcurrentTransforms": 100,"MaxPayloadInMB": 1,"ModelName": "${model_name}","TransformInput": {
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix","S3Uri": "${s3_input_path}"
                    }
                },"ContentType": "application/jsonlines","CompressionType": "Gzip","SplitType": "Line"
            },"TransformJobName.$": "$.executionid","TransformOutput": {
                "S3OutputPath": "${s3_output_path}","Accept": "application/jsonlines","AssembleWith": "Line"
            },"TransformResources": {
                "InstanceType": "xxx","InstanceCount": 1
            }
        },"End": true
        }
        }
    }

在上面的定义中,lambda 可以是一个函数,它解析我通过参数部分传递的执行 id arn:

 def lambda_handler(event,context):
    return(event.get('executionId').split(':')[-1])

或者如果你不想传递执行 id ,它可以简单地返回随机字符串

 import string
 def lambda_handler(event,context):
    return(string.ascii_uppercase + string.digits)

您可以生成各种随机字符串或在 lambda 中生成任何内容并将其传递给转换作业名称。