问题描述
我正在尝试使用来自 lambda 函数的 MSK(托管 Amazon Kafka 服务)消息 - MSK 是我的 lambda 的触发器。
制作人看起来像这样:
Schema::create('cart_products',function (Blueprint $table) {
$table->id();
$table->unsignedBigInteger('user_id')->nullable();
$table->unsignedBigInteger('product_id');
$table->unsignedBigInteger('option_id')->nullable();
$table->integer('quantity');
});
在 lambda 函数中,我收到以下信息:
data = {'time': 1611215510000000000,'tags': {'tag1': 'tagvalue'},'fields': {'value': 12345}}
self.producer = KafkaProducer(
security_protocol=self.security_protocol,bootstrap_servers=self.kafka_servers,value_serializer=lambda x: dumps(x).encode('utf-8'))
self.producer.send(kafka_topic,value=data)
我想将值字符串转换为 JSON 对象。我怎么能做到?我已经尝试了很多版本,我认为应该工作的版本会引发异常 ({
"eventSource":"aws:kafka","eventSourceArn":"<arn....>","bootstrapServers":"<serverlist...>","records":{
"topic-0":[
{
"topic":"topic","partition":0,"offset":0,"timestamp":1611138328871,"timestampType":"CREATE_TIME","value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlX251bSI6IDAuMCwgInZhbHVlIjogZmFsc2V9fQ=="
},{
"topic":"topic","offset":1,"timestamp":1611138330033,"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlIjogMTQxMzUuMH19"
}
]
}
}
)
Exception: Expecting value: line 1 column 1 (char 0)
解决方法
值字符串似乎是 base64 编码的,因此您需要找到一种方法来解码它们。然后你可以加载它们。
使用https://www.base64decode.org/解码的第一个字符串:
{"time": 1611138280000000000,"tags": {"status": "Good"},"fields": {"value_num": 0.0,"value": false}}