问题描述
我正在 Kinesis 数据流中接收 cloudtrail 日志。我正在调用流处理 lambda 函数,如 here 所述。然后将返回到流的最终结果存储到 S3 存储桶中。截至目前,处理失败并在 S3 存储桶中创建以下错误文件:
{"attemptsMade":4,"arrivalTimestamp":1619677225356,"errorCode":"Lambda.FunctionError","errorMessage":"Check your function and make sure the output is in required format. In addition to that,make sure the processed records contain valid result status of Dropped,Ok,or ProcessingFailed","attemptEndingTimestamp":1619677302684,
import base64
import gzip
import json
import logging
# Setup logging configuration
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def unpack_kinesis_stream_records(event):
# decode and decompress each base64 encoded data element
return [gzip.decompress(base64.b64decode(k["data"])).decode('utf-8') for k in event["records"]]
def decode_raw_cloud_trail_events(cloudTrailEventDataList):
#Convert Raw Event Data List
eventList = [json.loads(e) for e in cloudTrailEventDataList]
#Filter out-non DATA_MESSAGES
filteredEvents = [e for e in eventList if e["messageType"] == 'DATA_MESSAGE']
#Covert each indidual log Event Message
events = []
for f in filteredEvents:
for e in f["logEvents"]:
events.append(json.loads(e["message"]))
logger.info("{0} Event Logs Decoded".format(len(events)))
return events
def handle_request(event,context):
#Log Raw Kinesis Stream Records
#logger.debug(json.dumps(event,indent=4))
# Unpack Kinesis Stream Records
kinesisData = unpack_kinesis_stream_records(event)
#[logger.debug(k) for k in kinesisData]
# Decode and filter events
events = decode_raw_cloud_trail_events(kinesisData)
####### INTEGRATION CODE GOES HERE #########
return f"Successfully processed {len(events)} records."
def lambda_handler(event,context):
return handle_request(event,context)
谁能帮我理解这里的问题。
解决方法
我相信您使用的是“kinesis firehose”服务而不是“kinesis data stream”。您使用的代码用于直接从 kinesis 数据流中读取并处理 cloudtrail 事件。
kinesis firehose 数据转换 lambda 函数不同。 Firehose 将接收到的 cloudtrail 事件发送到 lambda 函数。 Lambda 处理/转换事件并将这些事件发送回 firehose,以便 firehose 可以将它们传送到目标 S3 存储桶。
您的 lambda 函数应该以与 firehose 期望的完全相同的格式返回记录,并且每个记录都应该具有 [Dropped、Ok 或 ProcessingFailed] 状态之一。您可以在 aws doc
中阅读更多信息