Python:将变量传递给另一个函数

问题描述

我对 Python 真的很陌生,希望有人能帮我解决这个问题。我有一个函数(实际上是一个 AWS Lamdba 函数),我需要在它创建后传递一个变量值。我这样做是为了在创建细分时在 Pinpoint 中创建营销活动。

import os
import time
import boto3
from botocore.exceptions import ClientError
from datetime import datetime,timedelta

AWS_REGION = os.environ['region']
projectId = os.environ['projectId']
importRoleArn = os.environ['importRoleArn']

def lambda_handler(event,context):
    print("Received event: " + str(event))
    for record in event['Records']:
        # Assign some variables to make it easier to work with the data in the
        # event recordi
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        folder =  os.path.split(key)[0]
        folder_path = os.path.join(bucket,folder)
        full_path = os.path.join(bucket,key)
        s3_url = "s3://" + folder_path
        # print(full_path);
        # Check to see if all file parts have been processed.
        if all_files_processed(bucket,folder,full_path):
            # If you haven't recently run an import job that uses a file stored in
            # the specified S3 bucket,then create a new import job. This prevents
            # the creation of duplicate segments.
            if not (check_import_jobs(bucket,s3_url,full_path)):
                segmentID = create_import_job(s3_url,full_path)
                create_campaign(segmentID)
            else:
                print("Import job found with URL s3://"
                        + os.path.join(bucket,folder) + ". Aborting.")
        else:
            print("Parts haven't finished processing yet.")

# Determine if all of the file parts have been processed.
def all_files_processed(bucket,full_path):
    # Use the "__ofN" part of the file name to determine how many files there
    # should be.
    number_of_parts = int((full_path.split("__of")[1]).split("_processed")[0])

    # figure out how many keys contain the prefix for the current batch of
    # folders (basically,how many files are in the appropriate "folder").
    client = boto3.client('s3')
    objs = client.list_objects_v2(Bucket=bucket,Prefix=folder)
    file_count = objs['KeyCount']

    ready_for_import = False

    if file_count == number_of_parts:
        ready_for_import = True

    return ready_for_import

# Check Amazon Pinpoint to see if any import jobs have been created by using
# the same S3 folder.
def check_import_jobs(bucket,full_path):
    url_list = []
    print(s3_url);
    # Retrieve a list of import jobs for the current project ID.
    client = boto3.client('pinpoint')
    try:
        client_response = client.get_import_jobs(
            ApplicationId=projectId
        )
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        segment_response = client_response['ImportJobsResponse']['Item']
        #print(segment_response);
        # Parse responses. Add all S3Url values to a list.
        for item in segment_response:
            #print(item);
            s3_url_existing = full_path
            url_list.append(s3_url_existing)
    #print(url_list);
    # Search for the current S3 URL in the list.
    if s3_url in url_list:
        found = True
    else:
        found = False
    print(found);
    return found

# Create the import job in Amazon Pinpoint.
def create_import_job(s3_url,full_path):
    client = boto3.client('pinpoint')

    segment_name = s3_url.split('/')[4]

    try:
        response = client.create_import_job(
            ApplicationId=projectId,ImportJobRequest={
                'Definesegment': True,'Format': 'CSV','RegisterEndpoints': True,'RoleArn': importRoleArn,'S3Url': s3_url,'SegmentName': segment_name
            }
        )
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        print("Import job " + response['ImportJobResponse']['Id'] + " "
                + response['ImportJobResponse']['JobStatus'] + ".")

        print("Segment ID: "
                + response['ImportJobResponse']['DeFinition']['SegmentId'])

        print("Application ID: " + projectId)
        return response['ImportJobResponse']['DeFinition']['SegmentId']
        
def create_campaign(segmentID):
    
    client = boto3.client('pinpoint')
    Now = datetime.Now()
    dt_string = Now.isoformat()
    print(type(segmentID))
    try:
        
        response = client.create_campaign(
            ApplicationId=projectId,WriteCampaignRequest={
                'Description': 'Test SMS Campaign 2','MessageConfiguration': {
                    'EmailMessage': {
                        'Body': 'This is a test 2','FromAddress': 'xxx@xxx.com','HtmlBody': '<p>Test 2</p>','Title': 'This is a test 2'
                    },'SMSMessage': {
                        'Body': 'Thanks for your visit to {{Attributes.Provider_Name}} on {{Attributes.Clinical_Date_of_Service}}','MessageType': 'PROMOTIONAL','SenderId': 'XXX'
                    }
                },'Schedule': {
                    'Frequency': 'ONCE','IsLocalTime': True,'StartTime': dt_string,'Timezone': 'UTC'
                },'Name': 'Test Email Campaign 6','SegmentId': segmentID
            }

        )
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        print('Campaign Created')

这个问题出现在我想将 SegmentID 发送到的 create_campaign 中。我最终收到以下错误...

“未找到 SegmentId 中指定的段”

我可以将 segmentID 打印到控制台没问题,只是让它传递给函数是我遇到的障碍。提前致谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)