问题描述
我在从Dstreams移植到结构化流媒体时遇到了麻烦,并且我已经设置了代码以将数据从比特币价格API输出到蜂巢中。
数据看起来像这样
[{"time": 1599859680,"open": "10328.0","high": "10330.8","low": "10328.0","close": "10330.8","vwap": "10330.8","volume": "0.00321565","count": 1},{"time": 1599859740,"open": "10330.8","high": "10331.9","low": "10330.8","close": "10331.0","vwap": "10331.5","volume": "0.92199459","count": 12},{"time": 1599859800,"open": "10331.0","high": "10331.0","low": "10331.0","vwap": "0.0","volume": "0.00000000","count": 0}]
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *
bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"
spark = SparkSession\
.builder\
.appName("StructuredBitcoin")\
.config("spark.sql.warehouse.dir","/user/hive/warehouse")\
.config("hive.metastore.uris","thrift://localhost:9083")\
.enableHiveSupport()\
.getorCreate()
data = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers",bootstrapServers)\
.option(subscribeType,topics)\
.load()
我按如下方式定义架构
schema = StructType([
StructField("time",TimestampType()),StructField("open",IntegerType()),StructField("high",StructField("low",StructField("close",StructField("vwap",StructField("volume",StructField("count",IntegerType())
])
然后我选择数据(我认为这是我要去的地方)
df = data.select(from_json(col("value").cast("string"),schema)\
.alias("bitcoin")).selectExpr("bitcoin.*")
并写入Hive
def write(df,epoch_id):
df.show()
df.write.mode("append").saveAsTable("bitcoin.data")
之后,我编写流并等待终止
query = df.writeStream.foreachBatch(write).start()
query.awaitTermination()
输出结果只是一堆NULL值,这使我相信数据无法正确读取。
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
我在哪里错了?
感谢帮助
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)