问题描述
我正在与3个kafka代理一起进行多节点Confluent kafka设置,其中存在用于Ksql和架构注册表的单独节点,并在单独的节点中运行连接器。这些经纪人之间的所有映射都是成功的。
我正在尝试使用融合的postgresql接收器连接器从ksql流主题“ twitter-csv_avro”将AVRO格式数据加载到postgresql表中。这是void showInSnackBar(String value) {
FocusScope.of(context).requestFocus(new FocusNode());
_scaffoldKey.currentState?.removeCurrentSnackBar();
_scaffoldKey.currentState.showSnackBar(new SnackBar(
content: new Text(
value,textAlign: TextAlign.center,style: TextStyle(
color: Colors.white,fontSize: 16.0,),backgroundColor: Colors.blue,duration: Duration(seconds: 3),));
}
文件
postgres-sink.properties
这是# key.converter=org.apache.kafka.connect.storage.StringConverter
# key.converter.schema.registry.url=http://host:8081
# value.converter=io.confluent.connect.avro.AvroConverter
# value.converter.schema.registry.url=http://host:8081
# schema.registry.url=http://host:8081
#Postgres sink connector id name which must be unique for each connector
name=twitter-streaming-postgres-sink
# Name of the connector class to be run
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Kafka topic name which is input data to postgresql table
topics=TWITTER_CSV_AVRO
# Postgresql Database connection details
connection.url=jdbc:postgresql://host:5432/d2insights?user=postgres&password=*******
key.converter.schemas.enable=false
value.converter.schemas.enable=true
auto.create=false
auto.evolve=true
key.ignore=true
# Postgresql Table name
table.name.format=twitter
# Primary key configuration that
# pk.mode=none
#record_key=??
文件
connect-avro-standalone.properties
当我使用bootstrap.servers=192.168.39.17:9092,192.168.39.18:9092,192.168.39.19:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://192.168.39.27:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.39.27:8081
value.converter.enhanced.avro.schema.support=true
offset.storage.file.filename=/tmp/connect.offsets
rest.port=9080
plugin.path=/usr/share/java/kafka-connect-jdbc
版本时,我使用的是confluent-5.2.1
和/usr/share/java/kafka-connect-jdbc
版本数据库下的默认postgresql-42.2.10.jar。
这是我用来运行连接器的命令
postgresql-10.12
尽管存在postgresql jar,但我遇到了以下异常
connect-standalone /etc/schema-registry/connect-avro-standalone-postgres.properties /etc/kafka/connect-postgresql-sink1.properties
有人可以建议或告诉我我在这里做错了什么吗
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)