将批量 csv 数据上传到现有的 DynamoDB 表

问题描述

我正在尝试将数据从 csv 文件迁移到现有的 AWS DynamoDB 表中,作为 AWS Amplify 网络应用程序的一部分。

我遵循了 this CloudFormation tutorial,使用以下模板。

我只能创建一个新的 DynamoDB 表,但不能使用现有表并向其添加数据。

问题: 有没有办法修改模板,以便我可以在向导的“指定堆栈详细信息”步骤中的“DynamoDBTableName”下提供现有表名,以便将 csv 数据添加到表中?如果没有,是否有替代流程?

{
    "AWstemplateFormatVersion": "2010-09-09","Metadata": {

    },"Parameters" : {
        "BucketName": {
            "Description": "Name of the S3 bucket you will deploy the CSV file to","Type": "String","ConstraintDescription": "must be a valid bucket name."
        },"FileName": {
            "Description": "Name of the S3 file (including suffix)","ConstraintDescription": "Valid S3 file name."
        },"DynamoDBTableName": {
            "Description": "Name of the dynamoDB table you will use","ConstraintDescription": "must be a valid dynamoDB name."
        }
    },"Resources": {
        "DynamoDBTable":{
            "Type": "AWS::DynamoDB::Table","Properties":{
                "TableName": {"Ref" : "DynamoDBTableName"},"BillingMode": "PAY_PER_REQUEST","AttributeDeFinitions":[
                    {
                        "AttributeName": "id","AttributeType": "S"
                    }
                ],"KeySchema":[
                    {
                        "AttributeName": "id","KeyType": "HASH"
                    }
                ],"Tags":[
                    {
                        "Key": "Name","Value": {"Ref" : "DynamoDBTableName"}
                    }
                ]
            }
        },"LambdaRole" : {
          "Type" : "AWS::IAM::Role","Properties" : {
            "AssumeRolePolicyDocument": {
              "Version" : "2012-10-17","Statement" : [
                {
                  "Effect" : "Allow","Principal" : {
                    "Service" : ["lambda.amazonaws.com","s3.amazonaws.com"]
                  },"Action" : [
                    "sts:AssumeRole"
                  ]
                }
              ]
            },"Path" : "/","ManagedPolicyArns":["arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole","arn:aws:iam::aws:policy/AWSLambdaInvocation-DynamoDB","arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"],"Policies": [{
                             "PolicyName": "policyname","PolicyDocument": {
                                       "Version": "2012-10-17","Statement": [{
                                    "Effect": "Allow","Resource": "*","Action": [
                                                              "dynamodb:PutItem","dynamodb:BatchWriteItem"
                                                  ]
                                      }]
                             }
                    }]
          }
       },"CsvToDDBLambdaFunction": {
            "Type": "AWS::Lambda::Function","Properties": {
                "Handler": "index.lambda_handler","Role": {
                    "Fn::GetAtt": [
                        "LambdaRole","Arn"
                    ]
                },"Code": {
                    "ZipFile": {
                        "Fn::Join": [
                            "\n",[
                                "import json","import boto3","import os","import csv","import codecs","import sys","","s3 = boto3.resource('s3')","dynamodb = boto3.resource('dynamodb')","bucket = os.environ['bucket']","key = os.environ['key']","tableName = os.environ['table']","def lambda_handler(event,context):","   #get() does not store in memory","   try:","       obj = s3.Object(bucket,key).get()['Body']","   except:","       print(\"S3 Object Could not be opened. Check environment variable. \")","       table = dynamodb.Table(tableName)","       print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")","   batch_size = 100","   batch = []","   #DictReader is a generator; not stored in memory","   for row in csv.DictReader(codecs.getreader('utf-8-sig')(obj)):","      if len(batch) >= batch_size:","         write_to_dynamo(batch)","         batch.clear()","      batch.append(row)","   if batch:","      write_to_dynamo(batch)","   return {","      'statusCode': 200,","      'body': json.dumps('Uploaded to DynamoDB Table')","   }","def write_to_dynamo(rows):","      table = dynamodb.Table(tableName)","      print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")","      with table.batch_writer() as batch:","         for i in range(len(rows)):","            batch.put_item(","               Item=rows[i]","            )","      print(\"Error executing batch_writer\")"
                            ]
                        ]
                    }
                },"Runtime": "python3.7","Timeout": 900,"MemorySize": 3008,"Environment" : {
                    "Variables" : {"bucket" : { "Ref" : "BucketName" },"key" : { "Ref" : "FileName" },"table" : { "Ref" : "DynamoDBTableName" }}
                }
            }
        },"S3Bucket": {
            "DependsOn" : ["CsvToDDBLambdaFunction","BucketPermission"],"Type": "AWS::S3::Bucket","Properties": {

                "BucketName": {"Ref" : "BucketName"},"AccessControl": "BucketownerFullControl","NotificationConfiguration":{
                    "LambdaConfigurations":[
                        {
                            "Event":"s3:ObjectCreated:*","Function":{
                                "Fn::GetAtt": [
                                    "CsvToDDBLambdaFunction","Arn"
                                ]
                            }
                        }
                    ]
                }
            }
        },"BucketPermission":{
            "Type": "AWS::Lambda::Permission","Properties":{
                "Action": "lambda:InvokeFunction","FunctionName":{"Ref" : "CsvToDDBLambdaFunction"},"Principal": "s3.amazonaws.com","SourceAccount": {"Ref":"AWS::AccountId"}
            }
        }
    },"Outputs" : {

    }
}

一个答案 Dennis 的回答是一种解决方案,但您也可以注释掉 JSON 文件 "DynamoDBTable" 中的 "Resources" 部分。

解决方法

您可以使用 AWS Database Migration Service (DMS) 将 CSV 文件从 Amazon S3 迁移到 Amazon DynamoDB。看看这个step-by step walkthrough