Apache Kafka的mongoSourceConnector

问题描述

我正在使用Apache Kafka和mongoSourceConnector来接收数据库更改(例如文档插入),并希望将其发送到kafka。在我的文档字段中,我将发送一个主题密钥,该密钥将把认的kafka主题重写为文档的字段主题值。

为此,我尝试了smt ExtractTopic $ Value(io.confluent.connect.transforms.ExtractTopic)

我的配置如下:

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=<my_connection_uri>
database=<my_db>
collection=messages

#topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[{"$match": { "$or": [{"operationType": "insert"},{"operationType": "update"}]}}]
batch.size=0
change.stream.full.document=updateLookup
publish.full.document.only=true
collation=

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

# key.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter

transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=topic

kafkacat返回时不带smt(暂时在此处搜索主题

{"_id": {"$oid": "5f719f1d559f00326a405b22"},"part_id": "","topic": "my_custom_topic","payload": "some payload data","__v": 0}

但是如果正在使用smt转换(ExtractTopic),则会抛出该错误

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction],found: java.lang.String

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)