如何从Apache Flink将数据写入Azure Blob存储?

问题描述

我正在尝试使用flink StreamingFileSink将数据从Intellij IDE写入到Azure Blob存储中,但是我遇到了错误

Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
Caused by: java.lang.classNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter

下面是我的代码

public class BlobSample {

    public static void main(String[] args) throws Exception {

        //System.setProperty("hadoop.home.dir","/");

        Configuration cfg = new Configuration();
        cfg.setString("fs.azure.account.key.azurekey.blob.core.windows.net","azure_blob_key");
        //cfg.setBoolean("recursive.file.enumeration",true);
        Filesystem.initialize(cfg,null);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = env.fromElements("hello");

        DataStream<String> output = input.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public void flatMap(String value,Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });


       // output.writeAsText("wasbs://[email protected]/output");

        String outputPath = "wasbs://[email protected]/rawsignals";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath),new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(100)
                                .build())
                .build();


        output.addSink(sink);

        env.execute("BlobStorage");

    }
}

我也尝试使用writeAsText,但它也无法正常工作,我在环境变量中添加了HADOOP_HOME并将此依赖项添加到build.gradle中,还编译了组:'org.apache.flink',名称:'flink-azure-fs- hadoop”,版本:“ 1.11.2”。我向flink-conf.yaml添加了azure密钥,但仍然无法正常工作。请帮助我解决问题。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)