Java - 收集消费者?

问题描述

有没有一种功能性的方法可以先收集流的元素,然后立即将集合传递给消费者?换句话说,一种在将所有流元素作为集合而不是一个一个地应用终止操作之前等待流结束的构造?

例如,您能否将以下内容实现为单行:

Stream<Event> stream = // a stream of events
List<Event> list = stream.collect(Collectors.toList());
doProcessEvents(list);

作为一种解决方法,我可以(ab)将 Collectors.collectingAndThen()Function 一起使用来实现我正在寻找的结果:

Function<List<Event>,Void> processingFunction = (list -> {
    doProcessEvents(list);
    return null;
});

Stream<Event> stream = // a stream of events
stream.collect(Collectors.collectingAndThen(Collectors.toList(),processingFunction);

我考虑过的其他替代方案(但不起作用)是,如果 collectingAndThen() 方法一个 Consumer 作为第二个参数,例如Collectors.collectingAndThen(Collector downstream,Consumer consumer) 或者 Consumer 接口有一个 finish() 方法,该方法在流中的最后一个元素被消费后执行

class EventConsumer implements Consumer<Event> {
    private final List<Event> list = new LinkedList<>();

    @Override
    public void accept(Event ev) {
        events.add(ev);
    }

    public void finish() {
        doProcessEvents(events);
    }
}

// usage
Stream<Event> stream = // a stream of events
stream.forEach(new EventConsumer());

这种方法的问题是事件会保存在内部列表中,但不会调用 finish() 方法。它只需稍作修改即可工作,但仍然没有单线:

Stream<Event> stream = // a stream of events
EventConsumer consumer = new EventConsumer()
stream.forEach(consumer);
consumer.finish();

解决方法

有没有一种函数式方法可以先收集流的元素,然后立即将集合传递给消费者?

直接调用Consumer.accept(T t)方法:

Stream<Event> stream = ...
Consumer<List<Event>> consumer = ...

consumer.accept(stream.collect(Collectors.toList()));
,

最后,我决定采用不同的方法。我没有试图将收集者和消费者硬塞在一起,而是创建了一个界面

interface StreamConsumer<T> {
   void consume(Stream<T> stream);
}

并将代码重构为


void processEvents(StreamConsumer<Events> streamConsumer,Stream<Event> events) {
    streamConsumer.consume(events);
}

这样做的目的是什么?现在,我可以实现不同类型的消费者,一些依赖于 List<Event>,另一些可以消费 Stream<Event>,例如

class ListEventConsumer implements StreamConsumer<Event> {

    private ListProcessor<Event> listProcessor;

    ListEventConsumer(ListProcessor<Event> listProcessor) {
       this.listProcessor = listProcessor;
    }

    @Override
    void consume(Stream<Event> events) {
        List<Event> list = events.collect(Collectors.toList());
        listProcess.process(list);
    }
}

以及标准的 Java Consumer 实现,例如

class FunctionalEventConsumer implements StreamConsumer<Event> {

    private Consumer<Event> consumer;

    FunctionalEventConsumer(Consumer<Event> consumer) {
        this.consumer = consumer;
    }

    @Override
    void consume(Stream<Event> events) {
        events.forEach(consumer);
    }
}

现在,我有一种方法可以将 Event 作为 Stream<Event> 使用,并首先在 ListEventConsumer 类中收集它们,该类将它们委托给 ListProcessor<Event>。使用这种方法,委托事件处理的类不需要知道使用的是哪种StreamConsumer<Event>

// either a ListEventConsumer or a FunctionalConsumer
StreamConsumer<Event> streamConsumer = // ...
Stream<Event> events = // ...

processEvents(streamConsumer,events);