pyflink debezium kafka 抛出 NullPointerException

问题描述

我正在尝试通过 python 连接到 Flink。数据源是postgresql的CDC(Change-Data-Capture)。

我正在使用的是:

  • postgresql==10.1
  • wal2json
  • kafka==2.12-2.1.0
  • flink==1.13.0
  • debezium==1.13.1.Final

kafka 运行良好。消费者可以获得CDC消息。但是 Flink 抛出这个错误。这个恼人的问题困扰了我一周。

我的代码

DDL = """CREATE TABLE topic_test_slot (
        origin_ts TIMESTAMP(3) MetaDATA FROM 'value.ingestion-timestamp' VIRTUAL,event_time TIMESTAMP(3) MetaDATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING MetaDATA FROM 'value.source.database' VIRTUAL,origin_schema STRING MetaDATA FROM 'value.source.schema' VIRTUAL,origin_table STRING MetaDATA FROM 'value.source.table' VIRTUAL,id BIGINT,name STRING,name1 STRING,name2 STRING
    ) WITH (
        'connector'='kafka','topic'='wh_testing_boss_statistics1.public.test_slot','properties.bootstrap.servers'='192.168.2.13:9092','scan.startup.mode' = 'earliest-offset','format'='debezium-json'


    )"""

jars = f"file:///home/flink-sql-connector-kafka_2.12-1.13.0.jar;" \
       f"file:///home/flink-json-1.13.0.jar;" \
       f"file:///home/flink-python_2.12-1.13.0.jar;" \
       f"file:///home/flink-json-1.13.0.jar;" \
       f"file:///home/flink-sql-connector-postgres-cdc-1.3.0.jar;" \
       f"file:///home/flink-connector-kafka_2.12-1.13.0.jar"


# create a blink stream TableEnvironment
from pyflink.table import EnvironmentSettings,StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",jars)
table_env.execute_sql(DDL)    
table = table_env.from_path("topic_test_slot")
table_head = table.limit(1)
table_head.to_pandas()

错误报告如下:

Py4JJavaError:调用 z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame 时发生错误。 :java.lang.RuntimeException:无法获取一个结果 在 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 在 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 在 org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) 在 org.apache.flink.table.runtime.arrow.ArrowUtils.filterOutRetractRows(ArrowUtils.java:735) 在 org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:673) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 在 org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) 在 org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 引起:java.io.IOException:无法获取作业执行结果 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120) 在 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ……还有 15 个 引起:java.util.concurrent.ExecutionException:org.apache.flink.runtime.client.JobExecutionException:作业执行失败。 在 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 在 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) ……还有 17 个 引起:org.apache.flink.runtime.client.JobExecutionException:作业执行失败。 在 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 在 org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) 在 java.util.concurrent.CompletableFuture.uniapply(CompletableFuture.java:616) 在 java.util.concurrent.CompletableFuture.uniapplyStage(CompletableFuture.java:628) 在 java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 在 org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ……还有 17 个 引起:org.apache.flink.runtime.JobException:Recovery is抑制了 norestartBackoffTimeStrategy 在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) 在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) 在 org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) 在 org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) 在 org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) 在 org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) 在 org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) 在 org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) 在 sun.reflect.GeneratedMethodAccessor34.invoke(未知来源) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcmessage(AkkaRpcActor.java:212) 在 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:77) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 在 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 在 akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 akka.actor.Actor$class.aroundReceive(Actor.scala:517) 在 akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 在 akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 在 akka.actor.ActorCell.invoke(ActorCell.scala:561) 在 akka.dispatch.MailBox.processMailBox(MailBox.scala:258) 在 akka.dispatch.MailBox.run(MailBox.scala:225) 在 akka.dispatch.MailBox.exec(MailBox.scala:235) 在 akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 引起:java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type ":"int32","optional":true,"field":"id"},{"type":"string","field":"name"},{"type": "string","field":"name1"},"field":"name2"}],"name":"wh_testing_boss_statistics1.public.test_slot.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional": true,"name":"wh_testing_boss_statistics1.public.test_slot .Value","field":"after"},"fields":[{"type":"string","optional":false,"field":"version"},"field":"connector"},{ "type":"int64","field":"ts_ms"},"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field" :"快照"},"field":"db"},"field":" schema"},"field":"table"},{"type":"int64","field":"txId" },"field":"lsn"},"field":"xmin"}],"name":"io.debezium.connector.postgresql.source","field":"source"},"field" :"op"},"fields":[{"type":" string","field":"total_order"},"field":"data_collection_order"}],"field":"transaction"}],"name":"wh_testing_boss_statistics1.public.test_slot。信封"},"payload":{"before":null,"after":{"id":6,"name":"6","name1":"6","name2":null},"秒ource":{"version":"1.3.1.Final","connector":"postgresql","name":"wh_testing_boss_statistics1","ts_ms":1624873853951,"snapshot":"false","db": "boss_statistics","schema":"public","table":"test_slot","txId":31192386,"lsn":46274339048152,"xmin":null},"op":"c","ts_ms" :1624873855079,"交易":null}}'。 在 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:173) 在 org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) 在 org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) 在 org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) 在 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) 在 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 在 org.apache.flink.streaming.runtime.tasks.sourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) 引起:java.lang.NullPointerException 在 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:141) ... 7个

我认为关键原因是:

引起:java.lang.NullPointerException

但是在 Google 中没有经典的答案。大多数答案对我来说都没用。

感谢任何建议。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)