问题描述
用例:
有一个模块,用于在同步模式下侦听事件。在使用EmitterProccessor
的同一模块中,事件被转换为Flux
并成为无限事件流。现在有一个上游模块可以预订这些事件流。这里的问题是如何动态地将这些流合并为一个流,然后订阅单个流。一个简单的例子是,我们说有N个传感器,我们可以动态注册这些传感器,并在将它们合并为一个流之后,将其作为单个数据流中的数据流开始监听。这是编写的模拟此行为的代码示例。
创建回调并开始监听事件
public interface CallBack {
void callBack(int name);
void done();
}
@Slf4j
@requiredArgsConstructor
public class CallBackService {
private CallBack callBack;
private final Function<Integer,Integer> func;
public void register(CallBack intf) {
this.callBack = intf;
}
public void startServer() {
log.info("Callback started..");
IntStream.range(0,10).forEach(i -> {
callBack.callBack(func.apply(i));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printstacktrace();
}
});
log.info("Callback finished..");
callBack.done();
}
}
使用事件处理程序将事件转换为流
@Slf4j
public class EmitterService implements CallBack {
private EmitterProcessor<Integer> emitterProcessor;
public EmitterService(){
emitterProcessor = EmitterProcessor.create();
}
public EmitterProcessor<Integer> getemmitor() {
return emitterProcessor;
}
@Override
public void callBack(int name) {
log.info("callbakc {} invoked",name);
//fluxSink.next(name);
emitterProcessor.onNext(name);
}
public void done() {
//fluxSink.complete();
emitterProcessor.onComplete();
}
}
public class WrapperService {
EmitterService service1;
ExecutorService service2;
public Flux<Integer> startService(Function<Integer,Integer> func) {
CallBackService service = new CallBackService(func);
service1 = new EmitterService();
service.register(service1);
service2 = Executors.newSingleThreadExecutor();
service2.submit(service::startServer);
return service1.getemmitor();
}
public void shutDown() {
service1.getemmitor().onComplete();
service2.shutdown();
}
}
订阅活动
@Slf4j
public class MainService {
public static void main(String[] args) throws InterruptedException {
TopicProcessor<Integer> stealer = TopicProcessor.<Integer>builder().share(true).build();
CountDownLatch latch = new CountDownLatch(20);
WrapperService n1 =new WrapperService();
WrapperService n2 =new WrapperService();
// n1.startService(i->i).mergeWith(n2.startService(i->i*2)).subscribe(stealer);
n1.startService(i->i).subscribe(stealer);
n2.startService(i->i*2).subscribe(stealer);
stealer.subscribeOn(Schedulers.boundedElastic())
.subscribe(x->{
log.info("Stole=>{}",x);
latch.countDown();
log.info("Latch count=>{}",latch.getCount());
});
latch.await();
n1.shutDown();
n2.shutDown();
stealer.shutdown();
}
}
尝试使用TopicProccessor
失败。在上面的代码中,第一个来源发生订阅,第二个来源没有订阅。但是,如果使用n1.startService(i->i).mergeWith(n2.startService(i->i*2)).subscribe(stealer);
订阅有效,但是在这种情况下没有动态行为。每次需要更换用户。非常感谢您的帮助。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)