从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

问题描述

我实现了一个火花作业,以结构化流中的foreachbatch从kafka主题中读取流。

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","mykafka.broker.io:6667")
  .option("subscribe","test-topic")
  .option("kafka.security.protocol","SASL_SSL")
  .option("kafka.ssl.truststore.location","/home/hadoop/cacerts")
  .option("kafka.ssl.truststore.password",tspass)
  .option("kafka.ssl.truststore.type","JKS")
  .option("kafka.sasl.kerberos.service.name","kafka")
  .option("kafka.sasl.mechanism","GSSAPI")
  .option("groupIdPrefix","MY_GROUP_ID")
  .load()

val streamservice = df.selectExpr("CAST(value AS STRING)")
  .select(from_json(col("value"),schema).as("data"))
  .select("data.*")


var stream_df = streamservice
  .selectExpr("cast(id as string) id","cast(x as int) x")

val monitoring_stream = stream_df.writeStream
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame,batchId: Long) =>
    if(!batchDF.isEmpty) { }
  }
  .start()
  .awaitTermination()

我有以下问题。

  1. 如果kafka主题很长时间没有数据,stream_df.writeStream是否将自动终止?对此有一些超时控制吗?

  2. 如果从kafka代理中删除了kafka主题,stream_df.writeStream是否将终止?

我希望在上述两种情况下,spark作业可以继续监视kafka主题而不会终止。 kafka连接器和/或stream_df.writerstream是否需要一些特殊设置?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)