问题描述
我正在尝试将流数据读取到来自Azure Eventhubs的Azure Databricks中。 这是我一直在使用的代码:
connectionString = "Connection string"
ehConf = {
'eventhubs.connectionString' : connectionString
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
ERROR: Some streams terminated before this command Could finish!
我了解到,我们必须根据Databricks运行时间以及Spark版本来提供Azure Eventhub的Jar文件。 我的Spark版本是2.4.5,Databricks运行时是6.6,我使用的jar文件是azure-eventhubs-spark_2.12-2.3.17.jar(针对指定的此组合)
但是我仍然面临这个问题,因为“某些流在此命令完成之前就已终止!”。有人可以帮我吗?
谢谢
解决方法
在我开始处理此问题时:首先遇到与您遇到的相同的问题。
ERROR: Some streams terminated before this command could finish!
进行此更改后,可以与以下配置完美配合:
Databrick运行时: 6.6 (includes Apache Spark 2.4.5,Scala 2.11)
Azure EventHub库: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
第一步:使用“库”安装库。
您可以尝试使用“安装库”选项安装“ com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
”。
第二步::更改与Azure事件中心相关的配置。
如果您使用的是ehConf = {'eventhubs.connectionString' : connectionString}
以上版本的“ 2.3.15
”,则会收到以下错误消息。
java.lang.IllegalArgumentException: Input byte array has wrong 4-byte ending unit
注意:与事件中心相关的所有配置都在事件中心配置字典中进行。配置字典必须包含事件中心连接字符串:
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString
For **2.3.15** version and above,the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)