来自嵌套数组的 JOLT 连接值 (Apache NiFi)

问题描述

我有一个 JSON:

{
  "reports": [
    {
      "columnHeader": {
        "metricHeader": {
          "metricHeaderEntries": [
            {
              "name": "ga:sessions","type": "INTEGER"
            },{
              "name": "ga:bounces",{
              "name": "ga:sessionDuration","type": "TIME"
            },{
              "name": "ga:pageviews","type": "INTEGER"
            }
          ]
        }
      },"data": {
        "rows": [
          {
            "metrics": [
              {
                "values": [
                  "25","18","1269.0","27"
                ]
              }
            ]
          }
        ],"totals": [
          {
            "values": [
              "25","27"
            ]
          }
        ],"rowCount": 1,"minimums": [
          {
            "values": [
              "25","maximums": [
          {
            "values": [
              "25","isDataGolden": true
      }
    }
  ]
}
它们的

metricHeaderEntriesvalues 是分开的。值在 data.totals 数组中(顺序正确保存)。 我想修改 JSON 并获得以下结构(或与此类似,我只需要对 metric.name = metric.value):

{
  "metrics": [
            {
              "name": "ga:sessions","value": "25"
            },"type": "18"
            },"type": "1269.0"
            },"type": "27"
            }
          ],"isDataGolden": true      
}

JOLT 可以吗? 之前我只将 shift 规范用于一些非常简单的任务。以下规格:

[
  {
    "operation": "shift","spec": {
      "reports": {
        "*": {
          "columnHeader": {
            "metricHeader": {
              "metricHeaderEntries": {
                "*": {
                  "name": "@(1,name)"
                }
              }
            }
          },"isDataGolden": "isDataGolden"
        }
      }
    }
  }
]

返回:

{
  "ga:sessions" : "ga:sessions","ga:bounces" : "ga:bounces","ga:sessionDuration" : "ga:sessionDuration","ga:pageviews" : "ga:pageviews"
}

“几乎”。当然不是我想要的。我需要一个数组 metrics 与字段 namevalue 如上所述。但我不知道如何从 data.totals 获取这些值并将它们放入指标。并且isDataGolden也消失了。我阅读了一些关于 modify-overwrite-beta内容,我可以将它用于我的案例吗?

解决方法

你可以使用executegroovyscript

import groovy.json.*

def ff=session.get()
if(!ff)return

//read flow file content and parse it
def body = ff.read().withReader("UTF-8"){reader-> 
    new JsonSlurper().parse(reader) 
}

def rep0=body.reports[0]

def result = [ 
    metrics : rep0.columnHeader.metricHeader.metricHeaderEntries.indexed().collect{i,m->
            [ 
                name : m.name,value: rep0.data.totals[0].values[i]
            ]
        },isDataGolden : rep0.data.isDataGolden 
]

//write new flow file content
ff.write("UTF-8"){writer-> 
    new JsonBuilder(result).writeTo(writer) 
}
//transfer
REL_SUCCESS << ff