一些流在此命令完成之前终止!结构化流

问题描述

我正在尝试将流数据读取到来自Azure Eventhubs的Azure Databricks中。 这是我一直在使用的代码

connectionString = "Connection string"
ehConf = {
  'eventhubs.connectionString' : connectionString
}

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

它给了我一个错误

ERROR: Some streams terminated before this command Could finish!

我了解到,我们必须根据Databricks运行时间以及Spark版本来提供Azure Eventhub的Jar文件。 我的Spark版本是2.4.5,Databricks运行时是6.6,我使用的jar文件是azure-eventhubs-spark_2.12-2.3.17.jar(针对指定的此组合)

Azure Event Hub jar files recommendations

但是我仍然面临这个问题,因为“某些流在此命令完成之前就已终止!”。有人可以帮我吗?

谢谢

解决方法

在我开始处理此问题时:首先遇到与您遇到的相同的问题。

ERROR: Some streams terminated before this command could finish!

enter image description here

进行此更改后,可以与以下配置完美配合:

Databrick运行时: 6.6 (includes Apache Spark 2.4.5,Scala 2.11)

Azure EventHub库: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17

第一步:使用“库”安装库。

您可以尝试使用“安装库”选项安装“ com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17”。

enter image description here

第二步::更改与Azure事件中心相关的配置。

如果您使用的是ehConf = {'eventhubs.connectionString' : connectionString}以上版本的“ 2.3.15”,则会收到以下错误消息。

java.lang.IllegalArgumentException: Input byte array has wrong 4-byte ending unit

注意:与事件中心相关的所有配置都在事件中心配置字典中进行。配置字典必须包含事件中心连接字符串:

connectionString = "YOUR.CONNECTION.STRING"

ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString

For **2.3.15** version and above,the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

enter image description here