c++ - 从线程池工作线程使用时 GetQueuedCompletionStatus 的奇怪行为

标签 c++ windows winapi threadpool

我一直在测试将 IO 完成端口与线程池中的工作线程相结合,并偶然发现了一个我无法解释的行为。特别是,虽然下面的代码:

  int data;
  for (int i = 0; i < NUM; ++i)
      PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));

  {
      std::thread t([&] ()
      {
            LPOVERLAPPED aux;
            DWORD        cmd;
            ULONG_PTR    key;

            for (int i = 0; i < NUM; ++i)
            {
              if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0))
                break;
              ++count;
            }
      });

      t.join();
   }

工作得很好并接收 NUM 状态通知(NUM 是一个大数字,100000 或更多),使用线程池工作对象的类似代码读取每个工作项的一个状态通知并在阅读后重新发布工作项,阅读数百个状态通知后失败。具有以下全局变量(请不要介意名称):

HANDLE cport;
PTP_POOL pool;
TP_CALLBACK_ENVIRON env;
PTP_WORK work;
std::size_t num_calls;
std::mutex mutex;
std::condition_variable cv; 
bool job_done;

和回调函数:

static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_)
{
  LPOVERLAPPED aux;
  DWORD        cmd;
  ULONG_PTR    key;

  if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0))
  {
    ++num_calls;
    SubmitThreadpoolWork(work);
  }
  else
  {
    std::unique_lock<std::mutex> l(mutex);
    std::cout << "No work after " << num_calls << " calls.\n";
    job_done = true;
    cv.notify_one();
  }
}

以下代码:

{
  job_done = false;
  std::unique_lock<std::mutex> l(mutex);

  num_calls = 0;
  cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);

  pool = CreateThreadpool(nullptr);
  InitializeThreadpoolEnvironment(&env);
  SetThreadpoolCallbackPool(&env, pool);

  work = CreateThreadpoolWork(callback, nullptr, &env);

  for (int i = 0; i < NUM; ++i)
      PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));

  SubmitThreadpoolWork(work);
  cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; } );
}

会在 250 次左右调用 GetQueuedCompletionStatus 后报告“No more work after ...”,尽管 NUM 设置为 1000000。更奇怪的是,将等待时间从 0 设置为 10 毫秒会增加成功调用了数十万次,偶尔会阅读所有 1000000 条通知。我不太明白,因为所有状态通知都是在第一次提交工作对象之前发布的。

有没有可能是完成端口和线程池的组合真的有问题,还是我的代码有问题?请不要深入探讨我为什么要这样做 - 我正在调查各种可能性并偶然发现了这一点。在我看来,它应该可以工作,但不知道出了什么问题。谢谢。

最佳答案

我试过运行这段代码,问题似乎是提供给 CreateIoCompletionPortNumberOfConcurrentThreads 参数。传递 1 意味着执行 callback 的第一个池线程与 io 完成端口相关联,但由于线程池可能会使用不同的线程 GetQueuedCompletionStatus 执行 callback 将发生这种情况时失败。 From documentation :

The most important property of an I/O completion port to consider carefully is the concurrency value. The concurrency value of a completion port is specified when it is created with CreateIoCompletionPort via the NumberOfConcurrentThreads parameter. This value limits the number of runnable threads associated with the completion port. When the total number of runnable threads associated with the completion port reaches the concurrency value, the system blocks the execution of any subsequent threads associated with that completion port until the number of runnable threads drops below the concurrency value.

Although any number of threads can call GetQueuedCompletionStatus for a specified I/O completion port, when a specified thread calls GetQueuedCompletionStatus the first time, it becomes associated with the specified I/O completion port until one of three things occurs: The thread exits, specifies a different I/O completion port, or closes the I/O completion port. In other words, a single thread can be associated with, at most, one I/O completion port.

因此,要将 io 完成与线程池一起使用,您需要将并发线程数设置为线程池的大小(您可以使用 SetThreadpoolThreadMaximum 进行设置)。

::DWORD const threads_count{1};

cport = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threads_count);
...
pool = ::CreateThreadpool(nullptr);
::SetThreadpoolThreadMaximum(pool, threads_count);

关于c++ - 从线程池工作线程使用时 GetQueuedCompletionStatus 的奇怪行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45563497/

相关文章:

python - 使用 GUI 运行 Python 脚本时如何摆脱 "Command Line"窗口?

windows - 使用此命令时Hadoop错误hadoop fs -mkdir/in

c# - 如何使用 C# 列出可用的视频模式?

c++ - WM_GETFONT 返回类型 LRESULT 而不是预期的 HFONT

c++ - 派生类(构造函数有参数)和基类(构造函数没有参数)之间没有可行的转换

c++ - 访问数组中所有结构的成员

c++ - 正则表达式+编译 boost ?

c++ - 当 C++ 中的数组大小未知时,如何在运行时将对字符串数组的引用作为函数参数传递?

c++ - 如何使用小于 8 GB 的内存编译带有模板的余弦表?

c++ - 线程调用成员函数