从 cron 运行时 PHP pthreads 失败

标签 php linux cron pthreads

好的,让我们从慢开始......

我有一个 pthreads 脚本正在运行并为我工作,当我通过 ssh 从命令行手动运行它时,经过测试和工作的时间为 100%。脚本如下,主线程进程代码调整为模拟随机进程运行时。

class ProcessingPool extends Worker {
    public function run(){}
}
class LongRunningProcess extends Threaded implements Collectable {
    public function __construct($id,$data) {
        $this->id = $id;
        $this->data = $data;
    }

    public function run() {
        $data = $this->data;
        $this->garbage = true;

        $this->result = 'START TIME:'.time().PHP_EOL;

        // Here is our actual logic which will be handled within a single thread (obviously simulated here instead of the real functionality)
        sleep(rand(1,100));

        $this->result .= 'ID:'.$this->id.' RESULT: '.print_r($this->data,true).PHP_EOL;
        $this->result .= 'END TIME:'.time().PHP_EOL;

        $this->finished = time();
    }
    public function __destruct () {
        $Finished = 'EXITED WITHOUT FINISHING';
        if($this->finished > 0) {
            $Finished = 'FINISHED';
        }

        if ($this->id === null) {
            print_r("nullified thread $Finished!");
        } else {
            print_r("Thread w/ ID {$this->id} $Finished!");
        }
    }

    public function isGarbage() : bool { return $this->garbage; }

    public function getData() {
        return $this->data;
    }
    public function getResult() {
        return $this->result;
    }

    protected $id;
    protected $data;
    protected $result;
    private $garbage = false;
    private $finished = 0;
}

$LoopDelay = 500000; // microseconds
$MinimumRunTime = 300; // seconds (5 minutes)

// So we setup our pthreads pool which will hold our collection of threads
$pool = new Pool(4, ProcessingPool::class, []);

$Count = 0;

$StillCollecting = true;
$CountCollection = 0;
do {

    // Grab all items from the conversion_queue which have not been processed
    $result = $DB->prepare("SELECT * FROM `processing_queue` WHERE `processed` = 0 ORDER BY `queue_id` ASC");
    $result->execute();
    $rows = $result->fetchAll(PDO::FETCH_ASSOC);

    if(!empty($rows)) {

        // for each of the rows returned from the queue, and allow the workers to run and return
        foreach($rows as $id => $row) {
            $update = $DB->prepare("UPDATE `processing_queue` SET `processed` = 1 WHERE `queue_id` = ?");
            $update->execute([$row['queue_id']]);

            $pool->submit(new LongRunningProcess($row['fqueue_id'],$row));

            $Count++;
        }
    } else {
        // 0 Rows To Add To Pool From The Queue, Do Nothing...
    }


    // Before we allow the loop to move on to the next part, lets try and collect anything that finished
    $pool->collect(function ($Processed) use(&$CountCollection) {
        global $DB;

        $data = $Processed->getData();
        $result = $Processed->getResult();


        $update = $DB->prepare("UPDATE `processing_queue` SET `processed` = 2 WHERE `queue_id` = ?");
        $update->execute([$data['queue_id']]);

        $CountCollection++;

        return $Processed->isGarbage();
    });
    print_r('Collecting Loop...'.$CountCollection.'/'.$Count);


    // If we have collected the same total amount as we have processed then we can consider ourselves done collecting everything that has been added to the database during the time this script started and was running
    if($CountCollection == $Count) {
        $StillCollecting = false;
        print_r('Done Collecting Everything...');
    }

    // If we have not reached the full MinimumRunTime that this cron should run for, then lets continue to loop
    $EndTime = microtime(true);
    $TimeElapsed = ($EndTime - $StartTime);
    if(($TimeElapsed/($LoopDelay/1000000)) < ($MinimumRunTime/($LoopDelay/1000000))) {
        $StillCollecting = true;
        print_r('Ended To Early, Lets Force Another Loop...');
    }

    usleep($LoopDelay);

} while($StillCollecting);

$pool->shutdown();

所以虽然上面的脚本会通过命令行运行(已经调整为基本的例子,上面的例子中已经模拟了详细的处理代码) , 从 cron 运行时,以下命令会给出不同的结果 每 5 分钟设置一次...

/opt/php7zts/bin/php -q /home/account/cron-entry.php file=every-5-minutes/processing-queue.php

上面的脚本,当使用上面的命令行调用时,会在脚本运行时一遍又一遍地循环,并从数据库队列中收集任何新的项目,并将它们插入到池中,这允许 4 个进程同时运行运行和完成的时间,然后收集并在另一个循环发生之前更新队列,从数据库中提取任何新项目。该脚本将一直运行,直到我们在脚本执行期间处理并收集了队列中的所有进程。如果脚本没有运行完整的 5 分钟预期时间段,循环将被迫继续检查队列,如果脚本运行超过 5 分钟标记,它允许任何当前线程完成并在关闭前收集。请注意,上面的代码还包括一个基于代码的“flock”功能,该功能使此空闲循环的 future cron 并在锁定解除后退出或启动,确保队列和线程不会相互碰撞。同样,所有这些都可以通过 SSH 从命令行运行。

一旦我执行上述命令,并将其放入 cron 中每 5 分钟运行一次,本质上是给我一个永无止境的循环,同时保持内存,我得到不同的结果...

结果描述如下... 脚本开始,检查 flock,如果锁不存在,则继续,创建锁,并运行上面的脚本。这些项目从数据库中的队列中取出,并插入到池中,池按预期一次触发 4 个线程。但是意想不到的结果是 run() 命令没有似乎被执行了,而是运行了 __destruct 函数,并且一个“Thread w/ID 2 FINISHED!”消息类型返回到输出。这反过来意味着事物的收集端不收集任何东西,并且启动脚本(cron 脚本本身 /home/account/cron-entry.php file=every-5-minutes/processing-queue.php ) 在将所有内容放入池中并销毁后结束。这过早地“完成”了 cron 作业,因为除了循环并从队列中拉出任何新内容之外别无他法,因为当队列中处理 == 1 时它们被视为“正在处理”。

问题最终变成了......我如何让 cron 的脚本知道生成的线程并运行()它们而不在它们可以做任何事情之前关闭池?

(注意...如果您复制/粘贴提供的脚本,请注意我没有在删除详细逻辑后对其进行测试,因此可能需要一些简单的修复...请不要吹毛求疵地说代码,因为这里的关键是,如果脚本是从命令行执行的,pthreads 可以工作,但是当脚本是从 CRON 执行时,pthreads 无法正常运行。如果你打算用非建设性的批评来评论,请用你的手指去做别的事!)

乔·沃特金斯!我需要你的才华!提前致谢!

最佳答案

毕竟,问题似乎与用户权限有关。我在 cpanel 中设置了这个特定的 cron,当手动运行命令时,我以 root 身份登录。

在 roots crontab 中设置此命令后,我能够让它成功运行池中的线程。我现在唯一的问题是有些线程永远不会完成,有时我无法关闭池。但这是一个不同的问题,所以我会在别处打开另一个问题。

对于遇到此问题的人,请确保您知道 cron 的所有者是谁,因为它对 php 的 pthreads 很重要。

关于从 cron 运行时 PHP pthreads 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40323446/

相关文章:

php - JWT "Signature verification failed"与 PHP

javascript - AJAX 成功无法将数据填充到表格 HTML

php - 蛋糕PHP : How to save Arabic words into mysql table?

Linux 查找带字符串的文件并打印上次访问时间

linux - 具有 LAN 的 VPN 客户端上的 Iptables 设置

web-applications - 如果应用程序停止, quartz 调度程序将获取下一个调度程序时间

php - 验证美国电话号码

php - 使用 phpseclib Interactive shell 显示超过 17 行

ubuntu - 在 Ubuntu crontab 中运行 JAR 文件失败

linux - crontab 在重新启动时不启动命令