MS Azure数据湖扫描目录并使用异步反应性API读取文件

问题描述

我想递归读取Azure Data Lake中的所有文件并处理文件内容。 下面是我想出的代码,如果这是一个好主意或可以做得更好,希望获得您的反馈。

不胜感激!

目前,我有一些我无法解释的副作用。可能与此有关。

  1. 我担心背压。目标是暂停扫描,请参阅filequeue.put()。
  2. 不确定是否处理了所有级别的异常。
  3. 要输出找到了多少个文件的总计数器。 autoConnect()似乎有点过分。
  4. 如何控制并行度?它正在使用多个线程,不是吗?

这是代码。 filequeue是Java BlockingQueue,也是轮询过程从中获取下一个数据的目标。必须是轮询。队列通常包含

    BlockingQueue<Tuple2<byte[],PathItem>> filequeue = new ArrayBlockingQueue<>(100);

    filenamescanner = asyncfsclient
        .listPaths(options) // read the directory recursive
        .filter(item -> {
            return !item.isDirectory(); // only interested in files,not dirs
        })
        .subscribe(item -> {
            DataLakeFileAsyncClient c = asyncfsclient.getFileAsyncClient(item.getName());
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            c.read().doOnNext(piece -> {
                 try {
                     out.write(piece.array());
                 } catch (IOException ex) {
                     System.out.print(ex.getMessage());
                 }
            }).doOnComplete(() -> {
                filequeue.put(Tuples.of(out.toByteArray(),item));
            })
            .subscribe(null,e -> System.out.println("ERROR reading file: " + e));
        },e -> System.out.println("ERROR listing dir: " + e));

只要队列具有条目或filenamescanner.isDisposed()== false,轮询过程就会读取数据。

如您所见,我目前缺乏经验,这是我想到的最好的代码。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...