c++ - C++低延迟线程异步缓冲流(打算记录)– Boost

标签 c++ multithreading boost asynchronous low-latency

问题:

下面的3个while循环包含已被注释掉的代码。我搜索(“TAG1”,“TAG2”和“TAG3”)以便于识别。我只是希望while循环在经过测试的条件变为真之前等待,然后再继续进行,同时尽可能地减少CPU资源。我首先尝试使用Boost条件变量,但是有一个竞争条件。将线程置于“x”微秒的 sleep 状态效率低下,因为无法精确计时唤醒时间。最后,boost::this_thread::yield()似乎没有任何作用。可能是因为我在双核系统上只有2个 Activity 线程。具体来说,如何使下面的三个标记区域更有效地运行,同时尽可能减少不必要的阻塞。

背景

客观的:

我有一个记录大量数据的应用程序。分析之后,我发现在日志记录操作(将文本或二进制文件记录到本地硬盘上的文件)上消耗了很多时间。我的目标是通过将非线程直接写调用替换为对线程缓冲流记录器的调用来减少logData调用的延迟。

探索的选项:

  • 可以将2005年以前的慢速硬盘升级到SSD。成本并不高昂...但是涉及很多工作...将需要升级200多台计算机...
  • Boost ASIO ...我不需要所有的proactor/网络开销,而是希望找到更简单,更轻便的东西。

  • 设计:
  • 生产者和使用者线程模式,应用程序将数据写入缓冲区,然后后台线程随后将其写入磁盘。因此,最终目标是让应用程序层调用的writeMessage函数尽可能快地返回,而稍后某个时间将数据正确/完全以FIFO顺序正确/完整地记录到日志文件中。
  • 只有一个应用程序线程,只有一个写程序线程。
  • 基于环形缓冲区。做出此决定的原因是,尽可能少地使用锁,理想情况下...如果我错了,请更正我...我认为我不需要任何锁。
  • Buffer是一个静态分配的字符数组,但出于性能原因,如果需要/可以将其移动到堆中。
  • 缓冲区具有一个开始指针,该指针指向应写入文件的下一个字符。缓冲区的末尾指针指向要写入文件的最后一个字符之后的数组索引。结束指针从不通过开始指针。如果传入的消息大于缓冲区,则编写器将等待直到清空缓冲区,然后将新消息直接写到文件中,而不会在缓冲区中放入超大消息(一旦清空缓冲区,工作线程不会写任何东西,所以没有争用)。
  • 编写器(工作线程)仅更新环形缓冲区的开始指针。
  • 主线程(应用程序线程)仅更新环形缓冲区的结束指针,并且再次,它仅在有可用空间时才将新数据插入缓冲区...否则,它要么等待缓冲区中的空间可用,要么直接写为如上所述。
  • 工作线程不断检查是否有要写入的数据(由缓冲区开始指针!=缓冲区结束指针表示)。如果没有要写入的数据,则理想情况下,一旦应用程序线程向缓冲区中插入了某些内容(并更改了缓冲区的结束指针,使其不再指向与开始相同的索引),工作线程便应进入休眠状态并唤醒指针)。我下面的内容涉及while循环不断检查该条件。这是等待缓冲区的一种非常糟糕/低效的方法。

  • 结果:
  • 在我的2009年代带SSD的双核笔记本电脑上,我看到线程/缓冲基准测试与直接写入的总写入时间约为1:6(0.609秒与0.095秒),但是变化很大。通常,缓冲写入基准实际上比直接写入慢。我认为,这种可变性是由于执行不良的情况导致的:WAITING空间释放缓冲区中的空间,等待缓冲区为空以及使工作线程等待工作可用。我已经测量了一些while循环消耗了10000个以上的周期,并且我怀疑那些周期实际上是在争夺其他线程(工作人员或应用程序)完成正在等待的计算所需的硬件资源。
  • 输出似乎已 checkout 。启用TEST模式并在压力测试中使用10小缓冲区作为压力测试,我扩散了数百MB的输出,发现它等于输入。

  • 使用当前版本的Boost(1.55)编译

    header
        #ifndef BufferedLogStream_h
        #define BufferedLogStream_h
    
        #include <stdio.h>
        #include <iostream>
        #include <iostream>
        #include <cstdlib>
        #include "boost\chrono\chrono.hpp"
        #include "boost\thread\thread.hpp"
        #include "boost\thread\locks.hpp"
        #include "boost\thread\mutex.hpp"
        #include "boost\thread\condition_variable.hpp"
        #include <time.h>
    
        using namespace std;
    
        #define BENCHMARK_STR_SIZE 128
        #define NUM_BENCHMARK_WRITES 524288
        #define TEST 0
        #define BENCHMARK 1
        #define WORKER_LOOP_WAIT_MICROSEC 20
        #define MAIN_LOOP_WAIT_MICROSEC 10
    
        #if(TEST)
        #define BUFFER_SIZE 10 
        #else 
        #define BUFFER_SIZE 33554432 //4 MB
        #endif
    
        class BufferedLogStream {
            public:
                BufferedLogStream();
                void openFile(char* filename);
                void flush();
                void close();
                inline void writeMessage(const char* message, unsigned int length);
                void writeMessage(string message);
                bool operator() () { return start != end; }
    
            private:
                void threadedWriter();
                inline bool hasSomethingToWrite();
                inline unsigned int getFreeSpaceInBuffer();
                void appendStringToBuffer(const char* message, unsigned int length);
    
                FILE* fp;
                char* start;
                char* end;
                char* endofringbuffer;
                char ringbuffer[BUFFER_SIZE];
                bool workerthreadkeepalive;
                boost::mutex mtx;
                boost::condition_variable waitforempty;
                boost::mutex workmtx;
                boost::condition_variable waitforwork;
    
                #if(TEST)
                struct testbuffer {
                    int length;
                    char message[BUFFER_SIZE * 2];
                };
    
                public:
                    void test();
    
                private:
                    void getNextRandomTest(testbuffer &tb);
                    FILE* datatowrite;
                #endif
    
            #if(BENCHMARK)
                public:
                    void runBenchmark();
    
                private:
                    void initBenchmarkString();
                    void runDirectWriteBaseline();
                    void runBufferedWriteBenchmark();
    
                    char benchmarkstr[BENCHMARK_STR_SIZE];
            #endif
        };
    
        #if(TEST)
        int main() {
            BufferedLogStream* bl = new BufferedLogStream();
            bl->openFile("replicated.txt");
            bl->test();
            bl->close();
            cout << "Done" << endl;
            cin.get();
            return 0;
        }
        #endif
    
        #if(BENCHMARK)
        int main() {
            BufferedLogStream* bl = new BufferedLogStream();
            bl->runBenchmark();
            cout << "Done" << endl;
            cin.get();
            return 0;
        }
        #endif //for benchmark
    
        #endif
    

    实现
        #include "BufferedLogStream.h"
    
        BufferedLogStream::BufferedLogStream() {
            fp = NULL;
            start = ringbuffer;
            end = ringbuffer;
            endofringbuffer = ringbuffer + BUFFER_SIZE;
            workerthreadkeepalive = true;
        }
    
        void BufferedLogStream::openFile(char* filename) {
            if(fp) close();
            workerthreadkeepalive = true;
            boost::thread t2(&BufferedLogStream::threadedWriter, this);
            fp = fopen(filename, "w+b");
        }
    
        void BufferedLogStream::flush() {
            fflush(fp); 
        }
    
        void BufferedLogStream::close() {
            workerthreadkeepalive = false;
            if(!fp) return;
            while(hasSomethingToWrite()) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            flush();        
            fclose(fp);             
            fp = NULL;          
        }
    
        void BufferedLogStream::threadedWriter() {
            while(true) {
                if(start != end) {
                    char* currentend = end;
                    if(start < currentend) {
                        fwrite(start, 1, currentend - start, fp);
                    }
                    else if(start > currentend) {
                        if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                        fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                    }
                    start = currentend;
                    waitforempty.notify_one();
                }
                else { //start == end...no work to do
                    if(!workerthreadkeepalive) return;
                    boost::unique_lock<boost::mutex> u(workmtx);
                    waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
                }
            }
        }
    
        bool BufferedLogStream::hasSomethingToWrite() {
            return start != end;
        }
    
        void BufferedLogStream::writeMessage(string message) {
            writeMessage(message.c_str(), message.length());
        }
    
        unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
            if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
            if(end == start) return BUFFER_SIZE-1;
            return start - end - 1; //case where start > end
        }
    
        void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
            if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
                memcpy(end, message, length);
                end += length;
            }
            else {
                int lengthtoendofbuffer = endofringbuffer - end;
                if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
                int remainderlength =  length - lengthtoendofbuffer;
                memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
                end = ringbuffer + remainderlength;
            }
        }
    
        void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
            if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
                while(hasSomethingToWrite()); {
                    boost::unique_lock<boost::mutex> u(mtx);
                    waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
                }
                fwrite(message, 1, length, fp);
            }
            else {
                //wait until there is enough free space to insert new string
                while(getFreeSpaceInBuffer() < length) {
                    boost::unique_lock<boost::mutex> u(mtx);
                    waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
                }
                appendStringToBuffer(message, length);
            }
            waitforwork.notify_one();
        }
    
        #if(TEST)
            void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
                tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
                for(int i = 0; i < tb.length; i++) {
                    tb.message[i] = rand() % 26 + 65;
                }
                tb.message[tb.length] = '\n';
                tb.length++;
                tb.message[tb.length] = '\0';
            }
    
            void BufferedLogStream::test() {
                cout << "Buffer size is: " << BUFFER_SIZE << endl;
                testbuffer tb;
                datatowrite = fopen("orig.txt", "w+b");
                for(unsigned int i = 0; i < 7000000; i++) {
                    if(i % 1000000 == 0) cout << i << endl;
                    getNextRandomTest(tb);
                    writeMessage(tb.message, tb.length);
                    fwrite(tb.message, 1, tb.length, datatowrite);
                }       
                fflush(datatowrite);
                fclose(datatowrite);
            }
        #endif
    
        #if(BENCHMARK) 
            void BufferedLogStream::initBenchmarkString() {
                for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                    benchmarkstr[i] = rand() % 26 + 65;
                }
                benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
            }
    
            void BufferedLogStream::runDirectWriteBaseline() {
                clock_t starttime = clock();
                fp = fopen("BenchMarkBaseline.txt", "w+b");
                for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                    fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
                }   
                fflush(fp);
                fclose(fp);
                clock_t elapsedtime = clock() - starttime;
                cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
            }
    
            void BufferedLogStream::runBufferedWriteBenchmark() {
                clock_t starttime = clock();
                openFile("BufferedBenchmark.txt");
                cout << "Opend file" << endl;
                for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                    writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
                }   
                cout << "Wrote" << endl;
                close();
                cout << "Close" << endl;
                clock_t elapsedtime = clock() - starttime;
                cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
            }
    
            void BufferedLogStream::runBenchmark() {
                cout << "Buffer size is: " << BUFFER_SIZE << endl;
                initBenchmarkString();
                runDirectWriteBaseline();
                runBufferedWriteBenchmark();
            }
        #endif
    

    更新:2013年11月25日

    我更新了下面的代码,使用boost::condition_variables,特别是Evgeny Panasyuk推荐的wait_for()方法。这样避免了不必要地一遍又一遍地检查相同的条件。我目前看到的是缓冲版本的运行时间大约是非缓冲/直接写入版本的1/6。这不是理想的情况,因为这两种情况都受硬盘(在我的情况下是2010年时代的SSD)的限制。我计划在硬盘不会成为瓶颈的环境中使用以下代码,并且大多数情况下(即使不是所有时间)缓冲区都应具有可用空间来容纳writeMessage请求。这使我想到了下一个问题。我应该把缓冲区多大?我不介意分配32 MB或64 MB以确保它永远不会满。该代码将在可以节省大量代码的系统上运行。凭直觉,我觉得静态分配一个32 MB的字符数组是个坏主意。是吗?无论如何,我希望当我为预期的应用程序运行以下代码时,logData()调用的等待时间将大大减少,这将显着减少总体处理时间。

    如果有人发现任何使以下代码更好(更快,更健壮,更精简等)的方法,请告诉我。我感谢您的反馈。 Lazin,您的方法将比我在下面发布的方法更快或更有效吗?我有点喜欢只有一个缓冲区并使其足够大以至于几乎永远不会填满的想法。然后,我不必担心从不同的缓冲区读取数据。 Evgeny Panasyuk,我喜欢尽可能使用现有代码的方法,尤其是当它是现有的Boost库时。但是,我也看不到spcs_queue比下面的效率如何。我宁愿处理一个较大的缓冲区,也不愿处理许多较小的缓冲区,并且不得不担心将输入流拆分为输入,然后将其拼接回输出。您的方法将使我可以将格式从主线程转移到工作线程上。那是一个切肉刀的方法。但是我不确定是否会节省大量时间并实现全部利益,我将不得不修改我不拥有的代码。

    //结束更新

    最佳答案

    一般解决方案。

    我认为您必须查看Naggle algorithm。对于一个生产者和一个消费者,这看起来像这样:

  • 在开始缓冲区为空时,工作线程处于空闲状态并等待事件。
  • 生产者将数据写入缓冲区并通知工作线程。
  • 工作线程醒来并开始写操作。
  • 生产者尝试编写另一条消息,但是工作进程使用了​​缓冲区,因此生产者分配了另一个缓冲区并向其中写入消息。
  • 生产者尝试写入另一条消息,但I/O仍在进行中,因此生产者将消息写入先前分配的缓冲区。
  • 工作线程已完成将缓冲区写入文件的操作,并发现还有一个包含数据的缓冲区,因此它抓取并开始写入。
  • 生产者使用第一个缓冲区来写入所有连续的消息,直到进行第​​二个写入操作为止。

  • 这种模式将有助于实现低延迟要求,单个消息将立即写入磁盘,但是大批量将写入大量事件以 boost 吞吐量。

    如果您的日志消息具有级别-您可以稍微改进此架构。所有错误消息都具有较高的优先级(级别),并且必须立即保存在磁盘上(因为它们很少见,但非常有值(value)),但是调试和跟踪消息的优先级较低,可以缓存以节省带宽(因为它们非常频繁,但是值(value)不高)作为错误和信息消息)。因此,当您编写error消息时,必须等到工作线程完成写消息(以及所有位于同一缓冲区中的消息)后再继续操作,但是调试和跟踪消息只能写到缓冲区中。

    线程。

    为每个应用程序线程生成工作线程是很昂贵的。您必须为每个日志文件使用单个编写器线程。写缓冲区必须在线程之间共享。每个缓冲区必须具有两个指针-commit_pointerprepare_pointer。缓冲区开头和commit_pointer之间的所有缓冲区空间均可用于工作线程。 commit_pointerprepare_pointer之间的缓冲区空间当前由应用程序线程更新。不变的:commit_pointer <= prepare_pointer

    写入操作可以分两步执行。
  • 准备写。此操作在缓冲区中保留空间。
  • 生产者计算len(message)并原子更新prepare_pointer
  • 旧的prepare_pointer值和len由使用者保存;
  • 提交写入。
  • 生产者在保留的缓冲区空间(旧的prepare_pointer值)的开头写入消息。
  • 生产者忙于等待,直到commit_pointer等于其保存在局部变量中的旧prepare_pointer值为止。
  • 通过自动执行commit_pointer = commit_pointer + len来进行生产者提交写入操作。

  • 为了防止错误共享,可以将len(message)舍入为缓存行大小,并且所有多余的空间都可以用空格填充。
    // pseudocode
    void write(const char* message) {
        int len = strlen(message);  // TODO: round to cache line size
        const char* old_prepare_ptr;
        // Prepare step
        while(1) 
        {
            old_prepare_ptr = prepare_ptr;
            if (
                CAS(&prepare_ptr, 
                     old_prepare_ptr, 
                     prepare_ptr + len) == old_prepare_ptr
                )
                break;
            // retry if another thread perform prepare op.
        }
        // Write message
        memcpy((void*)old_prepare_ptr, (void*)message, len);
        // Commit step
        while(1)
        {
            const char* old_commit_ptr = commit_ptr;
            if (
                 CAS(&commit_ptr, 
                      old_commit_ptr, 
                      old_commit_ptr + len) == old_commit_ptr
                )
                break;
            // retry if another thread commits
        }
        notify_worker_thread();
    }
    

    关于c++ - C++低延迟线程异步缓冲流(打算记录)– Boost,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20186859/

    相关文章:

    c++ - 处理二叉搜索树中重复键的性能规范

    c++ - 根据点之间的距离排序?

    java - 资源为空时锁定线程

    c++ - boost day_clock::local_day() 难题

    同步 LinkedList 中的 java.util.NoSuchElementException

    android - 如何解决NDK库调用卡住UI线程的问题

    c++ - boost ASIO async_write "Vector iterator not dereferencable"

    c++ - 列表框上下文菜单

    c++ - 通过推理返回值的通用长度元组

    c++ - std::condition_variable 和 std::condition_variable_any 有什么区别?