无法使用 nifi-flink 连接器将 apache flink 连接到 NIFI 源

问题描述

我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器的管道和名称为“Data For Flink”的输出端口。在 Flink 端,我使用带有 flink 的 flink-connector-nifi_2.11。下面是我在 Flink 中的 nifi 客户端代码

         SitetoSiteClientConfig clientConfig = new SitetoSiteClient.Builder()
             //   .url("https://localhost:9443/nifi")
                .url("http://localhost:8080/nifi")
                .portName("Data For Flink")
                .transportProtocol(SitetoSiteTransportProtocol.HTTP)
                .requestBatchCount(5)
                .buildConfig();

        SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
        DataStream<NiFiDataPacket> streamSource = streamExecEnv.addSource(nifiSource);
        DataStream<String> data = streamSource.map(new MapFunction<NiFiDataPacket,String>() {
            @Override
            public String map(NiFiDataPacket line) throws Exception {
                String row = new String(line.getContent(),Charset.defaultCharset());

                return row;
            }
        });
        data.print();
        streamExecEnv.execute(); 

当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里遗漏了什么吗?

我还需要配置安全的 nifi 实例或任何站点站点属性来将数据从 Nifi 传输到 Flink 吗?

我也试过了,但是我遇到了 ca 证书的问题。我已经使用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java truststore 时,我收到错误消息 - 需要 x509 证书。

编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。

我是 NiFi 和 Flink 的新手。感谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)