问题描述
我正在使用结构化流来读取数据并将其保存到 cassandra。使用了 spark-cassandra-connector:2.0.10 驱动程序。该应用程序在开始时运行良好,但每隔 20 小时,它就会异常暂停:
java.io.IOException: Exception during execution of SELECT "is_delete","region_code","gateway_id","purpose_code","address_code" FROM "hk_ods"."frm_firealarm_gateway" WHERE token("gateway_id") > ? AND token("gateway_id") <= ? ALLOW FILTERING: [node1-fzy/192.168.1.191:9042] Connection has been closed
at com.datastax.spark.connector.rdd.CassandratableScanRDD.com$datastax$spark$connector$rdd$CassandratableScanRDD$$fetchTokenRange(CassandratableScanRDD.scala:350)
at com.datastax.spark.connector.rdd.CassandratableScanRDD$$anonfun$17.apply(CassandratableScanRDD.scala:367)
at com.datastax.spark.connector.rdd.CassandratableScanRDD$$anonfun$17.apply(CassandratableScanRDD.scala:367)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(UnkNown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(UnkNown Source)
at java.lang.Thread.run(UnkNown Source)
Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed
at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38)
at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
at sun.reflect.GeneratedMethodAccessor46.invoke(UnkNown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(UnkNown Source)
at java.lang.reflect.Method.invoke(UnkNown Source)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy13.execute(UnkNown Source)
at com.datastax.spark.connector.cql.DefaultScanner.scan(Scanner.scala:34)
at com.datastax.spark.connector.rdd.CassandratableScanRDD.com$datastax$spark$connector$rdd$CassandratableScanRDD$$fetchTokenRange(CassandratableScanRDD.scala:342)
... 22 more
Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed
at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1210)
at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1195)
at com.datastax.driver.core.Connection.defunct(Connection.java:445)
at com.datastax.driver.core.Connection$dispatcher.exceptionCaught(Connection.java:1128)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:844)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.handler.codec.MessagetoMessageDecoder.channelRead(MessagetoMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:293)
at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
显示有一个onde关闭了连接,但是每个节点都没有关闭就运行。 如果重新启动,它可能会再运行 18 到 20 个小时,然后发生相同的异常。 任何人都可以帮忙吗?非常感谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)