如何在Nifi上读取sql表?

问题描述

我正在尝试在Nifi上创建基本流程

  1. sql读取表
  2. 在python上处理它
  3. sql写回另一个

这很简单。

Nifi flow

但是,当我尝试在python上读取数据时遇到了问题

据我了解,我需要使用sys.stdin / out。 它只能按以下方式进行读取和写入。

import sys
import pandas as pd

file = pd.read_csv(sys.stdin)
file.to_csv(sys.stdout,index=False)

下面您可以找到处理器属性,但是我认为这不是问题。

QueryDatabaseTableRecord:

QueryDatabaseTableRecord

ExecuteStreamCommand:

enter image description here

PutDatabaseRecord:

enter image description here

错误消息:

enter image description here

解决方法

如果您运行的是1.12.0或更高版本,则有一种更简单的方法:ScriptedTransformRecord。就像ExecuteScript一样,只是它可以按记录运行。这是一个简单的Groovy脚本的样子:

def fullName = record.getValue("FullName")
def nameParts = fullName.split(/[\s]{1,}/)
record.setValue("FirstName",nameParts[0])
record.setValue("LastName:",nameParts[1])
record

这是一个新的处理器,因此除了捆绑的(非常好)文档外,还没有那么多文档。因此,目前样本可能很少。如果您想使用并遇到问题,请随时加入nifi用户的邮件列表,并寻求更多详细的帮助。