poll与select相比,poll的编写较为简单。因为poll它有一个结构体,去管理关心事件,不需要像select还得自己去写第三方数组。其次poll它没有数量限制,select是数量限制的,但是数量大了必然会影响效率,poll和select在底层都是采用轮询检测的方式去查看要关心的fd的事件是否就绪,像epoll就不会采用这种轮询方式,这个后面讲。
总结:poll作为多路转接的一种方式,它是比select进步了的。
函数参数:
- fds是一个结构体数组,每一个结构体都有它的fd,关心的事件,内核反馈事件。看一下这个结构体:
struct pollfd {
int fd; /* file descriptor /
short events; / requested events /
short revents; / returned events */
};
- events,就是用户告诉内核,它关心fd的事件。
- revents,就是内核告诉用户,你关心的事件怎么样。
- 其实这个events,revents还是位图,事件是宏定义好了的,可以看一下事件:
比如:我要设置结构体关心读事件,可以使得events = POLLIN
,如果还想要关心写事件,那么就是events |= POLLOUT
;怎么判断事件就绪呢?很简单,revents & POLLIN
,为真就表示读事件就绪了。还是一些位操作。
- nfds 代表fds数组的长度.
typedef unsigned long int nfds_t;
注意它的类型 nfds_t 其实就是一个 无符号long int。所谓fds数组长度,可以理解成fds数组中元素个数,可不敢理解成数组大小啊。- timeout是表示poll函数的超时时间, 单位是毫秒(ms),这是一个时间线,在timeout之前是阻塞等待,超过timeout就是非阻塞等待(直接返回)。
比如:你设置为 -1 ,那么就是永远的非阻塞式等待;设置为0,那就是永远的非阻塞式等待;或者你给个合理的时间线,控制一下,都可以。
函数的返回值:
- 返回值小于0, 表示出错;
- 返回值等于0, 表示poll函数等待超时;
- 返回值大于0, 表示poll由于监听的文件描述符就绪而返回.
了解函数接口后,就简单的写一个测试程序:
#include
#include
#include
int main()
{struct pollfd rfds;rfds.fd = 0;rfds.events = POLLIN;rfds.revents = 0;while (true){int n = poll(&rfds, 1, -1);switch (n){case -1:std::cerr << "poll errno" << std::endl;break;case 0:std::cout << "time out" << std::endl;default:std::cout << "有事件就绪" << std::endl;if (rfds.revents & POLLIN){std::cout << "读事件就绪" << std::endl;char buffer[1024] = {0};ssize_t s = read(0, buffer, sizeof(buffer) - 1);if (s > 0){std::cout << "Say:" << buffer << std::endl;}}break;}}return 0;
}
可以看到poll的第三个参数我设置的是 -1 ,阻塞式等待。你当然可以改为0,看看现象,肯定是一刻不停的告诉你,time out
。
看一下运行结果:
我就怕小白事后问我,为啥你代码运行时,卡着不动。我告诉你哈,上面的代码检测的文件描述符是0,fd = 0是标准输入流,说人话 就是你 敲键盘 输入的东西,你不敲键盘输入,可不 它就 卡着不动,而且还是阻塞式等待 你输入。
- poll的代码简单,select要操作位图,位图还得用专门的函数操作,并且还得自己写第三方数组。poll是用结构体数组,这个结构体就是pollfd,而且结构体里可以有fd信息,关心事件信息,内核反馈信息。非常不错。
- poll没有数量限制。
-缺点:
- poll和select一样,底层采用轮询方式检测就绪fd,所以数量一旦大了,就会效率降低
- poll 也要经常的从用户态到内核态,切换
epoll是对poll的又一进步,epoll它最关键的就是在底层不是用简单的轮询方式检测fd,并且对要监测的fd在底层通过红黑树进行高效的管理。这块在工作原理中详谈。
先看select、poll的缺点:
总结就是:epoll它克服了这些缺点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法,epoll就是你告诉我要管理哪些事件,然后你就等我通知你 就可以了。
这个函数就是创建一个epoll模型,它的返回值是一个fd,也就是底层epoll的fd,后续的操作都是在这个epoll模型中,所以它的fd尤为重要。
创建成功返回fd,创建失败返回-1,并设置errno。
至于它的参数 size ,表示的就是epoll模型可以监测的fd的最大值。自从linux2.6.8之后,size参数是被忽略的。
注意
: epoll模型创建成功后,它的返回值是一个fd,所以用完后,要记得close(fd)。
这个函数就是用于管理epoll模型的:
函数参数:
- epfd,epoll模型的fd
- op,就是怎么操作,它可以有三个取值,也就是三个宏:
EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd;- fd,需要监听的fd
- event,这个是事件,告诉epoll模型要监听什么事。
先来看看这个结构体:
struct epoll_event
{
uint32_t events; /* Epoll events /
epoll_data_t data; / User data variable */
} __EPOLL_PACKED;
可以看到,events就是事件,它也是宏:EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.
可以看到epoll_event里面还有一个date,它也是一个结构体,最关键的就是它里面要保存fd。
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
这个函数就是用来取出内核反馈事件的:
函数参数:
- epfd:epoll模型的fd
- events:这个结构体组合就是用来存放,内核对监测fd的反馈情况的。
- maxevents:这个参数要小于等于epoll_creat(size)的size,给成size就行。
- timeout:和以往的IO模型一样,这是设置超时时间
返回值:返回值比较有意思,
- 成功:返回对应I/O上已准备好的文件描述符数目,这个写代码的时候能感受到用处
- 失败: 返回 -1,表明函数调用失败
- 超时:返回值等于0
了解epoll的工作原理,其实就是了解上面的三个函数接口,在底层都干了什么:
/* * This structure is stored inside the "private_data" member of the file * structure and rapresent the main data sructure for the eventpoll * interface. */
struct eventpoll { /* Protect the this structure access,可用于中断上下文 */ spinlock_t lock; /* * This mutex is used to ensure that files are not removed * while epoll is using them. This is held during the event * collection loop, the file cleanup path, the epoll file exit * code and the ctl operations.用户进程上下文中 */ struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* RB tree root used to store monitored fd structs */ struct rb_root rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transfering ready events to userspace w/out * holding ->lock. */ struct epitem *ovflist; /* The user that created the eventpoll descriptor */ struct user_struct *user;
};
这个结构体维护了epoll模型的基本属性,其中最关键的两个就是:
struct rb_root rbr
和struct list_head rdllist
,这俩结构体大家很熟悉,第一个就是红黑树,第二个就是双向链表。
红黑树是用于管理添加进来的事件的,也就是说你创建一个epoll模型,它是用红黑树来保存你向epoll模型中添加的事件。这样好处就是重复添加的事件就可以通过红黑树而高效的识别出来,而且红黑树查找的效率比轮询监测这种好多了。并且用户态调用 epoll_ctl()来操作 epoll 的监视文件时,需要增、删、改、查等动作有着比较高的效率。
双向链表是用来保存已经准备就绪事件,以后查询就绪事件,就直接把双向链表的东西拷贝到上层,就是把就绪事件拿到了,并且就绪了n个事件,这个n就是链表的长度,也很方便。
但是有个疑问:谁向双向链表中填入就绪事件呢?是内核直接填嘛?不是,有回调机制。
回调机制:所有添加到epoll模型中的事件都会与设备(网卡)驱动程序建立回调方法,也就是说,当响应的事件发生时会调用这个回调方法.这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
回调方法:并不是所有的文件都可以用epoll的,它必须有相应的回调方法:file_operations->poll
,可以看一下:
struct file_operations {ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);__poll_t (*poll) (struct file *, struct poll_table_struct *);int (*open) (struct inode *, struct file *);int (*fsync) (struct file *, loff_t, loff_t, int datasync);// 等
};
看到了把,这个回调方法是和底层硬件定制好了的,它有很多方法。也就是说,事件就绪好了,会通过回调机制(调用ep_poll_callback),去找对应的回调方法,然后再把就绪事件拷贝到双链表中,并且唤醒epoll模型,从而上层调用epoll_wait()可以拷贝双链表的事件。
大的逻辑已经有了,来看图解:
这是epoll模型的大体样子,帮助理解用的。
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型
}
注意这个epitem是事件在epoll模型中的管理的基本单位,把这个epitem要挂在红黑树里。红黑树的节点是一个键值对,键(key)就是fd,值(value)就是epitem。挂在红黑树上就完成了添加事件。
(2) 删除事件同理,就是红黑树上删除节点呗。
(3)修改事件,就是红黑树的节点的建不变(fd不变),修改它的epitem。
那么等待文件描述符的事情,其实就是等待epoll模型中双向链表中是否有数据,有阻塞等,非阻塞等,设置timeout(三种方式)。但是我们得把基本等待就绪的过程,了解一下,这个就看图解吧:
假如有事件就绪了:
那么epoll_wait()被唤醒了,跑到内核去拷贝head_list中的事件,注意这个函数的返回值是已经准备就绪的事件的个数,就是链表的元素个数,这个效率是O(1)。
head_list中的元素就是事件,事件就是epitem结构体,在强调一遍。
工作模式有两种:
数据
变化
举个例子:
(1)你的快递一共有三个,假如是李四是快递员:
李四:你好,来取下快递。然后你去取快递,你只能拿走两件,并且你拿走了。李四发现你的快递还有遗留,立马通知你:你好,来取下快递。然后你又去取走了快递,这次你取走了最后一个快递。李四发现你的快递没存留,那么也不会通知你了。
(2)你的快递还是有三个,假如王五是快递员:
王五:你好,取一下快递。然后你去取快递,你只能拿走两件,并且你拿走了。但是你还剩下一件,王五不会管,他告诉过你 要来取快递了,取没取完 那是你的事。直到有一天,你又来了一件快递,王五发现你的快递来了,王五:你好,来取快递。你又来取快递,这次你长记性了,快一次就取完吧,不一次取完你都不通知我一下。王五说:只有你的快递从无到有,从少变多时,我才会通知你,换句话说,这有快递数发生变化,我才通知你。
其实李四对应的就是LT工作模式,王五对应的就是ET工作模式。非常有趣昂。
但是要使用ET模式,有一个非常重要的点:fd必须设置为非阻塞模式。
为什么ET模式下,fd必须是非阻塞呢?
当epoll工作在ET模式下时,对于读操作,如果read一次没有读尽buffer中的数据,那么下次将得不到读就绪的通知,造成buffer中已有的数据无机会读出,除非有新的数据再次到达。对吧,这是好理解的。如果到socket编程中,这个问题会造成类似死锁
的情况。
比如:
那么现在就有问题了,epoll_wait()在ET模式下,只有当缓冲区的数据发生变化,才会返回。但是客户端并没有再次发送数据,导致epoll_wait()不返回,一直等待。服务端,它只读取了1k,不完整报文,所有就等epoll_wait()返回,告诉它继续向下拿数据,但问题现在是epoll_wait()不返回。真的无语,现在的情况是,只有当客户端再次发送数据,epoll_wait()才能返回;但是客户端要等服务端的响应,才能继续发送数据。就是这种你等我,我等它,它等你的死循环。
怎么解决这种问题呢?毫无疑问,你必须循环的yici性把10k数据拿走,哪怕你一次拿1k,你循环的拿上10次,必须保证都拿完。这种保证的前提就是 fd是非阻塞的,它可以轮询的拿,而不是等待的拿。
对比LT和ET:
LT是 epoll 的默认行为。 使用 ET 能够减少 epoll 触发的次数, 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完,相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些.
但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.
另一方面, ET 的代码复杂程度更高了.
到最后写一个epoll的简易代码,帮助理解。当然它是LT工作模式,如果想要挑战ET模式,那就下点狠功夫,之后我会写一个Reactor的小项目,其中就是利用epoll的ET模式,感兴趣的可以等后续。现在主要是先能把epoll的代码用起来,至于挑战,后面再说。
#include
#include
#include
#include
#include using namespace std;void useage()
{cout << "Please use"<< "./epoll_server"<< "+"<< "端口号" << endl;
}int main(int argv, char *argc[])
{启动服务器if (argv != 2){useage();exit(1);}使服务器进入listen状态int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd < 0){cerr << "listen failed" << endl;exit(2);}struct sockaddr_in my_sock;my_sock.sin_family = AF_INET;my_sock.sin_addr.s_addr = INADDR_ANY;my_sock.sin_port = htons(atoi(argc[1]));if (bind(listen_fd, (struct sockaddr *)&my_sock, sizeof(my_sock)) < 0){cerr << "bind errno" << endl;exit(3);}if (listen(listen_fd, 5) < 0){cerr << "listen errno" << endl;exit(4);}创建epoll模型,获取epfdint epfd = epoll_create(128); 添加事件,这个就是先得添加listen_fd事件struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = listen_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev); 事件循环struct epoll_event revs[128]; // 这是用于接收返回fd情况的事件集合while (true){int n = epoll_wait(epfd, revs, 128, -1);switch (n){case -1:cerr << "epoll errno" << endl;break;case 0:cout << "time out" << endl;break;default:// 走到这里说明有事件就绪了cout << "有事件就绪" << endl;for (int i = 0; i < n; i++){int sock = revs[i].data.fd;if (revs[i].events & EPOLLIN){cout << "文件描述符" << sock << "有读事件就绪" << endl;if (sock == listen_fd){cout << "*************************************" << endl;cout << "有连接事件就绪" << endl; 在这里要处理连接事件struct sockaddr_in sockaddr;socklen_t j = sizeof(sockaddr);int fd = accept(listen_fd, (struct sockaddr *)&sockaddr, &j);if (fd < 0){cerr << "accept errno" << endl;}cout << "连接成功" << endl; 连接完成后,要把fd以及它关心的事件,放到epoll模型中struct epoll_event _ev;_ev.data.fd = fd;_ev.events = EPOLLIN;if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &_ev) == 0){cout << "成功将" << fd << "托管给epoll" << endl;}else{cerr << "epoll_ctl failed" << endl;}cout << "*************************************" << endl;}else{ 走到这里说明是其他的普通fd的读事件就绪了char buffer[1024] = {0};ssize_t s = read(sock, buffer, sizeof(buffer) - 1);if (s > 0){cout << sock << ":"<< "正常数据读取" << endl;buffer[s] = 0;cout << "client:" << buffer << endl;cout << "*************************************" << endl;}else if (s == 0){ 说明对端连接关闭cout << sock << ":"<< "关闭连接" << endl;close(sock);// 记得管理epoll模型epoll_ctl(epfd, EPOLL_CTL_DEL, sock, NULL);std::cout << "sock: " << sock << "delete from epoll success" << std::endl;cout << "*************************************" << endl;}else{// 读取失败std::cout << "recv error" << std::endl;close(sock);epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);std::cout << "sock: " << sock << "delete from epoll success" << std::endl;cout << "*************************************" << endl;}}}}break;}}close(epfd);close(listen_fd);return 0;
}
来看现象:
(1)服务器启动,并且客户端1连接:
(2)客户端2也去连接:
客户端使用telnet连接,
服务端现象:
(3)以上说明,连接的epoll管理没有问题,现在客户端1,发送消息:
(4)客户端2发送消息:
(5)以上说明接收数据epoll管理没有问题,那么客户端1退出:
以上测试都没问题,感兴趣的小伙伴,可以下去自己试一试。
下一篇:csv文件、mysql表