如何基于计算元素属性有效地将单个输入通量分成多个输出通量?

问题描述

我们提供了一些包含所有事件的Flux<Event>代码。客户群 然后为这些事件的子集请求Flux<Event>

代码的作用类似于:

// Note: Kotlin code,but this question is not Kotlin-specific

/**
 * All incoming events
 */
private val allEvents: Flux<Event> = ...

/**
 * Returns an flux of the events with the matching key.
 */
fun eventsForKey(key: String): Flux<Event> {
    return allEvents.filter { event ->
        event.key == key
    }
}

所以我们有allEvents,其中包含所有传入事件,并且 eventsForKey函数调用(可能多次)以创建一个Flux<Event>个具有指定键的事件。有潜在的 这些经过过滤的Flux个实例同时存在很多。

我担心的是,这实际上是在进行线性搜索, “子通量”将每个事件传递给。也就是说,如果存在 n 次通量 实例在给定时刻还活着,并且单个事件到来,事件将 针对所有 n 过滤谓词进行测试。

我想要的是可以让我指定输入通量和键的东西 函数,然后(重复)获得任何给定键的输出通量 值。每个子通量的行为就像上面过滤的一样,但是 而不是为每个事件执行 n 谓词检查,而是每个事件 进行一次关键计算并为传出文件进行单个字典查找 助焊剂与现有子通量不匹配的事件应被丢弃, 就像使用过滤器一样。

我找到了Flux.groupBy(也是this related question的公认答案),但是:

  1. 它的返回类型是笨拙的Flux<GroupedFlux<K,T>>

    • 我不希望当一个子通量出现时 第一个事件出现。我需要能够获得给定键上的通量 需求,这可能在与该键匹配的任何事件发生之前 到了。

    • 我也不想与没有下游消费者的群体打交道 要求。与关键下游消费者不匹配的事件有 要求的应该被过滤掉。

  2. 其文档说明:

    请注意,groupBy在组基数较低时效果最佳,因此选择 您的keyMapper函数相应。

    我不确定“基数较低”是否意味着每个“组”都需要 很小,或者如果组的数量需要很小。 (而且我不知道是什么 在这种情况下,“小”表示。)我正在专门尝试处理 Sub-Flux实例数量可能很大的情况。

Reactor是否提供一种有效地多路分解这样的通量的方法

解决方法

您的问题对我来说听起来很有趣,我正在研究这个问题。这种解决方案可能并不完美;但我只是想分享!


您的要求听起来像您需要一些状态谓词来过滤事件,然后再进行子融合,以避免每个订阅者自己进行过滤! 在这种情况下,我们需要在某个地方维护一个列表/集合,以保存允许事件的列表。 [在我的示例中,我假设我有一个字符串流,并且第一个字符是事件。根据您在问题中包含的其他答案]

// map for char and the corresponding flux
private static final Map<Character,Flux<String>> CHAR_FLUX = new HashMap<>();

// allowed chars. empty initially
private static final List<Character> ALLOWED_CHARS = new ArrayList<>();

// stateful predicate
private static final Predicate<Character> IS_ALLOWED = c -> {
    System.out.println("IS_ALLOWED check : " + c);
    return ALLOWED_CHARS.contains(c);
};


Flux<GroupedFlux<Character,String>> groupedFluxFlux = Flux.just("a1","b1","c1","a2","b2","c2","a3","b3","c3","a4","b4","c4","a1","c4")
        .delayElements(Duration.ofMillis(1000))
        .filter(s -> IS_ALLOWED.test(s.charAt(0)))  // check if it is allowed
        .groupBy(s -> s.charAt(0))                  // group by starts only for the allowed keys
        .cache();

groupBy返回单播处理器,该广播只能由一个订户使用。对于您的情况,如果您期望同一密钥有多个订阅者,那么我们需要此映射。否则不需要。

您的eventsForKey方法将其添加到列表/集合后,将从地图中返回键值。

// here the filter is just 1 filter for 1 subscriber. does not filter for every event
ALLOWED_CHARS.add('a');
return CHAR_FLUX.computeIfAbsent('a',k -> Flux.defer(() -> groupedFluxFlux.filter(gf -> gf.key() == 'a').flatMap(Function.identity())).cache());

假设:

您的事件集(基数)有限。否则,列表/地图可能会增长,并且groupedFlux的效果也可能不会很好。

,

要正确执行此操作,可能比我本人熟悉对核心反应堆框架有更好的了解,但似乎您希望一个Subscriber驱动一个Publishers和多个HashMap }。装饰好的Subscriber在概念上应该很容易:

class DeMuxedSubscriber<T> implements Subscriber<T> {
    Map<T,SimplePublisher<T>> mapPublishers = new HashMap<>();

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T s) {
        if ( mapPublishers.get(s) != null) 
            mapPublishers.get(s).subscriber.onNext(s);
    }
    @Override
    public void onError(Throwable t) {
        mapPublishers.values().forEach(sp->sp.subscriber.onError(t));
    }

    @Override
    public void onComplete() {
        mapPublishers.values().forEach(sp->sp.subscriber.onComplete());
    }

    public Publisher<T> getPublisher(T s) {
        mapPublishers.putIfAbsent(s,new SimplePublisher<T>());
        return mapPublishers.get(s);
    }
};

在某个地方可能有一个类可以很好地充当发布者,但这足以说明问题:

class SimplePublisher<T> implements Publisher<T> {
    Subscriber<? super T> subscriber;
    
    @Override
    public void subscribe(Subscriber<? super T> s) {
        subscriber = s;
    }

}

然后您可以举一个简单的示例来使用它。这一切似乎有点尴尬,这里显示的示例DeMuxedSubscriber忽略了背压,但是,详细信息,

Flux<String> wordFlux = Flux.generate(() -> new Integer(0),(i,sink) -> {
    if (i >= 100)
        sink.complete();
    i = i + 1;
    sink.next(Integer.toString(largestPrimeFactor(i)));
    return i;
});
DeMuxedSubscriber<String> deMuxedSubscriber = new DeMuxedSubscriber<>();  
Flux.from(deMuxedSubscriber.getPublisher("3")).subscribe(System.out::println);
Flux.from(deMuxedSubscriber.getPublisher("5")).subscribe(System.out::println);
wordFlux.subscribe(deMuxedSubscriber);