c++ - 使用 boost.process 同时读取和写入 child 的 stdio

标签 c++ windows boost boost-asio boost-process

我正在尝试使用类似这样的 boost.process 写入和读取 child 的 stdio:

boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};

bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
    in.async_write_some(
        boost::asio::buffer(src.data(), read),
        [](const boost::system::error_code &e, std::size_t) { });
    // written data is not important, that's why using same buffer
    out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
                        [&](const boost::system::error_code &e,
                           std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();

所有读写操作都被通知为成功,但是 totalWrite 的值完全不正确 f.e 报告 29356032,实际值应该在 50000000 左右
我注意到程序在中途终止,
在 readService.run() 卡住子进程后使用 process.wait(),
使用 atomic int 产生相同的行为

现在我实际上只需要知道实际写入了多少数据,这就是我使用相同缓冲区的原因

最佳答案

  1. 这种模式:

    while (callback(CallbackActions::NeedMoreInput, src, read)) {
        in.async_write_some(...);
        out.async_read_some(...);
    }
    

    很可能被误导(异步操作总是立即返回,因此您只需继续添加更多异步操作而不给它们运行的​​机会)。

  2. 还有一个误入歧途的事实是,您为管道提供了单独的服务,但您在完全排除的情况下运行它们,因此在 writeService 完成之前不会运行读取操作。

  3. atomic 类型被误导,因为无法从多个线程访问

  4. 你想做什么?您保留了一个大缓冲区,但从未将任何数据放入其中(reserve != resize)。所以只能寄希望于什么都不写了。

    更讽刺的是,您正在读入完全相同的缓冲区,位于完全相同的位置。然而,那是立即 Undefined Behaviour ¹ 因为当您知道 src.size()==0 时,您传递了它 src.capacity()

    即使没有那个错误,你怎么能“同时”从内存中完全相同的字节读取和写入,并且仍然知道预期的结果是什么?

  5. 您没有将自己的 io_service 传递给 Boost Process

工作演示

这是一个工作示例。当然,我不得不猜测你到底想做什么。

我选择让程序将自己的源代码 (main.cpp) 发送到 stdin,并迭代读取 stdout,记录 total_received 字节。然后打印退出代码和总数。

作为临时压缩器,我使用了 '/usr/bin/xxd',因为它可用并且甚至可以有用地打印到 std::cout 以进行调试。

Live On Coliru//Coliru 上的麻烦

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
        }); // async

    read_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

打印

WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611

解释、简化

请注意,我们在没有循环的情况下完成了所有的写作。那是因为 boost::asio::async_write 是一个组合操作(它隐藏了循环)。

同样,如果您“负担得起”将整个接收到的数据存储在内存中,则可以使用 boost::asio::streambuf 并使用类似的组合操作来简化:

Live On Coliru//Coliru 上的麻烦

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    boost::asio::streambuf recv_buffer;
    boost::asio::async_read(out, recv_buffer,
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
            });

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
        }); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\n";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise

两个异步循环,有延迟

让我们这样做,并通过在写入每个 block 之前延迟一秒钟来使其更有趣。您希望看到的是由于延迟而发生的交替读/写:

Live On Coliru //是的,在 Coliru 上运行

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>

namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;

int main() {
    boost::asio::io_service svc;
    auto on_exit = [](int code, std::error_code ec) {
            std::cout << "Exited " << code << " (" << ec.message() << ")\n";
        };

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    std::ifstream ifs("main.cpp");
    std::size_t total_written = 0;
    Buffer send_buffer;
    boost::asio::high_resolution_timer send_delay(svc);
    write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
        if (!ifs.good())
        {
            error_code ec;
            in.close(ec);
            std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\n";
            return;
        }
        ifs.read(send_buffer.data(), send_buffer.size());

        boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
            [&](error_code ec, size_t transferred) {
                std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\n";
                total_written += transferred; 
                if (!ec) {
                    send_delay.expires_from_now(1s);
                    send_delay.async_wait([&write_loop](error_code ec) {
                        std::cout << "WriteLoop: send delay " << ec.message() << "\n";
                        if (!ec) write_loop(); // continue writing
                    });
                }
            });
    };

    read_loop(); // async
    write_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}

打印

WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214

¹ 也许只是未指定,我现在不想找出区别

关于c++ - 使用 boost.process 同时读取和写入 child 的 stdio,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48678012/

相关文章:

c++ - 迭代器错误 C2440

java - 从项目名称查找 api 中查找项目

python - 如何从 python 脚本将文件从 Windows 系统复制到任何其他远程服务器?

boost - mt-gd 和 mt-s 库有什么区别

c++ - boost .MultiIndex : searching elements using multiple fields

c++ - 字符串逆向程序抛出异常

c++ - 多个线程调用 exit() 时的安全静态析构函数

windows - GetFinalPathNameByHandle 对于设备句柄失败

c++ - 字符串迭代器与读取每一行不兼容

c++ - 在 C++ 中摸索多个独占组合时遇到问题