muduo网络库学习(四)事件驱动循环EventLoop
muduo的设计采用高并发服务器框架中的one loop per thread模式,即一个线程一个事件循环。
这里的loop,其实就是muduo中的EventLoop,所以到目前为止,不管是Poller,Channel还是TimerQueue都仅仅是单线程下的任务,因为这些都依赖于EventLoop。这每一个EventLoop,其实也就是一个Reactor模型。
而多线程体现在EventLoop的上层,即在EventLoop上层有一个线程池,线程池中每一个线程运行一个EventLoop,也就是Reactor + 线程池的设计模式
梳理一下
- 每个muduo网络库有一个事件驱动循环线程池EventLoopThreadPool
- 每个线程池中有多个事件驱动线程EventLoopThread
- 每个线程运行一个EventLoop事件循环
- 每个EventLoop事件循环包含一个io复用Poller,一个计时器队列TimerQueue
- 每个Poller监听多个Channel,TimerQueue其实也是一个Channel
- 每个Channel对应一个fd,在Channel被激活后调用回调函数
- 每个回调函数是在EventLoop所在线程执行
- 所有激活的Channel回调结束后EventLoop继续让Poller监听
所以调用回调函数的过程中是同步的,如果回调函数执行时间很长,那么这个EventLoop所在线程就会等待很久之后才会再次调用poll。
整个muduo网络库实际上是由Reactor + 线程池实现的,线程池中每一个线程都是一个Reactor模型。在处理大并发的服务器任务上有很大优势。
简化的关系图如下,EventLoop只涉及Poller,Channel(简单涉及TcpConnection)和TimerQueue。
- 白色三角,继承
- 黑色菱形,聚合
一个事件驱动循环EventLoop其实就是一个Reactor模型,是一个单线程任务。主要包含io复用函数Poller,定时器队列TimerQueue以及激活队列。其他的就是一些辅助变量
typedef std::vector ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;/* 创建时保存当前事件循环所在线程,用于之后运行时判断使用EventLoop的线程是否是EventLoop所属的线程 */const pid_t threadId_;/* poll返回的时间,用于计算从激活到调用回调函数的延迟 */Timestamp pollReturnTime_;/* io多路复用 */std::unique_ptr poller_;/* 定时器队列 */std::unique_ptr timerQueue_;/* 唤醒当前线程的描述符 */int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client./* * 用于唤醒当前线程,因为当前线程主要阻塞在poll函数上* 所以唤醒的方法就是手动激活这个wakeupChannel_,即写入几个字节让Channel变为可读* 注: 这个Channel也注册到Poller中*/std::unique_ptr wakeupChannel_;boost::any context_;// scratch variables/* * 激活队列,poll函数在返回前将所有激活的Channel添加到激活队列中* 在当前事件循环中的所有Channel在Poller中*/ChannelList activeChannels_;/* 当前执行回调函数的Channel */Channel* currentActiveChannel_;/* * queueInLoop添加函数时给pendingFunctors_上锁,防止多个线程同时添加* * mutable,突破const限制,在被const声明的函数仍然可以更改这个变量*/mutable MutexLock mutex_;/* * 等待在当前线程调用的回调函数,* 原因是本来属于当前线程的回调函数会被其他线程调用时,应该把这个回调函数添加到它属于的线程中* 等待它属于的线程被唤醒后调用,以满足线程安全性* * TcpServer::removeConnection是个例子* 当关闭一个TcpConnection时,需要调用TcpServer::removeConnection,但是这个函数属于TcpServer,* 然而TcpServer和TcpConnection不属于同一个线程,这就容易将TcpServer暴露给其他线程,* 万一其他线程析构了TcpServer怎么办(线程不安全)* 所以会调用EventLoop::runInLoop,如果要调用的函数属于当前线程,直接调用* 否则,就添加到这个队列中,等待当前线程被唤醒*/std::vector pendingFunctors_; // @GuardedBy mutex_
最后一个变量std::vector比较不好理解,它是一个任务容器,存放的是将要执行的回调函数。
准备这么一个容器的原因在于
- 某个对象(通常是
Channel或者TcpConnection)可能被另一个线程使用(这个线程不是这个对象所在线程),此时这个对象就等于暴露给其他线程了。这是非常不安全的,万一这个线程不小心析构了这个对象,而这个对象所属的那个线程正要访问这个对象(例如调用这个对象的接口),这个线程就会崩溃,因为它访问了一个本不存在的对象(已经被析构)。 - 为了解决这个问题,就需要尽量将对这个对象的操作移到它所属的那个线程执行(这里是调用这个对象的接口)以满足线程安全性。又因为每个对象都有它所属的事件驱动循环
EventLoop,这个EventLoop通常阻塞在poll上。可以保证的是EventLoop阻塞的线程就是它所属的那个线程,所以调用poll的线程就是这个对象所属的线程。这就 可以让poll返回后再执行想要调用的函数,但是需要手动唤醒poll,否则一直阻塞在那里会耽误函数的执行。
runInLoop和queueInLoop函数执行的就是上述操作
/** 1.如果事件循环不属于当前这个线程,就不能直接调用回调函数,应该回到自己所在线程调用* 2.此时需要先添加到自己的队列中存起来,然后唤醒自己所在线程的io复用函数(poll)* 3.唤醒方法是采用eventfd,这个eventfd只有8字节的缓冲区,向eventfd中写入数据另poll返回* 4.返回后会调用在队列中的函数,见EventLoop* * 举例说明什么时候会出现事件驱动循环不属于当前线程的情况* 1.客户端close连接,服务器端某个Channel被激活,原因为EPOLLHUP* 2.Channel调用回调函数,即TcpConnection的handleClose* 3.handleClose调用TcpServer为它提供的回调函数removeConnection* 4.此时执行的是TcpServer的removeConnection函数,* 解释 * 1.因为TcpServer所在线程和TcpConnection所在的不是同一个线程* 2.这就导致将TcpServer暴露给了TcpConnection所在线程* 3.因为TcpServer需要将这个关闭的TcpConnection从tcp map中删除* 就需要调用自己的另一个函数removeConnectionInLoop* 4.为了实现线程安全性,也就是为了让removeConnectionInLoop在TcpServer自己所在线程执行* 需要先把这个函数添加到队列中存起来,等到回到自己的线程在执行* 5.runInLoop中的queueInLoop就是将这个函数存起来* 6.而此时调用runInLoop的仍然是TcpConnection所在线程* 7.因为自始至终,removeConnection这个函数都还没有结束* * 如果调用runInLoop所在线程和事件驱动循环线程是一个线程,那么直接调用回调函数就行了* * 在TcpServer所在线程中,EventLoop明明阻塞在poll上,这里为什么可以对它进行修改* 1.线程相当于一个人可以同时做两件事情,一个EventLoop同时调用两个函数就很正常了* 2.其实函数调用都是通过函数地址调用的,既然EventLoop可读,就一定直到内部函数的地址,自然可以调用* 3.而更改成员函数,通过地址访问,进而修改,也是可以的*/
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}
当然了,如果这个对象所属线程和当前线程相同,就没有线程安全性的问题,直接调用即可。否则,就需要添加到pendingFunctors_中,这正是queueInLoop的功效
/** 由runInLoop调用,也可直接调用,作用* 1.将相应的回调函数存在事件驱动循环的队列中,等待回到自己线程再调用它* 2.激活自己线程的事件驱动循环*/
void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}
此处需要上锁保护pendingFunctors_以防止多个线程同时向它添加函数。这里的锁体现了RAII方法,大括号是语句块,把里面的变量作为临时变量处理
因为EventLoop通常阻塞在poll上,所以添加到pendingFunctors_后需要手动唤醒它,不然它一直阻塞在poll,会耽误了函数的执行。唤醒的方法是使用eventfd
#include
int eventfd(unsigned int initval, int flags);
函数用于创建一个eventfd文件描述符,这个描述符可用于进程/线程间的等待/唤醒。原因是内核只为eventfd维护一个uint64_t类型的计数器,大小应该在64位。
参数initval是这个计数器的初值
flags是一些标志,可以是下面几个的或运算结果
- EFD_NONBLOCK,非阻塞
- EFD_CLOEXEC,设置close-on-exec属性,调用exec时会自动close
- …
eventfd也可以使用write/read等io函数进行读写,区别是
write每次只能写入8字节大小的数据,内核会将这8字节大小的数值加到计数器上
read一次性读取这个计数器的值,并把缓冲区初始化为0。如果调用read时这个计数器值就是0,那么非阻塞时会返回EAGAIN,阻塞时会等待计数器的值变为非0
可以把这个eventfd添加到poll中,在需要唤醒时写入8字节数据,此时poll返回,执行回调函数,然后执行在pendingFunctors_中的函数。
loop函数是EventLoop的事件驱动循环,所有的Reactor模型的loop函数都差不多。执行的就是poll和回调函数的回调,以及pendingFunctors_中函数的调用
/* * 事件驱动主循环* * 1.每个TcpServer对应一个事件驱动循环线程池* 2.每个事件驱动循环线程池对应多个事件驱动循环线程* 3.每个事件驱动循环线程对应一个事件驱动主循环* 4.每个事件驱动主循环对应一个io多路复用函数* 5.每个io多路复用函数监听多个Channel* 6.每个Channel对应一个fd,也就对应一个TcpConnection或者监听套接字* 7.在poll返回后处理激活队列中Channel的过程是同步的,也就是一个一个调用回调函数* 8.调用回调函数的线程和事件驱动主循环所在线程是同一个,也就是同步执行回调函数* 9.线程池用在事件驱动循环上层,也就是事件驱动循环是线程池中的一个线程*/
void EventLoop::loop()
{assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){/* 清空激活队列 */activeChannels_.clear();/* epoll_wait返回后会将所有就绪的Channel添加到激活队列activeChannel中 */pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priorityeventHandling_ = true;/* 执行所有在激活队列中的Channel的回调函数 */for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;/* 执行pendingFunctors_中的所有函数 */doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}
Reactor模式的loop函数大多一个样子
- 调用poll监听事件,返回之前将激活的Channel添加到激活队列中
- 处理激活队列中的Channel,调用回调函数
muduo中多了处理pendingFunctors_中的函数,在自己的线程调用自己的函数是安全的
Channel的回调函数就是根据被激活原因调用不同的回调函数,这些回调函数是在TcpConnection创建之初被设置的。
简单说一下Channel和TcpConnection的关系
- 每个TcpConnection对象代表一个tcp连接,所以TcpConnection中需要保存用于服务器/客户端通信的套接字,这个套接字就记录在Channel中
- TcpConnection在创建之初会为Channel设置回调函数,如果套接字可读/可写/错误/关闭等就会执行TcpConnection中的函数
- TcpConnection在确定连接已经建立后会向Poller注册自己的Channel
/* TcpConnection.cc */channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));
Channel的handleEvent如下
tie_是TcpConnection的弱引用,在调用TcpConnection的函数之前判断它是否还存在,如果被析构了,那么提升的shared_ptr会是null
具体可以参考 muduo网络库学习(二)对套接字和监听事件的封装Channel
/** 根据fd激活事件的不同,调用不同的fd的回调函数*/
void Channel::handleEvent(Timestamp receiveTime)
{/* * RAII,对象管理资源* weak_ptr使用lock提升成shared_ptr,此时引用计数加一* 函数返回,栈空间对象销毁,提升的shared_ptr guard销毁,引用计数减一*/std::shared_ptr<void> guard;if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);}
}/** 根据被激活事件的不同,调用不同的回调函数*/
void Channel::handleEventWithGuard(Timestamp receiveTime)
{eventHandling_ = true;LOG_TRACE << reventsToString();if ((revents_ & POLLHUP) && !(revents_ & POLLIN)){if (logHup_){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)){if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false;
}
EventLoop没有特别处理定时器任务,原因是定时器任务TimerQueue也被转换成一个文件描述符添加到Poller中,所以时间一到timerfd变为可读,poll就会返回,就会调用回调函数。EventLoop只提供了runAt/runAfter/runEveny三个接口用于设置定时任务。这些在 muduo网络库学习(三)定时器TimerQueue的设计中有提及
/* * 定时器功能,由用户调用runAt并提供当事件到了执行的回调函数* 时间在Timestamp设置,绝对时间,单位是微秒*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{/* std::move,移动语义,避免拷贝 */return timerQueue_->addTimer(std::move(cb), time, 0.0);
}/** 如上,单位是微秒,相对时间*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb));
}/** 每隔多少微秒调用一次*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval);
}
几个C++方面的知识点
- std::move,移动语义,避免拷贝
- RAII,以锁为例,构造时上锁,析构时解锁(函数返回时局部变量析构)
- 花括号语句块
- std::unique_ptr,智能指针,不允许拷贝和赋值,独一无二
- std::shared_ptr,智能指针,可以拷贝赋值,存在引用计数
- std::weak_ptr,弱引用,不增加引用计数,必要时可通过lock函数提升为shared_ptr
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
