每当将新数据加载到s3存储桶中时,如何触发python脚本?

问题描述

我正在尝试从s3存储桶中提取数据,该存储桶将在第二秒获得新记录。数据以每小时250+ G的速度传入。我正在创建一个Python脚本,该脚本将连续运行以在之前实时收集新的数据负载

这是s3存储桶键的结构:

o_key=7111/year=2020/month=8/day=11/hour=16/minute=46/second=9/ee9.jsonl.gz
o_key=7111/year=2020/month=8/day=11/hour=16/minute=40/second=1/ee99999.jsonl.gz

我正在使用Boto3尝试尝试此操作,这是我到目前为止所拥有的:

s3_resource = boto3.resource('s3',aws_access_key_id=ACCESS_KEY,aws_secret_access_key=SECRET_KEY,verify=False)
s3_bucket = s3_resource.Bucket(BUCKET_NAME)
files = s3_bucket.objects.filter()
files = [obj.key for obj in sorted(files,key=lambda x: x.last_modified,reverse=True)]
for x in files:
    print(x)

这将输出该存储桶中的所有键,并按last_modified数据进行排序。但是,是否有一种方法可以暂停脚本,直到加载新数据,然后再处理该数据等等?加载新数据时可能会有20秒的延迟,这是在形成逻辑时给我带来麻烦的另一件事。任何想法或建议都会有所帮助。

 s3_resource = boto3.resource('s3',verify=False)
 s3_bucket = s3_resource.Bucket(BUCKET_NAME)

 files = s3_bucket.objects.filter()
 while list(files): #check if the key exists
         if len(objs) > 0 and objs[0].key == key:
                   print("Exists!")
         else:
               time.sleep(.1) #sleep until the next key is there
               continue 

这是我尝试过的另一种方法,但效果不佳。我试图在没有下一个数据的时候入睡,然后在加载新数据后对其进行处理。

解决方法

Amazon S3通知功能使您可以在存储桶中发生某些事件时接收通知。要启用通知,您必须首先添加一个通知配置,该配置标识您希望Amazon S3发布的事件以及您希望Amazon S3发送通知的目的地。 将此配置存储在与存储桶关联的通知子资源中。 -通常在Lambda中...

https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html

希望这对您有帮助
r0ck