需要为特定的 JSON 输出重新编写 Scala 代码

问题描述

我正在尝试通过样条曲线和 apacheatlas api 将 Databricks 笔记本沿袭注册到 Azure Purview。代码有两个版本:1) 是原始代码,它使用 databricks 运行时版本 6.4 并按预期工作,但我们需要在至少 7.5 版及更高版本的较新运行时版本中运行它,因此有 2) 秒为运行时版本 7.5 重构的代码版本。具体来说,新代码需要新的 JSON 包,但输出(见下文)与原始代码的预期输出(也如下所示)不匹配。需要重新编写代码才能正确执行,因为当前的新代码错误。谢谢

ORIGINAL OLD CODE:使用Databricks Runtime version6.4的原始代码如下

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
  //override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
  //val test = dbutils.notebook.getContext.notebookPath
  val notebookinformationjson = dbutils.notebook.getContext.toJson
  val outerMap = JSON.parseFull(notebookinformationjson).getorElse(0).asInstanceOf[Map[String,String]]
  val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

  val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
  val notebookPath = extraContextMap("notebook_path").split("/")
  
  val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
  val user = tagMap("user")
  val name = notebookPath(notebookPath.size-1)
  
  val notebookInfo = Map("notebookURL" -> notebookURL,"user" -> user,"name" -> name,"mounts" -> dbutils.fs.ls("/mnt").map(_.path),"timestamp" -> System.currentTimeMillis)
  val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
  
  override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
    override def forExecEvent(event: ExecutionEvent,ctx: HarvestingContext): Map[String,Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan,Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation,Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation,Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation,Any] = Map("foo" -> "bar5")
  }
})

ORIGINAL EXPECTED OUTPUT:这是原始代码的预期 JSON 输出

{
  'id': '618c92e1-ae79-491e-b6fd-b5080dc7ef8d','operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/parquet/Customers_new','append': False,'id': 0,'childIds': [
        1
      ],'params': {
        'path': 'dbfs:/mnt/test_data/parquet/Customers_new'
      },'extra': {
        'name': 'InsertIntoHadoopFsRelationCommand','destinationType': 'Parquet','foo': 'bar4'
      }
    },'reads': [
      {
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],'id': 1,'schema': [
          'b6112e12-9b90-46db-b919-bcc9c6280759','9f0671fe-813e-4608-a870-adae8386c46e','8dfcf4df-211c-48b0-8dec-1b1486dd0db4'
        ],'params': {
          'delimiter': ',','inferschema': 'true','header': 'true','path': 'dbfs:/mnt/test_data/csv/Customers.csv'
        },'extra': {
          'name': 'LogicalRelation','sourceType': 'CSV','foo': 'bar3'
        }
      }
    ]
  },'systemInfo': {
    'name': 'spark','version': '2.4.5'
  },'agentInfo': {
    'name': 'spline','version': '0.5.3'
  },'extraInfo': {
    'appName': 'Databricks Shell','dataTypes': [
      {
        '_typeHint': 'dt.Simple','id': 'df02093b-d529-4c8d-b422-9ac468baa765','name': 'integer','nullable': True
      },{
        '_typeHint': 'dt.Simple','id': '88f773f8-982c-4d6c-bed3-1600a99c5943','name': 'string','nullable': True
      }
    ],'attributes': [
      {
        'id': 'b6112e12-9b90-46db-b919-bcc9c6280759','name': 'CustomerID','dataTypeId': 'df02093b-d529-4c8d-b422-9ac468baa765'
      },{
        'id': '9f0671fe-813e-4608-a870-adae8386c46e','name': 'FirstName','dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      },{
        'id': '8dfcf4df-211c-48b0-8dec-1b1486dd0db4','name': 'LastName','dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      }
    ],'notebookInfo': {
      'obj': {
        'name': 'initialize_spline_original','timestamp': 1623430575561,'notebookURL': 'adb-2323242424.azuredatabricks.net/','mounts': [
          'dbfs:/mnt/datalake/','dbfs:/mnt/landing_dde/','dbfs:/mnt/landing_newc/','dbfs:/mnt/landing_sourcedb/','dbfs:/mnt/testmount/','dbfs:/mnt/training/'
        ],'user': 'user@test.com'
      }
    }
  }
}

代码:这是使用 Databricks 运行时版本 7.5 和升级和重构的 JSON 包的新代码

    %scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper


val splineConf: Configuration = StandardSplineConfigurationStack(spark)


spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
  val notebookinformationjson = dbutils.notebook.getContext.toJson
  val mapper = new ObjectMapper() with ScalaObjectMapper 
  mapper.registerModule(DefaultScalaModule)
val outerMap = mapper.readValue[Map[String,Object]](notebookinformationjson)
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

  val notebookInfo = Map("notebookURL" -> (notebookURL),"user" -> (user),"name" -> (name),"mounts" -> (dbutils.fs.ls("/mnt").map(_.path)),"timestamp" -> (System.currentTimeMillis))

 val mapper1 = new ObjectMapper()
 val notebookInfoJson = mapper1.writeValueAsstring(notebookInfo)

  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
    override def forExecEvent(event: ExecutionEvent,Any] = Map("foo" -> "bar5")
  })
})

输出:新代码的 JSON 对象输出如下所示:

{
  'id': '12812131038144','operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/delta/customers_join_orders_new','params': {
        'path': 'dbfs:/mnt/test_data/delta/customers_join_orders_new'
      },'extra': {
        'destinationType': 'tahoe','foo': 'bar4','name': 'SaveIntoDataSourceCommand'
      }
    },'reads': [
      {
        'childIds': [
          
        ],'inputSources': [
          'dbfs:/mnt/test_data/csv/Orders.csv'
        ],'id': 4,'schema': [
          '2018','2019','2020','2021','2022','2023'
        ],'params': {
          'inferschema': 'true','delimiter': ','path': 'dbfs:/mnt/test_data/csv/Orders.csv'
        },'extra': {
          'sourceType': 'csv','foo': 'bar3','name': 'LogicalRelation'
        }
      },{
        'childIds': [
          
        ],'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],'id': 6,'schema': [
          '2065','2066','2067'
        ],'name': 'LogicalRelation'
        }
      }
    ],'other': [
      {
        'id': 3,'childIds': [
          4
        ],'params': {
          'identifier': 'orders'
        },'extra': {
          'foo': 'bar5','name': 'SubqueryAlias'
        }
      },{
        'id': 5,'childIds': [
          6
        ],'params': {
          'identifier': 'customers'
        },{
        'id': 2,'childIds': [
          3,5
        ],'2023','2065','params': {
          'condition': {
            '_typeHint': 'expr.Binary','symbol': '=','dataTypeId': '05ffb715-9781-4019-aee3-21c77f80d2a1','children': [
              {
                '_typeHint': 'expr.AttrRef','refId': '2019'
              },{
                '_typeHint': 'expr.AttrRef','refId': '2065'
              }
            ]
          },'hint': '','joinType': 'INNER'
        },'name': 'Join'
        }
      },{
        'id': 1,'childIds': [
          2
        ],'params': {
          'projectList': [
            {
              '_typeHint': 'expr.AttrRef','refId': '2065'
            },{
              '_typeHint': 'expr.AttrRef','refId': '2020'
            },'refId': '2022'
            },'refId': '2023'
            }
          ]
        },'name': 'Project'
        }
      }
    ]
  },'version': '3.0.1'
  },'version': '0.6.0'
  },'id': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca','id': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be','name': 'double','id': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4','id': '05ffb715-9781-4019-aee3-21c77f80d2a1','name': 'boolean','notebookInfo': '{"traversableAgain":true,"empty":false}','attributes': [
      {
        'id': '2018','name': 'SalesOrderID','dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },{
        'id': '2019',{
        'id': '2020','name': 'OrderQty',{
        'id': '2021','name': 'ProductID',{
        'id': '2022','name': 'UnitPrice','dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
      },{
        'id': '2023','name': 'Linetotal',{
        'id': '2065',{
        'id': '2066','dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      },{
        'id': '2067','dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      }
    ]
  }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)