具有Redux-observable和RxJS的非严格序列

问题描述

我的应用程序有一个带有微调框的模态,只要发生长时间的阻止动作,该模态就会显示。 这些长时间阻止操作中有几种,每个操作都有一个标记其开始和结束的操作。

鉴于“动作流”,无论何时分派开始动作之一,我都希望分派showWaitingIndication动作,直到分派相应的结束动作,然后分派hideWaitingIndication。如果调度了另一个开始动作,然后在进行第一个阻止动作时调度了其相应的结束动作,则不应再次调用showWaitingIndicationhideWaitingIndication。动作仍处于活动状态时,也不应调度hideWaitingIndication

基本上,这个想法是,只要阻止动作处于活动状态,等待指示就不会隐藏。

例如

StartA-> dispatch(showWaitingIndication)->其他事件-> endA-> dispatch(hideWaitingIndication

StartA-> dispatch(showWaitingIndication)-> startB-> endB(不应称为hide)-> endA-> dispatch( hideWaitingIndication

StartA-> dispatch(showWaitingIndication)-> startB-> endA(不应称为hide!)-> endB-> dispatch({{ 1}})

我正在努力解决如何使用流实现这一点(我坚信这很适合解决此问题)。

到目前为止,我已经提出了类似的方法(无效)

hideWaitingIndication

这样做的正确方法是什么?

解决方法

这是一个非常有趣的问题!

我认为您可以尝试以下方法:

const showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA","startB": "endB"}

actions$.pipe(
  windowWhen(() => actions$.pipe(filter(action => action.type === hideWaitingIndication))),mergeMap(
    window => window.pipe(
      mergeMap(
        action => someAsyncCall().pipe(
          mapTo(showHideActionPairs[action]),startWith(showHideActionPairs[action])
        )
      ),scan((acc,crtEndAction) => {
        // first time receiving this end action -> the beginning of the async call
        if (!(crtEndAction in acc)) {
          acc[crtEndAction] = true;

          return acc;
        }

        // if the `crtEndAction` exists,it means that the async call has finished
        const {[crtEndAction]: _,...rest} = acc;

        return rest;
      },Object.create(null)),filter(obj => Object.keys(obj).length === 0),mapTo(hideWaitingIndication),// a new window marks the beginning of the modal
      startWith(showWaitingIndication),)
  )
)

我的第一个想法是,我需要找到一种方法来表示一个事件链,这样该链开始于showWaitingIndication并以hideWaitingIndication。链的末尾实际上是由最后完成的异步调用(end{N})指示的。因此,我认为这对于windowWhen是一个很好的用例。

但是window是什么?窗口是nothing more than a Subject

/* ... */
const window = this.window = new Subject<T>();
this.destination.next(window);
/* ... */

windowWhen(() => closeNotifier)的工作方式是它将发送Subject(a window)作为next值(这就是我们拥有mergeMap(window => ...)的原因)并且它将通过它推送值(例如动作)。我们正在window.pipe(...)中访问这些值。当closeNotifier发出时,当前的windowcomplete并创建并传递一个新的window,以便随后的动作将通过它发送。值得注意的是,默认情况下会创建一个窗口when the stream is subscribed

constructor(protected destination: Subscriber<Observable<T>>,private closingSelector: () => Observable<any>) {
  super(destination);
  this.openWindow(); // !
}

假设我们正在当前窗口中收到第一个操作。

mergeMap(
  action => someAsyncCall().pipe(
    mapTo(showHideActionPairs[action]),startWith(showHideActionPairs[action])
  )
),

一旦操作被拦截,我们将发送其期望的最终值,以便可以将其存储在scan的累加器中。当该动作的异步调用完成时,它将再次发送该结束值,以便可以将其从累加器中删除。
这样,我们可以确定窗口的寿命,当累加器中没有最终值时,窗口将关闭。

发生这种情况

filter(obj => Object.keys(obj).length === 0),

我们确保通知所有动作均已完成其任务。

,

我接受了安德烈(Andrei)的回答,因为他要向我指出正确的方向,而他涉及windowWhenaccumulator的解决方案是解决此问题的正确思路。为了完整起见,我也基于他发布了自己的解决方案,因为我觉得这里的逻辑更明确(而且个人更容易在寻找解决方案时动脑筋):

let showHideActionPairs = getShowHideActionPairs();
const relevantActionsTypesArray = Object.keys(showHideActionPairs).concat(Object.values(showHideActionPairs));

actions$ => actions$.pipe(
        // close the "window" when a hide action is received
        windowWhen(() => actions$.pipe(ofType(waitingIndicationHideActionName),)),mergeMap(
            window => window.pipe(
                // filter to only look at start/end actions
                ofType.apply(null,relevantActionsTypesArray),scan((accumulator,action) => {
                    let waitingForEndAction  = "startAction" in accumulator;
                    // first time we see a start action
                    if (!waitingForEndAction && action.type in showHideActionPairs) {
                        accumulator.startAction = action.type;
                        accumulator.actionable = true;
                    // found the right end action
                    } else if (waitingForEndAction && action.type === showHideActionPairs[accumulator.startAction]) {
                        accumulator.endAction = action.type;
                        accumulator.actionable = true;
                    // any other case is not actionable (will not translate to to an action)
                    }  else {
                        accumulator.actionable = false;
                    }
                    return accumulator;
                },{}),// accumulator spits out stuff for every action but we only care about the actionables
                filter(obj => obj.actionable),map(obj => {
                    if (obj.endAction){
                        return waitingIndicationHideAction
                    } else if (obj.startAction) {
                        return waitingIndicationShowAction
                    }
                }),)
        )
    )

};