问题描述
我正在使用 Cloudwatch 订阅将一个帐户的 cloudtrail 日志发送到另一个帐户。接收日志的账户有一个 Kinesis 数据流,它接收来自 cloudwatch 订阅的日志,并调用 AWS 提供的标准 lambda 函数来解析日志并将其存储到日志接收者账户的 S3 存储桶中。 写入 s3 存储桶的日志文件采用以下形式:
{"eventVersion":"1.08","userIdentity":{"type":"AssumedRole","principalId":"AA:i-096379450e69ed082","arn":"arn:aws:sts::34502sdsdsd:assumed-role/RDSAccessRole/i-096379450e69ed082","accountId":"34502sdsdsd","accessKeyId":"ASIAVAVKXAXXXXXXXC","sessionContext":{"sessionIssuer":{"type":"Role","principalId":"AROAVAVKXAKddddD","arn":"arn:aws:iam::3450291sdsdsd:role/RDSAccessRole","accountId":"345029asasas","userName":"RDSAccessRole"},"webIdFederationData":{},"attributes":{"mfaAuthenticated":"false","creationDate":"2021-04-27T04:38:52Z"},"ec2RoleDelivery":"2.0"}},"eventTime":"2021-04-27T07:24:20Z","eventSource":"ssm.amazonaws.com","eventName":"ListInstanceAssociations","awsRegion":"us-east-1","sourceIPAddress":"188.208.227.188","userAgent":"aws-sdk-go/1.25.41 (go1.13.15; linux; amd64) amazon-ssm-agent/","requestParameters":{"instanceId":"i-096379450e69ed082","maxResults":20},"responseElements":null,"requestID":"a5c63b9d-aaed-4a3c-9b7d-a4f7c6b774ab","eventID":"70de51df-c6df-4a57-8c1e-0ffdeb5ac29d","readOnly":true,"resources":[{"accountId":"34502914asasas","ARN":"arn:aws:ec2:us-east-1:3450291asasas:instance/i-096379450e69ed082"}],"eventType":"AwsApiCall","managementEvent":true,"eventCategory":"Management","recipientAccountId":"345029149342"}
{"eventVersion":"1.08","principalId":"AROAVAVKXAKPKZ25XXXX:AmazonMWAA-airflow","arn":"arn:aws:sts::3450291asasas:assumed-role/dev-1xdcfd/AmazonMWAA-airflow","accountId":"34502asasas","accessKeyId":"ASIAVAVKXAXXXXXXX","principalId":"AROAVAVKXAKPKZXXXXX","arn":"arn:aws:iam::345029asasas:role/service-role/AmazonMWAA-dlp-dev-1xdcfd","accountId":"3450291asasas","userName":"dlp-dev-1xdcfd"},"creationDate":"2021-04-27T07:04:08Z"}},"invokedBy":"airflow.amazonaws.com"},"eventTime":"2021-04-27T07:23:46Z","eventSource":"logs.amazonaws.com","eventName":"CreateLogStream","sourceIPAddress":"airflow.amazonaws.com","userAgent":"airflow.amazonaws.com","errorCode":"ResourceAlreadyExistsException","errorMessage":"The specified log stream already exists","requestParameters":{"logStreamName":"scheduler.py.log","logGroupName":"dlp-dev-DAGProcessing"},"requestID":"40b48ef9-fc4b-4d1a-8fd1-4f2584aff1e9","eventID":"ef608d43-4765-4a3a-9c92-14ef35104697","readOnly":false,"apiVersion":"20140328","recipientAccountId":"3450291asasas"}
这种类型的日志行的问题是 Athena 无法解析这些日志行,我也无法使用 Athena 查询日志。
我尝试修改蓝图 lambda 函数以将日志文件保存为标准 JSON 结果,这将使 Athena 能够轻松解析文件。
例如:
{'Records': ['{"eventVersion":"1.08","principalId":"AROAVAVKXAKPBRW2S3TAF:i-096379450e69ed082","arn":"arn:aws:sts::345029149342:assumed-role/RightslineRDSAccessRole/i-096379450e69ed082","accountId":"345029149342","accessKeyId":"ASIAVAVKXAKPBL653UOC","principalId":"AROAVAVKXAKPXXXXXXX","arn":"arn:aws:iam::34502asasas:role/RDSAccessRole","resources":[{"accountId":"3450291asasas","ARN":"arn:aws:ec2:us-east-1:34502asasas:instance/i-096379450e69ed082"}],"recipientAccountId":"345029asasas"}]}
import base64
import json
import gzip
from io import BytesIO
import boto3
def transformlogEvent(log_event):
return log_event['message'] + '\n'
def processRecords(records):
for r in records:
data = base64.b64decode(r['data'])
striodata = BytesIO(data)
with gzip.GzipFile(fileobj=striodata,mode='r') as f:
data = json.loads(f.read())
recId = r['recordId']
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped','recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
result = {}
result["Records"] = {}
events = []
for e in data['logEvents']:
events.append(e["message"])
result["Records"] = events
print(result)
if len(result) <= 6000000:
yield {
'data': result,'result': 'Ok','recordId': recId
}
else:
yield {
'result': 'ProcessingFailed','recordId': recId
}
else:
yield {
'result': 'ProcessingFailed','recordId': recId
}
def putRecordsToFirehoseStream(streamName,records,client,attemptsMade,maxAttempts):
FailedRecords = []
codes = []
errMsg = ''
# if put_record_batch throws for whatever reason,response['xx'] will error out,adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_record_batch(DeliveryStreamName=streamName,Records=records)
except Exception as e:
FailedRecords = records
errMsg = str(e)
# if there are no FailedRecords (put_record_batch succeeded),iterate over the response to gather results
if not FailedRecords and response and response['FailedPutCount'] > 0:
for idx,res in enumerate(response['RequestResponses']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
FailedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(FailedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records Failed while calling PutRecordBatch to Firehose stream,retrying. %s' % (errMsg))
putRecordsToFirehoseStream(streamName,FailedRecords,attemptsMade + 1,maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts),errMsg))
def putRecordsToKinesisstream(streamName,maxAttempts):
FailedRecords = []
codes = []
errMsg = ''
# if put_records throws for whatever reason,adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_records(StreamName=streamName,iterate over the response to gather results
if not FailedRecords and response and response['FailedRecordCount'] > 0:
for idx,res in enumerate(response['Records']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
FailedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(FailedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records Failed while calling PutRecords to Kinesis stream,retrying. %s' % (errMsg))
putRecordsToKinesisstream(streamName,errMsg))
def createReingestionRecord(isSas,originalRecord):
if isSas:
return {'data': base64.b64decode(originalRecord['data']),'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
else:
return {'data': base64.b64decode(originalRecord['data'])}
def getReingestionRecord(isSas,reIngestionRecord):
if isSas:
return {'Data': reIngestionRecord['data'],'PartitionKey': reIngestionRecord['partitionKey']}
else:
return {'Data': reIngestionRecord['data']}
def lambda_handler(event,context):
print(event)
isSas = 'sourceKinesisstreamArn' in event
streamARN = event['sourceKinesisstreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records']))
projectedSize = 0
dataByRecordId = {rec['recordId']: createReingestionRecord(isSas,rec) for rec in event['records']}
putRecordBatches = []
recordsToReingest = []
totalRecordsToBeReingested = 0
for idx,rec in enumerate(records):
if rec['result'] != 'Ok':
continue
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
totalRecordsToBeReingested += 1
recordsToReingest.append(
getReingestionRecord(isSas,dataByRecordId[rec['recordId']])
)
records[idx]['result'] = 'Dropped'
del(records[idx]['data'])
# split out the record batches into multiple groups,500 records at max per group
if len(recordsToReingest) == 500:
putRecordBatches.append(recordsToReingest)
recordsToReingest = []
if len(recordsToReingest) > 0:
# add the last batch
putRecordBatches.append(recordsToReingest)
# iterate and call putRecordBatch for each group
recordsReingestedSoFar = 0
if len(putRecordBatches) > 0:
client = boto3.client('kinesis',region_name=region) if isSas else boto3.client('firehose',region_name=region)
for recordBatch in putRecordBatches:
if isSas:
putRecordsToKinesisstream(streamName,recordBatch,attemptsMade=0,maxAttempts=20)
else:
putRecordsToFirehoseStream(streamName,maxAttempts=20)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar,totalRecordsToBeReingested,len(event['records'])))
else:
print('No records to be reingested')
return {"records": records}
我的最终目标是将结果以 JSON 格式存储在 S3 上,以便可以使用 Athena 轻松查询。
发生转换的那一行是:
elif data['messageType'] == 'DATA_MESSAGE':
在这方面的任何帮助将不胜感激。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)