如何在Loopback 4中实现Rabbit MQ或Bull JS

问题描述

我正在尝试实施后台服务,以减少API调用负载后台任务将运行,以将文件上传到S3并使用nodemailer发送电子邮件

解决方法

按照回送4的设计模式,您可以创建服务提供程序并将其注入您的服务或控制器中。 这是Bull.js的一个非常基本的示例:

import Bull,{ Queue,Job } from "bull";
import {Provider,service} from "@loopback/core";
import {get} from "@loopback/rest";

export function audioProcessor(job: Job) {
  console.log(
    `Processing audio file: ${job.data.filename}`,`Audio bitrate: ${job.data.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<Queue> {
  async value() {
    const queue = new Bull("AudioQueue",{
      redis: { port: 6379,host: "127.0.0.1" }
    });

    queue.process(audioProcessor);
    
    return queue;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public queue: Queue;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.queue.add(
      {
        filename: 'process_me.wav',bitrate: 320,}
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}

RabbitMQ实现应类似(未经测试):

import { Provider,service } from "@loopback/core";
import {get} from "@loopback/rest";
import amqp from "amqplib/callback_api";

export function audioProcessor(msg: any) {
  console.log(
    `Processing audio file: ${msg.content.filename}`,`Audio bitrate: ${msg.content.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<any> {
  async value() {
    const CONN_URL = "amqp://localhost";
    let ch = null;
    const channelName = 'AudioQueue';
    
    amqp.connect(CONN_URL,function (err,conn) {
      conn.createChannel(function (err,channel) {
        ch = channel;
      });
    });

    ch.assertQueue(channelName,{
      durable: false
    });

    ch.consume(channelName,audioProcessor,{
      noAck: true
    });
    
    return ch;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public channel: any;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.channel.sendToQueue(
      'AudioQueue',{
        filename: 'process_me.wav',}
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...