c++ - 使用线程池并行化函数会使其变慢: why?

标签 c++ threadpool

我正在运行一个数据库,而不是在 RocksDB 上运行。我有一个 find 函数,它在参数中接受查询,迭代数据库中的所有文档,并返回与查询匹配的文档。我想并行化这个函数,以便将工作分散到多个线程上。

为了实现这一点,我尝试使用 ThreadPool :我将循环代码移至 lambda 中,并为每个文档向线程池添加了一个任务。循环结束后,每个结果都由主线程处理。

当前版本(单线程):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    bson_t document;
    rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next())
    {
        const char* bson_data = (const char*)it->value().data();
        int bson_length = it->value().size();
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }
        bson_init_static(&document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, &document))
        {
            ++count;

            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t projected;
                bson_init(&projected);

                mongoc_matcher_projection_execute_noop(
                    &document,
                    projection,
                    &projected,
                    &error,
                    NULL
                );

                callback(nullptr, &projected);
            }
            else
            {
                callback(nullptr, &document);
            }

            if (limit >= 0 && count >= limit)
            {
                break;
            }
        }
    }
    delete it;

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

新版本(多线程):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bool limit_reached = limit == 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
    {
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }

        bson_t* document = new bson_t();

        bson_init_static(document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
        {
            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t* projected = new bson_t();
                bson_init(projected);

                mongoc_matcher_projection_execute_noop(
                    document,
                    projection,
                    projected,
                    &error,
                    NULL
                );

                delete document;

                return projected;
            }
            else
            {
                return document;
            }
        }
        else
        {
            delete document;

            return nullptr;
        }

    };

    const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());

    ThreadPool pool(WORKER_COUNT);
    std::vector<std::future<bson_t*>> futures;

    bson_t document;
    rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
    for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
    {
        const char* bson_data = (const char*)db_it->value().data();
        int bson_length = db_it->value().size();

        futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
    }
    delete db_it;

    for (auto it = futures.begin(); it != futures.end(); ++it)
    {
        bson_t* result = it->get();

        if (result)
        {
            count += 1;

            if (limit < 0 || count < limit)
            {
                callback(nullptr, result);
            }

            delete result;
        }
    }

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

  • 通过简单的文档和查询,单线程版本在我的机器上0.5秒处理100万个文档。
  • 使用相同的文档和查询,多线程版本可以在 3.3 秒内处理 100 万个文档。

令人惊讶的是,多线程版本要慢得多。此外,我测量了执行时间,75% 的时间花在 for 循环上。所以基本上 futures.push_back(pool.enqueue(process_document, bson_data, bson_length)); 行占用了 75% 的时间。

我做了以下事情:

  • 我检查了 WORKER_COUNT 的值,在我的机器上为 6。
  • 我尝试添加 futures.reserve(1000000),认为 vector 重新分配可能有问题,但它没有改变任何内容。
  • 我尝试删除动态内存分配 (bson_t* document = new bson_t();),但它并没有显着改变结果。

所以我的问题是:我是否做错了什么,导致多线程版本比单线程版本慢?

我目前的理解是,线程池的同步操作(当任务入队和出队时)只是消耗了大部分时间,解决方案是更改数据结构。想法?

最佳答案

并行化有开销。

在单线程版本中处理每个文档大约需要 500 纳秒。将工作委托(delegate)给线程池需要进行大量的簿记工作(既要委托(delegate)工作,又要在之后进行同步),并且所有这些簿记工作很可能需要每个作业超过 500 纳秒。

假设您的代码正确,则每个作业的簿记大约需要 2800 纳秒。为了从并行化中获得显着的加速,您需要将工作分成更大的 block 。

我建议尝试一次批量处理 1000 个文档。每个 future 不再只对应 1 个文档,而是对应 1000 个文档。

其他优化

如果可能,请避免不必要的复制。如果某些内容被大量复制,请查看是否可以通过引用而不是通过值来捕获它。

关于c++ - 使用线程池并行化函数会使其变慢: why?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55807718/

相关文章:

c++ - 为什么要使用两个运算符来删除动态分配的内存?

c++ - 使用 RotatedRect 跟踪手部旋转

java - 有没有办法取消并重用ExecutorService?

c++ - 我应该提到纯虚函数中的调用约定吗?

c++ - 您可以将 Ada 泛型函数导出到 C++ 吗?

c++ - 2d map 上 2 点之间的航点生成器

c# - 为什么对级联 ManualResetEvent 的多次等待会使执行时间增加三倍?

java - 两个ExecutorServices可以共享一个线程池吗?

java - 返回第一个结束任务的Java中的任务执行器

node.js - Node 实际创建了多少个线程?