如何在PySpark中使用foreach来编写kafka主题?

问题描述

我正在尝试通过foreach插入在每一行上创建的精致日志,并希望将其存储到Kafka主题中,如下所示-

def refine(df):
    log = df.value
    event_logs = json.dumps(get_event_logs(log)) #A function to refine the row/log
    pdf = pd.DataFrame({"value": event_logs},index=[0])

    spark = SparkSession.builder.appName("myAPP").getorCreate() 
    df = spark.createDataFrame(pdf)

    query = df.selectExpr("CAST(value AS STRING)") \
       .write \
       .format("kafka") \
       .option("kafka.bootstrap.servers","localhost:9092") \
       .option("topic","intest") \
       .save()

我正在使用以下代码进行调用

query = streaming_df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")  \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .foreach(refine)\
    .start()
query.awaitTermination()

但是refine函数在某种程度上无法获得我在提交代码时发送的Kafka软件包。我相信the子手无法访问通过以下命令发送的Kafka软件包-

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

因为提交代码时,我收到以下错误消息,

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; 

所以,我的问题是如何将数据存储到foreach内的Kafka中?另外,我想知道在foreach内创建另一个会话是否是个好主意;我不得不在foreach内重新声明会话,因为主驱动程序的退出会话无法在foreach中用于某些与可序列化有关的问题。

P.S:如果我尝试将其沉入...format("console")内的控制台(foreach)中,则它将正常工作。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)