EventLoop模块 --- 事件循环模块

目录

1 设计思想

eventfd

创建eventfd

2 实现

3 联合调试

4 整合定时器模块

5 联合超时模块调试?


1 设计思想

EventLoop 模块是和线程一一绑定的,每一个EventLoop模块内部都管理了一个Poller对象进行事件监控,同时管理着多个Connection对象,每一个连接所要完成的操作最终都会在他所绑定的EventLoop对象中进行。 EventLoop 模块是对事件进行整体管理和操作的,它内部包含的Channel和Poller分别负责事件管理和事件监控,但是最终执行各个事件都是要在EventLoop中执行,以此来保证线程安全。

eventfd

但是,在EventLoop内部的 Connection 有事件到来时,EventLoop 模块怎么知道呢?内部的Connection 有事件到来之后,我们要有办法能够通知 EventLoop 模块 。

而事件通知我们以前学过信号,但是在我们的主从Reactor 服务器中,是无法使用信号来通知具体的某一个EventLoop线程的,因为信号是针对进程的,但是事件具体分配给进程内的哪一个线程去执行我们是不确定的

所以,我们需要一个新的事件通知机制。 最简单的就是文件描述符,我们只需要给每一个 EventLoop线程专门分配一个 用于事件通知的文件描述符,每一次EventLoop内部的Connection有事件到来时,我们就可以调用EventLoop提供的方法向该文件描述符中写入数据,那么我们就可以通过EventLoop的读事件就绪,来唤醒EventLoop线程进行事件的处理了。

而 eventfd 就是一个专门用于事件通知的文件描述符,他在内核中相当于管理的数据结构中,存在一个计数器,这个计数器中的数字就是事件通知的次数。 每次我们向该文件描述符中写入一个数值,就是会将该计数器的数值加一

而我们可以使用 read 读取事件通知次数,读取到的数据就是事件的通知次数,读取完之后,计数就清零了。

那么eventfd 怎么使用呢?

他需要包含头文件?

创建eventfd

使用 eventfd 接口EventLoop模块 --- 事件循环模块

第一个参数就是设置计数器的初始值,而第二个参数 flags 是用于设置该fd的一些属性,他有以下选项:

EventLoop模块 --- 事件循环模块

EFD_CLOEXEC 就是禁止被子进程拷贝

EFD_NONBLOCK 设置该描述符为非阻塞

EFD_SEMAPHORE 其实就是用这个文件描述符实现信号量类似的功能,我们不是用这个属性。

未来我们使用的时候其实就是设置? EFD_CLOEXEC | EFD_NONBLOCK 这两个属性。

而他的返回值自然就是创建好的事件通知文件描述符,也就是操作句柄。

未来我们在对 eventfd 进行读写的时候,必须是一个 八字节的数据 。因为它本身就是一个计数器的功能,未来我们写入的数值会累加到内部的计数器上,同时读取的时候也是吧内部计数器的数值读取出去并清零。

正常的读写或者IO操作就是调用 read 和 write 接口,就是要注意读和写都使用一个八字节数据来进行。

EventLoop模块 --- 事件循环模块

未来我们需要一个任务队列,为什么呢?

要知道,虽然我们将每一个线程都绑定了一个 EventLoop 模块,一个EventLoop模块除了对Coonection的就绪的IO事件做管理,对连接就绪事件的管理,由于我们只在对应的EventLoop线程的Poller中进行监控,那么注定只会在对应的EventLoop线程中进行,不会有线程安全的问题,我们还需要对连接本身做管理未来我们对连接本身做管理的时候,虽然会调用绑定的 EventLoop 模块的函数,但是调用连接管理函数的时候不一定是在绑定的 EventLoop 模块绑定的线程中执行的,那么这些操作在执行的时候,需要进行一次判断,也就是判断当前调度的线程是不是对应的连接绑定大的EventLoop所绑定的线程,其实判断也很简单,用线程ID判断就行了。

所以未来我们的EventLoop中需要提供一个接口,提供给其他的模块进行连接的管理,而这些管理操作需要封装成一个任务,如果调用的时候不是在绑定的线程中,就需要将任务压入到对应EventLoop的任务队列中,等待绑定的EventLoop线程进行处理。

这样一来,对一个具体的连接的IO事件处理以及他的管理操作都在一个线程内部执行了,那么就必然是串行的,不会出现线程安全问题。但是连接无法和线程绑定,所以我们的策略就是让连接和一个EventLoop进行绑定,而EventLoop再想办法和线程进行绑定,来达成我们的目的。

那么一个 EventLoop 模块的操作的流程就是: 首先对所管理的描述符的事件进行监控,有事件就绪之后对就绪的事件进行处理,然后再处理任务队列中对连接的管理操作。 这是一个循环的过程。

但是任务队列本身就不是线程安全的了,因为未来可能会有多个线程对这个任务队列进行添加任务的操作,所以我们需要为EventLoop中的任务队列配一把互斥锁,用于实现对任务队列的互斥操作。

简而言之,就是在EventLoop内部对线程进行操作是线程安全的,但是在其他模块中通过调用EventLoop内的接口进行操作,就不一定是线程安全的了,所以需要进行判断。

那么EventLoop模块要如何设计呢?

首先我们需要记录一个线程id ,记录EventLoop所绑定的线程的唯一标识。

其实我们需要一个任务队列和一个互斥锁,保证任务队列的安全。

需要一个Poller对象,用来完成事件的监控。

同时需要一个 eventfd ,用于事件的通知。?

为什么需要这个 eventfd 呢? 我们说过,EventLoop 的执行流程是 监控文件描述符的事件,而这个监控我们肯定是需要阻塞的监控的,那么就会出现一种情况,我们的连接一直没有IO事件就绪,但是我们的任务队列中却已经放了很多任务了,这些对连接的管理操作就会一直得不到执行,那么未来就会出问题。 所以不仅是IO事件到来的时候需要结束这个阻塞状态,我们的任务队列中有新任务的时候也需要结束这个阻塞状态, 那么就需要一个事件的通知机制,也就是EventLoop所管理的Poller中也需要监控一个 eventfd 的读事件,每次我们向任务队列中放入任务的时候,我们需要向eventfd中写入一个数据,那么Poller 得Poll 操作就会返回,我们的 EventLoop 就能及时去执行这些任务队列的任务。

同时,我们也需要为eventfd进行事件的管理,为其创建一个 Channel 对象

那么现在的问题就是,任务队列如何设计?

首先,任务队列在压入任务的时候要加锁,这是毋庸置疑的。而我们在取出任务的时候,其实也是需要加锁的,因为我们无法保证 EventLoop线程在取任务的时候不会有其他线程进行压入任务的操作,而导致的线程安全问题。那么取任务或者执行任务的时候,如何进行加锁呢? 难道把执行任务队列中的任务的代码放在临界区内吗? 这样做会导致我们执行任务的过程中,别的线程也无法进行压入任务的操作,会阻塞在加锁上,效率很低,尤其是当里面有一些耗时的任务的时候。而其实我们只是在取出任务的时候需要加锁,将任务从任务队列中取出之后,这些任务对象其实就已经是在线程的私有栈中了,这时候是不会由线程安全的问题了,并不需要执行的时候也加锁。 那么我们就有一种思路:我们在取出任务的时候,进行加锁,加锁之后,直接将任务队列中的所有任务全部置换到我们的线程的私有栈中,然后清空任务队列,再解锁,之后再进行任务的执行。

所以我们的任务队列可以设计成一个 vector ,而每一次执行任务队列的时候,加锁直接使用vector的swap操作,将任务转移到线程私有栈中。

那么EventLoop的成员就是这几个:

class EventLoop
{
private:
using Task = std::function;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知

后续EventLoop需要的接口:

RunInLoop: 进行对连接的操作,需要将操作封装成一个 Task ,然后传给 RunInLoop 来执行,RunInLoop 中会判断当前线程是否是EventLoop绑定的线程来决定这个操作是立即执行还是压入任务队列。

PushTask: 将任务压入任务队列

IsInLoop:用来判断当前线程是否是当前EventLoop绑定的线程

UpdateEvent:更新/添加/修改事件监控

RemoveEvent:移除事件监控

Start:启动EventLoop模块,也就是开启EventLoop的死循环流程

RunAllTask : 用于执行任务队列的所有任务

目前的接口就这些,后续有需要的话还会再添加接口。


class EventLoop
{
private:
using Task = std::function;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知
    Channel* _eventfd_channel;          //管理eventfd的事件
private:
    void RunAllTask();
    bool IsInLoop()const;
    bool PushTask();
public:                             //对外提供的功能接口
    void UpdateEvent();
    void RemoveEvent();
    void RunInLoop();
    void Start();
    void WakeUp();                    //进行事件通知,也就是向eventfd中写入数据
};

2 实现

首先实现构造函数

构造函数的时候,最重要的就是绑定一个线程以及创建一个 eventfd,并初始化_event_channel,同时启动eventfd的读事件监听,设置回调方法,将其放入poller的事件监听中

    static int CreateEventfd()          //用于创建一个 eventfd
    {
        int fd = eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
        if(fd==-1)
        {
            ERROR_LOG("create eventfd failed");
            abort();
        }
        return fd;
    }
    void EventReadCallBack()const
    {
        uint64_t cnt = 0;
        int ret = read(_eventfd,&cnt,sizeof cnt);
        if(ret<0)
        {
            if(errno ==EAGAIN ||errno==EWOULDBLOCK ||errno ==EINTR) return;
            ERROR_LOG("read eventfd failed");
        }
        return;
    }
public:                             //对外提供的功能接口
    EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd))
    {
        _eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this));   //设置读回调函数
        _eventfd_channel->EnableRead();                     //启动读事件监听
    }

RunAllTask需要先加锁,取出所有任务,然后然后在临界区外执行任务。

    void RunAllTask()
    {
        std::vector tasks;
        {
            std::unique_lock lock(_mutex); //加锁
            tasks.swap(_queue);                        //取任务
        }
        for(auto&f:tasks)
        f();                                           //执行任务
    }

IsInLoop接口只需要判断当前线程id是否和EventLoop绑定的线程id相等。

    bool IsInLoop()const {return std::this_thread::get_id() == _thread_id ;} 

PushTask需要加锁然后push_back,之后就调用WakeUp唤醒EventLoop线程就行了。

    void PushTask(const Task& f)
    {
        {
            std::unique_lock lock(_mutex);
            _queue.push_back(f);
        }
        WakeUp();
    }

接下来就是对内部的连接所监控的事件的操作,对事件的操作其实并不会涉及到线程安全的问题,所以我们可以直接执行。

    void UpdateEvent(Channel* channel)
    {
        _poller.UpdateEvents(channel);
    }
    void RemoveEvent(Channel* channel)
    {
        _poller.Remove(channel);
    }

然后就是RunInLoop函数,无非就是判断加执行或者压入任务队列

    void RunInLoop(const Task& f)
    {
        if(IsInLoop()) f();     //如果是绑定的线程就直接执行
        else PushTask(f);
    }

然后就是Start函数,也就是EventLoop的主流程

    void Start()
    {
        while(1)
        {
            //1 监听事件
            std::vector actives;
            int ret = _poller.Poll(&actives);    
            //2 执行IO回调
            for(auto channel:actives) channel->HandlerEvents();
            //3 执行任务队列的任务
            RunAllTask();
        }
    }

最后就是唤醒EventLoop线程的函数WakeUp,很简单,向eventfd中写入一个1就行了。

    void WakeUp()                  //唤醒EventLoop线程
    {
        uint64_t val = 1;
        int ret = write(_eventfd,&val,sizeof val);
        if(ret < 0)
        {
            if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno==EINTR) return;
            ERROR_LOG("WakeUp failed");
            abort();
        }
        return;
    }

最后就是析构函数,释放eventfd 的channel就行了。 不过由于我们这是一个服务器,这个析构函数写不写都无所谓,因为当EventLoop对象释放的时候,说明服务器进程也结束了,这时候会自动释放资源。

    ~EventLoop(){delete _eventfd_channel;}

3 联合调试

我们需要将 Channel 模块以及 Poller 模块和EventLoop 做一个整合。

首先我们要修改一些Channel模块中的函数,将EventLoop的一些操作加进去。

首先需要在 Channel 中加上一个_loop 成员,绑定一个EventLoop 。由于EventLoop的定义在Channel后面,所以我们需要在Channel前对EventLoop进行声明。

其次Channel的构造函数需要传入一个 EventLoop的指针。

然后就是我们Channel 中的Update和Remove函数中,需要调用 _loop 提供的接口。

这两个函数由于调用了EventLoop的函数,所以我们需要将其定义写到EventLoop后面

void Channel::UpdateEvents()  //op 就是未来传递给 epoll_ctl 的op参数
{
    //后续调用EventLoop提供的接口
    _loop->UpdateEvent(this);
}
//移除监控
void Channel::Remove()
{
    DisableAll();
    //调用_loop提供的Remove接口
    _loop->RemoveEvent(this);
}

那么我们在日志打印中再加一个内容,打印线程id,这个信息后续会用到。

    fprintf(stdout,"%p %s [%s:%d] " format"\n",(void*)pthread_self(),str,__FILE__,__LINE__,##__VA_ARGS__);\

然后就是调试了,调试要怎么进行呢??

我们需要写一个通信套接字的读回调函数,未来设置进通信套接字的Channel中,当然,这个工作最终不是我们做的,后续是Connection来完成。后续EventLoop管理的也是Connection。

void ReadHandler(int fd)
{
    char buf[1024] = {0};
    int ret = read(fd,buf,sizeof buf -1);
    if(ret<0)    
    { 
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("read failed");
    }
    buf[ret]=0;
    NORMAL_LOG("read a message : %s",buf);
    ret = write(fd,buf,ret);
    if(ret<0)
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("write failed");
    }
    return ;
}

其次,我们目前需要手动为监听套接字创建一个Channel对象以及为其设置读回调方法

void Acceptor(int lstfd)
{
    int newfd = accept(lstfd,nullptr,nullptr);
    if(newfd == -1) 
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("acceptor failed");
    }
    //获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
    Channel* channel = new Channel(newfd,&loop);
    channel->SetReadCallBack(std::bind(ReadHandler,newfd));
    channel->EnableRead();
}

同时,写到这里,我发现了一个重大错误,就是前面我们实现Poller的时候,没有实现构造函数。

这里我补一下:

    static int CreateEpfd()
    {
#define EPOLL_SIZE 1024     //这个值大于 0 就行,无需关心
        int fd = epoll_create(EPOLL_SIZE);
    }

    Poller():_epfd(CreateEpfd()){}

而client 的逻辑还是用之前的测试代码,我们只需要修改 server 的逻辑。

int main(){
    Socket sock;
    sock.CreatServerSocket(8080);
    int lstfd = sock.Fd();
    Channel lstchannel(lstfd,&loop);
    lstchannel.SetReadCallBack(std::bind(Acceptor,lstfd));
    lstchannel.EnableRead();
    loop.Start();
    return 0;
}

那么我们的测试的结果就是:

EventLoop模块 --- 事件循环模块

EventLoop模块 --- 事件循环模块

4 整合定时器模块

EventLoop模块的总体框架其实已经差不多了,那么我们可以把定时器或者说超时管理模块加进去。

首先,超时管理模块也是与 EventLoop 模块一一对应的,他负责管理他所在 EventLoop 模块所管理的连接的超时释放机制。

而超时模块我们前面已经设计了一个简单的时间轮了,我们可以修改时间轮的代码来设计一个超时模块。

原始的时间轮代码:

class TimerTask
{
    using task = std::function;  //无参的回调,如果需要参数,有上层进行使用std::bind() 进行参数的绑定
    using releasetask = std::function;
private:
    uint64_t _id;
    uint64_t _delay;
    task _cb;
    releasetask _release;
    bool _is_canceled; //表示是否被取消
public:
    TimerTask(uint64_t id,uint64_t delay,task cb,releasetask rcb):_id(id),_delay(delay),_cb(cb),_release(rcb),_is_canceled(false){
        std::cout<<"构造,id:"<;
private:
    std::vector>> _wheel;
    std::unordered_map> _tasks;
    int _timer_idx;  
#define MAXTIME 60
public:
    TimerWheel():_wheel(MAXTIME),_timer_idx(0){}  //我们默认时间轮的最大刻度就是 60
    //添加定时任务
    bool AddTimerTask(uint64_t id , uint64_t delay,task cb)
    {
        assert(_tasks.find(id) == _tasks.end());                   //确保 id 合法
        std::shared_ptr pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
        std::weak_ptr wpt(pt);                          //weak_ptr
        int pos = (_timer_idx + delay) % MAXTIME;                  //计算到期时间
        _wheel[pos].push_back(pt);                                 //定时任务放入时间轮
        _tasks[id] = wpt;                                          //添加到map中管理
        return true;
    }
    //刷新/延迟定时任务
    bool RefreshTimerTask(uint64_t id)
    {
        std::unordered_map>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;                          //id 不合法直接返回false
        std::shared_ptr pt = it->second.lock();          //构造新的shared_ptr
        int pos = (_timer_idx + pt->GetDelay()) % MAXTIME;          //找到新的位置
        _wheel[pos].push_back(pt);
        return true;
    }
    //删除map的映射
    bool RealeaseTask(uint64_t id)
    {
        std::unordered_map>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;
        _tasks.erase(it);
        return true;
    }
    //取消定时任务
    bool CancelTimerTask(uint64_t id)
    {
        std::unordered_map>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;
        (it->second).lock()->CancelTask();
        return true;
    }
    //移动秒针
    void RunTick()
    {
        _timer_idx ++;
        _timer_idx %= MAXTIME;
        _wheel[_timer_idx].clear();
    }
};

定时任务类以及人物的添加修改的策略我们不需要修改,主要修改的其实就两方面,第一就是我们需要将时间轮和timerfd 结合起来,让时间轮中的 RunTick 函数每秒钟自动执行一次,其实就是在我们的时间轮中增加一个 timerfd 成员,设置他的超时时间为一秒,那么每隔一秒内核就会像timerfd中写入一个 1 。只需要为 timerfd 创建一个Channel对象,然后设置其读回调函数为RunTick函数,然后启动读事件回调,那么就完成了。

那么按照上面的思路,TimerWheel 模块中其实也还需要一个 _loop 对象的指针,因为我们需要通过_loop来构造Channel。

    EventLoop* _loop;
    Channel* _timerfd_channel;  

构造函数修改:构造函数中我们需要创建号 timerfd 以及为其设置读回调方法,回调方法中其实就是读取出 timerfd 的内容以及调用一次 RunTick

    static int CreateTimerfd()
    {
        int timerfd = timerfd_create(CLOCK_MONOTONIC,TFD_CLOEXEC|TFD_NONBLOCK);
        assert(timerfd!=-1);
        struct itimerspec timeout;
        //第一次超时时间间隔
        timeout.it_value.tv_sec = 2;     // 第一次超时为 3 s
        timeout.it_value.tv_nsec = 0;
        //第二次以及之后的超时时间间隔
        timeout.it_interval.tv_sec = 1;  // 往后每隔 1s 超时一次
        timeout.it_interval.tv_nsec = 0;
        int ret = timerfd_settime(timerfd,0,&timeout,NULL);  //设置定时通知
        assert(ret!=-1);  //返回值为-1表示设置失败,但是一般是不会失败的,可以不用关心
        return timerfd;
    } 

    TimerWheel(EventLoop* loop):_wheel(MAXTIME),_timer_idx(0),_loop(loop),_timerfd(CreateTimerfd()),_timerfd_channel(new Channel(_timerfd,_loop))
    {
        _timerfd_channel->SetReadCallBack(std::bind(&TimerWheel::OnTime,this));
        _timerfd_channel->EnableRead();
    }
    ~TimerWheel(){delete _timerfd_channel;}
    void OnTime()
    {
        TimerRead();
        RunTick();
    }
    void TimerRead()
    {
        uint64_t val = 0;
        int ret = read(_timerfd,&val,sizeof val);
        if(ret<0)
        {
            if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno == EINTR) return;
            ERROR_LOG("timerfd read failed");
            abort();
        }
        return;
    }

那么其实超时模块的设计就已经完成了。

然后我们接着修改 EventLoop 模块。首先EventLoop模块内需要一个TimerWheel对象,用于设置超时事件。,初始化的时候传入 EventLoop对象的指针就行了。

private:
using Task = std::function;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知
    Channel* _eventfd_channel;          //管理eventfd的事件
    TimerWheel _timer_wheel;            //超时管理模块
    EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd,this)),_timer_wheel(this)
    {
        _eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this));   //设置读回调函数
        _eventfd_channel->EnableRead();                     //启动读事件监听
    }

然后就是提供三个接口,用于添加,刷新和取消定时任务。

    void AddTimerTask(uint64_t id , uint64_t delay,Task f)
    {
        _timer_wheel.AddTimerTask(id,delay,f);
    }
    void RefreshTimerTask(uint64_t id)
    {
        _timer_wheel.RefreshTimerTask(id);
    }
    void CancelTimerTask(uint64_t id)
    {
        _timer_wheel.CancelTimerTask(id); 
    }

但是仅仅是这样还不行,因为我们之前说了,对连接的管理操作其实并不是线程安全的,最终执行AddTimerTask 和RefreshTimerTask 和CancelTimerTask 这三个函数的并不一定是他所绑定的EventLoop模块,所以我们需要调用他关联的EventLoop模块的RunInLoop来完成超时任务的添加,刷新,取消等。那么我们的接口就不能这样设置了,而是需要再封装一层。

我们需要将实际进行添加,刷新,取消定时任务的操作封装成一个任务,未来交给 RunInLoop来执行。

    void AddTimerTaskInLoop(uint64_t id , uint64_t delay,task cb)
    {
        assert(_tasks.find(id) == _tasks.end());                   //确保 id 合法
        std::shared_ptr pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
        std::weak_ptr wpt(pt);                          //weak_ptr
        int pos = (_timer_idx + delay) % MAXTIME;                  //计算到期时间
        _wheel[pos].push_back(pt);                                 //定时任务放入时间轮
        _tasks[id] = wpt;                                          //添加到map中管理
    }
    //刷新/延迟定时任务
    void RefreshTimerTaskInLoop(uint64_t id)
    {
        std::unordered_map>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return ;                          //id 不合法直接返回false
        std::shared_ptr pt = it->second.lock();          //构造新的shared_ptr
        int pos = (_timer_idx + pt->GetDelay()) % MAXTIME;          //找到新的位置
        _wheel[pos].push_back(pt);
    }
    //取消定时任务
    void CancelTimerTaskInLoop(uint64_t id)
    {
        std::unordered_map>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return ;
        (it->second).lock()->CancelTask();
    }

//添加定时任务
bool TimerWheel::AddTimerTask(uint64_t id , uint64_t delay,task cb)
{
    _loop->RunInLoop(std::bind(&TimerWheel::AddTimerTaskInLoop,this,id,delay,cb));
}
//刷新/延迟定时任务
bool TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::RefreshTimerTaskInLoop,this,id));
}
bool TimerWheel::CancelTimerTask(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::CancelTimerTaskInLoop,this,id));
}

这样一来我们的超时模块就设计完了。

?

5 联合超时模块调试?

现在我们需要测试一下超时模块。?

现在的问题是,超时任务怎么设置?其实在我们这里,超时任务就是一个关闭连接以及释放channel资源的任务,也就是一个关闭连接任务。

那么我们可以这样设置:

void CloseHandler(Channel*channel)  //简单点就移除事件监控并且关闭连接
{
    channel->Remove();
    close(channel->Fd());
    delete channel;
    DEBUG_LOG("释放了一个Channel:%p",channel);
}

同时,读任务还是用之前的。

Acceptor函数也做一些小的修改,把回调函数设置的完整一点。

int taskid = 0;
void Acceptor(int lstfd)
{
    int newfd = accept(lstfd,nullptr,nullptr);
    if(newfd == -1) 
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("acceptor failed");
    }
    //获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
    Channel* channel = new Channel(newfd,&loop);
    DEBUG_LOG("收到一个连接:%p",channel);
    channel->SetReadCallBack(std::bind(ReadHandler,newfd));
    channel->SetEventCallBack(std::bind(EventHandler,taskid));
    channel->SetCloseCallBack(std::bind(CloseHandler,channel));
    channel->SetErrorCallBack(std::bind(CloseHandler,channel));
    channel->EnableRead();
    //添加超时任务
    int delay = 10;
    loop.AddTimerTask(taskid++,delay,std::bind(CloseHandler,channel));
}

那么任意事件回调我们就设置一个刷新定时任务的回调

//任意事件回调
void EventHandler(int id)  //但是怎么找到任务的id呢?
{
    loop.RefreshTimerTask(id);    
}

这样就差不多了,我们主要测试一下超时模块能否正常工作。

我们的client则是前五秒发送五个消息,然后就死循环休眠,也就是模拟非活跃的情况。

EventLoop模块 --- 事件循环模块

EventLoop模块 --- 事件循环模块

目前我们也没看出来什么问题。

目前来说,我们的EventLoop模块和他所关联的的三个子模块 :Channel,Poller,TimerQueue模块还没有出现大的问题,后续出现问题我们再修正。

再附上一张项目概述时的EventLoop模块的关联图。

EventLoop模块 --- 事件循环模块

上一篇:Vue的性能优化常用的有哪些?
下一篇:Go 交叉编译