import os import sys #import datetime import pyspark.sql.functions as F from pyspark.sql.types import * from pyspark.sql import SparkSession #不启动broadcastJoin 、conf spark.speculation=true spark = SparkSession \ .builder \ .appName("app_test.py") \ .enableHiveSupport() \ .config("spark.dynamicAllocation.maxExecutors", "400") \ .config("spark.sql.autobroadcastJoinThreshold",-1) \ .config("spark.yarn.executor.memoryOverhead", 3702) \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.repartition.enabled", "true") \ .config("spark.log.level", "ERROR") \ .config("spark.speculation", "true") \ .config("spark.sql.hive.convertmetastoreOrc", "true")\ .getorCreate() spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.orc.split.strategy=ETL") spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") from datetime import datetime, timedelta def get_date(dt,time_delta=0): try: result=dt+timedelta(days=-time_delta) except: try: dt = datetime.strptime(dt, "%Y-%m-%d") # 字符串转化为date形式 except: dt = datetime.strptime(dt, '%Y%m%d') # 字符串转化为date形式 result = dt + timedelta(days=-time_delta) return str(result.strftime('%Y-%m-%d')) def insert_tab(df,tab,spark): col_target = spark.sql("""select * from {tab} limit 1""".format(tab=tab)).columns col=df.columns not_in_col=[i for i in col_target if i not in col] for i in not_in_col: df = df.withColumn(i, F.lit(None)) df2=df.select(col_target) df2.repartition('dt','data_type').write.insertInto(tab, overwrite=True) def search_dt(partitions_list,dt): ''' 如果想要取的分区dt在partition_list中,则返回dt,否则返回dt之前最近的一个分区 :param partition_list: 分区List :param dt: 想要取的分区 :return: 函数最终确定的分区dt,字符串格式 ''' dt=get_date(dt,0) if 'ACTIVE' in partitions_list: partitions_list.remove('ACTIVE') if dt in partitions_list: return dt dt_date=datetime.strptime(dt, '%Y-%m-%d').date() partition_list_lag=[(datetime.strptime(p_dt, '%Y-%m-%d').date()-dt_date).days for p_dt in partitions_list] try: reuslt=max(list(filter(lambda x:x<0,partition_list_lag))) except: reuslt=min(list(filter(lambda x:x>0,partition_list_lag))) return datetime.strftime(dt_date+timedelta(reuslt),'%Y-%m-%d') def get_nearest_dt(table_name,dt,spark): #检查是否有dt分区,如果没有,取最近分区 partitions = spark.sql("show partitions %s"%table_name).collect() partitions_list = [] for i in range(len(partitions)): dt_tmp = partitions[i]['partition'] partitions_list.append(dt_tmp[3:]) dt_result=search_dt(partitions_list,dt) return dt_result