多路复用 select poll epoll,多路复用技术的原理
墨初 知识笔记 139阅读
简介
主要接口 poll
update
poller是I/O多路复用接口抽象虚基类,对I/O多路复用API的封装,muduo提供了EPollPoller和PollPoller派生类(epoll和poll),所以不支持select.
newDefaultPoller()默认选择epoll

是Poller的核心功能使用派生类的poll或者epoll_wait来阻塞等待IO事件发生
通过派生类的实现来填充EventLoop的activeChannelList_
工厂函数创建一个Poller实例
在EpollPoller中每个实例对应一个epollfd

更新I/O多路复用的状态例如epoll_ctl的ADDMODDEL
主要成员 loop控制当前Poller的EventLoop指针
其余成员由派生类实现
#ifndef MUDUO_NET_POLLER_H#define MUDUO_NET_POLLER_H#include <map>#include <vector>#include muduo/base/Timestamp.h#include muduo/net/EventLoop.hnamespace muduo{namespace net{class Channel;////// Base class for IO Multiplexing////// This class doesnt own the Channel objects.class Poller : noncopyable{ public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); virtual ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) 0; /// Changes the interested I/O events. /// Must be called in the loop thread. virtual void updateChannel(Channel* channel) 0; /// Remove the channel, when it destructs. /// Must be called in the loop thread. virtual void removeChannel(Channel* channel) 0; //判断是否存在 virtual bool hasChannel(Channel* channel) const; //创建一个poller,默认是epoll static Poller* newDefaultPoller(EventLoop* loop); void assertInLoopThread() const { ownerLoop_->assertInLoopThread(); } protected: typedef std::map<int, Channel*> ChannelMap; ChannelMap channels_; private: EventLoop* ownerLoop_;};} // namespace net} // namespace muduo#endif // MUDUO_NET_POLLER_H
poller.cc #include muduo/net/Poller.h#include muduo/net/Channel.husing namespace muduo;using namespace muduo::net;Poller::Poller(EventLoop* loop) : ownerLoop_(loop){}Poller::~Poller() default;bool Poller::hasChannel(Channel* channel) const{ assertInLoopThread(); ChannelMap::const_iterator it channels_.find(channel->fd()); return it ! channels_.end() && it->second channel;}
EPollPoller.h #ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H#define MUDUO_NET_POLLER_EPOLLPOLLER_H#include muduo/net/Poller.h#include <vector>struct epoll_event;namespace muduo{namespace net{////// IO Multiplexing with epoll(4).///class EPollPoller : public Poller{ public: EPollPoller(EventLoop* loop); ~EPollPoller() override; Timestamp poll(int timeoutMs, ChannelList* activeChannels) override; void updateChannel(Channel* channel) override; void removeChannel(Channel* channel) override; private: static const int kInitEventListSize 16; static const char* operationToString(int op); void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; void update(int operation, Channel* channel); typedef std::vector<struct epoll_event> EventList; int epollfd_; EventList events_;};} // namespace net} // namespace muduo#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EPollPoller.cc // Copyright 2010, Shuo Chen. All rights reserved.// Use of this source code is governed by a BSD-style license// that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include muduo/net/poller/EPollPoller.h #include muduo/base/Logging.h#include muduo/net/Channel.h #include <assert.h>#include <errno.h>#include <poll.h>#include <sys/epoll.h>#include <unistd.h> using namespace muduo;using namespace muduo::net; /*struct epoll_event{ uint32_t events; //Epoll events epoll_data_t data; //User data variable} __attribute__ ((__packed__)); typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64;} epoll_data_t;*/ // On Linux, the constants of poll(2) and epoll(4)// are expected to be the same.static_assert(EPOLLIN POLLIN, epoll uses same flag values as poll);static_assert(EPOLLPRI POLLPRI, epoll uses same flag values as poll);static_assert(EPOLLOUT POLLOUT, epoll uses same flag values as poll);static_assert(EPOLLRDHUP POLLRDHUP, epoll uses same flag values as poll);static_assert(EPOLLERR POLLERR, epoll uses same flag values as poll);static_assert(EPOLLHUP POLLHUP, epoll uses same flag values as poll); namespace{const int kNew -1; //channel尚未添加到poller中const int kAdded 1; //已经添加了const int kDeleted 2; //之前监听过了后来移除了监听} //当flag EPOLL_CLOEXEC创建的epfd会设置FD_CLOEXEC//FD_CLOEXEC表示当程序执行exec函数时本fd将被系统自动关闭,表示不传递给exec创建的新进程EPollPoller::EPollPoller(EventLoop* loop) : Poller(loop),//创建epollfd使用带1的版本//如果参数为0,则与epoll_create版本相同,设置为O_CLOEXEC,查看open函数的这个参数解释,//子进程fork并调用exec时会关闭这个fd epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) //vector这样用时初始化kInitEventListSize个大小空间,默认16{ if (epollfd_ < 0) //在构造函数中判断<0就abort() { LOG_SYSFATAL << EPollPoller::EPollPoller; }} EPollPoller::~EPollPoller(){ ::close(epollfd_);} Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList是一个存放channel的vector{ LOG_TRACE << fd total count << channels_.size(); int numEvents ::epoll_wait(epollfd_, &*events_.begin(), //events_已初始化,是存放epoll_event的vector static_cast<int>(events_.size()), //监控套接字的数目 timeoutMs); int savedErrno errno; Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << events happened; fillActiveChannels(numEvents, activeChannels); if (implicit_cast<size_t>(numEvents) events_.size()) //如果返回的事件数目等于当前事件数组大小就分配2倍空间 { events_.resize(events_.size()*2); } } else if (numEvents 0) { LOG_TRACE << nothing happened; } else { // error happens, log uncommon ones if (savedErrno ! EINTR) { errno savedErrno; LOG_SYSERR << EPollPoller::poll(); } } return now;} //把返回到的这么多个事件添加到activeChannelsvoid EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { assert(implicit_cast<size_t>(numEvents) < events_.size()); for (int i 0; i < numEvents; i) //确定它的大小小于events_的大小因为events_是预留的事件vector { Channel* channel static_cast<Channel*>(events_[i].data.ptr);#ifndef NDEBUG int fd channel->fd(); //debug时做一下检测 ChannelMap::const_iterator it channels_.find(fd); assert(it ! channels_.end()); assert(it->second channel);#endif channel->set_revents(events_[i].events); //把已发生的事件传给channel,写到通道当中 activeChannels->push_back(channel); //并且push_back进activeChannels }} //这个函数被调用是因为channel->enablereading()被调用再调用channel->update()再event_loop->updateChannel()再->epoll或poll的updateChannel被调用//void EPollPoller::updateChannel(Channel* channel){ Poller::assertInLoopThread(); //在IO线程 const int index channel->index(); //初始状态index是-1 LOG_INFO << fd << channel->fd() << events << channel->events() << index << index; // 当是新的或是之前监听过后来移除了监听 // 两者的区别在于,新的channel 之前没有在epoll 中保存 // 而 del 的之前在 channels_ 中保存了但是没有被放入epoll_ctl中监听 if (index kNew || index kDeleted) //index是在poll中是下标在epoll中是三种状态上面有三个常量 { // a new one, add with EPOLL_CTL_ADD int fd channel->fd(); if (index kNew) { assert(channels_.find(fd) channels_.end()); //channels_是一个Map channels_[fd] channel; } else // index kDeleted { assert(channels_.find(fd) ! channels_.end()); assert(channels_[fd] channel); } channel->set_index(kAdded); update(EPOLL_CTL_ADD, channel); //注册事件 } else { // update existing one with EPOLL_CTL_MOD/DEL int fd channel->fd(); (void)fd; assert(channels_.find(fd) ! channels_.end()); assert(channels_[fd] channel); assert(index kAdded); // 既然已经添加了那么可能的修改就是修改监听的时间或者不在监听 // 因此这里先判断是否是没有监听的事件了如果是那么直接移除、 if (channel->isNoneEvent()) //判断无事件 { update(EPOLL_CTL_DEL, channel); //删除事件 channel->set_index(kDeleted); //删除后被设置为kDeleted } else { update(EPOLL_CTL_MOD, channel); //修改已注册的监听事件 } }} void EPollPoller::removeChannel(Channel* channel){ Poller::assertInLoopThread(); //判断是否在IO线程 int fd channel->fd(); LOG_TRACE << fd << fd; assert(channels_.find(fd) ! channels_.end()); assert(channels_[fd] channel); assert(channel->isNoneEvent()); int index channel->index(); assert(index kAdded || index kDeleted); size_t n channels_.erase(fd); //删除 (void)n; assert(n 1); if (index kAdded) { update(EPOLL_CTL_DEL, channel); } channel->set_index(kNew);} void EPollPoller::update(int operation, Channel* channel){ printf(-------%s,line.%d-------\n,__FUNCTION__,__LINE__); struct epoll_event event; //存放数据的结构体 memZero(&event, sizeof event); event.events channel->events(); //注册的事件 event.data.ptr channel; int fd channel->fd(); LOG_INFO << epoll_ctl op << operationToString(operation) << fd << fd << event { << channel->eventsToString() << }; if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)//epoll_ctl失败返回-1 { if (operation EPOLL_CTL_DEL) { LOG_SYSERR << epoll_ctl op << operationToString(operation) << fd << fd; } else { LOG_SYSFATAL << epoll_ctl op << operationToString(operation) << fd << fd; } }} const char* EPollPoller::operationToString(int op){ switch (op) { case EPOLL_CTL_ADD: return ADD; case EPOLL_CTL_DEL: return DEL; case EPOLL_CTL_MOD: return MOD; default: assert(false && ERROR op); return Unknown Operation; }}
标签: