如何使用尽可能少的内存来运行内存密集型火花流作业

问题描述

我正在运行一项作业,该作业使用一个值将两个 kafka 主题合并为一个主题。我使用的环境只允许我为每个作业分配少于 10g 的内存,我尝试加入的数据每个主题大约有 500k 条记录。

我对 Spark 还很陌生,所以我想知道是否有办法最大限度地减少内存消耗

代码

val df_person: DataFrame = PERSONinformatION_df
      .select(from_json(expr("cast(value as string) as actualValue"),schemaPERSONinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfperson")

val df_candidate: DataFrame = CANDIDATEinformatION_df
      .select(from_json(expr("cast(value as string) as actualValue"),schemaCANDIDATEinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfcandidate")

加入主题

val joined_df : DataFrame = df_candidate.join(df_person,col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner").withColumn("join_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS"))))

重构数据

val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID".as("candidateID"),$"FULLNAME",$"PERSONALID",$"join_date",$"dfcandidate.PERSONID".as("personID"),$"dfcandidate.comsume_date".as("candidate_comsume_time"),$"dfperson.comsume_date".as("person_comsume_time"))).cast("String").as("value"))

将它们写入主题

string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers","xxx:9092")
      .option("topic","mergedinfo")
      .option("checkpointLocation","/tmp/producer/checkpoints")
      .option("failOnDataLoss",false)
      .start()
      .awaitTermination()

运行命令:

spark-submit --class taqasi_spark.App --master yarn ./spark_poc-test_memory.jar --executor-memory 10g --driver-memory 10g --executor-memory 10g  --deploy-mode cluster

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)