数据帧 spark redis 连接器最终修改了一个值

问题描述

glue 作业中,我打算将数据帧的(最后 15 条记录)部分写入 redis 缓存,其余部分(如果它不是来自缓存(上次运行)到 s3)。我使用 spark-redis 连接器将数据帧直接写入 redis。我需要最后 15 条记录用于我之前进行的某些计算。

当我从计算中接收到数据帧时,数据帧中的数据看起来是正确的,如下所示:

+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|monitor_id|         event_time|year|month|day|hour|minute|energy_total|consumption|rank|data_to_drop|demand|time_zone_id|      event_time_tz|  event_time_tz_str|              run_id|
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|     10121|2020-11-16 08:33:00|2020|   11| 16|  08|    33|   1012.4236|        0.0| 1.0|        true|   0.0|         UTC|2020-11-16 08:33:00|2020-11-16 08:33:00|jr_bf2c8baa452309...|

修改数据框(除了将其保存到 redis,请参阅下面的函数)我得到以下值:

redis_dataframe id = 421
minute_redis_df id = 555
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|monitor_id|         event_time|year|month|day|hour|minute|energy_total|consumption|rank|data_to_drop|demand|time_zone_id|      event_time_tz|  event_time_tz_str|              run_id|
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|     10121|2020-11-16 08:33:00|2020|   11| 16|  08|    33|   1012.4236|        0.0| 1.0|       false|   0.0|         UTC|2020-11-16 08:33:00|2020-11-16 08:33:00|jr_bf2c8baa452309...|

请注意,'data_to_drop' 列已从 true 更改为 false!在 save_data_to_s3 函数中,我试图只保存那些错误的(意思是不是来自缓存),所以现在最终得到了欺骗。

为了解决,我尝试了各种方法

  • 在写入缓存之前将数据写入 s3 似乎可以解决问题!但我想在继续之前了解我做错了什么。
  • 添加大量调试语句(例如 dataframe.where(x=y).show()/rdd.id())似乎也神奇地“解决”了这个问题,这向我表明这是一个赛车状况;但为什么只有这一个值会改变?我们在从缓存中获取数据时添加值 .withColumn("data_to_drop",lit(True)) 或从我们的数据湖中添加 lit(False) 并进一步保持不变。
  • 添加persist()/cache() 语句似乎没什么用。
  • 正如您在下面的代码中看到的,我使用 select(*) 将数据帧复制到另一个数据帧,这通过 rdd.id 不同来确认。即使这不起作用,也不应该将标志从 true 更改为 false?
  • 我试图通过查看 spark-ui 来调试问题,但我不得不承认我不知道要查找什么。

Python 代码段:

cleaned_df = clean_silver_data(timezone_df)
cleaned_df.persist()
save_last_to_redis(cleaned_df)
save_data_to_s3(cleaned_df)

def save_last_to_redis(redis_dataframe):
    # Now filter number of records to push to redis..
    # We don't want more than REdis_LAST_RECORD_LIMIT_IN_MINUTES
    log_info("Store last minutes into redis cache")
    # create a window on monitor_id and event time with the the last records first
    log_info("redis_dataframe id = {}".format(redis_dataframe.rdd.id()))
    # copy original dataframe to new one so we can filter without altering original
    minute_redis_df = redis_dataframe.select("*")
    minute_redis_df.cache()
    log_info("minute_redis_df id = {}".format(minute_redis_df.rdd.id()))
    redis_window = Window.partitionBy(minute_redis_df['monitor_id']).orderBy(minute_redis_df['event_time'].desc())
    # rank the records then filter where its smaller than our limit and then drop the column
    log_info("Ranking bottom 15 and filtering")
    minute_redis_df = minute_redis_df.select('*',rank().over(redis_window).alias('cache_rank')) \
        .filter(col('cache_rank') <= REdis_LAST_RECORD_LIMIT_IN_MINUTES) \
        .withColumn("redis_key",concat(col("monitor_id"),lit(":"),col("event_time"))) \
        .drop("cache_rank","data_to_drop")
    if DEBUG_OUTPUT:
        log_info("Last 15 records data destined for Redis:")
        minute_redis_df.printSchema()
        minute_redis_df.where(DEBUG_CONDITION).show(DEBUG_RECORD_COUNT)
    # Push to redis
    minute_redis_df.write.format("org.apache.spark.sql.redis") \
        .option("table",REdis_KEY_PREFIX_MINUTE) \
        .option("key.column","redis_key") \
        .mode("overwrite") \
        .save()
    # Clear redis to keep just last 15 minutes for each monitor
    log_info("Finished writing to redis")

我使用以下 AWS glue 配置:

  • 类型:火花
  • 胶水版本:2.0
  • Python 版本:3
  • 最大容量:10
  • 工人类型:G.1X
  • 库:spark-redis_2.11-2.5.0-SNAPSHOT-jar-with-dependencies.jar
  • 作业书签:启用

这应该转换为使用:Spark 2.4.3 + Python 3.7

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)