动态合并无限反应堆流

问题描述

用例:

一个模块,用于在同步模式下侦听事件。在使用EmitterProccessor的同一模块中,事件被转换为Flux并成为无限事件流。现在有一个上游模块可以预订这些事件流。这里的问题是如何动态地将这些流合并为一个流,然后订阅单个流。一个简单的例子是,我们说有N个传感器,我们可以动态注册这些传感器,并在将它们合并为一个流之后,将其作为单个数据流中的数据流开始监听。这是编写的模拟此行为的代码示例。

创建回调并开始监听事件

public interface CallBack {

    void callBack(int name);

    void done();
}

@Slf4j
@requiredArgsConstructor
public class CallBackService {

    private CallBack callBack;
    private final Function<Integer,Integer> func;

    public void register(CallBack intf) {
        this.callBack = intf;
    }


    public void startServer() {
        log.info("Callback started..");
        IntStream.range(0,10).forEach(i -> {
            callBack.callBack(func.apply(i));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printstacktrace();
            }
        });
        log.info("Callback finished..");
        callBack.done();
    }
}

使用事件处理程序将事件转换为流

@Slf4j
public class EmitterService implements CallBack {

    private EmitterProcessor<Integer> emitterProcessor;

    public EmitterService(){
        emitterProcessor = EmitterProcessor.create();
    }

    public EmitterProcessor<Integer> getemmitor() {
        return emitterProcessor;
    }


    @Override
    public void callBack(int name) {
        log.info("callbakc {} invoked",name);
        //fluxSink.next(name);
        emitterProcessor.onNext(name);
    }

    public void done() {
        //fluxSink.complete();
        emitterProcessor.onComplete();
    }
}


public class WrapperService {

    EmitterService service1;
    ExecutorService service2;

    public Flux<Integer> startService(Function<Integer,Integer> func) {
        CallBackService service = new CallBackService(func);
        service1 = new EmitterService();
        service.register(service1);
        service2 = Executors.newSingleThreadExecutor();
        service2.submit(service::startServer);
        return service1.getemmitor();
    }

    public void shutDown() {
        service1.getemmitor().onComplete();
        service2.shutdown();
    }
}

订阅活动

@Slf4j
public class MainService {


    public static void main(String[] args) throws InterruptedException {
        TopicProcessor<Integer> stealer = TopicProcessor.<Integer>builder().share(true).build();
        CountDownLatch latch = new CountDownLatch(20);
        WrapperService n1 =new WrapperService();
        WrapperService n2 =new WrapperService();
//        n1.startService(i->i).mergeWith(n2.startService(i->i*2)).subscribe(stealer);
        n1.startService(i->i).subscribe(stealer);
        n2.startService(i->i*2).subscribe(stealer);
        stealer.subscribeOn(Schedulers.boundedElastic())
                .subscribe(x->{
                    log.info("Stole=>{}",x);
                    latch.countDown();
                    log.info("Latch count=>{}",latch.getCount());
                });

        latch.await();
        n1.shutDown();
        n2.shutDown();
        stealer.shutdown();
    }


}

尝试使用TopicProccessor失败。在上面的代码中,第一个来源发生订阅,第二个来源没有订阅。但是,如果使用n1.startService(i->i).mergeWith(n2.startService(i->i*2)).subscribe(stealer);订阅有效,但是在这种情况下没有动态行为。每次需要更换用户。非常感谢您的帮助。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)