深度 LS 递归生成器的 Python 函数错误

问题描述

我有以下函数应该对所有数据湖资产进行递归,但是在调用以下行 TypeError: <generator object deep_ls at 0x7fc512538e40> has the wrong type - (<class 'str'>,) is expected. 时它给了我以下错误 files = dbutils.fs.ls(deep_ls(root,max_depth=20)),这意味着递归获取文件.您对如何解决此问题有任何想法或建议:

def deep_ls(path: str,max_depth=1):


    # Hidden files may be filtered out
    condition = None if keep_hidden else lambda x: x.name[0] not in ('_','.')

    # List all files in path and apply sorting rules
    li = sorted(filter(condition,dbutils.fs.ls(path)),reverse=reverse,key=key)

    # Return all files (not ending with '/')
    for x in li:
        if x.path[-1] != '/':
            yield x

    # If the max_depth has not been reached,start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in li:
            if x.path[-1] != '/':
                continue
            for y in deep_ls(x.path,max_depth - 1,reverse,key,keep_hidden):
                yield y

    # If max_depth has been reached,# return the folders
    else:
        for x in li:
            if x.path[-1] == '/':
                yield x
            

def key(val):
    try:
        return int(list(filter(bool,val.path.split('/'))).pop().split('=').pop())
    except ValueError as e:
        return -1

这是试图调用函数的完整数据块笔记本代码

    # Example Implementation
# ----------------------

# Library Imports

import os
import requests
import json
import jmespath
import pandas as pd

from pprint import pprint
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient,AtlasEntity,AtlasProcess,TypeCategory
from pyapacheatlas.core.typedef import *

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Authentication

# Service Principal with "Purview Data Source Administrator" permissions on Purview
tenant_id = "enter-tenant-id"
client_id = "enter-client-id"
client_secret = "enter-client-secret"
resource_url = "https://purview.azure.net"
data_catalog_name = "demo-purview"
adls_sas_token = 'enter-sas-token-here'

# Retrieve authentication objects
azuread_access_token = azuread_auth(tenant_id,client_id,client_secret,resource_url)
purview_client = purview_auth(tenant_id,data_catalog_name)

# Asset details

# Asset parameters
storage_account = "adls"
container = "lake"

# The root level path we want to begin populating assets from
top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}"

# Retrieve full list of assets
assets_all = list(get_all_adls_assets(top_path,data_catalog_name,azuread_access_token,max_depth=20))


# Grab SAS token
#adls_sas_token = dbutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Configure Spark to access from DFS endpoint
root = 'https://%s@%s.dfs.core.windows.net/' % (container,storage_account)

spark.conf.set('fs.azure.sas.%s.%s.dfs.core.windows.net' % (container,storage_account),adls_sas_token)
print('Remote adls root path: ' + root)

# Get ADLS files recursively
files = dbutils.fs.ls(deep_ls(root,max_depth=20))
files_df = convertfiles2df(files) # Note this is a Pandas DataFrame

# Generate asset-aligned names
files_df['asset'] = files_df['name'].str.replace(r'\d+','{N}')

# Append schema row-wise from Purview
files_df['schema'] = files_df.apply(lambda row: get_adls_asset_schema(assets_all,row['asset'],purview_client),axis=1)

# display Asset DataFrame
display(files_df)

解决方法

dbutils.fs.ls 函数 accepts only one argument - 带有路径的字符串,但您将生成器传递给它。你可以把它转换成这样:

for file in deep_ls(root,max_depth=20):
  res = dbutils.fs.ls(file)
  do_something_with_result...