问题描述
示例:在创建流文件期间,我使用处理器 GenerateFlowFile
和自定义文本 ${Now()}
作为当前时间戳实例化流文件。
我想要一个处理器(哪种类型与我无关)将流文件的内容(时间戳)读取到处理器的自定义属性 property_name
。之后,我希望能够通过 REST-API 潜在地查询处理器并从处理器读取该属性。
最初我认为我可以用 ExtractText
处理器来做到这一点,但它根据正则表达式提取文本并将其写回流文件,而我想将该信息保存在处理器中,直到下一个流文件到达。
解决方法
你不能通过 NiFi 做到这一点。当处理器运行时,您无法更新其配置。
也许您可以在 UpdateAttribute 上使用状态变量?
有状态的使用
通过为“Store State”选择“store state local”选项 属性 UpdateAttribute 不仅会存储评估的属性 作为 FlowFile 的属性,但也作为有状态变量 以递归方式引用。这使处理器能够 计算传入 FlowFiles 的总和或计数等内容。一种 动态属性可以作为有状态变量引用,如下所示:
动态属性键:theCount 值:
${getStateValue("theCount"):plus(1)}
这个例子将保持一个计数
通过处理器的 FlowFiles 总数。
要在 State 之上使用逻辑,只需使用
更新属性。所有操作都将作为有状态属性存储为
以及被添加到 FlowFiles。使用“高级用法”它是
可以跟踪诸如流量最大值之类的事情
远的。这将通过有一个条件来完成
"${getStateValue("maxValue"):lt(${value})}"
和一个动作
属性:“maxValue”,值:“${value}”。 “状态变量
Initial Value”属性用于初始化有状态变量
如果有状态运行,则需要设置。一些逻辑规则将
需要非常高的初始值,例如使用高级规则
确定最小值。如果有状态属性引用其他
有状态属性然后是其他有状态属性的值
后面会有迭代。例如,尝试计算
传入流的平均值需要求和和计数。我摔倒
三个属性设置在同一个 UpdateAttribute 中(如下所示)然后
平均值将始终不包括计数的最新值
和总和:
Count 键:theCount 值:${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})}
平均键 : theAverage 值 :
${getStateValue("theSum"):divide(getStateValue("theCount"))}
相反,
因为 average 只依赖于 theCount 和 theSum 属性(它们是
也添加到 FlowFile 中)应该有以下无状态
UpdateAttribute 正确计算平均值。在事件
处理器无法获取开始时的状态
onTrigger,FlowFile 将被推回原始
关系和处理器将屈服。如果处理器能够
获取 onTrigger 开始时的状态,但无法设置
将属性添加到 FlowFile 后的状态,FlowFile 将是
转移到“设置状态失败”。这通常是由于状态不
是最新版本(另一个线程已取代
状态与另一个版本)。在大多数用例中,这种关系
应该循环回处理器,因为唯一受影响的属性
将被覆盖。注意:目前唯一的“有状态”选项是
在本地存储状态。这样做是因为当前的实现
集群状态依赖于 Zookeeper 而 Zookeeper 不是设计出来的
对于具有状态的负载/吞吐量类型的 UpdateAttribute 将
要求。将来,如果/当多个不同的集群状态
选项被添加,UpdateAttribute 将被更新。
感谢@Ivan,我能够创建一个完整的工作解决方案 - 以供将来参考:
-
使用例如实例化流文件
GenerateFlowFile
处理器并添加自定义属性“myproperty”和值${now()}
(注意:您可以将此属性添加到任何处理器中的流文件,不必是GenerateFlowFile
处理器) -
有一个
UpdateAttribute
处理器,其选项(在处理器属性下)Store State 设置为Store state locally
。 -
在
UpdateAttribute
处理器中添加一个名为 readable_property 的自定义属性,并将其设置为值${'myproperty'}
。
处理器的状态现在包含最后一个流文件的值(例如,带有属性被添加到流文件的时间戳)。
增加的奖金:
- 通过 REST-API 和 URI
/nifi-api/processors/{id}/state
上的 GET 获取有状态处理器的值(以及通过 (!) 的最后一个流文件的值)
返回的 JSON 包含以下几行:
{
"key":"readable_property","value":"Wed Apr 14 11:13:40 CEST 2021","clusterNodeId":"some-id-0d8eb6052","clusterNodeAddress":"some-host:port-number"
}
然后您只需解析 JSON 以获取值。
,您应该使用 UpdateAttribute 处理器。 您可以阅读几种方法 - f.e. Update attributes based on content in NiFi