问题描述
在 glue 作业中,我打算将数据帧的(最后 15 条记录)部分写入 redis 缓存,其余部分(如果它不是来自缓存(上次运行)到 s3)。我使用 spark-redis 连接器将数据帧直接写入 redis。我需要最后 15 条记录用于我之前进行的某些计算。
当我从计算中接收到数据帧时,数据帧中的数据看起来是正确的,如下所示:
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|monitor_id| event_time|year|month|day|hour|minute|energy_total|consumption|rank|data_to_drop|demand|time_zone_id| event_time_tz| event_time_tz_str| run_id|
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
| 10121|2020-11-16 08:33:00|2020| 11| 16| 08| 33| 1012.4236| 0.0| 1.0| true| 0.0| UTC|2020-11-16 08:33:00|2020-11-16 08:33:00|jr_bf2c8baa452309...|
不修改数据框(除了将其保存到 redis,请参阅下面的函数)我得到以下值:
redis_dataframe id = 421
minute_redis_df id = 555
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
|monitor_id| event_time|year|month|day|hour|minute|energy_total|consumption|rank|data_to_drop|demand|time_zone_id| event_time_tz| event_time_tz_str| run_id|
+----------+-------------------+----+-----+---+----+------+------------+-----------+----+------------+------+------------+-------------------+-------------------+--------------------+
| 10121|2020-11-16 08:33:00|2020| 11| 16| 08| 33| 1012.4236| 0.0| 1.0| false| 0.0| UTC|2020-11-16 08:33:00|2020-11-16 08:33:00|jr_bf2c8baa452309...|
请注意,'data_to_drop' 列已从 true 更改为 false!在 save_data_to_s3 函数中,我试图只保存那些错误的(意思是不是来自缓存),所以现在最终得到了欺骗。
- 在写入缓存之前将数据写入 s3 似乎可以解决问题!但我想在继续之前了解我做错了什么。
- 添加大量调试语句(例如 dataframe.where(x=y).show()/rdd.id())似乎也神奇地“解决”了这个问题,这向我表明这是一个赛车状况;但为什么只有这一个值会改变?我们在从缓存中获取数据时添加值 .withColumn("data_to_drop",lit(True)) 或从我们的数据湖中添加 lit(False) 并进一步保持不变。
- 添加persist()/cache() 语句似乎没什么用。
- 正如您在下面的代码中看到的,我使用 select(*) 将数据帧复制到另一个数据帧,这通过 rdd.id 不同来确认。即使这不起作用,也不应该将标志从 true 更改为 false?
- 我试图通过查看 spark-ui 来调试问题,但我不得不承认我不知道要查找什么。
Python 代码段:
cleaned_df = clean_silver_data(timezone_df)
cleaned_df.persist()
save_last_to_redis(cleaned_df)
save_data_to_s3(cleaned_df)
def save_last_to_redis(redis_dataframe):
# Now filter number of records to push to redis..
# We don't want more than REdis_LAST_RECORD_LIMIT_IN_MINUTES
log_info("Store last minutes into redis cache")
# create a window on monitor_id and event time with the the last records first
log_info("redis_dataframe id = {}".format(redis_dataframe.rdd.id()))
# copy original dataframe to new one so we can filter without altering original
minute_redis_df = redis_dataframe.select("*")
minute_redis_df.cache()
log_info("minute_redis_df id = {}".format(minute_redis_df.rdd.id()))
redis_window = Window.partitionBy(minute_redis_df['monitor_id']).orderBy(minute_redis_df['event_time'].desc())
# rank the records then filter where its smaller than our limit and then drop the column
log_info("Ranking bottom 15 and filtering")
minute_redis_df = minute_redis_df.select('*',rank().over(redis_window).alias('cache_rank')) \
.filter(col('cache_rank') <= REdis_LAST_RECORD_LIMIT_IN_MINUTES) \
.withColumn("redis_key",concat(col("monitor_id"),lit(":"),col("event_time"))) \
.drop("cache_rank","data_to_drop")
if DEBUG_OUTPUT:
log_info("Last 15 records data destined for Redis:")
minute_redis_df.printSchema()
minute_redis_df.where(DEBUG_CONDITION).show(DEBUG_RECORD_COUNT)
# Push to redis
minute_redis_df.write.format("org.apache.spark.sql.redis") \
.option("table",REdis_KEY_PREFIX_MINUTE) \
.option("key.column","redis_key") \
.mode("overwrite") \
.save()
# Clear redis to keep just last 15 minutes for each monitor
log_info("Finished writing to redis")
我使用以下 AWS glue 配置:
- 类型:火花
- 胶水版本:2.0
- Python 版本:3
- 最大容量:10
- 工人类型:G.1X
- 库:spark-redis_2.11-2.5.0-SNAPSHOT-jar-with-dependencies.jar
- 作业书签:启用
这应该转换为使用:Spark 2.4.3 + Python 3.7
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)