问题描述
我指的是以下链接 https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader 它使用火花流处理Azure数据砖中的增量文件。我想知道带有Data Lake stroage Gen2的HD Insight集群是否支持增量文件。我在HD洞察力火花群集中尝试了该示例,我得到了以下错误
示例代码:
input_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format","json") \
.option("cloudFiles.connectionString",connection_string) \
.option("cloudFiles.resourceGroup",resource_group) \
.option("cloudFiles.subscriptionId",subscription_id) \
.option("cloudFiles.tenantId",tenant_id) \
.option("cloudFiles.clientId",client_id) \
.option("cloudFiles.clientSecret",client_secret) \
.option("cloudFiles.includeExistingFiles","true") \
.schema(schema) \
.load(input_folder)
Traceback (most recent call last):
File "<stdin>",line 12,in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py",line 398,in load
return self._df(self._jreader.load(path))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py",line 63,in deco
return f(*a,**kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.classNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.classNotFoundException: cloudFiles.DefaultSource
at java.net.urlclassloader.findClass(urlclassloader.java:382)
at java.lang.classLoader.loadClass(ClassLoader.java:419)
at java.lang.classLoader.loadClass(ClassLoader.java:352)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
。
解决方法
不幸的是,Azure HDInsight不支持将Auto Loader用于新文件检测。
什么是自动加载程序?
自动加载器– Databricks 的新功能,允许从各种数据源向Delta Lake增量摄取数据。 Auto Loader是Apache Spark的优化云文件源,可在新数据到达时从云存储连续有效地加载数据。合作伙伴集成的数据摄取网络使您可以将数百个数据源中的数据直接摄取到Delta Lake中。
在后台(在Azure Databricks中),运行Auto Loader将自动设置 Azure事件网格和队列存储服务。通过这些服务,自动加载器使用Azure存储中的队列轻松查找新文件,将它们传递给Spark,从而在流传输或批处理作业中以低延迟和低成本加载数据。 Auto Loader会记录要处理的文件,以确保对传入数据进行一次准确的处理。
当新数据文件到达云存储时,Auto Loader会逐步有效地对其进行处理,而无需进行任何其他设置。 Auto Loader提供了一个新的结构化流源,称为cloudFiles。给定云文件存储上的输入目录路径,cloudFiles源会在新文件到达时自动处理它们,并可以选择处理该目录中的现有文件。
有关详细信息,请参见Load files from Azure Blob storage or Azure Data Lake Storage Gen2 using Auto Loader。