问题描述
我学会了在 SPARK 3 上使用新的自动加载器流式传输方法,但我遇到了这个问题。 在这里,我正在尝试收听简单的 json 文件,但我的流从未启动。
from pyspark.sql.types import StructType,StringType,IntegerType
azdls_connection_string = "My connection string"
queue_name = "queu-name"
stream_schema = StructType() \
.add("timestamp",StringType(),False) \
.add("temperature",IntegerType(),False)
ressource_group = ""
cloudfiles_subid_telem = ""
cloudfiles_clientid_telem = ""
cloudfiles_clientsecret_telem = ""
tenantid = ""
conainer_name = "mydb"
abs_fs = "abfss://" + conainer_name + "@" + dls_name + ".dfs.core.windows.net"
read_stream = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.useNotifications",True)
.option("cloudFiles.format","json")
.option("cloudFiles.connectionString",azdls_connection_string)
.option("cloudFiles.resourceGroup",ressource_group)
.option("cloudFiles.subscriptionId",cloudfiles_subid_telem)
.option("cloudFiles.tenantId",tenantid)
.option("cloudFiles.clientId",cloudfiles_clientid_telem)
.option("cloudFiles.clientSecret",cloudfiles_clientsecret_telem)
.option("cloudFiles.region","francecentral")
.schema(stream_schema)
.option("cloudFiles.includeExistingFiles",False)
.load(abs_fs + "/input")
)
checkpoint_path = abs_fs + "/checkpoints"
out_path = abs_fs + "/out"
df = read_stream.writeStream.format("delta") \
.option("checkpointLocation",checkpoint_path) \
.start(out_path)
当我尝试开始流式传输时,出现错误。我的权限设置正确,因为我的 Azure 队列已创建。我在 databricks 网站上的 autloader 文档中没有找到有关此错误的任何信息。
这是我的错误:
java.io.IOException: Attempted read from closed stream.
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:165)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.Reader.read(Reader.java:140)
at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:2001)
at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:1980)
at org.apache.commons.io.IoUtils.copy(IoUtils.java:1957)
at org.apache.commons.io.IoUtils.copy(IoUtils.java:1907)
at org.apache.commons.io.IoUtils.toString(IoUtils.java:778)
at org.apache.commons.io.IoUtils.toString(IoUtils.java:759)
at com.databricks.sql.aqs.EventGridClient.prettyResponse(EventGridClient.scala:428)
at com.databricks.sql.aqs.EventGridClient.com$databricks$sql$aqs$EventGridClient$$errorResponse(EventGridClient.scala:424)
at com.databricks.sql.aqs.EventGridClient$$anonfun$createEventSubscription$3.applyOrElse(EventGridClient.scala:235)
at com.databricks.sql.aqs.EventGridClient$$anonfun$createEventSubscription$3.applyOrElse(EventGridClient.scala:229)
at com.databricks.sql.aqs.EventGridClient.executeRequest(EventGridClient.scala:387)
at com.databricks.sql.aqs.EventGridClient.createEventSubscription(EventGridClient.scala:226)
at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.$anonfun$setupEventGridSubscription$1(AzureEventNotificationSetup.scala:135)
at scala.Option.getorElse(Option.scala:189)
at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.setupEventGridSubscription(AzureEventNotificationSetup.scala:121)
at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.<init>(AzureEventNotificationSetup.scala:75)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.$anonfun$create$1(EventNotificationSetup.scala:66)
at com.databricks.sql.fileNotification.autoIngest.ResourceManagementUtils$.unwrapInvocationTargetException(ResourceManagementUtils.scala:42)
at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.create(EventNotificationSetup.scala:50)
at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.$anonfun$createSource$2(CloudFilesSourceProvider.scala:162)
at scala.Option.getorElse(Option.scala:189)
at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.createSource(CloudFilesSourceProvider.scala:154)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:306)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:93)
at scala.collection.mutable.HashMap.getorElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:90)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:88)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:322)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:322)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:166)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:164)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:311)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:88)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:68)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:269)
提前致谢
解决方法
好的,我找到了我的问题。我在尝试 abqs 连接器时与之前的流发生冲突。