MSK 触发器的 AWS Lambda 事件负载

问题描述

我正在尝试反序列化值以恢复 Kafka 消息值。但我找不到合适的解串器。我收到以下值:

AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2

但是当我尝试通过以下方式获取反序列化消息的字符串表示时:

        var bytes = Convert.FromBase64String("AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2");
        var jsonBack = Encoding.UTF8.GetString(bytes);

而且它只能部分反序列化它。带有额外的符号。我找不到任何关于它如何序列化的文档,也看不到它的结构。有没有人在使用带有 MSK 触发器的 AWS Lambda for .Net 时看到同样的情况

下面我给出了一个来自 aws 文档的消息示例。但是没有任何关于保存在值中的信息(消息的键和值以及形式是什么)

MSK Event Payload

解决方法

我创建了一个用于此的类。当我开始使用它时,我遇到了类似的问题,因为 AWS 只给出了节点 JS 的示例。所以从那里我创建了下面的类,效果很好。尝试一下。使用此类作为接收到 lambda 函数的有效负载类型。因此,您的处理程序签名如下所示。请注意,您需要对下面“KafkaMessage”类中的“body”属性进行 base64decode。

public async Task FunctionHandler(MSKEvent evnt,ILambdaContext context)
{
//do some magic.
}

/// <summary>
/// Represents an MSK event. MSK event is the event when lambda is triggered via a kafka topic.
/// </summary>
public class MSKEvent
{
    /// <summary>
    /// The source of the event.
    /// </summary>
    public string EventSource { get; set; }

    /// <summary>
    /// The AWS arn of the event source.
    /// </summary>
    public string EventSourceArn { get; set; }

    /// <summary>
    /// The collection of records.
    /// </summary>
    public Dictionary<string,IEnumerable<KafkaMessage>> Records { get; set; }
}

/// <summary>
/// Represents a kafka message.
/// </summary>
public class KafkaMessage
{
    /// <summary>
    /// The kafka topic name this message belongs to.
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// The kafka partition this message belongs to.
    /// </summary>
    public int Partition { get; set; }

    /// <summary>
    /// The offset of this message.
    /// </summary>
    public int Offset { get; set; }

    /// <summary>
    /// The created timestamp in unix ms of this message.
    /// </summary>
    public long Timestamp { get; set; }

    /// <summary>
    /// The timestamp type.
    /// </summary>
    public string TimestampType { get; set; }

    /// <summary>
    /// Base64 encoded message body. Base64 decode this field to obtain the actual message.
    /// </summary>
    [JsonProperty("value")]
    public string Body { get; set; }
}