我有一个消息处理逻辑如下:
boost::asio::io_service ioService;
boost::thread_group threadPool;
// Initialization code.
int noOfCores = boost::thread::hardware_concurrency();
for (int i = 0 ; i < noOfCores ; i ++)
{
threadPool.create_thread(boost::bind(&boost::asio::io_service::run, , &ioService));
}
稍后在套接字读取线程中,我接收消息并在绑定(bind)接收到的消息时发布处理函数。
ioService.post(boost::bind(MessageHandler, message));
上面一行保证了可以同时处理多条消息。但是,它也失去了有时可能需要的顺序处理。事实上,由于消息的处理时间可能千差万别,因此顺序会完全困惑。
例如,假设我有 A、B、C、D、E、F、G、H、I 和 J 类型的消息。我不关心它们的处理顺序,所以多线程处理是完美的。但是,我确实需要按顺序处理相同类型的消息。假设我按以下顺序收到消息。
C1,E1,F1,A1,B1,C2,B2,D1,D2,F2,A2,H1,H2,A3,E2,E3,F3
这里,字母是类型,数字是它们到达的顺序。不同的消息类型可以按任意顺序并行处理,应保持类型内的顺序。我希望在 A2 之前处理 A1,在 A3 之前处理 A2。每种消息类型都相同。
其中一种方法是,确保给定类型的消息始终进入同一线程。这不是很有效。如果我只有 5 种消息类型和 32 个 CPU 内核,我仍然会被限制为 5 个线程。是否可以强制某个项目由特定线程处理?否则我必须为每个线程维护一个队列,处理互斥锁定等。
有没有更好的方法?
最佳答案
您可以使用 strand对于每种类型的工作作为您的队列。这样您的工作就可以均匀地分布在 io_context 线程中,并且它为每种类型的工作提供顺序工作队列并且不需要任何互斥锁。
关于c++ - 多线程处理,同时保持部分顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55044469/