c++ - 在 Visual Studio 2012 中使用嵌套的 std::async 和 std::condition_variable

标签 c++ c++11 visual-studio-2012 condition-variable stdasync

我有 Worker 类和一个 Handler 类来为作业创建一个抽象层。我想使用 std::async 来混合一些异步,但我的 Visual Studio 2012(更新 1)出现了一些奇怪的行为。

我的类层次结构如下:

  • Worker 是一个抽象类,InitWork 作为纯虚方法。
  • BasicWorker : Worker 只是使用 printf 进行一些输出。
  • GroupWorker : Worker 是其他 worker 的集合。
  • Handler 捕获 Worker 做一些工作。

然后我调用几个 std::async 方法,我在其中创建工作程序和处理程序,我在嵌套的 std::async 调用中调用处理程序,并且我等待 worker 的初始化(std::condition_variable),然后我停止处理程序。

最后,我等待所有 std::future 完成。

代码如下:

#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>

struct Worker
{
    virtual ~Worker() { }
    virtual void Init() = 0;
    virtual void Work() = 0;
};

struct BasicWorker : public Worker
{
    virtual ~BasicWorker() { }
    virtual void Init()
    {
        printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
    }

    virtual void Work()
    {
        printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
    }
};

struct GroupWorker : public Worker
{
    GroupWorker()
    {
        workers.push_back(std::make_shared<BasicWorker>());
    }

    virtual ~GroupWorker() { }

    virtual void Init()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Init();
        }
        initEvent.notify_all();
    }

    virtual void Work()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Work();
        }
    }

    void WaitForInit()
    {
        //std::unique_lock<std::mutex> initLock(initMutex);
        //initEvent.wait(initLock);
    }
private:
    std::mutex initMutex;
    std::condition_variable initEvent;
    std::vector<std::shared_ptr<Worker>> workers;
};

struct Handler
{
    static const int Stopped = -1;
    static const int Ready = 0;
    static const int Running = 1;

    Handler(const std::shared_ptr<Worker>& worker) :
        worker(worker)
    { }

    void Start(int count)
    {
        int readyValue = Ready;
        if(working.compare_exchange_strong(readyValue, Running))
        {
            worker->Init();

            for(int i = 0; i < count && working == Running; ++i)
            {
                worker->Work();
            }
        }
    }

    void Stop()
    {
        working = Stopped;
    }
private:
    std::atomic<int> working;
    std::shared_ptr<Worker> worker;
};

std::future<void> Start(int jobIndex, int runCount)
{
    //printf("Start: %d\n", jobIndex);
    return std::async(std::launch::async, [=]()
    {
        printf("Async: %d\n", jobIndex);
        auto worker = std::make_shared<GroupWorker>();
        auto handler = std::make_shared<Handler>(worker);

        auto result = std::async(std::launch:async, [=]()
        {
            printf("Nested async: %d\n", jobIndex);
            handler->Start(runCount);
        });

        worker->WaitForInit();
        handler->Stop();

        result.get();
    });
}

int main()
{
    const int JobCount = 300;
    const int RunCount =  5;
    std::array<std::future<void>, JobCount> jobs;

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i] = Start(i, RunCount);
    }

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i].get();
    }
}

我的问题是:

  • 如果我取消注释 WaitForInit@GroupWorker 函数中的行,那么我的嵌套异步函数调用不会进行,直到完成所有第一级异步函数调用
  • 在等待 std::condition_variable 时,如果我增加作业数量,创建新线程的速度会呈指数级下降。对于我低于 100 个工作的试验,存在一些异步性,但高于 300 个时,它完全按顺序创建工作。
  • 然后,如果我在 Start 方法中取消对 printf 行的注释,所有嵌套的异步都非常有效

所以,

  • 我在使用 std::condition_variable 时做错了什么?
  • 为什么为大约 100 个线程创建作业会变慢? (这个问题是可选的,似乎是操作系统的问题,可以用智能线程池概念解决)
  • printf 与这些有什么关系? (我尝试在竞争条件的情况下删除所有 printf 调用,我在代码中放置了一个断点但没有帮助。这与 std::cout 的情况相同也是)

编辑: 我添加了启动策略(如 Jonathan Wakely 所建议的那样)以确保线程的创建。但这也无济于事。我目前正在创建一个 std::thread 并调用 thread::join 函数在第一级异步中等待。

最佳答案

注意可以调用 printf,但不要假设 std::thread::id 可以转换为 int。你可以像这样让它更便携:

inline long tol(std::thread::id id)
{
  std::ostringstream ss;
  ss << id;
  return stol(ss.str());
}

(这仍然假设 std::thread::id 的字符串值可以转换为 long,这不是必需的,但比假设更有可能到 int)

的隐式转换

What am I doing wrong in usage of std::condition_variable?

您没有正在等待的“条件”,也没有同步来确保对 notify_all 的调用发生在对 wait 的调用之前。你应该有一个成员变量,上面写着“this worker has been init'd”,它由 Init 设置,并且只在条件变量不正确时等待条件变量(该标志应该是原子的或由一个互斥量,以防止数据竞争)。

Why creating jobs gets slower for like 100s of threads? (this question is optional, seems like a problem of OS and can be fixed with a smart thread-pool concept)

因为有数百个线程,对共享资源有很多争用,操作系统调度程序也有很大压力,所以实现可能决定开始返回延迟函数(即好像 std::async 是用 std::launch::deferred) 而不是异步调用的。您的代码假定 async 不会返回延迟函数,因为如果异步 worker 及其嵌套的异步 worker 都作为延迟函数运行,程序可能会死锁,因为外部函数会阻塞等待嵌套函数调用Init 但嵌套函数永远不会运行,直到 out 函数调用 result.get()。您的程序不可移植,只能在 Windows 上运行,因为(如果我理解正确的话)MSVC async 使用工作窃取线程池,如果线程可用,它将运行延迟函数。 这不是标准所要求的。如果你想强制每个工作线程都有一个新线程,请使用 std::launch::async 策略。

What has printf to do with any of this? (I tried deleting all printf calls in the case of a race condition and I put a breakpoint in the code but no help. It is the same case with std::cout too)

它会造成轻微的延迟,并且可能会在线程之间造成某种形式的不可靠排序,因为它们现在正在争夺,并且可能是在争夺单个全局资源。 printf 施加的延迟可能足以让一个线程完成,这会将其资源释放到线程池并允许另一个异步工作程序运行。

关于c++ - 在 Visual Studio 2012 中使用嵌套的 std::async 和 std::condition_variable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13797368/

相关文章:

c++ - std::unique_ptr<T> 不完整类型错误

c++ - DirectX 11 Effects11.lib Effects11d.lib 链接器错误

visual-studio - 如何从 Visual Studio 2012 中的 CommandBarButton 获取 EnvDTE.Command

asp.net - IIS Express 似乎没有发现对 applicationhost.config 的更改

c++ - 编译器警告 : lambda return type cannot be deduced

c++ - 从巨大的 txt 中读取二维矩阵

C++ new 运算符和错误检查

c++ - 为什么代码中会发生溢出(-2147483648)?

c++ - 实现线程间同步屏障的最佳方法是什么

c++ - Visual Studio 2010 不会自动链接来自依赖项的项目中的静态库,因为它应该是