无法将IIDR CDC连接到kafka

问题描述

当尝试通过Zookeeper将kafka的IIDR复制引擎连接到kafka集群时,出现以下错误

kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT:
//broker:29092","PLAINTEXT_HOST://localhost:9092"],"jmx_port":9101,"host":"broker","timestamp":"1598174513950","port":29092,"version":4}
at kafka.cluster.broker$.createbroker(broker.scala:101)
at kafka.utils.ZkUtils.getbrokerInfo(ZkUtils.scala:787)
at kafka.utils.ZkUtils$$anonfun$getAllbrokersInCluster$2.apply(ZkUtils.scala:162)
at kafka.utils.ZkUtils$$anonfun$getAllbrokersInCluster$2.apply(ZkUtils.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.utils.ZkUtils.getAllbrokersInCluster(ZkUtils.scala:162)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.getBootstrapbrokers(KafkaTargetPublisherProxy.java:829)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.loadKafkaServicesInfo(KafkaTargetPublisherProxy.java:417)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.handleStartReplicateMessage(KafkaTargetPublisherProxy.java:139)
at com.datamirror.ts.enginemsg.Messagedispatcher.dispatchSwitch(Messagedispatcher.java:547)
at com.datamirror.ts.enginemsg.Messagedispatcher.dispatch(Messagedispatcher.java:142)
at com.datamirror.ts.engine.ReplicationSession.dispatchDynamicmessage(ReplicationSession.java:2816)
at com.datamirror.ts.engine.ReplicationSession.dispatchDataMessage(ReplicationSession.java:2910)
at com.datamirror.ts.target.publication.TargetDataChannelJob.moderateForTheTarget(TargetDataChannelJob.java:178)
at com.datamirror.ts.target.publication.TargetDataChannelJob.execute(TargetDataChannelJob.java:74)
at com.datamirror.ts.engine.component.PipelineThread.runThread(PipelineThread.java:217)
at com.datamirror.ts.util.TsThread.run(TsThread.java:130)
Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.protocol.SecurityProtocol.PLAINTEXT_HOST
at java.lang.Enum.valueOf(Enum.java:249)
at org.apache.kafka.common.protocol.SecurityProtocol.valueOf(SecurityProtocol.java:28)
at org.apache.kafka.common.protocol.SecurityProtocol.forName(SecurityProtocol.java:89)
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at kafka.cluster.broker$$anonfun$1.apply(broker.scala:90)
at kafka.cluster.broker$$anonfun$1.apply(broker.scala:89)
at scala.collection.immutable.List.map(List.scala:277)
at kafka.cluster.broker$.createbroker(broker.scala:89)

似乎IIDR和我的kafka群集之间存在兼容性问题。感谢有关如何克服这一问题的任何建议。

System setup
- IIDR v11.4 kafka engine setup
- Quick start confluent kafka docker setup,https://github.com/confluentinc/cp-all-in-one,cd cp-all-in-one

解决方法

这几天对于Kafka和IDR产品的建议都是使用bootstrap.servers并在kafkaproducer.properties和kafkaconsumer.properties文件中列出代理。如果需要追求Zookeeper的配置,请联系L2,但我们强烈建议根据Apache Kafka的建议使用bootstrap.servers参数。