问题描述
环境:
配置:
spark.dynamicAllocation.enabled true
spark.scheduler.mode FAIR
spark.databricks.delta.preview.enabled true
spark.shuffle.service.enabled true
spark.databricks.service.server.enabled true
我尝试过的事情 How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
- 我没有得到任何错误,也没有得到任何计数
我要实现的目标: 我想提交一份作业列表(python方法/函数),并且我希望数据块并行运行这些作业。
下面是我玩的示例代码。我想传递一个函数名称的列表,该函数名称是在并行运行的作业1,作业2和作业3中同时执行的。
import datetime
from pyspark.sql.types import StructType,IntegerType,DateType,StringType,StructField
current_time = datetime.datetime.Now()
current_time = str(current_time)
print(current_time)
localPath = '/mnt'
def job1(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1",StringType())])
rddDF = sc.parallelize((
{"textValue": current_time },\
{ "textValue": current_time },\
{ "textValue": current_time}))
new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job1('1')
def job2(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1",mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job2('2')
def job3(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1",mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job3('3')
listofJobs = ['job1','job2','job3']
print(listofJobs)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)