PHP:线程代理共享一个公共(public)对象

标签 php object pthreads shared synchronize

我正在非线程 PHP 中运行一个 CGI 程序,这是一个关于人工生命和进化的小练习。生物体有一个基因组和一个解释器,这使它们能够执行某些操作,包括在共享的世界地图上移动和相互交互。目前,我通过使用多个 PHP 进程通过 MySQL 数据库进行交互来维护线程的外观很差,但我想重写代码,以便它使用 pthreads 在单个线程中连续运行,而不一定使用数据库(尽管我' d 可能想保留它用于报告)。

我一直在 github 上浏览提问和回答的问题以及代码示例,但没有设法找到任何东西——据我所知——可以解决我想要的问题。由于我并不是一个天才的 OOP 编码员,而且我对编写线程代码(尤其是 PHP)是全新的,所以我的问题会相当广泛。

我试图通过编写代码来展示我正在尝试做的事情来缩小我的问题范围,但它可能仍然太宽泛。对于如何进一步缩小范围的任何建议,我将不胜感激。

我对下面代码的问题:

  • 如何让 Organism 作用于共享的 World 对象,以便将 World 对象的变化传达给所有线程,避免冲突并保持一致性?
  • 鉴于种群规模最终是可变的,是否有一种方法可以引用世界对象的有机体部分(即 $world->organisms), 世界是否能够创造新的生物体,如以下(错误)代码所示?
  • 考虑到我最终想要创建一个由数百个 Organisms 组成的种群,您是否有任何关于限制事件线程数量(即限制内存/CPU 使用)同时仍保持一致性的建议?

下面的代码(当然不会运行,但是)说明了我正在寻求实现的目标:

/*
 * Shared object containing a map of the world and 
 * methods for getting/setting coordinates
 */
class World extends Thread
{
    public $map;
    public $sx;
    public $sy;
    public $organisms;

    // set all map coords to 0
    public function __construct($sx, $sy, $p)
    {
        $map = array();
        for( $i = 0; $i < $sx; $i++ )
        {
            $map[$i] = array();
            for( $j = 0; $j < $sy; $j++ )
            {
                $map[$i][$j] = 0;
            }
        }
        $this->map = $map;
        $this->sx = $sx;
        $this->sy = $sy;

        // create and start organism threads
        $this->organisms = array();
        for( $i = 0; $i < $p; $i++ )
        {
            // this won't work because threaded objects created
            // within a thread scope that have no external references
            // are destroyed in the constructor
            $this->makeOrganism($i+1, $i+1, $i+1);
        }
    }

    // produces a new organism, adds to world population
    public function makeOrganism($x, $y, $value)
    {
        if( $x < 1 || $x > $this->sx ) return false;
        if( $y < 1 || $y > $this->sy ) return false;
        if( $this->getMap($x, $y) != 0 ) return false;

        echo "creating new organism $value\n";
        $organisms = $this->organisms;
        // doesn't work because the world data is decoupled in the new thread
        $organisms[] = new Organism($this, $x, $y, $value);
        $this->organisms = $organisms;

        return true;
    }

    // assumes valid coords
    public function setMap($x, $y, $value)
    {    
        return $this->map[$x-1][$y-1] = $value;
    }

    // assumes valid coords
    public function getMap($x, $y)
    {
        return $this->map[$x-1][$y-1];
    }

    public function getSX()
    {
        return $this->sx;
    }

    public function getSY()
    {
        return $this->sy;
    }

    public function run() 
    {
        for( $i = 0; $i < count($this->organisms); $i++ )
        {
            echo "starting organism ", $this->value, "\n";
            $this->organisms[$i]->start();
        }
    }
}

/*
 * Autonomously running agent accessing shared World map
 */
class Organism extends Thread
{
    public $value;
    public $world;
    public $x;
    public $y;

    public function __construct(World $world, $x, $y, $value)
    {
        $this->world = $world;
        $this->value = $value;
        $this->x = $x;
        $this->y = $y;
        // assume coordinates are empty
        $this->world->setMap($x, $y, $value);
    }

    // try to move organism by $dx, $dy
    public function move($dx, $dy)
    {
        $x = $this->x + $dx;
        $y = $this->y + $dy;
        if( $x < 1 || $x > $this->world->getSX() ) return false;
        if( $y < 1 || $y > $this->world->getSY() ) return false;
        if( $this->world->getMap($x, $y) != 0 ) return false;

        $this->world->setMap($x, $y, $this->value);
        $this->world->setMap($this->x, $this->y, 0);
        $this->x = $x;
        $this->y = $y;
        return true;
    }

    public function getValue()
    {
        return $this->value;
    }

    public function run()
    {
        // infinite loop; organisms move randomly about until they die
        while( true )
        {
            echo "running organism ", $this->getValue(), "\n";
            // this should operate on the shared object World, 
            // maintaining consistency and avoiding conflicts between threads
            $dx = rand(-1, 1);
            $dy = rand(-1, 1);
            $this->move($dx, $dy);

            // occasionally add an organism to the population by cloning this one
            if( rand(0, 100) > 95 )
            {
                $this->world->makeOrganism($this->x+1, $this->y+1, $this->value+100);
            }

            // wait random interval, organisms are
            // not expected to move all at the same time
            $this->wait(1000 + rand(500, 1500));
        }
    }
}

// initialize shared object
$world = new World(50, 50, 50);
$world->start();
$world->join();

最佳答案

我将回答 pthreads v3,PHP7。

请不要在新项目中使用 pthreads v2,v3 要好得多。

How do I get an Organism to act on a shared World object so that the changes in the World object are communicated to all threads, avoiding conflicts and maintaining coherence?

下面的代码创建了一堆操作共享对象的线程:

<?php
class Test extends Thread {

    public function __construct(Threaded $shared) {
        $this->shared = $shared;
    }

    public function run() {
        $this->shared[] = $this->getThreadId();
    }
}

$shared = new Threaded();
$tests = [];
for ($i = 0; $i<20; $i++) {
    $tests[$i] = 
        new Test($shared);
    $tests[$i]->start();
}

foreach ($tests as $test)
    $test->join();

var_dump($shared);
?>

将产生类似于:

object(Threaded)#1 (20) {
  [0]=>
  int(140322714146560)
  [1]=>
  int(140322703144704)
  [2]=>
  int(140322621355776)
  [3]=>
  int(140322612963072)
  [4]=>
  int(140322604570368)
  [5]=>
  int(140322596177664)
  [6]=>
  int(140322587784960)
  [7]=>
  int(140322579392256)
  [8]=>
  int(140322570999552)
  [9]=>
  int(140322487138048)
  [10]=>
  int(140322478745344)
  [11]=>
  int(140322470352640)
  [12]=>
  int(140322461959936)
  [13]=>
  int(140322453567232)
  [14]=>
  int(140322445174528)
  [15]=>
  int(140322436781824)
  [16]=>
  int(140322428389120)
  [17]=>
  int(140322419996416)
  [18]=>
  int(140322411603712)
  [19]=>
  int(140322403211008)
}

元素个数一致并非偶然:

$this->shared[] = $this->getThreadId();

执行此操作时,pthreads 会为您提供安全

任何引用了 Threaded $shared 对象的 Thread 都可以操作它。

一致性和安全性是两个不同的东西,考虑下面的代码:

<?php
class Test extends Thread {

    public function __construct(Threaded $shared) {
        $this->shared = $shared;
    }

    public function run() {
        if (!isset($this->shared[0])) {
            $this->shared[0] = $this->getThreadId();
        }
    }

    private $shared;
}

$shared = new Threaded();
$tests  = [];

for ($i = 0; $i < 16; $i++) {
    $tests[$i] = 
        new Test($shared);
    $tests[$i]->start();
}

foreach ($tests as $test)
    $test->join();
?>

您可能希望只有一个 Thread 走这条路:

$this->shared[0] = $this->getThreadId();

但是,这并不能保证。由于在调用 isset 和上面的语句之间没有锁定,许多线程可以自由并发地通过路径。

Test的run方法替换成下面的代码可以保证一致性:

    public function run() {
        $this->shared->synchronized(function(){
            if (!isset($this->shared[0])) {
                $this->shared[0] = $this->getThreadId();
            }
        });
    }

Given that the population size is ultimately variable, is there a way to make the references to the Organisms part of the World object (ie. $world->organisms), and have World be able to create new Organisms, as shown in below (faulty) code?

这听起来好像违反了 SOLID 的基本原则。不管怎样,追逐这都不是什么好事。

听起来 Organisms 应该属于主进程,因此需要在那里构建并传递到 ThreadWorker 或其他任何地方。

它们需要属于主进程,因为它们可能会在多个 Thread 中结束。

Given that I ultimately want to create a population of hundreds of Organisms, do you have any pointers on limiting the number of active threads (ie. limiting memory/cpu usage) while still maintaining consistency?

使用 pthreads 提供的 Pool 实现。

下面是一些代码:

<?php
class Organisms extends Volatile {}

class World extends Threaded {

    public function __construct(Organisms $organisms, Volatile $grid) {
        $this->organisms = $organisms;
        $this->grid = $grid;
    }

    public function populate($organism, int $x, int $y) : bool {
        $reference = $this->getGridReference($x, $y);

        return $this->grid->synchronized(function() use($organism, $reference) {
            if (isset($this->grid[$reference]))
                return false;
            return (bool) $this->grid[$reference] = $organism;
        });
    }

    private function getGridReference(int $x, int $y) {
        return sprintf("%dx%d", $x, $y);
    }

    public function getOrganisms() { return $this->organisms; }

    private $organisms;
}

class Organism extends Threaded {

    public function __construct(World $world) {
        $this->world = $world;
    }

    public function setPosition(int $x, int $y) {
        $this->x = $x;
        $this->y = $y;
    }

    public function getWorld() { return $this->world; }

    private $world;
    private $x = -1;
    private $y = -1;
}

class OrganismPopulateTask extends Threaded {

    public function __construct(World $world, Organism $organism, int $x, int $y) {
        $this->world = $world;
        $this->organism = $organism;
        $this->x = $x;
        $this->y = $y;
    }

    public function run() {
        if ($this->world->populate(
            (object) $this->organism, $this->x, $this->y)) {
                $this->organism->setPosition($this->x, $this->y);
        }
    }

    private $world;
    private $organism;
    private $x;
    private $y;
}

$organisms = new Organisms();
$grid = new Volatile();
$world = new World($organisms, $grid);
$pool = new Pool(16);

$organisms[] = new Organism($world);
$organisms[] = new Organism($world);

$pool
    ->submit(new OrganismPopulateTask($world, $organisms[0], 10, 10));
$pool   
    ->submit(new OrganismPopulateTask($world, $organisms[1], 10, 10));

$pool->shutdown();

var_dump($world);
?>

将产生:

object(World)#3 (2) {
  ["organisms"]=>
  object(Organisms)#1 (2) {
    [0]=>
    object(Organism)#5 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(10)
      ["y"]=>
      int(10)
    }
    [1]=>
    object(Organism)#6 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(-1)
      ["y"]=>
      int(-1)
    }
  }
  ["grid"]=>
  object(Volatile)#2 (1) {
    ["10x10"]=>
    object(Organism)#5 (3) {
      ["world"]=>
      *RECURSION*
      ["x"]=>
      int(10)
      ["y"]=>
      int(10)
    }
  }
}

注意:这使用了 v3.1.2 中包含的功能

其中大部分应该是不言自明的,显式转换是为了避免因尝试连接到已消失的对象而导致的异常。

要注意的主要事情是每个“ Action ”都被视为一个“任务”并提交给Pool

关于PHP:线程代理共享一个公共(public)对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33856305/

相关文章:

javascript - 如何判断给定对象是数组还是对象?

Java 泛型、对象和通配符的区别和说明

c - Posix 消息队列接收/发送/打开不起作用?

c - 不停止 gdb 中的所有线程

php - preg_replace '/[^0-9]/' 或 '/\D/' 或 '/\d/' 之间的区别?

php - 如何在 Doxygen 中禁用斜杠命令语法

php - echo (new DateTime())->getTimestamp(); ...不支持匿名对象?

php - 写入大于 4096 的图像文件

java - 如果在父类(super class)中定义了方法,如何根据调用它的对象更改结果

c++ - 如何正确地将参数传递给 pthread