ElasticSearch+Pyspark 读取速度慢

问题描述

我想使用 Pyspark 从 Elasticsearch 获取一些查询结果,但我的阅读性能太慢了。我在我的 spark-class 中使用了 1 个工作节点和 1 个主节点,这是我创建的 SparkSession 代码

'''

spark = SparkSession\
        builder\
        appName\
        config("spark.executor.memory","1g")\
        config("spark.executor.cores","4")\
        config("spark.driver.cores","1")\
        config("spark.driver.memory","1g")\
        config("spark.cores.max","15")\
        config("spark.es.mapping.date.rich","false")\

'''

而我阅读的 DataFrame 代码

'''

df =  spark.read.format("org.elasticsearch.spark.sql")\
      .option("es.nodes","IP")
      .option("es.nodes.wan.only","true")
      .option("es.port","port")
      .option("es.scroll.size","10000")
      .option("es.scroll.keepalive","10m")
      .option("es.batch.size.entries","2000")
      .option("numPartitions","100")
      .load("myindex")

'''

当我使用此 numPartitions 选项提交数据帧时,当我运行 df.rdd.getNumPartitions() 时,我的数据帧分区号仍为“1”,但随后我可以使用 repartition(100) 重新运行。但我还是不明白为什么我的 numPartitions 选项一开始就不起作用。

因此,当我进行一些过滤和选择然后运行 ​​collect() 时,我的工作在第 0 阶段需要花费大量时间。

当我查看 localhost:4040 时,我可以清楚地看到我的工作有两个阶段,阶段 0 只在一个执行程序中运行并且需要很多时间,但阶段 2 可以执行并行作业。为什么我的第 0 阶段需要这么长时间,而我不能在那个问题上做任何并行任务?

感谢您的建议。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)