问题描述
我有一个Angular 7应用程序,可让您多次上传文件。对于选择的每个文件,它都会向服务器发出请求。这不是很好,因为对于我的后端服务器而言,打开数百个并发调用似乎是一个潜在的问题。
我想做的是限制我的应用程序可以发出的并发请求的数量。我有一个通用的API类,除了文件上传之外,我还想用它来在应用程序范围内进行限制,而不是需要文件上传组件本身来管理它。
RxJx有时会让我感到困惑,但是我很确定这是可能的。
class ApiService {
get(path: string,params: any = {}): Observable<any> {
return this.http.get(path`,{ params: params });
}
uploadFile(path: string,body: any = {}): Observable<any> {
...code for preparing file here...
return this.http.post(path,body);
}
}
class FileUploader {
// called many times-- once for each file
uploadFile(file) {
this.apiService.uploadFile(path,body: file).subscribe(response => {
// use response here
})
}
}
我想像的是,在api类中,我可以添加到使用最大并发性或诸如此类的队列中并等待进行调用,直到有空间为止,而不是立即在fileUpload或get函数中执行http调用。 。但是由于我已经立即在File Uploader类中进行了订阅,因此我不确定该怎么做。
解决方法
您可以使用Subject
和mergeMap
运算符
interface FileUpload {
path: string;
body: file;
}
export class UploadService {
private readonly CONCURRENT_UPLOADS = 2;
private uploadQ = new Subject<FileUpload>();
constructor(private api: ApiService) {
this.uploadQ.asObservable().pipe(
mergeMap(fu => this.api.uploadFile(fu.path,fu.body)).pipe(
// must catch error here otherwise the subscriber will fail
// and will stop serving the Q
catchError(err => {
console.error('Caught error ',err);
return of(err);
})),this.CONCURRENT_UPLOADS),).subscribe((res: WhateverResultYouGet) => {
// process result
},err => {
// something went wrong
});
}
// this is your original signature of the method but where do you get path,actually?
/**
* Push the file to upload Q
*/
uploadFile(file) {
this.uploadQ.next({path,body: file});
}
}
您无需立即启动上传,而只是将上传推到队列中。队列由构造函数中的预订服务,使用mergeMap
运算符,您可以在其中实际指定并发性。