问题描述
我正在尝试使用Google PubSub源连接器将数据从我的Google云获取到kafka。我确实获得了数据,但消息以字节为单位。我引用了here,并且如上所述,我已经使用JSON转换器对其进行了更改。
这是我的连接器代码部分:
name=cpsSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha
这就是我在卡夫卡中得到的东西:
{
"schema":{
"type":"struct","fields":[
{
"type":"bytes","optional":false,"field":"message"
},{
"type":"string","field":"subFolder"
},"field":"deviceid"
},"field":"deviceRegistryLocation"
},"field":"projectId"
},"field":"deviceNumId"
},"field":"deviceRegistryId"
}
],"optional":false
},"payload":{
"message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=","subFolder":"","deviceid":"deviceid","deviceRegistryLocation":"region_value","projectId":"projectid","deviceNumId":"device_num_value","deviceRegistryId":"registryid"
}
}
即使提供了连接器,细节我也得到了字节信息。我还需要做些什么将其转换为json格式吗?
解决方法
Cloud Pub / Sub Kafka连接器不检查或转换其收到的消息中的数据;它只是将数据字段作为字节传递,这是PubsubMessage中字段的类型。当前无法使连接器本身读取消息的内容并将其转换为JSON。