php - AMPHP - 排队的任务多于池中可用的工作人员

标签 php parallel-processing php-7 php-7.4 amphp

我有一个项目,需要将大量 .tif 图像转换为 PDF 文档。文件数量达到数百万。

为了加快进程,我使用 Amphp。由于使用 Imagemagick 转换图像的过程会占用一些 CPU 功率,因此我想限制并行运行的转换器进程的最大数量。

我的第一种方法有效,但如果我对文件进行排队而不是向一定数量的工作人员提供 x 文件数组,则可以改进。

这是我当前的代码,我尝试在其中复制 the example .

<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.', $import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath], $constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $coroutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutines[] = Amp\call(function() use ($pool, $task) {
                return yield $pool->enqueue($task);
            });
        }
        $results = yield Amp\Promise\all($coroutines);

        return yield $pool->shutdown();
    });
}

我的问题是,一旦我排队的任务数量超过 THREAD_COUNT 数量,我就会收到以下 PHP 警告:警告:池中的工作线程意外退出,代码为 -1 并且没有创建 PDF。

只要我保持在最大池大小以下,一切都很好。

我在 Windows 10 上使用 PHP 7.4.9 和 amphp/parallel 1.4.0。

最佳答案

经过一些更多的实验,我找到了一个解决方案,这似乎可行。 感觉有点“hacky”,所以如果有人有更好的想法,请分享。我以为池会自动建立一个队列,然后由最大数量的工作人员处理,但事实似乎并非如此。

我现在将从 Amp\call 获取的协程保存在两个单独的数组中。一种保存所有协程,另一种保存当前循环的所有协程。

$coroutine = Amp\call(function () use ($pool, $task) {
    return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;

将项目放入队列后,我检查是否已达到配置的最大线程数。如果池中有最大数量的工作线程且没有空闲工作线程,我会在当前循环协程上调用 Amp\Promise\first 函数来等待新的空闲空闲工作线程。

由于该函数会在我下次到达那里时立即返回(因为完成的协程仍然是我的当前循环数组),因此我清除了该数组。

if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
    yield Amp\Promise\first($loopRoutines);
    $loopRoutines = [];
}

在 foreach 之后,我在所有协程数组上调用 Amp\Promise\all,因此脚本会等待所有工作线程完成。

这是我更改的代码:

<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.', $import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath], $constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $allCoroutines = [];
        $loopRoutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutine = Amp\call(function () use ($pool, $task) {
                return yield $pool->enqueue($task);
            });
            $loopRoutines[] = $coroutine;
            $allCoroutines[] = $coroutine;
            if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
                yield Amp\Promise\first($loopRoutines);
                $loopRoutines = [];
            }
        }
        yield Amp\Promise\all($allCoroutines);

        return yield $pool->shutdown();
    });
}

关于php - AMPHP - 排队的任务多于池中可用的工作人员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64023451/

相关文章:

php - 无法将本地 WordPress 连接到远程 MySQL 服务器

php - WordPress 不使用插件的热门帖子

php - geoip zip 查询

concurrency - 与withPool并行

c# - 在 parallel.foreach 范围之外递增计数值

python - 我对 Spark 中并行操作的理解是否正确?

php - 如何使用 PHP7 使 mssql_connect 工作?

php - 减去 php 数组

php - 在 ubuntu 上安装 php70-gd

php - 既然所有错误都是异常,PHP7 上的 set_error_handler() 会发生什么?