Puppeteer-Cluster不使用nodeJS worker

问题描述

我在nodeJS的多工作程序模式下使用puppeteer-cluster,由于某种原因,只有一个工作程序正在打开我定义的并发浏览器的数量。另一个被忽略。我在做什么错了?

基本上,我为每个工作程序启动一个具有2个浏览器并发性的集群,因此我希望它打开工作程序数(如我定义的cpus数)*每个工作程序2个浏览器,但实际上仅打开了两个浏览器。

例如: 假设我有8个内核,所以我要启动8个工作程序,每个工作程序将启动两个puppeteer浏览器,总共16个。相反,在无头调试时:true模式,我只能打开两个浏览器。

单位: “ puppeteer”:“ ^ 5.2.1”, “ puppeteer-cluster”:“ ^ 0.22.0”

import {Cluster} from 'puppeteer-cluster';
import {ReportTimeouts} from "../../config/reports.consts";
import {isDebug} from "../../utils/env.utils";
import {IPuppeteerClusterService} from "../../interfaces/services.interfaces";

export default class PuppeteerClusterService implements IPuppeteerClusterService {

    private cluster;

    public getCluster() {
        if (!this.cluster) {
            throw new Error(`PuppeteerClusterService.getCluster: init didn't run`);
        }
        return this.cluster;
    }

    public async init() {
        const args = [
            '--no-first-run','--no-zygote','--no-sandBox','--disable-extensions','--disable-setuid-sandBox','--disable-dev-shm-usage','--ignore-certificate-errors',"--proxy-server='direct://'",'--proxy-bypass-list=*','--lang=en-US,en'];
        const debugMode = isDebug;
        const headless = !debugMode;
        const concurrency = Cluster.CONCURRENCY_broWSER;
        const maxConcurrency = 2;
        const cluster = await Cluster.launch({
            concurrency,maxConcurrency,puppeteerOptions: {
                headless,dumpio: debugMode,handleSIGTERM: true,handleSIGINT: true,args
            },monitor: false,// turn this on to get cpu / memory usages
            timeout: ReportTimeouts.PuppeteerClusterTimeout,});
        console.log(`PuppeteerClusterService.init: initialized puppeteer cluster with concurrency type ${concurrency} and max concurrency of ${maxConcurrency}`);
        console.log('PuppeteerClusterService.init: running headless?: ',headless);
        this.cluster = cluster;
    }
}
import './src/services/monitoring/tracer';
import {config} from 'dotenv';
import * as process from 'process';
import * as http from 'http';
import * as cluster from 'cluster';
import * as os from 'os';
import App from './app';
import initORM from './src/config/sequelize_config.handler';
import routes from './src/routes';
import {DEFAULT_PORTS,REQUEST_TIMEOUT_MINUTES} from './src/config/networking.consts';
import {IServices} from "./src/interfaces/services.interfaces";
import SystemSetting from "./src/models/system_setting.model";
import Services from "./src/services";
import {ISystemSetting} from "./src/interfaces/models/system_setting.interface";
import ActiveReportSendingScheduler from "./src/logic/scheduled_tasks/active_report_sending.scheduler";
import appConfig from './src/config';
import LoggerService from './src/services/logger.service';
import DayTaggingReportSendingScheduler from './src/logic/scheduled_tasks/day_tagging_report_sending.scheduler';
import { MessageConsumingManager } from './src/logic/messaging/message_consuming.manager';
import { isDebug } from './src/utils/env.utils';

config()
const env = process.env.NODE_ENV?.toLocaleLowerCase() || 'dev';
monitorServer(env,process);

const ports = {
    http: process.env.HTTP_PORT ? Number(process.env.HTTP_PORT) : DEFAULT_PORTS.http,};

const workers = [];

function setupOrm() {
    const logger = LoggerService;

    console.info(`setupOrm: app initiating on env ${env}`);
    initORM({logger});
    console.info(`setupOrm: sequelize ORM initiated`);
    console.info('setupOrm: loading static tables into memory');
    console.info(`setupOrm: env params are ${JSON.stringify(process.env)}`);
}

const setupWorkerProcesses = (services:IServices) => {
    const numCores = os.cpus().length;
    services.logger.info('setupWorkerProcesses: master cluster setting up ' + numCores + ' workers');

    // iterate on number of cores need to be utilized by an application
    // current example will utilize all of them
    for(let i = 0; i < numCores; i++) {
        // creating workers and pushing reference in an array
        // these references can be used to receive messages from workers
        workers.push(cluster.fork());

        // to receive messages from worker process
        workers[i].on('message',function(message) {
            services.logger.info(message);
        });
    }

    // process is clustered on a core and process id is assigned
    cluster.on('online',function(worker) {
        services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' is listening');
    });

    // if any of the worker process dies then start a new one by simply forking another one
    cluster.on('exit',function(worker,code,signal) {
        services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' died with code: ' + code + ',and signal: ' + signal);
        services.logger.info('setupWorkerProcesses: starting a new worker');
        cluster.fork();
        workers.push(cluster.fork());
        // to receive messages from worker process
        workers[workers.length-1].on('message',function(message) {
            services.logger.info(message);
        });
    });
};

async function setApp(services:IServices,systemSettings: ISystemSetting[]) {
  const app = await App.init({routes,services,systemSettings,env});
  services.logger.info(`setApp: app initiated on env ${env}`);
  services.logger.info(`setApp: app initiated with config ${JSON.stringify(appConfig)}`);
  services.logger.info(`setApp: routes: ${Object.keys(routes).join(' | ')}`);
  services.logger.info(`setApp: ports: ${JSON.stringify(ports)}`);
  services.logger.info(`setApp: database connection gained`);

  const server = http.createServer(app)
    .listen(
      ports.http,async () => {
        services.logger.info(`setApp: HTTP Server successfully started at port ${ports.http}`);
      }
    );
    server.keepAliveTimeout = REQUEST_TIMEOUT_MINUTES * 60 * 1000; // Time (in ms) server will wait and keep the connection open after last response.
    server.headersTimeout = (REQUEST_TIMEOUT_MINUTES * 60 * 1000) + 1000; // https://github.com/nodejs/node/issues/27363#issuecomment-603489130
}

function setSchedulers(services:IServices,systemSettings: ISystemSetting[]) {
    const schedulers = [
        new ActiveReportSendingScheduler(services,systemSettings),new DayTaggingReportSendingScheduler(services,];
    schedulers.forEach(s=>s.init());
    services.logger.info(`setSchedulers: schedulers initiated on env ${env}`);
}

function setMessageConsumers(services:IServices,systemSettings: ISystemSetting[]){
    new MessageConsumingManager(services,systemSettings).initialize();
    services.logger.info(`setMessageConsumers: message consumers initiated on env ${env}`);
}
/**
 * Setup server either with clustering or without it
 * @param isClusterrequired
 */
const setupServer = async () => {
    console.info(`setupServer: initating app in multiprocess mode`);
    setupOrm();

    const systemSettings = await SystemSetting.findAll().then((settings) => settings.filter((s:SystemSetting) => !s.env || s.env.includes(env) || env.includes(s.env)));
    const services: IServices = new Services(systemSettings,process.env);
    await services.init();
    services.logger.info(`setupServer: initiated app in multi process mode`);

    if (cluster.isMaster) {
        setupWorkerProcesses(services);
        setSchedulers(services,systemSettings);
        setMessageConsumers(services,systemSettings);
    } else {
        await setApp(services,systemSettings);
    }

};

function monitorServer(env:string,proc: NodeJS.Process){
    if (env !== 'production'){
        console.warn(`monitorServer - not production so no monitoring.`);
        return;
    }

    if (!process.env.NEW_RELIC_KEY){
        console.warn(`monitorServer - NEW_RELIC_KEY not provided,not loading.`);
        return;
    }
    if (!process.env.APP_NAME){
        console.warn(`monitorServer - APP_NAME not provided,not loading.`);
        return;
    }

    const newRelic = require('newrelic');
    console.info(`monitorServer - newrelic loaded: ${typeof newRelic === 'object'}`);
}

setupServer();

import {
    IAnalyticsService,ICacheService,ICycleTaggingService,IEmailSendingService,IFileUploader,ILogger,IMonitoringService,IPowerBIAuthService,IPowerBIService,IPuppeteerClusterService,IReportMonitoringServiceFactory,IServices,ISiteService,IUserService
} from "../interfaces/services.interfaces";
import {ISystemSetting} from "../interfaces/models/system_setting.interface";
import CacheService from "./cache.service";
import LoggerService from './logger.service';
import {S3FileUploader} from "./persistence/s3_file.uploader";
import {EmailSendingService} from "./sendouts/email_sending.service";
import {AdminUserService} from "./external_models/user.service";
import {ModerationService} from "./sendouts/moderation.service";
import {PowerBIAuthService} from "./power_bi/power_bi_auth.service";
import {PowerBIService} from "./power_bi/power_bi.service";
import {AdminSiteService} from "./external_models/site.service";
import {CycleTaggingService} from "./cycle_tagging.service";
import MonitoringService from "./monitoring/monitoring.service";
import ReportMonitoringServiceFactory from "./monitoring/report_monitoring.service.factory";
import {AutoSendoutCalculatorFactory} from "../logic/sendout/auto_sendout_calculator.factory";
import PuppeteerClusterService from "./screenshots/puppeteer_cluster.service";
import {RedisFactory} from "./redis.factory";
import {MutexFactory} from "./mutex.factory";
import {IMutexFactory} from "../interfaces/general.interfaces";
import {AnalyticsService} from "./analytics/analytics.service";
import {DummyAnalyticsService} from "./analytics/dummy.service";
import {IAutoSendoutCalculatorFactory} from "../interfaces/sendouts.interface";

export default class Services implements IServices {
  puppeteerClusterService: IPuppeteerClusterService;

  constructor(systemSettings: ISystemSetting[],processEnv: Record<string,string | undefined>){
    this.puppeteerClusterService = new PuppeteerClusterService();
  }

  public async init(): Promise<void> {
      await this.puppeteerClusterService.init();
  }

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)