问题描述
我正在尝试使用Spark结构化流从Kafka读取数据。 (CDH 6.3.2(带有一个节点的概念证明版本)带有Spark 2.4.0,Kafka 2.2.1,pyspark 2.4.0) 基本上,我在Kafka中有一个主题,其中我正在使用python发布一些测试数据:
from time import sleep
from numpy.random import choice,randint
from kafka import KafkaProducer
import json
def get_random_value():
new_dict={}
branch_list=['MSK','SPB','KSM']
currency_list=['RUB','USD','EUR']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(0,100)
return new_dict
if name == 'main':
producer = KafkaProducer(bootstrap_servers=['myserver:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))
my_topic = 'test'
while True:
for e in range(10):
data = get_random_value()
future = producer.send(topic = my_topic,value=data)
sleep(5)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('firstStream').getorCreate()
df = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers','myserver:9092')\
.option('subscribe','test')\
.option('startingOffsets','earliest')\
.load()\
.selectExpr('cast(value as string)')\
.writeStream\
.format('console')\
.start()\
.awaitTermination()
据我了解,我应该能够在控制台中看到输出,从中我提交我的Spark应用程序,但是毕竟没有数据,除了我订阅了一个主题:
火花提交字符串:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 /root/myfolder/testStream.py
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)