PHP Text Stream/Buffer 在脚本之间共享实时数据

标签 php file unix stream

存在本地文件访问问题的 PHP stream_socket_server/client。

我正在使用此脚本的修改版:php: How to save the client socket (not closed), so a further script may retrieve it to send an answer?但我无法让本地文件部分正常工作。

我想做的是通过使用文件作为中间人,本质上是在 PHP 进程/脚本之间流式传输数据,本质上是流式传输数据。

我在打开/添加到现有文件的现有脚本时遇到问题。

stream_socket_server 端,它将工作一次(文件不存在),但随后在任何后续尝试运行时抛出以下错误;

PHP Warning: stream_socket_server(): unable to connect to unix://./temp.sock (Unknown error)

似乎当 stream_socket_server 创建文件时,它使其成为只读文件,下面的代码片段中有详细信息;

rwxrwxr-x 1 xxx xxx    0 Jun 13 20:05 temp.sock

我曾尝试将权限调整为更宽容的设置,但没有成功。

在套接字客户端,我无法让它打开文件,无论文件是否存在。

$socket = stream_socket_server('unix://./temp.sock', $errno, $errstr);
$sock = stream_socket_client('unix:///./temp.sock', $errno, $errstr);

PHP Warning: stream_socket_server(): unable to connect to unix://./temp.sock (Unknown error) (server when file already exists)

PHP Warning: stream_socket_client(): unable to connect to unix://./temp.sock (Connection refused) (client)

最佳答案

让我先这样说:你确定你需要 unix 套接字吗?你确定proc_open()的管道不足以实现您的目标? proc_open() 比 unix 套接字更容易使用。继续,

警告: 不要相信 fread() 会一次读取所有数据,尤其是在发送大量数据(如兆字节)时,您需要某种方式来传达您的消息将有多大是的,这可以通过以消息长度 header 开头的所有消息来实现,例如小端 uint64 字符串,您可以使用

/**
 * convert a native php int to a little-endian uint64_t (binary) string
 *
 * @param int $i
 * @return string
 */
function to_little_uint64_t(int $i): string
{
    return pack('P', $i);
}

你可以用

解析它
/**
 * convert a (binary) string containing a little-endian uint64_t
 * to a native php int
 *
 * @param string $i
 * @return int
 */
function from_little_uint64_t(string $i): int
{
    $arr = unpack('Puint64_t', $i);
    return $arr['uint64_t'];
}

有时 fread() 不会在第一次调用时返回所有数据,您必须继续调用 fread() 并附加数据以获取完整消息,这是此类 fread() 循环的实现:

/**
 * read X bytes from $handle,
 * or throw an exception if that's not possible.
 *
 * @param mixed $handle
 * @param int $bytes
 * @throws \RuntimeException
 * @return string
 */
function fread_all($handle, int $bytes): string
{
    $ret = "";
    if ($bytes < 1) {
        // ...
        return $ret;
    }
    $bytes_remaining = $bytes;
    for (;;) {
        $read_now = fread($handle, $bytes_remaining);
        $read_now_bytes = (is_string($read_now) ? strlen($read_now) : 0);
        if ($read_now_bytes > 0) {
            $ret .= $read_now;
            if ($read_now_bytes === $bytes_remaining) {
                return $ret;
            }
            $bytes_remaining -= $read_now_bytes;
        } else {
            throw new \RuntimeException("could only read " . strlen($ret) . "/{$bytes} bytes!");
        }
    }
}

此外,当发送大量数据时,你也不能相信 fwrite(),有时你需要调用 fwrite,看看它写了多少字节,然后 substr() - 截掉实际的字节写入,然后在第二个 fwrite() 中发送其余部分,依此类推,这是一个 fwrite() 循环的实现,它会一直写入直到所有内容都被写入(或者如果不能写入所有内容则抛出异常):

/**
 * write the full string to $handle,
 * or throw a RuntimeException if that's not possible
 *
 * @param mixed $handle
 * @param string $data
 * @throws \RuntimeException
 */
function fwrite_all($handle, string $data): void
{
    $len = $original_len = strlen($data);
    $written_total = 0;
    while ($len > 0) {
        $written_now = fwrite($handle, $data);
        if ($written_now === $len) {
            return;
        }
        if ($written_now <= 0) {
            throw new \RuntimeException("could only write {$written_total}/{$original_len} bytes!");
        }
        $written_total += $written_now;
        $data = substr($data, $written_now);
        $len -= $written_now;
        assert($len > 0);
    }
}

.. 有了这个,你可以创建你的服务器

$server_errno = null;
$server_errstr = "";
$server_path = __FILE__ . ".socket";
$server = stream_socket_server("unix://" . $server_path, $server_errno, $server_errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if (! $server || ! ! $server_errno) {
    throw new \RuntimeException("failed to create server {$server_path} - errno: {$server_errno} errstr: {$server_errstr}");
}
register_shutdown_function(function () use (&$server_path, &$server) {
    // cleanup
    fclose($server);
    unlink($server_path);
});
var_dump("listening on {$server_path}", $server);

现在,如果您只需要支持与 1 个客户端交谈,只需一条消息,一个就可以做到

echo "waiting for connection...";
$client = stream_socket_accept($server);
echo "connection!\n";
echo "reading message size header..";
stream_set_blocking($client, true);
// size header is a little-endian 64-bit (8-byte) unsigned integer
$size_header = fread_all($client, 8);
$size_header = from_little_uint64_t($size_header);
echo "got size header, message size: {$size_header}\n";
echo "reading message...";
$message = fread_all($client, $size_header);
echo "message recieved: ";
var_dump($message);
$reply = "did you know that the hex-encoded sha1-hash of your message is " . bin2hex(hash("sha1", $message, true)) . " ?";
echo "sending reply: {$reply}\n";
fwrite_all($client, to_little_uint64_t(strlen($reply)) . $reply);
echo "reply sent!\n";

客户端看起来像

$unix_socket_path = __DIR__ . "/unixserver.php.socket";
$conn_errno = 0;
$conn_errstr = "";
echo "connecting to unix socket..";
$conn = stream_socket_client("unix://" . $unix_socket_path, $conn_errno, $conn_errstr, (float) ($timeout ?? ini_get("default_socket_timeout")), STREAM_CLIENT_CONNECT);
if (! $conn || ! ! $conn_errno) {
    throw new \RuntimeException("unable to connect to unix socket path at {$unix_socket_path} - errno: {$conn_errno} errstr: {$conn_errstr}");
}
stream_set_blocking($conn, true);
echo "connected!\n";
$message = "Hello World";
echo "sending message: {$message}\n";
fwrite_all($conn, to_little_uint64_t(strlen($message)) . $message);
echo "message sent! waitinf for reply..";
$reply_length_header = fread_all($conn, 8);
$reply_length_header = from_little_uint64_t($reply_length_header);
echo "got reply header, length: {$reply_length_header}\n";
echo "reciving reply..";
$reply = fread_all($conn, $reply_length_header);
echo "recieved reply: ";
var_dump($reply);

现在运行我们得到的服务器:

hans@dev2020:~/projects/misc$ php unixserver.php 
string(59) "listening on /home/hans/projects/misc/unixserver.php.socket"
resource(5) of type (stream)
waiting for connection...

然后运行客户端,

hans@dev2020:~/projects/misc$ php unixclient.php 
connecting to unix socket..connected!
sending message: Hello World
message sent! waitinf for reply..got reply header, length: 105
reciving reply..recieved reply: string(105) "did you know that the hex-encoded sha1-hash of your message is 0a4d55a8d778e5022fab701977c5d840bbc486d0 ?"

现在回头看看我们的服务器,我们会看到:

hans@dev2020:~/projects/misc$ php unixserver.php 
string(59) "listening on /home/hans/projects/misc/unixserver.php.socket"
resource(5) of type (stream)
waiting for connection...connection!
reading message size header..got size header, message size: 11
reading message...message recieved: string(11) "Hello World"
sending reply: did you know that the hex-encoded sha1-hash of your message is 0a4d55a8d778e5022fab701977c5d840bbc486d0 ?
reply sent!

这一次只适用于 1 个客户端,只有一个回复/响应,但至少它正确使用 fread/fwrite 循环,并确保整个消息,无论它有多大,总是被发送/全部收到..

让我们做一些更有趣的事情:创建一个可以与无限数量的客户端异步对话的服务器

// clients key is the client-id, and the value is the client socket
$clients = [];

stream_set_blocking($server, false);
$check_for_client_activity = function () use (&$clients, &$server): void {
    $select_read_arr = $clients;
    $select_read_arr[] = $server;
    $select_except_arr = [];
    $empty_array = [];
    $activity_count = stream_select($select_read_arr, $empty_array, $empty_array, 0, 0);
    if ($activity_count < 1) {
        // no activity.
        return;
    }
    foreach ($select_read_arr as $sock) {
        if ($sock === $server) {
            echo "new connections! probably..";
            // stream_set_blocking() has no effect on stream_socket_accept,
            // and stream_socket_accept will still block when the socket is non-blocking,
            // unless timeout is 0, but if timeout is 0 and there is no waiting connections,
            // php will throw PHP Warning: stream_socket_accept(): accept failed: Connection timed
            // so it seems using @ to make php stfu is the easiest way here
            $peername = "";
            while ($new_connection = @stream_socket_accept($server, 0, $peername)) {
                socket_set_blocking($new_connection, true);
                $clients[] = $new_connection;
                echo "new client! id: " . array_key_last($clients) . " peername: {$peername}\n";
            }
        } else {

            $client_id = array_search($sock, $clients, true);
            assert(! ! $client_id);
            echo "new message from client id {$client_id}\n";
            try {
                $message_length_header = fread_all($sock, 8);
                $message_length_header = from_little_uint64_t($message_length_header);
                $message = fread_all($sock, $message_length_header);
                echo "message: ";
                var_dump($message);
            } catch (Throwable $ex) {
                echo "could not read the full message, probably means the client has been disconnected. removing client..\n";
                // removing client
                stream_socket_shutdown($sock, STREAM_SHUT_RDWR);
                fclose($sock);
                unset($clients[$client_id]);
            }
        }
    }
};
for (;;) {
    // pretend we're doing something else..
    sleep(1);
    echo "checking for client activity again!\n";
    $check_for_client_activity();
}

现在只需调用 $check_for_client_activity();只要方便,看看您是否收到任何客户的消息。如果您无事可做,想等到收到任何客户的消息,您可以这样做

$empty_array = [];
$select_read_arr=$clients;
$select_read_arr[]=$server;
$activity_count = stream_select($select_read_arr, $empty_array, $empty_array, null, null);

警告,由于 stream_select() 的最后两个参数为 null,stream_select 可以无限期阻塞,如果您没有获得任何新连接并且您的任何客户端都没有任何反应。 (您可以设置另一个超时时间,例如 1 秒或其他时间来设置超时时间。null 表示“永远等待”)

关于PHP Text Stream/Buffer 在脚本之间共享实时数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56590938/

相关文章:

php - 将记录移动到不同的表 MySQL

php - 为什么每当我刷新页面时,都会创建新的 CI session ?

linux - 如何从 MAKEFILE 中的字符串列表中提取具有特定模式的字符串?

linux - awk 解析输出并提取值

PHP 替换大括号之间的所有内容?

php - PDO 对象超出范围,尝试调用 Javascript 函数

git 将两个文件 merge 为一个并保留历史记录

c - 编写一个程序,用有限的文件定位步骤反转二进制文件中字节的顺序

linux - 通过终端删除大文件的第一个和最后一个字符

linux - 在 Shell 中动态重命名 XML 文件