React Actions 使用 RxJS 进行并行异步调用

问题描述

我有一个现有操作,该操作使用 ID 列表作为查询字符串参数调用 api,并获取传递的 ID 的缩略图响应。在某些情况下,ID 的数量可以是 150-200 条记录。因此,我正在使用 RxJs 的 forkJoinsubscribe 方法使此 API 调用基于批处理并并行运行。我在实施方面面临以下 3 个问题。

  1. 我能够在下一个订阅方法中获得基于批处理的响应。但是,在此函数调用的操作以及响应并未被调用

  2. maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次性执行的,而不是设置为常量的 3 个批次/

  3. 如何处理 observable 的返回,以便 mergeMap 不给出语法错误Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction,index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)

这是我当前的代码。任何帮助都会很好。

const getThumbnails: Epic<IAction,IAction,IStoreState> = (action$,state$) =>
    action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
            
            const assetIds = Object.keys(payload);
            const Meta = payload;

            const batchSize = 10;
            const maxParallelQueries = 3;
            
            const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

            for (let count = 0; count < assetIds.length; count += batchSize) {
                fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count,count + batchSize)));
            }

             forkJoin(fetchThumbnailObservables)
            .pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable,maxParallelQueries)) // Issue 1 :  maxParallelQueries limit is not working
            .subscribe({ 
                complete: () => { },error: () => { },next: (response) => { 
                     // Issue 2 :  below action dont seems to be getting invoked
                    actions.AssetActions.receivedThumbnailsAction(response,Meta) 
                },});
    
            // Issue 3 :  How to handle return of observable 
            // Added below code for prevennting error raised by mergeMap( ({ payload }) => {
            return new Observable<any>();
        }),catchError((error) => of(actions.Common.errorOccured({ error })))
    );
    ```

解决方法

我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。

让我们从这样一个事实开始:您有一个 id 数组 assetIds,并且您希望以受控的并发级别 maxParallelQueries 为每个 id 进行 http 调用。

在这种情况下,from 函数和 mergeMap 运算符是您的朋友。

你需要做的是

// from function transform an array of values,in this case,to a stream of values
from(assetIds).pipe(
  mergeMap(id => {
    // do whatever you need to do and eventually return an Observable
    return AssetDataService.fetchThumbnailData(id)
  },maxParallelQueries) // the second parameter of mergeMap is the concurrency
)

通过这种方式,您应该能够调用 http 调用并保持您已建立的并发限制

,
  1. 我能够在下一个订阅方法中获得基于批处理的响应。但是,在此函数中调用的操作以及响应并未被调用。

你不能在 redux-observable 中强制调用一个动作。相反,您需要对它们进行排队。 actions.AssetActions.receivedThumbnailsAction(response,meta) 将返回形状为 {type: "MY_ACTION",payload: ...} 的普通对象,而不是分派动作。您宁愿返回包装在 mergeMap 内的 observable 中的对象,以便将下一个操作添加到队列中。 (见下面的解决方案)

  1. maxParallelQueries 似乎没有按预期工作。所有 fetchThumbnailObservables() 都立即执行,而不是按设置为常量的 3 个批次/

第一猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count,count + batchSize)) 的返回类型是什么?如果它是 Promise 类型,那么 maxParallelQueries 无效,因为承诺是急切的并且在创建时立即开始,因此在 mergeMap 有机会控制请求之前。因此,请确保 fetchThumbnailData 返回一个可观察对象。如果您需要将 Promise 转换为 Observable 以确保 Promise 不会在 defer 内过早地触发,请使用 from 而不是 fetchThumbnailData

第二个猜测:forkJoin 正在触发所有请求,因此 mergeMap 甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告您 mergeMap(fetchThumbnailObservable => fetchThumbnailObservable,...) 中的返回类型不正确。我下面的解决方案将 forJoin 替换为 from,并且应该启用您的 maxParallelQueries

  1. 如何处理 observable 的返回,以便 mergeMap 不给出语法错误。 '({ payload }: IAction) => void' 类型的参数不可分配给类型 '(value: IAction,index: number) => ObservableInput' 的参数。类型 'void' 不可分配给类型 'ObservableInput'.ts(2345)

这不是语法错误。在史诗中,您可以选择返回一个空的 observable return empty(),如果没有后续动作,或者通过返回 IAction 类型的 observable 来分派另一个动作。在这里,我(未经测试)尝试满足您的要求:

const getThumbnails: Epic<IAction,IAction,IStoreState> = (action$,state$) =>
  action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
    mergeMap( ({ payload }) => {
            
      const assetIds = Object.keys(payload);
      const meta = payload;

      const batchSize = 10;
      const maxParallelQueries = 3;
            
      const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

      for (let count = 0; count < assetIds.length; count += batchSize) {
        fetchThumbnailObservables.push(
          AssetDataService.fetchThumbnailData(
             assetIds.slice(count,count + batchSize)
          )
        );
      }

      return from(fetchThumbnailObservables)
        .pipe(
          mergeMap(
            fetchThumbnailObservable => fetchThumbnailObservable,maxParallelQueries
          ),map((response) =>
            // dispatch action for each response
            actions.AssetActions.receivedThumbnailsAction(response,meta)
          ),// catch error early so epic continue on error
          catchError((error) => of(actions.Common.errorOccured({ error })))
        );
    })
  );

请注意,我将 catchError 移到更靠近请求的位置(在内部 forkJoin Observable 内),以便在发生错误时史诗不会终止。 Error Handling - redux observable 之所以如此,是因为当 catchError 处于活动状态时,它会用 catchError 返回的可观察对象替换失败的可观察对象。而且我假设您不想在出现错误时替换整个史诗?

还有一件事:redux-observable 为您处理订阅,因此无需在您的应用程序中的任何位置调用 .subscribe.unsubscribe。你只需要关心 observables 的生命周期。这里最重要的是要知道一个 observable 什么时候开始,当它完成时会发生什么,它什么时候完成或者当一个 observable 抛出时会发生什么。