问题描述
我试图弄清楚如何使用NiFi解决以下问题。
场景:
我的NiFi流正在接收JSON消息,它们是输入流文件(来自Kafka主题),对于每个输入JSON,我将执行以下操作
- 我想基于输入json中的属性值扫描HBase表。这将是行前缀扫描,预计将返回 结果最多6条记录
- 对于结果中收到的每个行键,我想在同一张表中进行更新。
- 一旦我对收到的所有行键进行了更新,我想传递原始流程文件(从卡夫卡收到的) 进入后续流程组以继续我的处理。
我已经使用ScanHBase处理器从HBase过滤记录,但是我面临的问题是-ScanHBase处理器没有丰富选项 输入json,但相反,它将以不同的格式返回原始流文件或结果。 我知道可能是在HBaseScan之后充实JSON没有意义,因为预期Hbase扫描会返回记录列表,并且可能是 将输入的JSON和结果一起添加是没有意义的。
我可以使用的可能选项是-使用带有合并策略和相关属性的MergeContent处理器,但是此后MergeContent的结果有点难以使用 进行处理。
还有其他方法可以解决此问题吗?
我的流程如下所示
合并内容配置
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)