问题描述
我有一个dynamo DB表,该表将用于存储失败的请求,稍后,另一个lambda将要读取请求并对其进行重新处理。
目前,我正在使用打字稿CDK创建这样的表
const myTable = new dynamodb.Table(this,"my-table",{
tableName: "my-table-name",partitionKey: { name: "file_id",type: dynamodb.AttributeType.STRING },});
我正在使用python lambda这样将数据发送到表中
dynamodb = boto3.resource("dynamodb",region_name=region)
my_table = dynamodb.Table("my-table-name")
failedRecord = {
"file_id": str(file_id),"processed": "false","payload": str(payload),}
my_table.put_item(Item=failedRecord)
现在我想从另一个lambda中处理表中所有带有处理=假的条目,我想读取它们,对它们做些什么,然后更新它们的处理=真。
我是否需要在此处添加二级索引才能有效。一个很好的例子。
谢谢
解决方法
考虑创建一个仅包含 个未处理项目的全局二级索引。您可以通过添加/删除GSI主键来从GSI添加/删除项。例如,考虑以下表结构:
请注意,只有file_id
3和4定义了GSIPK。 GSI在逻辑上看起来像这样:
DynamoDB仅将项目投影到该项目上存在GSIPK的索引中。您的lambda可以从GSI中读取,进行一些工作,将processed
属性设置为true
并删除GSIPK
值。这样可以有效地从二级索引中删除该项目。
对DynamoDB进行的update
调用看起来像这样:
const params = {
TableName: YOUR_TABLE_NAME_HERE,Key: {
PK: FILE_ID_HERE
},UpdateExpression: "SET #processed = :true REMOVE #gsipk",ExpressionAttributeNames: {
"#processed": "processed","#gsi1pk": "GSIPK",},ExpressionAttributeValues: {
":true": true
}
};
ddbClient.update(params);
,
假设您的filenote_id
已经是唯一的(应该假定您已将其设置为“分区键”),并且使用共享的记录格式和表模式为GSI而不添加排序键也不会任何区别。
您可以考虑使用的另一种方法是为有问题的表启用DynamoDB Stream并将其设置为trigger of the second Lambda Function。
使用这种方法,您实际上将捕获表上的所有活动,并且按照逻辑,您可以过滤掉所有非INSERT
的事件,并按照自己的进度处理您感兴趣的事件。
这样,您可以避免完全查询表。