问题:
下面的3个while循环包含已被注释掉的代码。我搜索(“TAG1”,“TAG2”和“TAG3”)以便于识别。我只是希望while循环在经过测试的条件变为真之前等待,然后再继续进行,同时尽可能地减少CPU资源。我首先尝试使用Boost条件变量,但是有一个竞争条件。将线程置于“x”微秒的 sleep 状态效率低下,因为无法精确计时唤醒时间。最后,boost::this_thread::yield()似乎没有任何作用。可能是因为我在双核系统上只有2个 Activity 线程。具体来说,如何使下面的三个标记区域更有效地运行,同时尽可能减少不必要的阻塞。
背景
客观的:
我有一个记录大量数据的应用程序。分析之后,我发现在日志记录操作(将文本或二进制文件记录到本地硬盘上的文件)上消耗了很多时间。我的目标是通过将非线程直接写调用替换为对线程缓冲流记录器的调用来减少logData调用的延迟。
探索的选项:
设计:
结果:
使用当前版本的Boost(1.55)编译
header
#ifndef BufferedLogStream_h
#define BufferedLogStream_h
#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"
#include "boost\thread\thread.hpp"
#include "boost\thread\locks.hpp"
#include "boost\thread\mutex.hpp"
#include "boost\thread\condition_variable.hpp"
#include <time.h>
using namespace std;
#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10
#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif
class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }
private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);
FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;
#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};
public:
void test();
private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif
#if(BENCHMARK)
public:
void runBenchmark();
private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();
char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};
#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif
#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark
#endif
实现
#include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}
void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}
void BufferedLogStream::flush() {
fflush(fp);
}
void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}
void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}
bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}
void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}
unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}
void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength = length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}
void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}
#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}
void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif
#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}
void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif
更新:2013年11月25日
我更新了下面的代码,使用boost::condition_variables,特别是Evgeny Panasyuk推荐的wait_for()方法。这样避免了不必要地一遍又一遍地检查相同的条件。我目前看到的是缓冲版本的运行时间大约是非缓冲/直接写入版本的1/6。这不是理想的情况,因为这两种情况都受硬盘(在我的情况下是2010年时代的SSD)的限制。我计划在硬盘不会成为瓶颈的环境中使用以下代码,并且大多数情况下(即使不是所有时间)缓冲区都应具有可用空间来容纳writeMessage请求。这使我想到了下一个问题。我应该把缓冲区多大?我不介意分配32 MB或64 MB以确保它永远不会满。该代码将在可以节省大量代码的系统上运行。凭直觉,我觉得静态分配一个32 MB的字符数组是个坏主意。是吗?无论如何,我希望当我为预期的应用程序运行以下代码时,logData()调用的等待时间将大大减少,这将显着减少总体处理时间。
如果有人发现任何使以下代码更好(更快,更健壮,更精简等)的方法,请告诉我。我感谢您的反馈。 Lazin,您的方法将比我在下面发布的方法更快或更有效吗?我有点喜欢只有一个缓冲区并使其足够大以至于几乎永远不会填满的想法。然后,我不必担心从不同的缓冲区读取数据。 Evgeny Panasyuk,我喜欢尽可能使用现有代码的方法,尤其是当它是现有的Boost库时。但是,我也看不到spcs_queue比下面的效率如何。我宁愿处理一个较大的缓冲区,也不愿处理许多较小的缓冲区,并且不得不担心将输入流拆分为输入,然后将其拼接回输出。您的方法将使我可以将格式从主线程转移到工作线程上。那是一个切肉刀的方法。但是我不确定是否会节省大量时间并实现全部利益,我将不得不修改我不拥有的代码。
//结束更新
最佳答案
一般解决方案。
我认为您必须查看Naggle algorithm。对于一个生产者和一个消费者,这看起来像这样:
这种模式将有助于实现低延迟要求,单个消息将立即写入磁盘,但是大批量将写入大量事件以 boost 吞吐量。
如果您的日志消息具有级别-您可以稍微改进此架构。所有错误消息都具有较高的优先级(级别),并且必须立即保存在磁盘上(因为它们很少见,但非常有值(value)),但是调试和跟踪消息的优先级较低,可以缓存以节省带宽(因为它们非常频繁,但是值(value)不高)作为错误和信息消息)。因此,当您编写
error
消息时,必须等到工作线程完成写消息(以及所有位于同一缓冲区中的消息)后再继续操作,但是调试和跟踪消息只能写到缓冲区中。线程。
为每个应用程序线程生成工作线程是很昂贵的。您必须为每个日志文件使用单个编写器线程。写缓冲区必须在线程之间共享。每个缓冲区必须具有两个指针-
commit_pointer
和prepare_pointer
。缓冲区开头和commit_pointer
之间的所有缓冲区空间均可用于工作线程。 commit_pointer
和prepare_pointer
之间的缓冲区空间当前由应用程序线程更新。不变的:commit_pointer
<= prepare_pointer
。写入操作可以分两步执行。
prepare_pointer
; prepare_pointer
值和len由使用者保存; commit_pointer
等于其保存在局部变量中的旧prepare_pointer
值为止。 commit_pointer
= commit_pointer
+ len来进行生产者提交写入操作。 为了防止错误共享,可以将len(message)舍入为缓存行大小,并且所有多余的空间都可以用空格填充。
// pseudocode
void write(const char* message) {
int len = strlen(message); // TODO: round to cache line size
const char* old_prepare_ptr;
// Prepare step
while(1)
{
old_prepare_ptr = prepare_ptr;
if (
CAS(&prepare_ptr,
old_prepare_ptr,
prepare_ptr + len) == old_prepare_ptr
)
break;
// retry if another thread perform prepare op.
}
// Write message
memcpy((void*)old_prepare_ptr, (void*)message, len);
// Commit step
while(1)
{
const char* old_commit_ptr = commit_ptr;
if (
CAS(&commit_ptr,
old_commit_ptr,
old_commit_ptr + len) == old_commit_ptr
)
break;
// retry if another thread commits
}
notify_worker_thread();
}
关于c++ - C++低延迟线程异步缓冲流(打算记录)– Boost,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20186859/