问题描述
#Spark #Python
目标:
读取日志文件的位置,从日志中提取csv文本表格数据并打印表格数据的json(表格列(CSV检索到的表格列+序列号+时间戳)
当前代码伪:
df = sparkSession.read \
.format("com.databricks.spark.redshift") \
.option("url","some url with id{}&password={}".format(
redshift_user,redshift_pass)) \
.option("query",query) \
.option("tempdir",s3_redshift_temp_dir) \
.option("forward_spark_s3_credentials",True)
df = df_context.load()
+-------------+-------------------+--------------------+
|serial_number| test_date| s3_path|
+-------------+-------------------+--------------------+
| A0123456|2019-07-10 04:11:52|s3://test-bucket-...|
| A0123456|2019-07-24 23:48:03|s3://test-bucket-...|
| A0123456|2019-07-22 20:56:57|s3://test-bucket-...|
| A0123456|2019-07-22 20:56:57|s3://test-bucket-...|
| A0123456|2019-07-22 20:58:36|s3://test-bucket-...|
+-------------+-------------------+--------------------+
由于我们无法将 spark 上下文传递给工作节点,因此使用 boto3 读取文本文件并处理文本以获取 csv 表结构。 此处不共享用于从日志中检索表的专有代码。
spark.udf.register("read_s3_file",read_s3_file)
df_with_string_csv = df.withColumn('files_dataframes',read_s3_file(drive_event_tab.s3_path))
df_with_string_csv 现在包含以下示例
+-------------+-------------------+--------------------+----------------------+
|serial_number| test_date| s3_path| table_csv_data |
+-------------+-------------------+--------------------+----------------------+
| 1050D1B0|2019-05-07 15:41:11|s3://test-bucket-...|col1,col2,col3,col4...|
| 1050D1B0|2019-05-07 15:41:11|s3://test-bucket-...|col1,col4...|
| 1050D1BE|2019-05-08 09:26:55|s3://test-bucket-...|col1,col4...|
| A0123456|2019-07-25 06:54:28|s3://test-bucket-...|col1,col4...|
| A0123456|2019-07-22 21:07:21|s3://test-bucket-...|col1,col4...|
| A0123456|2019-07-25 00:19:52|s3://test-bucket-...|col1,col4...|
| A0123456|2019-07-24 22:24:40|s3://test-bucket-...|col1,col4...|
| A0123456|2019-09-12 22:15:19|s3://test-bucket-...|col1,col4...|
| A0123456|2019-07-22 21:27:56|s3://test-bucket-...|col1,col4...|
+-------------+-------------------+--------------------+----------------------+
示例 table_csv_data 列包含:
timestamp,partition,offset,key,value
1625218801350,97,33009,2CKXTKAT_20210701193302_6400_UCMP,458969040
1625218801349,41,33018,3FGW9S6T_20210701193210_6400_UCMP,17569160
尝试实现如下最终数据框,请帮忙
+-------------+-------------------+--------------------+-----------------+-----------+-----------------------------------+--------------+
|serial_number| test_date| timestamp| partition | offset | key | value |
+-------------+-------------------+--------------------+-----------------+-----------+-----------------------------------+--------------+
| 1050D1B0|2019-05-07 15:41:11| 1625218801350 | 97 | 33009 | 2CKXTKAT_20210701193302_6400_UCMP | 458969040 |
| 1050D1B0|2019-05-07 15:41:11| 1625218801349 | 41 | 33018 | 3FGW9S6T_20210701193210_6400_UCMP | 17569160 |
..
..
..
+-------------+-------------------+--------------------+----------------------+
解决方法
对于 Spark 2.4.0+,您只需要拆分、爆炸和 array_except
的一些组合请使用repartition进行优化,因为explode可能会创建很多行。
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import split,explode,col,array_except,array,trim
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
df = df \
.withColumn('table_csv_data',split(col('table_csv_data'),'\n')) \
.withColumn('table_csv_data',array_except(col('table_csv_data'),array([col('table_csv_data')[0]]))) \
.withColumn('table_csv_data',explode(col('table_csv_data'))) \
.withColumn('table_csv_data',',')) \
.withColumn('timestamp',trim(col('table_csv_data')[0])) \
.withColumn('partition',trim(col('table_csv_data')[1])) \
.withColumn('offset',trim(col('table_csv_data')[2])) \
.withColumn('key',trim(col('table_csv_data')[3])) \
.withColumn('value',trim(col('table_csv_data')[4])) \
.drop('table_csv_data')
df.show(truncate=False)
+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+
|serial_number|test_date |s3_path |timestamp |partition|offset|key |value |
+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+
|1050D1B0 |2019-05-07 15:41:11|s3://test-bucket-|1625218801350|97 |33009 |2CKXTKAT_20210701193302_6400_UCMP|458969040|
|1050D1B0 |2019-05-07 15:41:11|s3://test-bucket-|1625218801349|41 |33018 |3FGW9S6T_20210701193210_6400_UCMP|17569160 |
|1050D1B0 |2019-05-07 15:41:11|s3://test-bucket-|1625218801350|97 |33009 |2CKXTKAT_20210701193302_6400_UCMP|458969040|
|1050D1B0 |2019-05-07 15:41:11|s3://test-bucket-|1625218801349|41 |33018 |3FGW9S6T_20210701193210_6400_UCMP|17569160 |
+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+