如何执行pyspark / databricks中的作业列表?

问题描述

环境:

  • Databricks 6.4(包括Apache Spark 2.4.5,Scala 2.11)
  • 具有自动缩放和终止功能的高并发服务器

配置:

spark.dynamicAllocation.enabled true
spark.scheduler.mode FAIR
spark.databricks.delta.preview.enabled true
spark.shuffle.service.enabled true
spark.databricks.service.server.enabled true

我尝试过的事情 How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

  • 我没有得到任何错误,也没有得到任何计数

我要实现的目标: 我想提交一份作业列表(python方法/函数),并且我希望数据块并行运行这些作业。

下面是我玩的示例代码。我想传递一个函数名称的列表,该函数名称是在并行运行的作业1,作业2和作业3中同时执行的。

import datetime 
from pyspark.sql.types import StructType,IntegerType,DateType,StringType,StructField
current_time = datetime.datetime.Now()
current_time = str(current_time)
print(current_time)

localPath = '/mnt'

def job1(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1",StringType())])

  rddDF = sc.parallelize((
  {"textValue": current_time },\
  { "textValue": current_time  },\
  { "textValue": current_time}))
  new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job1('1')

def job2(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1",mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job2('2')

def job3(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1",mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job3('3')

listofJobs = ['job1','job2','job3']
print(listofJobs)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)