问题描述
我想递归读取Azure Data Lake中的所有文件并处理文件内容。 下面是我想出的代码,如果这是一个好主意或可以做得更好,希望获得您的反馈。
不胜感激!
目前,我有一些我无法解释的副作用。可能与此有关。
- 我担心背压。目标是暂停扫描,请参阅filequeue.put()。
- 不确定是否处理了所有级别的异常。
- 要输出找到了多少个文件的总计数器。 autoConnect()似乎有点过分。
- 如何控制并行度?它正在使用多个线程,不是吗?
这是代码。 filequeue是Java BlockingQueue,也是轮询过程从中获取下一个数据的目标。必须是轮询。队列通常包含
BlockingQueue<Tuple2<byte[],PathItem>> filequeue = new ArrayBlockingQueue<>(100);
filenamescanner = asyncfsclient
.listPaths(options) // read the directory recursive
.filter(item -> {
return !item.isDirectory(); // only interested in files,not dirs
})
.subscribe(item -> {
DataLakeFileAsyncClient c = asyncfsclient.getFileAsyncClient(item.getName());
ByteArrayOutputStream out = new ByteArrayOutputStream();
c.read().doOnNext(piece -> {
try {
out.write(piece.array());
} catch (IOException ex) {
System.out.print(ex.getMessage());
}
}).doOnComplete(() -> {
filequeue.put(Tuples.of(out.toByteArray(),item));
})
.subscribe(null,e -> System.out.println("ERROR reading file: " + e));
},e -> System.out.println("ERROR listing dir: " + e));
只要队列具有条目或filenamescanner.isDisposed()== false,轮询过程就会读取数据。
如您所见,我目前缺乏经验,这是我想到的最好的代码。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)