设置Kafka连接器管道时发生错误 环境概述:

问题描述

设置Kafka连接器管道时出错

环境概述:

Docker容器

  1. docker run --name some-MysqL -e MysqL_ROOT_PASSWORD = mypw -d MysqL:latest
  2. docker run -d --name Elasticsearch -p 9200:9200 -p 9300:9300 -e“ discovery.type = single-node” Elasticsearch :7.9。 2

操作系统

  1. 开启WSL2-Windows-10版本1909(操作系统内部版本18363.1139)

Kafka版本

  1. confluent-6.0.0

你好 我正在阅读Neha Narkhede,Gwen Shapira和Todd Palino撰写的《卡夫卡:权威指南》 我已经到达连接器示例:从MysqL到Elasticsearch 章节(第146页), 按照说明创建从MysqL源到 Elasticsearch 接收器的管道。 我已经从说明中转移了一些东西

  • 我使用mvn软件包而不是mvn build创建了MysqL Elasticsearch 连接器
  • 我将所述连接器放在名为C:\ Users \ ROY \ confluent-6.0.0 \ share \ kafka的文件夹中 与我下载的其他一些连接器相比更加坚固。我将connect-distributed.properties中的plugin.path设置为: plugin.path = C://用户//ROY//confluent-6.0.0//share//kafka,/mnt/c/Users/ROY/confluent-6.0.0/share/kafka
  • 我正在使用 MysqL Elasticsearch 作为docker容器。

MysqL连接器可以正常工作并将数据读入主题, 但是当我尝试创建 Elasticsearch 连接器时,出现以下错误

...
(io.confluent.connect._Elasticsearch_._Elasticsearch_SinkConnectorConfig:354)
[2020-10-16 12:22:27,170] ERROR WorkerSinkTask{id=elastic-login-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.NoClassDefFoundError: io/searchBox/action/Action
    at io.confluent.connect._Elasticsearch_._Elasticsearch_SinkTask.start(_Elasticsearch_SinkTask.java:74)
    at io.confluent.connect._Elasticsearch_._Elasticsearch_SinkTask.start(_Elasticsearch_SinkTask.java:48)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.classNotFoundException: io.searchBox.action.Action
    at java.base/java.net.urlclassloader.findClass(urlclassloader.java:471)
    at java.base/java.lang.classLoader.loadClass(ClassLoader.java:589)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.base/java.lang.classLoader.loadClass(ClassLoader.java:522)
    ... 11 more
...

有人可以协助解决问题并给出错误的解释吗?

谢谢

罗伊

解决方法

好,我发现了问题。在弹性搜索连接器源代码上运行“ mvn软件包”后,我应该将整个目录复制到Kafka的/ share目录中,而不是只使用一个.jar文件。