multithreading - 为什么我在多线程中解析Google Protocol Buffer 的消息时很慢?

标签 multithreading protocol-buffers

我尝试从调用 SerializeToString 生成的二进制文件中解析许多 Google Protocol Buffer 消息。我首先通过调用 new 函数将所有字节加载到堆内存中。我还有两个数组来存储堆内存中消息的字节开始地址和消息的字节数。 然后我开始通过调用 ParseFromString 来解析消息。我想通过使用多线程来加快过程。 在每个线程中,我传递地址数组和字节计数数组的开始索引和结束索引。

在父进程中。主要代码为:

struct ParsePara
{
    char* str_buffer;
    size_t* buffer_offset;
    size_t* binary_string_length_array;
    size_t start_idx;
    size_t end_idx;
    Flight_Ticket_Info* ticket_info_buffer_array;
};

//Flight_Ticket_Info is class of message
//offset_size is the count of message
ticket_array = new Flight_Ticket_Info[offset_size];
const int max_thread_count = 6;
pthread_t pthread_id_vec[max_thread_count];

CTimer thread_cost;
thread_cost.start();

vector<ParsePara*> para_vec;
const size_t each_count = ceil(float(offset_size) / max_thread_count);
for (size_t k = 0;k < max_thread_count;k++)
{   
    size_t start_idx = each_count * k;
    size_t end_idx = each_count * (k+1);

    if (start_idx >= offset_size)
        break;

    if (end_idx >= offset_size)
        end_idx = offset_size;

    ParsePara* cand_para_ptr = new ParsePara();

    if (!cand_para_ptr)
    {   
        _ERROR_EXIT(0,"[Malloc memory fail.]");
    }   

    cand_para_ptr->str_buffer = m_valdata;//heap memory for storing Bytes of message
    cand_para_ptr->buffer_offset = offset_array;//begin address of each message
    cand_para_ptr->start_idx = start_idx;
    cand_para_ptr->end_idx = end_idx;
    cand_para_ptr->ticket_info_buffer_array = ticket_array;//array to store message
    cand_para_ptr->binary_string_length_array = binary_length_array;//Bytes count of each message

    para_vec.push_back(cand_para_ptr);
}   

for(size_t k = 0 ;k < para_vec.size();k++)
{
    int ret = pthread_create(&pthread_id_vec[k],NULL,parserFlightTicketForMultiThread,para_vec[k]);

    if (0 != ret)
    {
        _ERROR_EXIT(0,"[Error] [create thread fail]");
    }
}

for (size_t k = 0;k < para_vec.size();k++)
{
    pthread_join(pthread_id_vec[k],NULL);
}

每个线程中的线程函数是:

    void* parserFlightTicketForMultiThread(void* void_para_ptr)
{
    ParsePara* para_ptr = (ParsePara*) void_para_ptr;

    parserFlightTicketForMany(para_ptr->str_buffer,para_ptr->ticket_info_buffer_array,para_ptr->buffer_offset,
            para_ptr->start_idx,para_ptr->end_idx,para_ptr->binary_string_length_array);
}

void parserFlightTicketForMany(const char* str_buffer,Flight_Ticket_Info* ticket_info_buffer_array,
        size_t* buffer_offset,const size_t start_idx,const size_t end_idx,size_t* binary_string_length_array)
{
    printf("start_idx:%d,end_idx:%d\n",start_idx,end_idx);
    for (size_t k = start_idx;k < end_idx;k++)
    {
        if (k % 100000 == 0)
            cout << k << endl;

        size_t cand_offset = buffer_offset[k];
        size_t binary_length = binary_string_length_array[k];
    ticket_info_buffer_array[k].ParseFromString(string(&str_buffer[cand_offset],binary_length-1));
    }
    printf("done %ld %ld\n",start_idx,end_idx);
}

但是多线程的成本比一个线程要多。 一个线程的成本是:40455623ms 我的电脑是8核六线程的成本是:131586865ms

有人可以帮助我吗?谢谢!

最佳答案

一些可能的问题 - 您必须进行实验以确定哪些问题:

  • Protobuf 解析速度通常受到内存带宽而不是 CPU 时间的限制,尤其是在输入数据集较大的情况下。在这种情况下,更多线程将无济于事,因为所有核心都共享主内存的带宽。事实上,多个核心争夺内存带宽可能会使整体运行速度变慢。请注意,最大的内存消耗不是输入字节,而是解析数据对象 - 即解析的输出 - 它比输入字节大很多倍编码数据。要改善此问题,请考虑编写解析循环,以便它在解析后立即完全处理每条消息,然后再继续处理文本消息。这样,您就不需要分配 k 个 protobuf 对象,而只需为每个线程分配一个 protobuf 对象,并重复使用同一个对象进行解析。这样,对象将(可能)保留在核心的私有(private) L1 缓存中,并避免消耗内存带宽;仅输入字节将通过主总线读取。
  • 如何将数据加载到 RAM 中?你是read()进入一个大数组还是mmap()?在后一种情况下,数据是从磁盘延迟读取的——直到您真正尝试解析它时才会发生。即使在 read() 情况下,数据也可能已被换出,从而产生类似的效果。无论哪种方式,您的线程现在不仅要争夺内存带宽,还要争夺磁盘带宽,这当然要慢得多。让六个线程读取大文件的不同部分总体上肯定会比让一个线程读取整个文件慢,因为操作系统针对顺序访问进行了优化。
  • Protobuf 在解析期间分配内存。许多内存分配器在分配新内存时会锁定。由于所有线程都在紧密循环中分配大量对象,因此它们将争夺此锁。确保您使用的是线程友好的内存分配器,例如 Google 的 tcmalloc。请注意,在解析-消费循环中重复重用相同的 protobuf 对象而不是分配大量不同的对象也会有很大帮助,因为 protobuf 对象会自动为子对象重用内存。
  • 您的代码中可能存在错误,并且在多线程时它可能根本不符合您的预期。例如,错误可能导致所有线程处理相同的数据,而不是不同的数据,并且它们选择的数据可能恰好更大。确保您测试的代码结果在运行单线程与多线程时完全相同。

简而言之,如果您希望多个核心使您的代码更快,您不仅要考虑每个核心在做什么,还要考虑每个核心进出哪些数据,以及核心需要进行多少通信对彼此。理想情况下,您希望每个核心都独立运行,而不与任何人或任何事物交谈;然后你就得到了最大的并行度。当然,这通常是不可能的,但越接近越好。

顺便说一句,为您随机优化:

ParseFromString(string(&str_buffer[cand_offset],binary_length-1))

将其替换为:

ParseFromArray(&str_buffer[cand_offset],binary_length-1)

std::string 处创建会生成数据副本,这会浪费时间(和内存带宽)。 (但这并不能解释为什么线程很慢。)

关于multithreading - 为什么我在多线程中解析Google Protocol Buffer 的消息时很慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31787934/

相关文章:

c - 多线程应用程序中的错误处理

java - 如何在通过ExecutorService生成的线程名称中添加前缀

multithreading - 测试高并发工作线程系统的正确方法是什么?

Java : Proto parser library

java - Netty + Protocol Buffers Java <-> C 通信问题

android - 从 Android sqlite 数据库中检索大 blob

c# - 单元测试在调试版本中通过但在发布版本中失败

c - pthread_cancel 返回 EINPROGRESS

grails - 在Grails中用新版本替换全局库

java - 高效地将 Java 列表转换为 Matlab 矩阵