如何使用MongoDB在Kafka Connect Sink连接器中获取kafka消息的标头

问题描述

如何使用Kafka Connect从kafka消息中检索传入的标头,以使用MongoDB Sink Connector到mongodb将它们存储为其他数据字段。

我有一个kafka主题“ PROJECT_EXAMPLE_TOPIC”。 如您所见,我已经能够保存msg时间戳,传入消息数据和mongo文档的创建/更新日期。

我想这里有个提取文件功能

kafka值示例

  // incoming kafka value
  {
    "msgid" : "exampleId"
  }
  1. 如何获取原始标头 header_foo

  //expected example
  {
  
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),"message_source" : "mongo_connector","message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),"message_topic" : "PROJECT_EXAMPLE_TOPIC","msgid" : "exampleId","message_header_foo" : "header_foo_value"
   }


  1. 如何获取所有kafka标头?
  //expected example
  {
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),"message_headers" : {
        "header_001" : "header_001_value","header_002" : "header_002_value",...
        "header_x" : "header_x_value"
    }
  }


有我的配置

{
    "name": "sink-mongo-PROJECT-EXAMPLE","config": {
      "topics": "PROJECT_EXAMPLE_TOPIC","connector.class": "com.mongodb.kafka.connect.MongoSinkConnector","tasks.max": "1","key.converter": "org.apache.kafka.connect.storage.StringConverter","key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL","key.converter.schemas.enable": "false","key.converter.basic.auth.credentials.source": "USER_INFO","key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL","value.converter.schemas.enable": "false","value.converter.basic.auth.credentials.source": "USER_INFO","value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET","connection.uri": "PROJECT_referential_MONGO_URL","database": "PROJECT_DB_NAME","collection": "EXAMPLE","max.num.retries": "3","retries.defer.timeout": "5000","key.projection.type": "none","key.projection.list": "","field.renamer.mapping": "[]","field.renamer.regex": "[]","document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy","post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder","value.projection.list": "msgid","value.projection.type": "whitelist","writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsstrategy","delete.on.null.values": "false","max.batch.size": "0","rate.limiting.timeout": "0","rate.limiting.every.n": "0","change.data.capture.handler": "","errors.tolerance": "all","errors.log.enable":true,"errors.log.include.messages":true,"transforms": "InsertSource,InsertTopic,InsertTimestamp","transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value","transforms.InsertSource.static.field": "message_source","transforms.InsertSource.static.value": "mongo_connector","transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value","transforms.InsertTopic.topic.field": "message_topic","transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value","transforms.InsertTimestamp.timestamp.field": "message_timestamp"

    }
  }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)