AMPHP-排队的任务数量超过池中可用工人的数量

问题描述

我有一个项目,正在其中将大量.tif图像转换为PDF文档。文件数达到数百万。

为了加快过程,我正在使用AmPHP。由于使用Imagemagick转换图像的过程占用了一些cpu功率,因此我想限制并行运行的转换器过程的最大数量

我的第一种方法行得通,但如果我将文件排入队列,而不是给一定数量的工作人员提供x个文件的数组,则可以改善这种情况。

这是我当前的代码,我尝试在其中复制the example

<?PHP
require dirname(__DIR__) . '/vendor/autoload.PHP';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.',$import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEParaTOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath],$constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $coroutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutines[] = Amp\call(function() use ($pool,$task) {
                return yield $pool->enqueue($task);
            });
        }
        $results = yield Amp\Promise\all($coroutines);

        return yield $pool->shutdown();
    });
}

我的问题是,一旦入队数量超过THREAD_COUNT个任务,就会收到以下PHP警告:Warning: Worker in pool exited unexpectedly with code -1并且没有创建PDF。

只要我保持在最大池大小以下,一切都很好。

我在Windows 10和amPHP / parallel 1.4.0上使用PHP 7.4.9。

解决方法

经过更多的实验后,我找到了解决方案,这似乎可行。 感觉有点“ hacky”,所以如果有人有更好的主意,请分享。我以为该池会自动建立一个队列,然后由最大数量的工作人员处理,似乎并非如此。

现在,我将从Amp\call获得的协程保存在两个单独的数组中。一种包含所有协程,另一种包含所有当前循环。

$coroutine = Amp\call(function () use ($pool,$task) {
    return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;

将一个项目加入队列后,我检查是否已经达到配置线程的最大数量。如果池中有最大数量的工作程序,而没有空闲工作程序,则我在当前循环协程中调用Amp\Promise\first函数,以等待新的空闲空闲工作程序。

由于该函数将在我下次到达那里时立即返回(因为完成的协程仍在我的当前循环数组中),因此我清除了该数组。

if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
    yield Amp\Promise\first($loopRoutines);
    $loopRoutines = [];
}

在foreach之后,我在所有协程数组上调用Amp\Promise\all,因此脚本等待所有工作程序完成。

这是我更改的代码:

<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.',$import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath],$constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $allCoroutines = [];
        $loopRoutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutine = Amp\call(function () use ($pool,$task) {
                return yield $pool->enqueue($task);
            });
            $loopRoutines[] = $coroutine;
            $allCoroutines[] = $coroutine;
            if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
                yield Amp\Promise\first($loopRoutines);
                $loopRoutines = [];
            }
        }
        yield Amp\Promise\all($allCoroutines);

        return yield $pool->shutdown();
    });
}