问题描述
我试图将数据写入CSV文件并将文件存储在Azure Data Lake Gen2上,并遇到作业中止错误消息。同样的代码以前也可以正常工作。
错误消息:
org.apache.spark.SparkException: Job aborted.
代码:
import requests
response = requests.get('https://myapiurl.com/v1/data',auth=('user','password'))
data = response.json()
from pyspark.sql import *
df=spark.createDataFrame([Row(**i) for i in data])
df.write.format(source).mode("overwrite").save(path) #error line
解决方法
我总结以下解决方案
如果要访问Azure数据块中的Azure数据湖gen2,则有两种选择。
-
安装Azure数据湖gen2作为Azure databricks的文件系统。完成之后,您可以使用路径
/mnt/<>
读写文件。而且我们只需要运行一次代码即可。a。创建服务主体,并将Storage Blob数据贡献者分配给Data Lake Storage Gen2存储帐户范围内的sp
az login az ad sp create-for-rbac -n "MyApp" --role "Storage Blob Data Contributor" \ --scopes /subscriptions/<subscription>/resourceGroups/<resource-group>/providers/Microsoft.Storage/storageAccounts/<storage-account>
b。代码
configs = {"fs.azure.account.auth.type": "OAuth","fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider","fs.azure.account.oauth2.client.id": "<appId>","fs.azure.account.oauth2.client.secret": "<clientSecret>","fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token","fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1",mount_point = "/mnt/flightdata",extra_configs = configs)
-
使用存储帐户访问密钥直接访问。
我们可以将代码
spark.conf.set( "fs.azure.account.key.<storage-account-name>.dfs.core.windows.net","<storage-account-access-key-name>")
添加到脚本中。然后,我们可以使用路径abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/
读写文件。例如
from pyspark.sql.types import StringType spark.conf.set( "fs.azure.account.key.testadls05.dfs.core.windows.net","<account access key>") df = spark.createDataFrame(["10","11","13"],StringType()).toDF("age") df.show() df.coalesce(1).write.format('csv').option('header',True).mode('overwrite').save('abfss://test@testadls05.dfs.core.windows.net/result_csv')
有关更多详细信息,请参阅here