使用 Apache spark 使用 JanusGraph 执行 OLAP 查询时,Gremlin 控制台和 Spark UI 没有响应

问题描述

我在 Janusgraph(v0.5.3) 上有一个图,其中包含大约 200 万个顶点和 2000 万条边。我正在制作一个 OLAP 查询,它是 lowest_common_ancestor 配方的修改版本(下面添加查询)。

查询耗时太长(超过 1 小时),我看到 Managed memory leak detected; 警告,然后 Spark Web UI 没有响应 不再(不能再调试了)。

我还看到了 Lost executor driver on localhost: Executor heartbeat timed out 警告。但是即使在 1 小时后查询也没有退出。作业开始 30 分钟后,我看到这些警告。一世 希望 spark 和 hadoop 能让查询更快,但这似乎 非常慢。我无法分析查询或查看 Spark Web UI 以了解进度。

注意:我已经安装了 hadoop(3.2.2) 并使用了 Apache Spark(2.4.0)。我假设 spark 附带 janusgraph 发行版,我不记得安装了。但是 JanusGraph 文档说 v0.5.3 是 compatible with spark 2.2.x ),不确定是否是 spark 兼容性问题?

以下是我如何使用 bin/gremlin.sh 控制台运行查询

graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
g = graph.traversal().withComputer(SparkGraphComputer)

// OLAP query
input = [2437272,4956336]
g.V().has(id,within(input)).
           aggregate('input').hasId(input.head()). 
           repeat(__.in('has_word')).emit().as('x').
           select('input').unfold().has(id,within(input.tail())).
           repeat(__.in('has_word')).emit(where(eq('x'))).
           group().
             by(select('x')).
             by(path().count(local).fold()).
           unfold().filter(select(values).count(local).is(input.tail().size())).
           order().
             by(select(values).unfold().sum()).
           select(keys).limit(5).elementMap()

我假设 hadoop 配置良好

user@xyz-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh 

         \,/
         (o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
00:56:59 WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> 
gremlin> hdfs
==>storage[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1845984070_1,ugi=rrmerugu (auth:SIMPLE)]]]

conf/hadoop-graph/read-cql.properties 如下所述

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat

gremlin.hadoop.jarsIndistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
gremlin.spark.persistStorageLevel=disK_ONLY #MEMORY_AND_disK

#
# JanusGraph Cassandra InputFormat configuration
#
# These properties defines the connection properties which were used while write data to JanusGraph.
janusgraphmr.ioformat.conf.storage.backend=cql
# This specifies the hostname & port for Cassandra data store.
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.port=9042
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cql.keyspace=janusgraph
# This defines the indexing backend configuration used while writing data to JanusGraph.
janusgraphmr.ioformat.conf.index.search.backend=elasticsearch
janusgraphmr.ioformat.conf.index.search.hostname=127.0.0.1
# Use the appropriate properties for the backend when using a different storage backend (HBase) or indexing backend (Solr).

#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true

#
# SparkGraphComputer Configuration
#
spark.master=local[8]
spark.executor.memory=12g
spark.driver.memory=12g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator


spark.executor.memoryOverhead=1g
spark.driver.memoryOverhead=1g
spark.network.timeout=600s
spark.executor.heartbeatInterval=119s

spark.io.compression.codec=snappy

gremlin 控制台输出

rrmerugu@Code-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh 

         \,/
         (o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
07:57:25 WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
==>hadoopgraph[cqlinputformat->nulloutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cqlinputformat->nulloutputformat],sparkgraphcomputer]
gremlin> input = [2437272,4956336]
==>2437272
==>4956336
gremlin> g.V().has(id,within(input)).
......1>            aggregate('input').hasId(input.head()). 
......2>            repeat(__.in('has_word')).emit().as('x').
......3>            select('input').unfold().has(id,within(input.tail())).
......4>            repeat(__.in('has_word')).emit(where(eq('x'))).
......5>            group().
......6>              by(select('x')).
......7>              by(path().count(local).fold()).
......8>            unfold().filter(select(values).count(local).is(input.tail().size())).
......9>            order().
.....10>              by(select(values).unfold().sum()).
.....11>            select(keys).limit(5).elementMap()
07:58:05 WARN  org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer  - class org.apache.hadoop.mapreduce.lib.output.NullOutputFormat does not implement PersistResultGraphAware and thus,persistence options are unkNown -- assuming all options are possible
07:58:06 WARN  org.apache.spark.util.Utils  - Your hostname,Code-WS resolves to a loopback address: 127.0.1.1; using 192.168.0.10 instead (on interface enp7s0)
07:58:06 WARN  org.apache.spark.util.Utils  - Set SPARK_LOCAL_IP if you need to bind to another address
08:01:59 WARN  org.apache.spark.executor.Executor  - Managed memory leak detected; size = 40472352 bytes,TID = 1633
08:02:03 WARN  org.apache.spark.executor.Executor  - Managed memory leak detected; size = 41050016 bytes,TID = 1683
08:11:26 WARN  org.apache.spark.rpc.netty.NettyRpcEnv  - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:11:29 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
08:21:13 WARN  org.apache.spark.rpc.netty.NettyRpcEnv  - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:21:37 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
08:32:07 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
08:35:18 WARN  org.apache.spark.HeartbeatReceiver  - Removing executor driver with no recent heartbeats: 614252 ms exceeds timeout 600000 ms
Exception in thread "dispatcher-event-loop-7" java.lang.OutOfMemoryError: Java heap space
08:44:53 ERROR org.apache.spark.util.Utils  - Uncaught exception in thread driver-heartbeater
08:57:17 WARN  org.spark_project.jetty.io.ManagedSelector  - 
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils  - uncaught error in thread Spark Context Cleaner,stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils  - throw uncaught Fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.executor.Executor  - Exception in task 91.0 in stage 16.0 (TID 1890)
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler  - Uncaught exception in thread Thread[Executor task launch worker for task 1890,5,main]
java.lang.OutOfMemoryError: Java heap space
08:58:56 WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 91.0 in stage 16.0 (TID 1890,localhost,executor driver): java.lang.OutOfMemoryError: Java heap space

08:58:56 ERROR org.apache.spark.scheduler.TaskSetManager  - Task 91 in stage 16.0 Failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 91 in stage 16.0 Failed 1 times,most recent failure: Lost task 91.0 in stage 16.0 (TID 1890,executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
Type ':help' or ':h' for help.

系统规格:我在配备 32GB RAM 和 1TB SSD 的 Ubuntu 6 核 i7 处理器上执行此操作。

我想找到的答案

  1. 我是否遗漏了什么,有什么关于为什么这个查询花费太长时间并出现错误提示?任何建议表示赞赏。
  2. 是否有更好的方法来分析此查询以及为什么它花费了太长时间;一旦发生内存泄漏错误,控制台和 Spark Web UI 就不会响应,而且我无法对此进行调试。

更新

用程序退出的OOM错误更新日志

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)