欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

多路复用 select poll epoll,多路复用技术的原理

墨初 知识笔记 139阅读
简介

poller是I/O多路复用接口抽象虚基类,对I/O多路复用API的封装,muduo提供了EPollPoller和PollPoller派生类(epoll和poll),所以不支持select.

newDefaultPoller()默认选择epoll

主要接口 poll

是Poller的核心功能使用派生类的poll或者epoll_wait来阻塞等待IO事件发生
通过派生类的实现来填充EventLoop的activeChannelList_

static createNewPoller

工厂函数创建一个Poller实例
在EpollPoller中每个实例对应一个epollfd

update

更新I/O多路复用的状态例如epoll_ctl的ADDMODDEL

主要成员 loop

控制当前Poller的EventLoop指针
其余成员由派生类实现

源码剖析 poller.h
#ifndef MUDUO_NET_POLLER_H#define MUDUO_NET_POLLER_H#include <map>#include <vector>#include muduo/base/Timestamp.h#include muduo/net/EventLoop.hnamespace muduo{namespace net{class Channel;////// Base class for IO Multiplexing////// This class doesnt own the Channel objects.class Poller : noncopyable{ public:  typedef std::vector<Channel*> ChannelList;  Poller(EventLoop* loop);  virtual ~Poller();  /// Polls the I/O events.  /// Must be called in the loop thread.  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels)  0;  /// Changes the interested I/O events.  /// Must be called in the loop thread.  virtual void updateChannel(Channel* channel)  0;  /// Remove the channel, when it destructs.  /// Must be called in the loop thread.  virtual void removeChannel(Channel* channel)  0;  //判断是否存在  virtual bool hasChannel(Channel* channel) const;  //创建一个poller,默认是epoll  static Poller* newDefaultPoller(EventLoop* loop);  void assertInLoopThread() const  {    ownerLoop_->assertInLoopThread();  } protected:  typedef std::map<int, Channel*> ChannelMap;  ChannelMap channels_; private:  EventLoop* ownerLoop_;};}  // namespace net}  // namespace muduo#endif  // MUDUO_NET_POLLER_H
poller.cc
#include muduo/net/Poller.h#include muduo/net/Channel.husing namespace muduo;using namespace muduo::net;Poller::Poller(EventLoop* loop)  : ownerLoop_(loop){}Poller::~Poller()  default;bool Poller::hasChannel(Channel* channel) const{  assertInLoopThread();  ChannelMap::const_iterator it  channels_.find(channel->fd());  return it ! channels_.end() && it->second  channel;}
EPollPoller.h
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H#define MUDUO_NET_POLLER_EPOLLPOLLER_H#include muduo/net/Poller.h#include <vector>struct epoll_event;namespace muduo{namespace net{////// IO Multiplexing with epoll(4).///class EPollPoller : public Poller{ public:  EPollPoller(EventLoop* loop);  ~EPollPoller() override;  Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;  void updateChannel(Channel* channel) override;  void removeChannel(Channel* channel) override; private:  static const int kInitEventListSize  16;  static const char* operationToString(int op);  void fillActiveChannels(int numEvents,                          ChannelList* activeChannels) const;  void update(int operation, Channel* channel);  typedef std::vector<struct epoll_event> EventList;  int epollfd_;  EventList events_;};}  // namespace net}  // namespace muduo#endif  // MUDUO_NET_POLLER_EPOLLPOLLER_H
EPollPoller.cc
// Copyright 2010, Shuo Chen.  All rights reserved.//  Use of this source code is governed by a BSD-style license// that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include muduo/net/poller/EPollPoller.h #include muduo/base/Logging.h#include muduo/net/Channel.h #include <assert.h>#include <errno.h>#include <poll.h>#include <sys/epoll.h>#include <unistd.h> using namespace muduo;using namespace muduo::net;  /*struct epoll_event{  uint32_t events;   //Epoll events  epoll_data_t data;    //User data variable} __attribute__ ((__packed__)); typedef union epoll_data{  void *ptr;  int fd;  uint32_t u32;  uint64_t u64;} epoll_data_t;*/  // On Linux, the constants of poll(2) and epoll(4)// are expected to be the same.static_assert(EPOLLIN  POLLIN,        epoll uses same flag values as poll);static_assert(EPOLLPRI  POLLPRI,      epoll uses same flag values as poll);static_assert(EPOLLOUT  POLLOUT,      epoll uses same flag values as poll);static_assert(EPOLLRDHUP  POLLRDHUP,  epoll uses same flag values as poll);static_assert(EPOLLERR  POLLERR,      epoll uses same flag values as poll);static_assert(EPOLLHUP  POLLHUP,      epoll uses same flag values as poll); namespace{const int kNew  -1; //channel尚未添加到poller中const int kAdded  1; //已经添加了const int kDeleted  2;   //之前监听过了后来移除了监听} //当flag  EPOLL_CLOEXEC创建的epfd会设置FD_CLOEXEC//FD_CLOEXEC表示当程序执行exec函数时本fd将被系统自动关闭,表示不传递给exec创建的新进程EPollPoller::EPollPoller(EventLoop* loop)  : Poller(loop),//创建epollfd使用带1的版本//如果参数为0,则与epoll_create版本相同,设置为O_CLOEXEC,查看open函数的这个参数解释,//子进程fork并调用exec时会关闭这个fd    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),          events_(kInitEventListSize)                    //vector这样用时初始化kInitEventListSize个大小空间,默认16{  if (epollfd_ < 0)                           //在构造函数中判断<0就abort()  {    LOG_SYSFATAL << EPollPoller::EPollPoller;  }} EPollPoller::~EPollPoller(){  ::close(epollfd_);} Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList是一个存放channel的vector{  LOG_TRACE << fd total count  << channels_.size();  int numEvents  ::epoll_wait(epollfd_,                               &*events_.begin(),  //events_已初始化,是存放epoll_event的vector                               static_cast<int>(events_.size()),    //监控套接字的数目                               timeoutMs);  int savedErrno  errno;  Timestamp now(Timestamp::now());  if (numEvents > 0)  {    LOG_TRACE << numEvents <<  events happened;    fillActiveChannels(numEvents, activeChannels);    if (implicit_cast<size_t>(numEvents)  events_.size()) //如果返回的事件数目等于当前事件数组大小就分配2倍空间    {      events_.resize(events_.size()*2);    }  }  else if (numEvents  0)  {    LOG_TRACE << nothing happened;  }  else  {    // error happens, log uncommon ones    if (savedErrno ! EINTR)    {      errno  savedErrno;      LOG_SYSERR << EPollPoller::poll();    }  }  return now;} //把返回到的这么多个事件添加到activeChannelsvoid EPollPoller::fillActiveChannels(int numEvents,                                     ChannelList* activeChannels) const   {  assert(implicit_cast<size_t>(numEvents) < events_.size());  for (int i  0; i < numEvents; i)                          //确定它的大小小于events_的大小因为events_是预留的事件vector  {    Channel* channel  static_cast<Channel*>(events_[i].data.ptr);#ifndef NDEBUG    int fd  channel->fd();                       //debug时做一下检测    ChannelMap::const_iterator it  channels_.find(fd);    assert(it ! channels_.end());    assert(it->second  channel);#endif    channel->set_revents(events_[i].events);        //把已发生的事件传给channel,写到通道当中    activeChannels->push_back(channel);             //并且push_back进activeChannels  }} //这个函数被调用是因为channel->enablereading()被调用再调用channel->update()再event_loop->updateChannel()再->epoll或poll的updateChannel被调用//void EPollPoller::updateChannel(Channel* channel){  Poller::assertInLoopThread();  //在IO线程  const int index  channel->index();  //初始状态index是-1  LOG_INFO << fd   << channel->fd()    <<  events   << channel->events() <<  index   << index;   // 当是新的或是之前监听过后来移除了监听  // 两者的区别在于,新的channel 之前没有在epoll 中保存  // 而 del 的之前在 channels_ 中保存了但是没有被放入epoll_ctl中监听  if (index  kNew || index  kDeleted)  //index是在poll中是下标在epoll中是三种状态上面有三个常量  {    // a new one, add with EPOLL_CTL_ADD    int fd  channel->fd();    if (index  kNew)    {      assert(channels_.find(fd)  channels_.end());  //channels_是一个Map      channels_[fd]  channel;    }    else // index  kDeleted    {      assert(channels_.find(fd) ! channels_.end());      assert(channels_[fd]  channel);    }     channel->set_index(kAdded);    update(EPOLL_CTL_ADD, channel);   //注册事件  }  else  {    // update existing one with EPOLL_CTL_MOD/DEL    int fd  channel->fd();    (void)fd;    assert(channels_.find(fd) ! channels_.end());    assert(channels_[fd]  channel);    assert(index  kAdded); // 既然已经添加了那么可能的修改就是修改监听的时间或者不在监听    // 因此这里先判断是否是没有监听的事件了如果是那么直接移除、    if (channel->isNoneEvent())   //判断无事件    {      update(EPOLL_CTL_DEL, channel);    //删除事件      channel->set_index(kDeleted);  //删除后被设置为kDeleted    }    else    {      update(EPOLL_CTL_MOD, channel);   //修改已注册的监听事件    }  }} void EPollPoller::removeChannel(Channel* channel){  Poller::assertInLoopThread();  //判断是否在IO线程  int fd  channel->fd();  LOG_TRACE << fd   << fd;  assert(channels_.find(fd) ! channels_.end());  assert(channels_[fd]  channel);  assert(channel->isNoneEvent());  int index  channel->index();  assert(index  kAdded || index  kDeleted);  size_t n  channels_.erase(fd);   //删除  (void)n;   assert(n  1);   if (index  kAdded)  {    update(EPOLL_CTL_DEL, channel);  }  channel->set_index(kNew);} void EPollPoller::update(int operation, Channel* channel){  printf(-------%s,line.%d-------\n,__FUNCTION__,__LINE__);  struct epoll_event event;    //存放数据的结构体  memZero(&event, sizeof event);  event.events  channel->events();  //注册的事件  event.data.ptr  channel;  int fd  channel->fd();  LOG_INFO << epoll_ctl op   << operationToString(operation)    <<  fd   << fd <<  event  {  << channel->eventsToString() <<  };  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)//epoll_ctl失败返回-1  {    if (operation  EPOLL_CTL_DEL)    {      LOG_SYSERR << epoll_ctl op  << operationToString(operation) <<  fd  << fd;    }    else    {      LOG_SYSFATAL << epoll_ctl op  << operationToString(operation) <<  fd  << fd;    }  }} const char* EPollPoller::operationToString(int op){  switch (op)  {    case EPOLL_CTL_ADD:      return ADD;    case EPOLL_CTL_DEL:      return DEL;    case EPOLL_CTL_MOD:      return MOD;    default:      assert(false && ERROR op);      return Unknown Operation;  }}

标签:
声明:无特别说明,转载请标明本文来源!