Spark结构化流使用pyspark将NULL输出到Hive

问题描述

我在从Dstreams移植到结构化流媒体时遇到了麻烦,并且我已经设置了代码以将数据从比特币价格API输出到蜂巢中。

数据看起来像这样

[{"time": 1599859680,"open": "10328.0","high": "10330.8","low": "10328.0","close": "10330.8","vwap": "10330.8","volume": "0.00321565","count": 1},{"time": 1599859740,"open": "10330.8","high": "10331.9","low": "10330.8","close": "10331.0","vwap": "10331.5","volume": "0.92199459","count": 12},{"time": 1599859800,"open": "10331.0","high": "10331.0","low": "10331.0","vwap": "0.0","volume": "0.00000000","count": 0}]

我的设置包括以下内容

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *

bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"

spark = SparkSession\
    .builder\
    .appName("StructuredBitcoin")\
    .config("spark.sql.warehouse.dir","/user/hive/warehouse")\
    .config("hive.metastore.uris","thrift://localhost:9083")\
    .enableHiveSupport()\
    .getorCreate()

data = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",bootstrapServers)\
    .option(subscribeType,topics)\
    .load()

我按如下方式定义架构

schema = StructType([
    StructField("time",TimestampType()),StructField("open",IntegerType()),StructField("high",StructField("low",StructField("close",StructField("vwap",StructField("volume",StructField("count",IntegerType())
    ])

然后我选择数据(我认为这是我要去的地方)

df = data.select(from_json(col("value").cast("string"),schema)\
    .alias("bitcoin")).selectExpr("bitcoin.*")

并写入Hive

def write(df,epoch_id):
    df.show()
    df.write.mode("append").saveAsTable("bitcoin.data")

之后,我编写流并等待终止

query = df.writeStream.foreachBatch(write).start()

query.awaitTermination()

输出结果只是一堆NULL值,这使我相信数据无法正确读取。

NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL

我在哪里错了?

感谢帮助

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)