Kafka 将值作为字符串发送:我如何反序列化它并使用 Python 将其转换为 JSON 对象

问题描述

我正在尝试使用来自 lambda 函数的 MSK(托管 Amazon Kafka 服务)消息 - MSK 是我的 lambda 的触发器。

制作人看起来像这样:

        Schema::create('cart_products',function (Blueprint $table) {
            $table->id();
            $table->unsignedBigInteger('user_id')->nullable();
            $table->unsignedBigInteger('product_id');
            $table->unsignedBigInteger('option_id')->nullable();
            $table->integer('quantity');
        });

在 lambda 函数中,我收到以下信息:

data = {'time': 1611215510000000000,'tags': {'tag1': 'tagvalue'},'fields': {'value': 12345}}
self.producer = KafkaProducer(
            security_protocol=self.security_protocol,bootstrap_servers=self.kafka_servers,value_serializer=lambda x: dumps(x).encode('utf-8'))
self.producer.send(kafka_topic,value=data)

我想将值字符串转换为 JSON 对象。我怎么能做到?我已经尝试了很多版本,我认为应该工作的版本会引发异常 ({ "eventSource":"aws:kafka","eventSourceArn":"<arn....>","bootstrapServers":"<serverlist...>","records":{ "topic-0":[ { "topic":"topic","partition":0,"offset":0,"timestamp":1611138328871,"timestampType":"CREATE_TIME","value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlX251bSI6IDAuMCwgInZhbHVlIjogZmFsc2V9fQ==" },{ "topic":"topic","offset":1,"timestamp":1611138330033,"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlIjogMTQxMzUuMH19" } ] } } )

Exception: Expecting value: line 1 column 1 (char 0)

解决方法

值字符串似乎是 base64 编码的,因此您需要找到一种方法来解码它们。然后你可以加载它们。

使用https://www.base64decode.org/解码的第一个字符串:

{"time": 1611138280000000000,"tags": {"status": "Good"},"fields": {"value_num": 0.0,"value": false}}