问题描述
我正在尝试使用flink StreamingFileSink将数据从Intellij IDE写入到Azure Blob存储中,但是我遇到了错误
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
Caused by: java.lang.classNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter
下面是我的代码
public class BlobSample {
public static void main(String[] args) throws Exception {
//System.setProperty("hadoop.home.dir","/");
Configuration cfg = new Configuration();
cfg.setString("fs.azure.account.key.azurekey.blob.core.windows.net","azure_blob_key");
//cfg.setBoolean("recursive.file.enumeration",true);
Filesystem.initialize(cfg,null);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("hello");
DataStream<String> output = input.flatMap(new FlatMapFunction<String,String>() {
@Override
public void flatMap(String value,Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
// output.writeAsText("wasbs://[email protected]/output");
String outputPath = "wasbs://[email protected]/rawsignals";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath),new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(100)
.build())
.build();
output.addSink(sink);
env.execute("BlobStorage");
}
}
我也尝试使用writeAsText,但它也无法正常工作,我在环境变量中添加了HADOOP_HOME并将此依赖项添加到build.gradle中,还编译了组:'org.apache.flink',名称:'flink-azure-fs- hadoop”,版本:“ 1.11.2”。我向flink-conf.yaml添加了azure密钥,但仍然无法正常工作。请帮助我解决问题。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)