考虑以下代码:
#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
namespace io = boost::asio;
class test {
public:
test(io::any_io_executor e) : exe{ std::move(e) } {}
io::awaitable<void> delay(size_t sec) {
io::steady_timer t{ exe };
t.expires_after(std::chrono::seconds{ sec });
co_await t.async_wait(io::use_awaitable);
}
io::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
io::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
for (size_t i = 0; i != 3; ++i) {
io::co_spawn(exe, d_print(), io::detached);
}
}
protected:
io::any_io_executor exe;
};
int main() {
io::io_context ctx;
test t{ ctx.get_executor() };
t.start();
ctx.run();
return 0;
}
输出:
1
1
1
(delay 1 second)
2
2
2
(delay 1 second)
3
3
3
(delay 1 second)
现在我想让 d_print
的执行串行化:一次只有一个 d_print
协程运行。
预期输出:
1
2
3
1
2
3
1
2
3
每行延迟 1 秒。
我可以通过使用strand
来实现这一点吗?
我想要做的是在三个 print_and_delay
周围添加类似 synchorized(st) {
的内容。当执行器尝试运行以下协程时,阻止它们,直到正在运行的协程完成执行。
最佳答案
链已经保护通过它发布的代码免受并发执行。
该行为正确,并且添加假定的“synchronized”关键字也不会改变它。
您所追求的并不是针对并发/重叠执行的保护 - 因为它已经在工作了。相反,您希望避免在前一个协程停止之前发布新的协程 (d_print
) 实例。
解决方案
解决这个问题的经典方法是排队。
Similar requirements come up with e.g. full-duplex IO where outgoing messages have to be queued to avoid interleaving writes.
链对于避免对队列的不同步访问仍然有用:
<强> Live On Coliru
#include <boost/asio.hpp>
#include <chrono>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
class test {
public:
test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}
static asio::awaitable<void> delay(size_t sec) {
co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
.async_wait(asio::use_awaitable);
}
asio::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
asio::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
post(strand_, std::bind(&test::do_start, this));
}
private:
asio::strand<asio::any_io_executor> strand_;
std::deque<asio::awaitable<void> > queue_;
asio::awaitable<void> drain_queue() { // runs on strand
while (!queue_.empty()) {
co_await std::move(queue_.front());
queue_.pop_front();
}
}
void do_start() { // runs on strand
bool not_running = queue_.empty();
for (size_t i = 0; i != 3; ++i)
queue_.push_back(d_print());
if (not_running)
co_spawn(strand_, std::bind(&test::drain_queue, this), asio::detached);
}
};
int main() {
asio::io_context ctx;
test t{ctx.get_executor()};
t.start();
ctx.run();
}
其他想法
您还可以使用实验性的 Asio Channels 功能,该功能可以简化线程安全并提供更大的灵活性。
关于c++ - Boost.ASIO 如何将链与 c++20 协程一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76351432/