我有 Worker
类和一个 Handler
类来为作业创建一个抽象层。我想使用 std::async
来混合一些异步,但我的 Visual Studio 2012(更新 1)出现了一些奇怪的行为。
我的类层次结构如下:
Worker
是一个抽象类,Init
和Work
作为纯虚方法。BasicWorker : Worker
只是使用printf
进行一些输出。GroupWorker : Worker
是其他 worker 的集合。Handler
捕获Worker
做一些工作。
然后我调用几个 std::async
方法,我在其中创建工作程序和处理程序,我在嵌套的 std::async
调用中调用处理程序,并且我等待 worker 的初始化(std::condition_variable
),然后我停止处理程序。
最后,我等待所有 std::future
完成。
代码如下:
#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>
struct Worker
{
virtual ~Worker() { }
virtual void Init() = 0;
virtual void Work() = 0;
};
struct BasicWorker : public Worker
{
virtual ~BasicWorker() { }
virtual void Init()
{
printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
}
virtual void Work()
{
printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
}
};
struct GroupWorker : public Worker
{
GroupWorker()
{
workers.push_back(std::make_shared<BasicWorker>());
}
virtual ~GroupWorker() { }
virtual void Init()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Init();
}
initEvent.notify_all();
}
virtual void Work()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Work();
}
}
void WaitForInit()
{
//std::unique_lock<std::mutex> initLock(initMutex);
//initEvent.wait(initLock);
}
private:
std::mutex initMutex;
std::condition_variable initEvent;
std::vector<std::shared_ptr<Worker>> workers;
};
struct Handler
{
static const int Stopped = -1;
static const int Ready = 0;
static const int Running = 1;
Handler(const std::shared_ptr<Worker>& worker) :
worker(worker)
{ }
void Start(int count)
{
int readyValue = Ready;
if(working.compare_exchange_strong(readyValue, Running))
{
worker->Init();
for(int i = 0; i < count && working == Running; ++i)
{
worker->Work();
}
}
}
void Stop()
{
working = Stopped;
}
private:
std::atomic<int> working;
std::shared_ptr<Worker> worker;
};
std::future<void> Start(int jobIndex, int runCount)
{
//printf("Start: %d\n", jobIndex);
return std::async(std::launch::async, [=]()
{
printf("Async: %d\n", jobIndex);
auto worker = std::make_shared<GroupWorker>();
auto handler = std::make_shared<Handler>(worker);
auto result = std::async(std::launch:async, [=]()
{
printf("Nested async: %d\n", jobIndex);
handler->Start(runCount);
});
worker->WaitForInit();
handler->Stop();
result.get();
});
}
int main()
{
const int JobCount = 300;
const int RunCount = 5;
std::array<std::future<void>, JobCount> jobs;
for(int i = 0; i < JobCount; ++i)
{
jobs[i] = Start(i, RunCount);
}
for(int i = 0; i < JobCount; ++i)
{
jobs[i].get();
}
}
我的问题是:
- 如果我取消注释
WaitForInit@GroupWorker
函数中的行,那么我的嵌套异步函数调用不会进行,直到完成所有第一级异步函数调用 - 在等待
std::condition_variable
时,如果我增加作业数量,创建新线程的速度会呈指数级下降。对于我低于 100 个工作的试验,存在一些异步性,但高于 300 个时,它完全按顺序创建工作。 - 然后,如果我在
Start
方法中取消对printf
行的注释,所有嵌套的异步都非常有效
所以,
- 我在使用
std::condition_variable
时做错了什么? - 为什么为大约 100 个线程创建作业会变慢? (这个问题是可选的,似乎是操作系统的问题,可以用智能线程池概念解决)
printf
与这些有什么关系? (我尝试在竞争条件的情况下删除所有printf
调用,我在代码中放置了一个断点但没有帮助。这与std::cout
的情况相同也是)
编辑:
我添加了启动策略(如 Jonathan Wakely 所建议的那样)以确保线程的创建。但这也无济于事。我目前正在创建一个 std::thread
并调用 thread::join
函数在第一级异步中等待。
最佳答案
注意可以调用 printf
,但不要假设 std::thread::id
可以转换为 int
。你可以像这样让它更便携:
inline long tol(std::thread::id id)
{
std::ostringstream ss;
ss << id;
return stol(ss.str());
}
(这仍然假设 std::thread::id
的字符串值可以转换为 long
,这不是必需的,但比假设更有可能到 int
)
What am I doing wrong in usage of std::condition_variable?
您没有正在等待的“条件”,也没有同步来确保对 notify_all
的调用发生在对 wait
的调用之前。你应该有一个成员变量,上面写着“this worker has been init'd”,它由 Init
设置,并且只在条件变量不正确时等待条件变量(该标志应该是原子的或由一个互斥量,以防止数据竞争)。
Why creating jobs gets slower for like 100s of threads? (this question is optional, seems like a problem of OS and can be fixed with a smart thread-pool concept)
因为有数百个线程,对共享资源有很多争用,操作系统调度程序也有很大压力,所以实现可能决定开始返回延迟函数(即好像 std::async
是用 std::launch::deferred
) 而不是异步调用的。您的代码假定 async
不会返回延迟函数,因为如果异步 worker 及其嵌套的异步 worker 都作为延迟函数运行,程序可能会死锁,因为外部函数会阻塞等待嵌套函数调用Init
但嵌套函数永远不会运行,直到 out 函数调用 result.get()
。您的程序不可移植,只能在 Windows 上运行,因为(如果我理解正确的话)MSVC async
使用工作窃取线程池,如果线程可用,它将运行延迟函数。 这不是标准所要求的。如果你想强制每个工作线程都有一个新线程,请使用 std::launch::async
策略。
What has printf to do with any of this? (I tried deleting all printf calls in the case of a race condition and I put a breakpoint in the code but no help. It is the same case with std::cout too)
它会造成轻微的延迟,并且可能会在线程之间造成某种形式的不可靠排序,因为它们现在正在争夺,并且可能是在争夺单个全局资源。 printf
施加的延迟可能足以让一个线程完成,这会将其资源释放到线程池并允许另一个异步工作程序运行。
关于c++ - 在 Visual Studio 2012 中使用嵌套的 std::async 和 std::condition_variable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13797368/