问题描述
我一直试图将数据从SAP HANA加载到HDFS,并且我使用“ AEDAT”作为增量列。 日期格式“ AEDAT”具有“ YYYYMMDD”,但Kafka连接器发出以下错误:
[2020-10-05 16:37:13,587] ERROR WorkerSourceTask{id=incremental-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
java.lang.IllegalArgumentException: The Incrementing column is not found in the table or is not of correct type
at com.sap.kafka.connect.source.querier.IncrColTableQuerier.getIncrementingColumn(IncrColTableQuerier.scala:106)
at com.sap.kafka.connect.source.querier.IncrColTableQuerier.<init>(IncrColTableQuerier.scala:21)
at com.sap.kafka.connect.source.GenericSourceTask.$anonfun$start$8(GenericSourceTask.scala:121)
at com.sap.kafka.connect.source.GenericSourceTask.$anonfun$start$8$adapted(GenericSourceTask.scala:99)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.sap.kafka.connect.source.GenericSourceTask.start(GenericSourceTask.scala:99)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
示例行(最后一列是AEDAT):
('300','4500000000','00010','450000000000010','','20200102')
hana-source.properties
name=incremental-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=saptohdfs
connection.url=jdbc:sap://xx.xx.xx.xx:30041/
connection.user=user
connection.password=pass
saptohdfs.table.name="SAPHANADB"."EKPO"
mode=incrementing
saptohdfs.incrementing.column.name="AEDAT"
hdfs-sink.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
hdfs.url=hdfs://ip-10-1-1-131.ap-south-1.compute.internal:8020/warehouse/tablespace/external/hive
tasks.max=1
topics=saptohdfs
flush.size=3
hive.integration=true
hive.metastore.uris=thrift://ip-10-1-1-131.ap-south-1.compute.internal:9083
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.avro.AvroFormat
#value.converter=io.confluent.connect.avro.AvroConverter
https://github.com/SAP/kafka-connect-sap/issues/41
解决方法
当名称不匹配或数据类型不正确时,将引发此错误。请仔细检查“ AEDAT”是否为正确的列名以及其类型是否为时间戳。如果您可以提供架构,也会有所帮助。
此外,在示例中,incrementing.column.name不使用双引号(不确定是否存在问题)。