使用完整模式在pyspark中连接流和静态数据框

问题描述

我有两个数据帧,一个是使用Spark结构化流传输的,另一个是我创建的静态数据帧。我正在尝试加入他们。

但是在我尝试过的所有方式中,我都会遇到此错误:

“没有流时不支持完全输出模式 流式数据帧/数据集上的聚合”

raw_s = spark.readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers","...")\
          .option("subscribe","06-02-2020,07-02-2020,08-02-2020,09-02-2020,10-02-2020,11-02-2020,12-02-2020,13-02-2020,14-02-2020,15-02-2020,16-02-2020,17-02-2020,18-02-2020,19-02-2020,20-02-2020,21-02-2020,22-02-2020,23-02-2020,24-02-2020,25-02-2020,26-02-2020,27-02-2020,28-02-2020,29-02-2020,01-03-2020,02-03-2020,03-03-2020,04-03-2020,05-03-2020,06-03-2020,07-03-2020,08-03-2020,09-03-2020,10-03-2020,11-03-2020,12-03-2020,13-03-2020,14-03-2020,15-03-2020,16-03-2020,17-03-2020,18-03-2020,19-03-2020,20-03-2020,21-03-2020,22-03-2020,23-03-2020,24-03-2020,25-03-2020,26-03-2020,27-03-2020,28-03-2020,29-03-2020,30-03-2020,31-03-2020,01-04-2020,02-04-2020,03-04-2020,04-04-2020,05-04-2020,06-04-2020,07-04-2020,08-04-2020,09-04-2020,10-04-2020,11-04-2020,12-04-2020,13-04-2020,14-04-2020,15-04-2020,16-04-2020,17-04-2020,18-04-2020,19-04-2020,20-04-2020,21-04-2020,22-04-2020,23-04-2020,24-04-2020,25-04-2020,26-04-2020,27-04-2020,28-04-2020,29-04-2020,30-04-2020,01-05-2020,02-05-2020,03-05-2020,04-05-2020,05-05-2020,06-05-2020,07-05-2020,08-05-2020,09-05-2020,10-05-2020,11-05-2020,12-05-2020,13-05-2020,14-05-2020,15-05-2020,16-05-2020,17-05-2020,18-05-2020,19-05-2020,20-05-2020,21-05-2020,22-05-2020,23-05-2020,24-05-2020,25-05-2020,26-05-2020,27-05-2020,28-05-2020,29-05-2020,30-05-2020,31-05-2020,01-06-2020,02-06-2020,03-06-2020,04-06-2020,05-06-2020,06-06-2020,07-06-2020,08-06-2020,09-06-2020")\
          .option("startingOffsets","earliest")\
          .load()

string_val = raw_s.selectExpr("CAST(value AS STRING)")
the static data frame is:
extra:pyspark.sql.dataframe.DataFrame
state:string
date_only:string
new_cases:double
deaths:double

the streamed one:
tweet_id:long
user_id:long
date:string
keywords:array
  element:string
location:map
  key:string
  value:string
date_tmp:timestamp
date_only:string
country:string
state:string
city:string

这是我的代码:

import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

# Define the schema of the data:
schema = StructType()\
          .add("tweet_id",LongType(),False)\
          .add("user_id",False)\
          .add("date",StringType(),True)\
          .add("keywords",ArrayType(StringType(),True),True)\
          .add("location",MapType(StringType(),True)

# Read each 'value' String as JSON:
json_df = string_val.select(F.from_json(F.col("value"),schema= schema).alias('json'))
# Flatten the nested object:
streaming_df = json_df.select("json.*")
streaming_df = streaming_df.withColumn("date_tmp",F.to_timestamp(F.col('date'),"EEE MMM dd HH:mm:ss ZZZZ yyyy"))
streaming_df = streaming_df.withColumn("date_only",F.from_unixtime(F.unix_timestamp(streaming_df.date_tmp),"MM-dd-yyyy"))
streaming_df = streaming_df.withColumn("country",streaming_df.location.country)
streaming_df = streaming_df.withColumn("state",streaming_df.location.state)
streaming_df = streaming_df.withColumn("city",streaming_df.location.city)
streaming_df = streaming_df.where(streaming_df.country == 'United States')
streaming_df = streaming_df.where(streaming_df.state.isNotNull())
join_df = streaming_df.join(extra,['date_only','state'],'inner')
display(join_df)

output:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

解决方法

摘自手册https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

完全模式-整个结果表将输出到接收器 每次触发后。聚合查询(仅)支持此功能。

尝试添加模式或添加聚合。

但是当您拥有JOIN时,您将需要添加模式。对于JOIN:

JOIN尚不支持更新和完成模式。火花2.4.5。

完全模式不会删除旧的聚合状态。会导致OOM错误。

您需要尝试以下操作:

  join_df.writeStream \
    .format("console")  \
    .outputMode("append") \
    .start() \
    .awaitTermination()

display相同,如果内存正确提供,则以complete作为模式。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...