问题描述
我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器的管道和名称为“Data For Flink”的输出端口。在 Flink 端,我使用带有 flink 的 flink-connector-nifi_2.11。下面是我在 Flink 中的 nifi 客户端代码。
SitetoSiteClientConfig clientConfig = new SitetoSiteClient.Builder()
// .url("https://localhost:9443/nifi")
.url("http://localhost:8080/nifi")
.portName("Data For Flink")
.transportProtocol(SitetoSiteTransportProtocol.HTTP)
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = streamExecEnv.addSource(nifiSource);
DataStream<String> data = streamSource.map(new MapFunction<NiFiDataPacket,String>() {
@Override
public String map(NiFiDataPacket line) throws Exception {
String row = new String(line.getContent(),Charset.defaultCharset());
return row;
}
});
data.print();
streamExecEnv.execute();
当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里遗漏了什么吗?
我还需要配置安全的 nifi 实例或任何站点到站点的属性来将数据从 Nifi 传输到 Flink 吗?
我也试过了,但是我遇到了 ca 证书的问题。我已经使用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java truststore 时,我收到错误消息 - 需要 x509 证书。
编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。
我是 NiFi 和 Flink 的新手。感谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)