问题描述
我对 Python 真的很陌生,希望有人能帮我解决这个问题。我有一个函数(实际上是一个 AWS Lamdba 函数),我需要在它创建后传递一个变量值。我这样做是为了在创建细分时在 Pinpoint 中创建营销活动。
import os
import time
import boto3
from botocore.exceptions import ClientError
from datetime import datetime,timedelta
AWS_REGION = os.environ['region']
projectId = os.environ['projectId']
importRoleArn = os.environ['importRoleArn']
def lambda_handler(event,context):
print("Received event: " + str(event))
for record in event['Records']:
# Assign some variables to make it easier to work with the data in the
# event recordi
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
folder = os.path.split(key)[0]
folder_path = os.path.join(bucket,folder)
full_path = os.path.join(bucket,key)
s3_url = "s3://" + folder_path
# print(full_path);
# Check to see if all file parts have been processed.
if all_files_processed(bucket,folder,full_path):
# If you haven't recently run an import job that uses a file stored in
# the specified S3 bucket,then create a new import job. This prevents
# the creation of duplicate segments.
if not (check_import_jobs(bucket,s3_url,full_path)):
segmentID = create_import_job(s3_url,full_path)
create_campaign(segmentID)
else:
print("Import job found with URL s3://"
+ os.path.join(bucket,folder) + ". Aborting.")
else:
print("Parts haven't finished processing yet.")
# Determine if all of the file parts have been processed.
def all_files_processed(bucket,full_path):
# Use the "__ofN" part of the file name to determine how many files there
# should be.
number_of_parts = int((full_path.split("__of")[1]).split("_processed")[0])
# figure out how many keys contain the prefix for the current batch of
# folders (basically,how many files are in the appropriate "folder").
client = boto3.client('s3')
objs = client.list_objects_v2(Bucket=bucket,Prefix=folder)
file_count = objs['KeyCount']
ready_for_import = False
if file_count == number_of_parts:
ready_for_import = True
return ready_for_import
# Check Amazon Pinpoint to see if any import jobs have been created by using
# the same S3 folder.
def check_import_jobs(bucket,full_path):
url_list = []
print(s3_url);
# Retrieve a list of import jobs for the current project ID.
client = boto3.client('pinpoint')
try:
client_response = client.get_import_jobs(
ApplicationId=projectId
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
segment_response = client_response['ImportJobsResponse']['Item']
#print(segment_response);
# Parse responses. Add all S3Url values to a list.
for item in segment_response:
#print(item);
s3_url_existing = full_path
url_list.append(s3_url_existing)
#print(url_list);
# Search for the current S3 URL in the list.
if s3_url in url_list:
found = True
else:
found = False
print(found);
return found
# Create the import job in Amazon Pinpoint.
def create_import_job(s3_url,full_path):
client = boto3.client('pinpoint')
segment_name = s3_url.split('/')[4]
try:
response = client.create_import_job(
ApplicationId=projectId,ImportJobRequest={
'Definesegment': True,'Format': 'CSV','RegisterEndpoints': True,'RoleArn': importRoleArn,'S3Url': s3_url,'SegmentName': segment_name
}
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Import job " + response['ImportJobResponse']['Id'] + " "
+ response['ImportJobResponse']['JobStatus'] + ".")
print("Segment ID: "
+ response['ImportJobResponse']['DeFinition']['SegmentId'])
print("Application ID: " + projectId)
return response['ImportJobResponse']['DeFinition']['SegmentId']
def create_campaign(segmentID):
client = boto3.client('pinpoint')
Now = datetime.Now()
dt_string = Now.isoformat()
print(type(segmentID))
try:
response = client.create_campaign(
ApplicationId=projectId,WriteCampaignRequest={
'Description': 'Test SMS Campaign 2','MessageConfiguration': {
'EmailMessage': {
'Body': 'This is a test 2','FromAddress': 'xxx@xxx.com','HtmlBody': '<p>Test 2</p>','Title': 'This is a test 2'
},'SMSMessage': {
'Body': 'Thanks for your visit to {{Attributes.Provider_Name}} on {{Attributes.Clinical_Date_of_Service}}','MessageType': 'PROMOTIONAL','SenderId': 'XXX'
}
},'Schedule': {
'Frequency': 'ONCE','IsLocalTime': True,'StartTime': dt_string,'Timezone': 'UTC'
},'Name': 'Test Email Campaign 6','SegmentId': segmentID
}
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print('Campaign Created')
这个问题出现在我想将 SegmentID 发送到的 create_campaign 中。我最终收到以下错误...
“未找到 SegmentId 中指定的段”
我可以将 segmentID 打印到控制台没问题,只是让它传递给函数是我遇到的障碍。提前致谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)