Skip to content

使用aiop实现事件等待模式

ruki edited this page Aug 7, 2014 · 1 revision

事件等待模式,也就所谓的reactor模式,简单的讲就是通过监听等待io的事件,然后将等待就绪可以处理的事件,分发给对应的io处理程序进行处理,这种模式本身并不算是异步io,但是TBOX的asio库,在类unix系统上,也是基于此模型实现的,所以在讲解真正的异步io之前,首先稍微介绍介绍,对asio的底层机制有个大体了解。

在类unix系统,例如linux的epoll,mac的kqueue,以及select、poll, dev/poll等等,都是可以用来实现reactor

虽然功能大体相同,但是效率上来讲, epoll和kqueue 更加的高效,因为他们没有像select, poll那样通过轮询来实现

单从接口设计上来说,kqueue的设计效率更高些,因为它可以一次批量处理多个事件,所以跟系统内核间的交互比较少。但是具体epoll和kqueue那个效率更高,就不好说了。

TBOX下的asio底层实现,大体分为两种:

  1. 基于reactor模型,通过对epoll,kqueue、poll、select等api的分装实现的aiop接口(也就是本章所要讲的) 在一个独立的线程内,进行事件监听,来分发处理各种异步io事件,也就实现了proactor的模式。
  2. 直接系统原生支持的proactor模型,例如windows的iocp,并在其基础上做一些封装来实现。

这样,对于上层应用来说,用的都是同一套asio的异步回调接口,不需要做什么的改变,但是底层根据不同平台其实现机制,已大不相同,windows上用了iocp,linux和android用了epoll,mac上用了kqueue,ios上用了poll。。

言归正传,本章所要将的aiop接口,就是对reactor的各种poll接口的上层封装,对于一些不需要将就高性能的应用直接使用aiop更加的简洁,易维护。 而基于回调的proactor模式,等下一章再详细讲解。

首先我先描述下几个对象类型:

aiop: 事件等待对象池
aioo: 等待对象,关联和维护:socket句柄、用户私有数据
aioe: 事件对象,一个aioo对象一次可以等待多个不同类型的aioe事件对象,例如send, recv, acpt, conn, ...
code: 事件代码

接着我们看下直接使用aiop的简单服务器代码,(这里为了看起来简洁点,资源管理和释放上我就先省略了):

tb_int_t main(tb_int_t argc, tb_char_t** argv)
{
    // 初始化一个tcp的套接字,用于监听
    tb_socket_ref_t listen_sock = tb_socket_init(TB_SOCKET_TYPE_TCP);
    tb_assert_and_check_return_val(sock, 0);
    
    // 初始化aiop轮询池,对象规模16个socket,可混用,如果传0,则使用默认值
    tb_aiop_ref_t aiop = tb_aiop_init(16);
    tb_assert_and_check_return_val(aiop, 0);
    
    // 绑定ip和端口,这里ip没做绑定,传null,监听端口为9090
    if (!tb_socket_bind(listen_sock, tb_null, 9090)) return 0;
    
    // 监听socket
    if (!tb_socket_listen(listen_sock, 5)) return 0;
    
    // 将这个监听套接字添加到aiop中,并附上accept等待事件
    if (!tb_aiop_addo(aiop, listen_sock, TB_AIOE_CODE_ACPT, tb_null)) return 0;
    
    // 初始化一个aioe的事件对象列表,用于获取等待返回的事件
    tb_aioe_t list[16];
    
    // 开启循环
    while (1)
    {
        /* 等待事件到来,用到跟epoll,select类似
         *
         * 16: 最大需要等待的事件数量
         * -1: 等待超时值,这里为永久等待
         *
         * objn为返回的有效事件数, 如果失败返回:-1, 超时返回:0
         */
        tb_long_t objn = tb_aiop_wait(aiop, list, 16, -1);
        tb_assert_and_check_break(objn >= 0);
        
        // 枚举等到的事件列表
        tb_size_t i = 0;
        for (i = 0; i < objn; i++)
        {
            // 获取事件对应aioo对象句柄,aioo对socket句柄,事件类型、关联的私有数据进行了同一维护
            tb_aioo_ref_t aioo = list[i].aioo;
            
            // 获取aioe事件关联的私有数据指针
            // 也可以通过tb_aioo_priv(aioo)获取
            tb_cpointer_t priv = list[i].priv;
            
            // 获取aioo对应的socket句柄
            tb_socket_ref_t sock = tb_aioo_sock(aioo);
            
            // 有accept事件?
            if (list[i].code & TB_AIOE_CODE_ACPT)
            {
                // 接收对方的连接,返回对应的客户端socket
                tb_socket_ref_t client_sock = tb_socket_accept(sock, tb_null, tb_null);
                tb_assert_and_check_break(client_sock);
                
                /* 将客户端的socket也添加到aiop池中,并等待它的recv事件
                 *
                 * 这里的最后一个参数,可以传一个私有的数据指针,并和sock进行关联
                 * 用于方便维护每个连接对应的会话数据,这里随便传了个字符串
                 *
                 * 返回的aioo对象可以保存下来,并在之后可以灵活修改需要等待的事件
                 *
                 * 注:这里的client_sock需要在自己的应用内,自己做释放,aiop不会去自动释放
                 * 因为这个实在外围代码自己创建的,这里的例子仅仅为了省事,就直接忽略了。
                 */
                tb_aioo_ref_t aioo = tb_aiop_addo(aiop, client_sock, TB_AIOE_CODE_RECV, "private data");
                tb_assert_and_check_break(aioo);
            }
            // 有recv事件?
            else if (list[i].code & TB_AIOE_CODE_RECV)
            {
                // 非阻塞接收一段数据
                tb_byte_t data[8192];
                tb_long_t real = tb_socket_recv(sock, data, sizeof(data));
                // 接收完指定数据后,省略部分代码
                // ...
                // 将socket改为等待发送
                if (!tb_aiop_sete(aiop, aioo, TB_AIOE_CODE_SEND, tb_null)) break;
                
                // 尝试发送数据,也许没发完
                tb_socket_send(sock, "hello", sizeof("hello"));
            }
            // 有send事件?
            else if (list[i].code & TB_AIOE_CODE_SEND)
            {
                // 继续发送上回没发完的数据
                // tb_socket_send(..);
                
                // 删除对应aioo事件对象,取消监听这个sock的事件
                tb_aiop_delo(aiop, aioo);
            }
            // 有连接事件?
            else if (list[i].code & TB_AIOE_CODE_CONN)
            {
                // 此处不会进入,仅仅在tb_socket_connect之后,并且注册了aioo,才会有此事件
                // ...
            }
            // 错误代码处理 
            else 
            {
                tb_trace_e("unknown code: %lu",list[i].code);
                break;
            }
        }
    }
    return 0;
}

这里的代码,仅仅描述了下aiop的接口调用流程,没有实际的服务器业务逻辑,仅作参考,请不要照搬复制的使用。

代码里提到的aioo对象,还有个单独的等待接口,用于直接等待单个socket对象,一般在basic_stream的wait里面使用:

/*! 等待单个socket句柄的事件
 *
 * @param socket    socket句柄 
 * @param code      aioe等待事件代码
 * @param timeout   等待超时时间,永久等待传:-1
 *
 * @return          > 0: 等到的事件, 0: 超时, -1: 失败
 */
tb_long_t           tb_aioo_wait(tb_socket_ref_t socket, tb_size_t code, tb_long_t timeout);

aiop的接口并不多,我这里不多做描述了,就简单的列举下吧:

/*! 初始化aiop事件等待池
 *
 * @param maxn      等待对象的规模数量
 *
 * @return          aiop池
 */
tb_aiop_ref_t       tb_aiop_init(tb_size_t maxn);

/*! 退出aiop
 *
 * @param aiop      aiop池
 */
tb_void_t           tb_aiop_exit(tb_aiop_ref_t aiop);

/*! 清除所有aiop中的aioo等待对象
 *
 * @param aiop      aiop池
 */
tb_void_t           tb_aiop_cler(tb_aiop_ref_t aiop);

/*! 强制退出aiop的等待,tb_aiop_wait会返回:-1,并且退出循环,无法再次等待
 *
 * @param aiop      aiop池
 */
tb_void_t           tb_aiop_kill(tb_aiop_ref_t aiop);

/*! 退出aiop的等待,tb_aiop_wait会返回:0,但不退出等待循环,还可继续等待
 *
 * @param aiop      aiop池
 */
tb_void_t           tb_aiop_spak(tb_aiop_ref_t aiop);

/*! 添加一个socket等待对象aioo,并且关联等待的事件代码code,以及私有数据priv
 *
 * @param aiop      aiop池
 * @param socket    socket句柄
 * @param code      等待的事件代码
 * @param priv      关联的用户私有数据
 *
 * @return          aioo对象
 */
tb_aioo_ref_t       tb_aiop_addo(tb_aiop_ref_t aiop, tb_socket_ref_t socket, tb_size_t code, tb_cpointer_t priv);

/*! 删除一个aioo等待对象,永远不再等待它
 *
 * @param aiop      aiop池
 * @param aioo      aioo对象句柄
 *
 */
tb_void_t           tb_aiop_delo(tb_aiop_ref_t aiop, tb_aioo_ref_t aioo);

/*! 投递一个aioe等待事件对象
 *
 * @param aiop      aiop池
 * @param aioe      aioe事件对象
 *
 * @return          投递成功返回:tb_true, 失败返回:tb_false
 */
tb_bool_t           tb_aiop_post(tb_aiop_ref_t aiop, tb_aioe_t const* aioe);

/*! 设置和修改aioo对象的等待事件
 *
 * @param aiop      aiop池
 * @param aioo      aioo对象
 * @param code      等待事件代码
 * @param priv      用户私有数据
 *
 * @return          成功返回:tb_true, 失败返回:tb_false
 */
tb_bool_t           tb_aiop_sete(tb_aiop_ref_t aiop, tb_aioo_ref_t aioo, tb_size_t code, tb_cpointer_t priv);

/*! 等待一定量的aioe事件
 *
 * @param aiop      aiop池
 * @param list      aioe事件列表,用于保存成功返回的事件对象
 * @param maxn      指定等待多少的事件
 * @param timeout   等待超时值,永久等待:-1
 *
 * @return          > 0: 实际等到的aioe事件对象数, 0: 超时, -1: 失败
 */
tb_long_t           tb_aiop_wait(tb_aiop_ref_t aiop, tb_aioe_t* list, tb_size_t maxn, tb_long_t timeout);

虽然aiop的reactor模式处理并发io已经相当方便了,但是由于系统底层对其的实现的程度不一,像windows上只能通过select来实现,因此对于一些高性能并发io的处理上,还是有些力不从心的,为了更好的利用不同系统提供的io处理特性,达到更好的并发性,又要保证上层调用接口的简单、统一,实现跨平台、高度可移植性。

这个时候用proactor模式进行更上层的接口封装,完全采用异步回调通知模式,是一种较好地解决方案。

Clone this wiki locally