Confluent RabbitMQ 源连接器 - 配置、许可证相关错误?

问题描述

我们的 Kafka 设置包括 AWS MSK 上的代理、AWS EKS pod 上的 Confluent Kafka Connect (confluentinc/cp-kafka-connect:5.5.1)。

我们正在尝试使用 Confluent RabbitMQ 源连接器(商业连接器的试用版)https://docs.confluent.io/5.5.1/connect/kafka-connect-rabbitmq/index.html 并出现以下错误

连接器配置 -

{
  "connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector","confluent.topic.bootstrap.servers": "b-1.###.amazonaws.com:9092,b-2.###.amazonaws.com:9092,b-3.###.amazonaws.com:9092,b-4.###.amazonaws.com:9092","tasks.max": "1","rabbitmq.password": "user","rabbitmq.queue": "my_queue","rabbitmq.username": "pass","rabbitmq.virtual.host": "/","rabbitmq.port": "port","confluent.topic.replication.factor": "1","rabbitmq.host": "rabbit_host_ip","name": "Rabbit_Source_RT4","kafka.topic": "my_topic","value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

获取连接器状态 -

{
  "name": "Rabbit_Source_RT4,"connector": {
    "state": "Failed","worker_id": "kfk-connect:8083","trace": "java.lang.NullPointerException\n\tat io.confluent.license.License.readFully(License.java:195)\n\tat io.confluent.license.License.loadPublicKey(License.java:187)\n\tat io.confluent.license.License.loadPublicKey(License.java:181)\n\tat io.confluent.license.LicenseManager.loadPublicKey(LicenseManager.java:553)\n\tat io.confluent.license.LicenseManager.registerOrValidateLicense(LicenseManager.java:331)\n\tat io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:257)\n\tat io.confluent.connect.rabbitmq.RabbitMQSourceConnector.doStart(RabbitMQSourceConnector.java:62)\n\tat io.confluent.connect.rabbitmq.RabbitMQSourceConnector.start(RabbitMQSourceConnector.java:56)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:259)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder.startConnector(distributedHerder.java:1229)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder.access$1300(distributedHerder.java:127)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder$14.call(distributedHerder.java:1245)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder$14.call(distributedHerder.java:1241)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
  },"tasks": [],"type": "source"
}

连接器状态失败且未创建任务。也尝试更新此配置,但每次都出现相同的错误

日志 -

[2021-01-07 15:21:17,884] INFO Kafka version: 5.5.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2021-01-07 15:21:17,884] INFO Kafka commitId: a0a0000zzz0a0000 (org.apache.kafka.common.utils.AppInfoParser)
[2021-01-07 15:21:17,884] INFO Kafka startTimeMs: 1610032877884 (org.apache.kafka.common.utils.AppInfoParser)
[2021-01-07 15:21:17,884] INFO [Producer clientId=Rabbit_Source_RT4-license-manager] Cluster ID: -aAaAzxcvA1a0weaaa11A (org.apache.kafka.clients.Metadata)
[2021-01-07 15:21:17,887] INFO [Consumer clientId=Rabbit_Source_RT4-license-manager,groupId=null] Cluster ID: -aAaAzxcvA1a0weaaa11A (org.apache.kafka.clients.Metadata)
[2021-01-07 15:21:17,890] INFO [Consumer clientId=Rabbit_Source_RT4-license-manager,groupId=null] Subscribed to partition(s): _confluent-command-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2021-01-07 15:21:17,groupId=null] Seeking to EARLIEST offset of partition _confluent-command-0 (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2021-01-07 15:21:17,899] INFO [Consumer clientId=Rabbit_Source_RT4-license-manager,groupId=null] Resetting offset for partition _confluent-command-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2021-01-07 15:21:17,900] INFO Finished reading KafkaBasedLog for topic _confluent-command (org.apache.kafka.connect.util.KafkaBasedLog)
[2021-01-07 15:21:17,900] INFO Started KafkaBasedLog for topic _confluent-command (org.apache.kafka.connect.util.KafkaBasedLog)
[2021-01-07 15:21:17,900] INFO Started License Store (io.confluent.license.LicenseStore)
[2021-01-07 15:21:17,901] INFO Validating Confluent License (io.confluent.connect.utils.licensing.ConnectLicenseManager)
[2021-01-07 15:21:17,906] INFO Closing License Store (io.confluent.license.LicenseStore)
[2021-01-07 15:21:17,906] INFO Stopping KafkaBasedLog for topic _confluent-command (org.apache.kafka.connect.util.KafkaBasedLog)
[2021-01-07 15:21:17,908] INFO [Producer clientId=Rabbit_Source_RT4-license-manager] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-01-07 15:21:17,910] INFO Stopped KafkaBasedLog for topic _confluent-command (org.apache.kafka.connect.util.KafkaBasedLog)
[2021-01-07 15:21:17,910] INFO Closed License Store (io.confluent.license.LicenseStore)
[2021-01-07 15:21:17,910] ERROR WorkerConnector{id=Rabbit_Source_RT4} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
java.lang.NullPointerException
        at io.confluent.license.License.readFully(License.java:195)
        at io.confluent.license.License.loadPublicKey(License.java:187)
        at io.confluent.license.License.loadPublicKey(License.java:181)
        at io.confluent.license.LicenseManager.loadPublicKey(LicenseManager.java:553)
        at io.confluent.license.LicenseManager.registerOrValidateLicense(LicenseManager.java:331)
        at io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:257)
        at io.confluent.connect.rabbitmq.RabbitMQSourceConnector.doStart(RabbitMQSourceConnector.java:62)
        at io.confluent.connect.rabbitmq.RabbitMQSourceConnector.start(RabbitMQSourceConnector.java:56)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
        at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
        at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:259)
        at org.apache.kafka.connect.runtime.distributed.distributedHerder.startConnector(distributedHerder.java:1229)
        at org.apache.kafka.connect.runtime.distributed.distributedHerder.access$1300(distributedHerder.java:127)
        at org.apache.kafka.connect.runtime.distributed.distributedHerder$14.call(distributedHerder.java:1245)
        at org.apache.kafka.connect.runtime.distributed.distributedHerder$14.call(distributedHerder.java:1241)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2021-01-07 15:21:17,913] INFO Finished creating connector Rabbit_Source_RT4 (org.apache.kafka.connect.runtime.Worker)
[2021-01-07 15:21:17,913] INFO [Worker clientId=connect-1,groupId=compose-kfk-connect-group] Skipping reconfiguration of connector Rabbit_Source_RT4 since it is not running (org.apache.kafka.connect.runtime.distributed.distributedHerder)
[2021-01-07 15:21:17,groupId=compose-kfk-connect-group] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.distributedHerder)

GET /connector-plugins 请求的输出包含 -

{"class":"io.confluent.connect.rabbitmq.RabbitMQSourceConnector","type":"source","version":"0.0.0.0"},

还检查发现'_confluent-command'主题不包含任何消息。

  1. 是因为试用版已经结束,需要企业许可证还是由于配置错误
  2. 如何验证试用版的剩余时长(因为我们没有使用控制中心)?

提前致谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)