异步日志实现,异步import
终极管理员 知识笔记 76阅读
AsyncLogging是muduo的日志程序如果直接让文件写日志可能会发生阻塞muduo前端设计了2个BufferPtr分别是currentBuffer_和nextBuffer_还有一个存放BufferPtr的vector(buffers_)。
多个前端线程往currentBuffer_写数据currentBuffer_写满了将其放入buffers_通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
后端也有2个BufferPtr分别为newBuffer1和newBuffer2还有一个BufferVector(buffersToWrite)。后端线程在收到前端通知之后利用buffersToWrite和buffers_进行交换并且用newBuffer1和newBuffer2归还给前端的currentBuffer_和nextBuffer_然后把日志写入文件。

muduo日志文件只提供写入本地文件
后端线程写入条件:

// Use of this source code is governed by a BSD-style license// that can be found in the License file.//// Author: Shuo Chen (chenshuo at chenshuo dot com)#ifndef MUDUO_BASE_ASYNCLOGGING_H#define MUDUO_BASE_ASYNCLOGGING_H#include muduo/base/BlockingQueue.h#include muduo/base/BoundedBlockingQueue.h#include muduo/base/CountDownLatch.h#include muduo/base/Mutex.h#include muduo/base/Thread.h#include muduo/base/LogStream.h#include <atomic>#include <vector>namespace muduo{class AsyncLogging : noncopyable{ public: AsyncLogging(const string& basename, off_t rollSize, int flushInterval 3); ~AsyncLogging() { if (running_) { stop(); } } void append(const char* logline, int len); void start() { running_ true; thread_.start(); latch_.wait();//保证后端线程已经开始运行 } void stop() NO_THREAD_SAFETY_ANALYSIS { running_ false; cond_.notify(); thread_.join(); } private: void threadFunc(); typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; typedef std::vector<std::unique_ptr<Buffer>> BufferVector; typedef BufferVector::value_type BufferPtr; //存储超时时间变量 const int flushInterval_; //后端线程启动标志 std::atomic<bool> running_; //日志文件名 const string basename_; //预留的日志大小 const off_t rollSize_; //调用后端写入线程 muduo::Thread thread_; //该变量保证后端日志写入线程已经运行 muduo::CountDownLatch latch_; //条件变量与互斥锁,用来保证前端线程与后端线程的线程同步 muduo::MutexLock mutex_; muduo::Condition cond_ GUARDED_BY(mutex_); //前端线程当前写入缓冲区 BufferPtr currentBuffer_ GUARDED_BY(mutex_); //前端线程下一个备用缓冲区 BufferPtr nextBuffer_ GUARDED_BY(mutex_); //带写入文件已填满的缓冲区队列 BufferVector buffers_ GUARDED_BY(mutex_);};} // namespace muduo#endif // MUDUO_BASE_ASYNCLOGGING_H
AsyncLogging.cc 前端线程
前端在生成一条日志消息的时候会调用AsyncLogging::append()。在这个函数中如果当前缓冲currentBuff_剩余的空间足够大则会直接把日志消息拷贝追加到当前缓冲中这是最常见的情况。
这里拷贝一条日志消息并不会带来多大开销。前后端代码的其余部分都没有拷贝而是简单的指针交换。否则说明当前缓冲已经写满就把它送入移入buffers并试图把预备好的另一块缓冲nextBuffer_移用move为当前缓冲然后追加日志消息并通知唤醒后端开始写入日志数据。
以上两种情况在临界区之内都没有耗时的操作运行时间为常数。
如果前端写入速度太快一下子把两块缓冲都用完了那么只好分配一块新的buffer作为当前缓冲这是极少发生的情况
后端线程
首先准备好两块空闲的buffer以备在临界区内交换。
在临界区内等待条件触发这里的条件有两个其一是超时其二是前端写满了一个或多个buffer。
注意这里是非常规的conditionvariable用法它没有使用while循环而且等待时间有上限。当“条件”满足时先将当前缓冲currentBuffe_移入buffers_并立刻将空闲的newBuffer1移为当前缓冲。
注意这整段代码位于临界区之内因此不会有任何race condition。接下来将buffer_与buffersToWrite交换后面的代码可以在临界区之外安全地访问buffersToWrite将其中的日志数据写入文件。
临界区里最后干的一件事情是用newBuffer2替换nextBuffer这样前端始终有一个预备buffer可供调配。nextBuffer_可以减少前端临界区分配内存的概率缩短前端临界区长度。注意到后端临界区内也没有耗时的操作运行时间为常数。会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
最后这四个缓冲在程序启动的时候会全部填充为0这样可以避免程序热身时page fault引发性能不稳定。
// Use of this source code is governed by a BSD-style license// that can be found in the License file.//// Author: Shuo Chen (chenshuo at chenshuo dot com) #include muduo/base/AsyncLogging.h#include muduo/base/LogFile.h#include muduo/base/Timestamp.h #include <stdio.h> using namespace muduo; //异步日志AsyncLogging::AsyncLogging(const string& basename, off_t rollSize, int flushInterval) : flushInterval_(flushInterval),//后端线程超时时间变量,默认为3s running_(false),//后端线程启动标志 basename_(basename),//设置输出日志文件名 rollSize_(rollSize),// 预留的日志大小 thread_(std::bind(&AsyncLogging::threadFunc, this), Logging),// 执行该异步日志记录器的线程 latch_(1),//该变量作用是保证后端线程已进入 mutex_(), cond_(mutex_), currentBuffer_(new Buffer), //当前缓冲区 nextBuffer_(new Buffer),//预备缓冲区 buffers_()//缓冲区队列{ currentBuffer_->bzero(); //清空 nextBuffer_->bzero();//清空 buffers_.reserve(16);//设置缓冲区队列大小为16 } //所有LOG_*最终都会调用append函数void AsyncLogging::append(const char* logline, int len){ muduo::MutexLockGuard lock(mutex_); if (currentBuffer_->avail() > len) // 如果当前buffer还有空间就添加到当前日志 { currentBuffer_->append(logline, len);//调用vector的append } else { //将使用完后的buffer添加到 buffer vector后 buffers_.push_back(std::move(currentBuffer_)); if (nextBuffer_) // 重新设置当前buffer { currentBuffer_ std::move(nextBuffer_); } else { currentBuffer_.reset(new Buffer); // Rarely happens //如果前端写入速度太快了一下子把两块缓冲都用完了那么只好分配一块新的buffer,作当前缓冲这是极少发生的情况 } currentBuffer_->append(logline, len); // 通知日志线程有数据可写 cond_.notify(); }} void AsyncLogging::threadFunc() // 线程调用的函数主要用于周期性的flush数据到日志文件中{ assert(running_ true); latch_.countDown(); LogFile output(basename_, rollSize_, false);//打开日志文件 BufferPtr newBuffer1(new Buffer); //这两个是后台线程的buffer BufferPtr newBuffer2(new Buffer); newBuffer1->bzero();//clear newBuffer2->bzero();//clear BufferVector buffersToWrite;//用来和前台线程的buffers_进行swap buffersToWrite.reserve(16);//预留空间 while (running_) { assert(newBuffer1 && newBuffer1->length() 0); assert(newBuffer2 && newBuffer2->length() 0); assert(buffersToWrite.empty()); { muduo::MutexLockGuard lock(mutex_); //如果buffer为空那么表示没有数据需要写入文件那么就等待指定的时间默认三秒 if (buffers_.empty()) // unusual usage! { cond_.waitForSeconds(flushInterval_); } //无论cond是因何而醒来都要将currentBuffer_放到buffers_中。 //如果是因为时间到而醒那么currentBuffer_还没满此时也要将之写入LogFile中。 //如果已经有一个前台buffer满了那么在前台线程中就已经把一个前台buffer放到buffers_中 //了。此时还是需要把currentBuffer_放到buffers_中注意前后放置是不同的buffer //因为在前台线程中currentBuffer_已经被换成nextBuffer_指向的buffer了 buffers_.push_back(std::move(currentBuffer_)); //currentBuffer_是当前缓冲区 /*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区 currentBuffer_ std::move(newBuffer1); //使用新的未使用的 buffersToWrite 交换 buffers_将buffers_中的数据在异步线程中写入LogFile中 buffersToWrite.swap(buffers_);//内部指针交换而非复制 if (!nextBuffer_) { nextBuffer_ std::move(newBuffer2);/*-----假如需要归还第二个----*/ } } assert(!buffersToWrite.empty()); // 如果将要写入文件的buffer列表中buffer的个数大于25那么将多余数据删除 // 消息堆积 //前端陷入死循环拼命发送日志消息超过后端的处理能力 //这是典型的生产速度超过消费速度会造成数据在内存中的堆积 //严重时引发性能问题(可用内存不足), //或程序崩溃(分配内存失败) if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, Dropped log messages at %s, %zd larger buffers\n, Timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); // 丢掉多余日志以腾出内存仅保留两块缓冲区 buffersToWrite.erase(buffersToWrite.begin()2, buffersToWrite.end()); } // 将buffersToWrite的数据写入到日志中 for (const auto& buffer : buffersToWrite) { // FIXME: use unbuffered stdio FILE ? or use ::writev ? output.append(buffer->data(), buffer->length()); } // 重新调整buffersToWrite的大小 if (buffersToWrite.size() > 2) { // drop non-bzero-ed buffers, avoid trashing buffersToWrite.resize(2); } if (!newBuffer1) { assert(!buffersToWrite.empty()); // 从buffersToWrite中弹出一个作为newBuffer1 newBuffer1 std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer1->reset();// 清理newBuffer1 } //前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer if (!newBuffer2) { assert(!buffersToWrite.empty()); newBuffer2 std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer2->reset(); } buffersToWrite.clear(); output.flush();//刷新日志文件(写入) } output.flush();}
如果日志消息堆积怎么办 万一前端陷入死循环拼命发送日志消息超过后端的处理输出能力会导致什么后果对于同步日志来说这不是问题因为阻塞IO自然就限制了前端的写入速度起到了节流阀的作用。但是对于异步日志来说这就是典型的生产速度高于消费速度问题会造成数据在内存中堆积严重时引发性能问题可用内存不足或程序崩溃分配内存失败。
muduo日志库处理日志堆积的方法很简单直接丢掉多余的日志buffer以腾出内存见这样可以防止日志库本身引起程序故障是一种自我保护措施。