Nestjs Schedule 依次执行队列

问题描述

我有两种从 json/csv 导入产品和库存的方法。我已经实现了用于排队作业的 nestJS Bull 模块。两个导入进程都在异步运行并且工作正常。但现在我想先完全处理产品导入队列,然后只处理库存导入队列。

jobs.module.ts

BullModule.registerQueueAsync({
  name: 'default',inject: [ConfigService],useFactory: async (
    configService: ConfigService,): Promise<BullModuleOptions> => ({
    redis: {
      ...
    },defaultJobOptions: {
      attempts: 1,},}),})

jobs.service.ts

constructor(@InjectQueue('default') private defaultQueue: Queue) { }

@Cron(CronExpression.EVERY_30_MINUTES)
async queueProductUpdateJob() {
    await this.defaultQueue.add('productUpdate');
}

@Cron(CronExpression.EVERY_30_MINUTES)
async queueInventoryUpdateJob() {
    await this.defaultQueue.add('inventoryUpdate');
}

jobs.processor.ts

@Process('productUpdate')
async productUpdate(job: Job<unkNown>){
  console.log('data: ',JSON.stringify(job.data));
  await this.updateProductService.importProducts(job.data);
  return {};
}

@Process('inventoryUpdate')
async inventoryUpdate(job: Job<unkNown>){
  console.log('data: ',JSON.stringify(job.data));
  await this.updateInventoryService.importInventory(job.data);
  return {};
}

我们如何才能先完全处理 queueProductUpdateJob 作业,然后才处理 queueInventoryUpdateJob

解决方法

我想如果你像这样改变jobs.service.ts就可以了:(因为他们都是每30分钟一次玉米)

jobs.service.ts

constructor(@InjectQueue('default') private defaultQueue: Queue) { }

@Cron(CronExpression.EVERY_30_MINUTES)
async queueProductInventoryUpdateJob() {
    await this.defaultQueue.add('productUpdate');
    await this.defaultQueue.add('inventoryUpdate');
}

或者你可以使用“import { queue } from 'async';” (如果想告诉我解释更多) 它将排成队列并依次执行任务