在数据融合中应用Rank或分区的row_num函数

问题描述

我想在Data Fusion中对我的数据实现rank或partitioned row_num函数,但找不到任何插件

有什么办法吗?

我想实现以下内容

enter image description here

假设我有上述数据,现在我想根据AccountNumber对数据进行分组,并将最新记录发送到一个接收器中,再将其发送给其他接收器。 因此,根据以上数据,

Sink1应该具有

enter image description here

Sink2

enter image description here

我打算通过按AccountNumber应用rank或row_number分区,并按功能类似按Record_date desc进行排序,然后将rank = 1或row_num = 1的记录发送到一个接收器,然后将其发送给其他接收器。

解决方法

解决问题的一种好方法是使用Spark plugin。 为了将其添加到您的Datafusion实例中,请转到 HUB->插件->搜索Spark->部署插件。然后,您可以在 Analytics 标签上找到它。

为了给您一个如何使用它的示例,我在下面创建了管道:

enter image description here

该管道基本上是:

  1. 从GCS读取文件。
  2. 在数据中执行排名功能
  3. 在不同分支中用rank = 1和rank> 1过滤数据
  4. 将数据保存在不同的位置

现在,让我们深入了解每个组件:

1- GCS :这是一个简单的GCS来源。此示例使用的文件具有如下所示的数据

enter image description here

2- Spark_rank :这是一个带有以下代码的Spark插件。该代码基本上使用您的数据创建了一个临时视图,并且它们应用查询对行进行排名。之后,您的数据返回到管道。在下面您还可以查看此步骤的输入和输出数据。请注意,由于输出被传递到两个分支,因此输出是重复的。

      def transform(df: DataFrame,context: SparkExecutionPluginContext) : DataFrame = {
          df.createTempView("source")
          df.sparkSession.sql("SELECT AccountNumber,Address,Record_date,RANK() OVER (PARTITION BY accountNumber ORDER BY record_date DESC) as rank FROM source")
    }

enter image description here

3- Spark2 Spark3 :像下面的步骤一样,此步骤使用Spark插件转换数据。使用以下代码,Spark2仅获取等级= 1的数据

    def transform(df: DataFrame,context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_0")
      df.sparkSession.sql("SELECT AccountNumber,Record_date FROM 
    source_0 WHERE rank = 1")
    }

Spark3使用以下代码获取等级> 1的数据:

    def transform(df: DataFrame,context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_1")
      df.sparkSession.sql("SELECT accountNumber,address,record_date FROM source_1 WHERE rank > 1")
    }

4- GCS2 GCS3 :最后,在此步骤中,您的数据再次保存到GCS中。