问题描述
我正在运行一项作业,该作业使用一个值将两个 kafka 主题合并为一个主题。我使用的环境只允许我为每个作业分配少于 10g 的内存,我尝试加入的数据每个主题大约有 500k 条记录。
我对 Spark 还很陌生,所以我想知道是否有办法最大限度地减少内存消耗
代码:
val df_person: DataFrame = PERSONinformatION_df
.select(from_json(expr("cast(value as string) as actualValue"),schemaPERSONinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfperson")
val df_candidate: DataFrame = CANDIDATEinformatION_df
.select(from_json(expr("cast(value as string) as actualValue"),schemaCANDIDATEinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfcandidate")
加入主题:
val joined_df : DataFrame = df_candidate.join(df_person,col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner").withColumn("join_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS"))))
重构数据
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID".as("candidateID"),$"FULLNAME",$"PERSONALID",$"join_date",$"dfcandidate.PERSONID".as("personID"),$"dfcandidate.comsume_date".as("candidate_comsume_time"),$"dfperson.comsume_date".as("person_comsume_time"))).cast("String").as("value"))
将它们写入主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers","xxx:9092")
.option("topic","mergedinfo")
.option("checkpointLocation","/tmp/producer/checkpoints")
.option("failOnDataLoss",false)
.start()
.awaitTermination()
运行命令:
spark-submit --class taqasi_spark.App --master yarn ./spark_poc-test_memory.jar --executor-memory 10g --driver-memory 10g --executor-memory 10g --deploy-mode cluster
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)