将流文件属性/内容读取到处理器属性

问题描述

我想根据通过的最后一个文件内容设置处理器的属性

示例:在创建流文件期间,我使用处理器 GenerateFlowFile自定义文本 ${Now()} 作为当前时间戳实例化流文件

我想要一个处理器(哪种类型与我无关)将流文件内容(时间戳)读取到处理器的自定义属性 property_name。之后,我希望能够通过 REST-API 潜在地查询处理器并从处理器读取该属性

最初我认为我可以用 ExtractText 处理器来做到这一点,但它根据正则表达式提取文本并将其写回流文件,而我想将该信息保存在处理器中,直到下一个文件到达。

解决方法

你不能通过 NiFi 做到这一点。当处理器运行时,您无法更新其配置。

也许您可以在 UpdateAttribute 上使用状态变量?

有状态的使用

通过为“Store State”选择“store state local”选项 属性 UpdateAttribute 不仅会存储评估的属性 作为 FlowFile 的属性,但也作为有状态变量 以递归方式引用。这使处理器能够 计算传入 FlowFiles 的总和或计数等内容。一种 动态属性可以作为有状态变量引用,如下所示:

动态属性键:theCount 值: ${getStateValue("theCount"):plus(1)} 这个例子将保持一个计数 通过处理器的 FlowFiles 总数。 要在 State 之上使用逻辑,只需使用 更新属性。所有操作都将作为有状态属性存储为 以及被添加到 FlowFiles。使用“高级用法”它是 可以跟踪诸如流量最大值之类的事情 远的。这将通过有一个条件来完成 "${getStateValue("maxValue"):lt(${value})}" 和一个动作 属性:“maxValue”,值:“${value}”。 “状态变量 Initial Value”属性用于初始化有状态变量 如果有状态运行,则需要设置。一些逻辑规则将 需要非常高的初始值,例如使用高级规则 确定最小值。如果有状态属性引用其他 有状态属性然后是其他有状态属性的值 后面会有迭代。例如,尝试计算 传入流的平均值需要求和和计数。我摔倒 三个属性设置在同一个 UpdateAttribute 中(如下所示)然后 平均值将始终不包括计数的最新值 和总和:

Count 键:theCount 值:${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})} 平均键 : theAverage 值 : ${getStateValue("theSum"):divide(getStateValue("theCount"))} 相反, 因为 average 只依赖于 theCount 和 theSum 属性(它们是 也添加到 FlowFile 中)应该有以下无状态 UpdateAttribute 正确计算平均值。在事件 处理器无法获取开始时的状态 onTrigger,FlowFile 将被推回原始 关系和处理器将屈服。如果处理器能够 获取 onTrigger 开始时的状态,但无法设置 将属性添加到 FlowFile 后的状态,FlowFile 将是 转移到“设置状态失败”。这通常是由于状态不 是最新版本(另一个线程已取代 状态与另一个版本)。在大多数用例中,这种关系 应该循环回处理器,因为唯一受影响的属性 将被覆盖。注意:目前唯一的“有状态”选项是 在本地存储状态。这样做是因为当前的实现 集群状态依赖于 Zookeeper 而 Zookeeper 不是设计出来的 对于具有状态的负载/吞吐量类型的 UpdateAttribute 将 要求。将来,如果/当多个不同的集群状态 选项被添加,UpdateAttribute 将被更新。

,

感谢@Ivan,我能够创建一个完整的工作解决方案 - 以供将来参考:

  1. 使用例如实例化流文件GenerateFlowFile 处理器并添加自定义属性“myproperty”和值 ${now()}(注意:您可以将此属性添加到任何处理器中的流文件,不必是 GenerateFlowFile处理器)

  2. 有一个 UpdateAttribute 处理器,其选项(在处理器属性下)Store State 设置为 Store state locally

  3. UpdateAttribute 处理器中添加一个名为 readable_property 的自定义属性,并将其设置为值 ${'myproperty'}

处理器的状态现在包含最后一个流文件的值(例如,带有属性被添加到流文件的时间戳)。

增加的奖金:

  1. 通过 REST-API 和 URI /nifi-api/processors/{id}/state 上的 GET 获取有状态处理器的值(以及通过 (!) 的最后一个流文件的值)

返回的 JSON 包含以下几行:

{
"key":"readable_property","value":"Wed Apr 14 11:13:40 CEST 2021","clusterNodeId":"some-id-0d8eb6052","clusterNodeAddress":"some-host:port-number"
}

然后您只需解析 JSON 以获取值。

,

您应该使用 UpdateAttribute 处理器。 您可以阅读几种方法 - f.e. Update attributes based on content in NiFi