Kafka Mqtt源连接器无法接收从MQTT发布的数据,并失败,并显示“ java.lang.NullPointerException”

问题描述

我是Kafka的新手,正在研究Kafka Mqtt源连接器。我正在用2个不同的示例来实现Mqtt源连接器。

请检查以下详细信息。

参考文献1:https://johanvandevenne.github.io/kafka-connect-mqtt/

Kafka Mqtt源连接器

 http://localhost:8083/connectors \
 -H 'Content-Type: application/json' \
 -d '{ "name": "mqtt-source-connector","config":
   {
     "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector","mqtt.topic":"temperature","kafka.topic":"mqtt.","mqtt.clientID":"my_client_id","mqtt.broker":"tcp://127.0.0.1:1883","key.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable":false,"value.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter.schemas.enable":false
   }
}'

Ref2:https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source","config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector","tasks.max" : "1","mqtt.server.uri" : "tcp://127.0.0.1:1883","mqtt.topics" : "temperature","kafka.topic" : "mqtt.","confluent.topic.bootstrap.servers": "localhost:9092","confluent.topic.replication.factor": "1","confluent.license":""
    }
}'

当我执行上述示例并检查连接器状态时,它显示出我的运行方式:

curl http:// localhost:8083 / connectors / mqtt-source / status | python -m json.tool

%总接收百分比%Xferd平均速度时间时间时间 当前 Dload上传总剩余左行驶速度100185100185 0 0 10277 0-:-:--:-:- -:-:-10277 { “连接器”:{ “ state”:“ RUNNING”, “ worker_id”:“ 127.0.1.1:8083” }, “ name”:“ mqtt-source”, “任务”: [ { “ id”:0, “ state”:“ RUNNING”, “ worker_id”:“ 127.0.1.1:8083” } ], “ type”:“源”}

但是,当我从Mqtt主题发布数据时,在kafka消费者主题中却没有收到。

消费者:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt。 --from-beginning

Mqtt发布主题:mosquitto_pub -h 127.0.0.1 -p 1883 -t温度 -q 2 -m“ 99999,2.10#”

当我从Mqtt发布数据时,两个Kafka Mqtt连接器都给我以下错误

{ “连接器”:{ “ state”:“ RUNNING”, “ worker_id”:“ 127.0.1.1:8083” }, “ name”:“ mqtt-source”, “任务”: [ { “ id”:0, “ state”:“ Failed”, “ trace”:“ java.lang.NullPointerException \ n \ tat org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)\ n org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)\ n \ tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)\ n \ tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\ n \ tat java.base / java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:515)\ n \ tat java.base / java.util.concurrent.FutureTask.run(FutureTask.java:264)\ n \ tat java.base / java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\ n \ tat java.base / java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:628)\ n \ tat java.base / java.lang.Thread.run(Thread.java:834)\ n“, “ worker_id”:“ 127.0.1.1:8083” } ], “ type”:“源”}

连接器属性是否存在问题?还是其他的东西??

预先感谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)