问题描述
我正在VS Code编辑器中运行Spark Streaming代码,并且正在使用内存作为数据接收器,并且输出模式已完成。
任何帮助将不胜感激
import sys
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *
sparkSession = SparkSession.builder.master('local').appName('Max_Stock in complete mode').getorCreate()
sparkSession.sparkContext.setLogLevel('ERROR')
schema = StructType([StructField('Date',StringType(),True),StructField('Open',StructField('High',DoubleType(),StructField('Low',StructField('Close',StructField('Adjusted Close',StructField('Volume',StructField('Name',True)
])
input_stream = """path"""
stockPricesDf = sparkSession.readStream.option('header','true').schema(schema).csv(input_stream)
print(' ')
print('Is the stream ready?')
print(stockPricesDf.isstreaming)
print(' ')
print('Schema of the input stream')
print(stockPricesDf.printSchema())
upDaysDf = stockPricesDf.select("Name","Date","High","Open","Close").where("Open > Close")
upDays_max = upDaysDf.groupBy('Name').max('High')
query = upDays_max.writeStream.outputMode('complete').format('memory').queryName('datatable')\
.option('truncate','false') \
.option('numRows',5) \
.start() \
.awaitTermination()
SparkSession.sql("select * from datatable ").show(5)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)