如何使用RxJ限制Angular中的并发API请求

问题描述

我有一个Angular 7应用程序,可让您多次上传文件。对于选择的每个文件,它都会向服务器发出请求。这不是很好,因为对于我的后端服务器而言,打开数百个并发调用似乎是一个潜在的问题。

我想做的是限制我的应用程序可以发出的并发请求的数量我有一个通用的API类,除了文件上传之外,我还想用它来在应用程序范围内进行限制,而不是需要文件上传组件本身来管理它。

RxJx有时会让我感到困惑,但是我很确定这是可能的。

class ApiService {

    get(path: string,params: any = {}): Observable<any> {
        return this.http.get(path`,{ params: params });
    }
    
    uploadFile(path: string,body: any = {}): Observable<any> {
        ...code for preparing file here...
        return this.http.post(path,body);
    }
    
}

class FileUploader {

    // called many times-- once for each file
    uploadFile(file) {
        this.apiService.uploadFile(path,body: file).subscribe(response => {
             // use response here
        })
    }
}

我想像的是,在api类中,我可以添加到使用最大并发性或诸如此类的队列中并等待进行调用,直到有空间为止,而不是立即在fileUpload或get函数中执行http调用。 。但是由于我已经立即在File Uploader类中进行了订阅,因此我不确定该怎么做。

解决方法

您可以使用SubjectmergeMap运算符

interface FileUpload {
   path: string;
   body: file;
}

export class UploadService {
  private readonly CONCURRENT_UPLOADS = 2;
  private uploadQ = new Subject<FileUpload>();

  constructor(private api: ApiService) {
    this.uploadQ.asObservable().pipe(
      mergeMap(fu => this.api.uploadFile(fu.path,fu.body)).pipe(
        // must catch error here otherwise the subscriber will fail
        // and will stop serving the Q
        catchError(err => {
          console.error('Caught error ',err);
          return of(err);
        })),this.CONCURRENT_UPLOADS),).subscribe((res: WhateverResultYouGet) => {
         // process result  
      },err => {
        // something went wrong
      });
  }

  // this is your original signature of the method but where do you get path,actually?
  /**
  * Push the file to upload Q
  */
  uploadFile(file) {
    this.uploadQ.next({path,body: file});
  }
}

您无需立即启动上传,而只是将上传推到队列中。队列由构造函数中的预订服务,使用mergeMap运算符,您可以在其中实际指定并发性。