0%

I/O复用


I/O复用使得程序能同时监听多个文件描述符,使用I/O复用的情况:

  • 客户端程序要同时处理多个socket。
  • 客户端程序要同时处理用户输入和网络连接。
  • TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合。
  • 服务器要同时处理TCP请求和UDP请求。
  • 服务器要同时监听多个端口,或者处理多种服务。

I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得服务器程序看起来像是串行工作的。如果要实现并发,只能使用多进程或多线程等编程手段。

Linux下实现I/O复用的系统调用主要有select、poll和epoll

select系统调用

select系统调用:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

select API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include<sys/select.h> 
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
/*
参数:
nfds:指定被监听的文件描述符总数。通常被设置为select监听的所有文件描述符中的最大值加1。
readfds:指向可读事件对应的文件描述符集合
writefds:指向可写事件对应的文件描述符集合
exceptfds:指向异常事件对应的文件描述符集合
timeout:设置select函数的超时时间,指针类型可以内核将修改它以告诉应用程序select等待了多久。
如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。
如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。
返回值:
成功时返回就绪(可读、可写和异常)文件描述符的总数;
如果在超时时间内没有任何文件描述符就绪,select将返回0;
select失败时返回-1并设置errno;
如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
*/

// timeval结构体
struct timeval {
long tv_sec;/*秒数*/
long tv_usec;/*微秒数*/
};

文件描述符就绪条件

socket可读的情况:

  • socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。
  • socket通信的对方关闭连接。此时对该socket的读操作将返回0。
  • 监听socket上有新的连接请求。
  • socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。

socket可写的情况:

  • socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
  • socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
  • socket使用非阻塞connect连接成功或者失败(超时)之后。
  • socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。

socket异常情况:

  • socket上接收到带外数据。

处理带外数据

socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。

同时接收普通数据和带外数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// 9-1use_select.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
printf( "ip is %s and port is %d\n", ip, port );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
close( listenfd );
}

char remote_addr[INET_ADDRSTRLEN];
printf( "connected with ip: %s and port: %d\n", inet_ntop( AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN ), ntohs( client_address.sin_port ) );

char buf[1024];
fd_set read_fds;
fd_set exception_fds;

FD_ZERO( &read_fds );
FD_ZERO( &exception_fds );

int nReuseAddr = 1;
setsockopt( connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof( nReuseAddr ) );
while( 1 )
{
// 每次调用select前都要重新在read_fds和exception_fds中设置文件描述符connfd,
// 因为事件发生之后,文件描述符集合将被内核修改
memset( buf, '\0', sizeof( buf ) );
FD_SET( connfd, &read_fds );
FD_SET( connfd, &exception_fds );

ret = select( connfd + 1, &read_fds, NULL, &exception_fds, NULL );
printf( "select one\n" );
if ( ret < 0 )
{
printf( "selection failure\n" );
break;
}

if ( FD_ISSET( connfd, &read_fds ) )
{ // 对于可读事件,采用普通的recv函数读取数据
ret = recv( connfd, buf, sizeof( buf )-1, 0 );
if( ret <= 0 )
{
break;
}
printf( "get %d bytes of normal data: %s\n", ret, buf );
}
else if( FD_ISSET( connfd, &exception_fds ) )
{ // 对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据
ret = recv( connfd, buf, sizeof( buf )-1, MSG_OOB );
if( ret <= 0 )
{
break;
}
printf( "get %d bytes of oob data: %s\n", ret, buf );
}

}

close( connfd );
close( listenfd );

return 0;
}

poll系统调用

poll系统调用:在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
/*
参数:
fds:pollfd结构类型的数组,它指定所有感兴趣的文件描述符上发生的可读、可写和异常等事件。
nfds:指定被监听事件集合fds的大小,无符号长整数unsigned long int
timeout:指定poll的超时值,单位是毫秒;
timeout为-1,poll调用将永远阻塞,直到某个事件发生;
timeout为0,poll调用将立即返回。
返回值:
返回值的含义与select相同。
*/

// pollfd结构体的定义
struct pollfd {
int fd;/*文件描述符*/
short events;/*注册的事件,告诉poll监听fd上的哪些事件,一系列事件的按位或*/
short revents;/*实际发生的事件,由内核填充*/
};

poll支持的事件类型:

image-20220728115950782

epoll系列系统调用

内核事件表

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大差异。

  • epoll使用一组函数来完成任务,而不是单个函数。
  • epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。
  • 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。

创建epoll一个额外的文件描述符:

1
2
3
4
5
6
7
8
#include<sys/epoll.h> 
int epoll_create(int size);
/*
参数:
size:现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。
返回值:
返回一个文件描述符,将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
*/

操作epoll的内核事件表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include<sys/epoll.h> 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
/*
参数:
epfd:epoll_create返回的文件描述符,指定要访问的内核事件表
fd:要操作的文件描述符
op:指定操作类型。
EPOLL_CTL_ADD,往事件表中注册fd上的事件。
EPOLL_CTL_MOD,修改fd上的注册事件。
EPOLL_CTL_DEL,删除fd上的注册事件。
event:指定事件,epoll_event结构指针类型
返回值:
成功时返回0,失败则返回-1并设置errno。
*/

// epoll_event的定义
struct epoll_event {
__uint32_t events;/*epoll事件*/
epoll_data_t data;/*用户数据*/
};

epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include<sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
/*
参数:
epfd:epoll_create返回的文件描述符,指定要访问的内核事件表
events:指定事件,epoll_event结构指针类型
maxevents:指定最多监听多少个事件,它必须大于0。
timeout:指定超时值,单位是毫秒;
timeout为-1,调用将永远阻塞,直到某个事件发生;
timeout为0,调用将立即返回。
返回值:
成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno。
*/

epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。

poll和epoll在使用上的差别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*如何索引poll返回的就绪文件描述符*/ 
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
/*必须遍历所有已注册文件描述符并找到其中的就绪者(当然,可以利用ret来稍做优化)*/
for(int i = 0; i < MAX_EVENT_NUMBER; ++i) {
if(fds[i].revents&POLLIN)/*判断第i个文件描述符是否就绪*/
{
int sockfd=fds[i].fd;
/*处理sockfd*/
}
}

/*如何索引epoll返回的就绪文件描述符*/
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
/*仅遍历就绪的ret个文件描述符*/
for(int i = 0; i < ret; i++) {
int sockfd = events[i].data.fd;
/*sockfd肯定就绪,直接处理*/
}

LT和ET模式

epoll对文件描述符的操作有两种模式:

  • LT(Level Trigger,电平触发)模式。epoll的默认的工作模式,epoll相当于一个效率较高的poll。
  • ET(Edge Trigger,边沿触发)模式。epoll的高效工作模式,当epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。

对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。

ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。

LT和ET在工作方式上的差异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// 9-3mtlt.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

// 将文件描述符设置成非阻塞的
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

// 将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中
// 参数 enable_et指定是否对fd启用ET模式
void addfd( int epollfd, int fd, bool enable_et )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if( enable_et )
{
event.events |= EPOLLET;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

// LT模式的工作流程
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, false ); // 调用addfd函数,对connfd禁用ET模式
}
else if ( events[i].events & EPOLLIN )
{ // 只要socket读缓存中还有未读出的数据,这段代码就被触发
printf( "event trigger once\n" );
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret <= 0 )
{
close( sockfd );
continue;
}
printf( "get %d bytes of content: %s\n", ret, buf );
}
else
{
printf( "something else happened \n" );
}
}
}

// ET模式的工作流程
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, true ); // 对connfd开启ET模式
}
else if ( events[i].events & EPOLLIN )
{ // 这段代码不会被重复触发,所以循环读取数据,以确保把socket读缓存中的所有数据读出
printf( "event trigger once\n" );
while( 1 )
{
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret < 0 )
{ // 对于非阻塞IO,下面的条件成立表示数据已经全部读取完毕。
// 此后,epoll就能再次触发sockfd上的EPOLLIN事件,以驱动下一次读操作
if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
{
printf( "read later\n" );
break;
}
close( sockfd );
break;
}
else if( ret == 0 )
{
close( sockfd );
}
else
{
printf( "get %d bytes of content: %s\n", ret, buf );
}
}
}
else
{
printf( "something else happened \n" );
}
}
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd, true );

while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}

lt( events, ret, epollfd, listenfd ); // 使用LT模式
//et( events, ret, epollfd, listenfd ); // 使用ET模式
}

close( listenfd );
return 0;
}

服务器程序运行该代码,然后telnet到这个服务器程序上并一次传输超过10字节(BUFFER_SIZE的大小)的数据,会发现ET模式下事件被触发的次数要比LT模式下少很多。

注意:每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的事件而一直处于阻塞状态(饥渴状态)。

EPOLLONESHOT事件

并发程序中,一个线程(或进程),在读取完某个socket上的数据数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。

EPOLLONESHOT事件:使得一个socket连接在任一时刻都只被一个线程处理。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。

注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的 EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

EPOLLONESHOT事件的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// 9-4oneshot.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

// 存放内核事件表fd和socket连接fd
struct fds
{
int epollfd;
int sockfd;
};

// 将socket连接fd设置为非阻塞
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

// 将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中
// 参数oneshot指定是否注册fd上的EPOLLONESHOT事件
void addfd( int epollfd, int fd, bool oneshot )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if( oneshot )
{
event.events |= EPOLLONESHOT;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

// 重置fd上的事件。这样操作之后,尽管fd上的EPOLLONESHOT事件被注册,
// 但是操作系统仍然会触发fd上的EPOLLIN事件,且只触发一次
void reset_oneshot( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

// 工作线程
void* worker( void* arg )
{
int sockfd = ( (fds*)arg )->sockfd;
int epollfd = ( (fds*)arg )->epollfd;
printf( "start new thread to receive data on fd: %d\n", sockfd );
char buf[ BUFFER_SIZE ];
memset( buf, '\0', BUFFER_SIZE );

while( 1 )
{
// 循环读取sockfd上的数据,直到遇到EAGAIN错误
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret == 0 )
{
close( sockfd );
printf( "foreiner closed the connection\n" );
break;
}
else if( ret < 0 )
{
if( errno == EAGAIN )
{
reset_oneshot( epollfd, sockfd );
printf( "read later\n" );
break;
}
}
else
{
printf( "get content: %s\n", buf );
sleep( 5 ); // 休眠5s。模拟数据处理过程
}
}
printf( "end thread receiving data on fd: %d\n", sockfd );
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
// 注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户连接
// 因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件
addfd( epollfd, listenfd, false );

while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < ret; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
// 对每个非监听文件描述符都注册POLLONESHOT事件
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN )
{
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
// 新启动一个工作线程为sockfd服务
pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
}
else
{
printf( "something else happened \n" );
}
}
}

close( listenfd );
return 0;
}

工作线程函数worker中,如果一个工作线程处理完某个socket上的一次请求(用休眠5s来模拟这个过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务。并且因为该 socket上注册了EPOLLONESHOT事件,其他线程没有机会接触这个socket,如果工作线程等待5s后仍然没收到该socket上的下一批客户数据,则它将放弃为该socket服务。同时,它调用reset_oneshot函数来重置该socket上的注册事件,这将使epoll有机会再次检测到该socket上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。

尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务。这就保证了连接的完整性,从而避免了很多可能的竞态条件。

三组I/O复用函数的比较

select、poll、epoll三组I/O复用系统调用。

这三组系统调用都能同时监听多个文件描述符,等待由timeout参数指定的超时时间,直到一个或者多个文件描述符上有事件发生时返回,返回值是就绪的文件描述符的数量。返回0表示没有事件发生。

select特点:select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select需要提供3个这种类型的参数来分别传入和输出可读、可写及异常等事件。因此,select不能处理更多类型的事件,另一方面由于内核对fd_set集合的在线修改,应用程序下次调用select前不得不重置这3个fd_set集合。

poll特点:把文件描述符和事件都定义其中,任何事件都被统一处 理,从而使得编程接口简洁得多。并且内核每次修改的是pollfd结构体的revents成员,而events成员保持不变,因此下次调用poll时应用程序无须重置pollfd类型的事件集参数。

select和poll的时间复杂度:由于每次select和poll调用都返回整个用户注册的事件集合(其中包括就绪的和未就绪的),所以应用程序索引就绪文件描述符的时间复杂度为O(n)。

epoll特点:在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除、修改事件。这样,每次epoll_wait调用都直接从该内核事件表中取得用户注册的事件,而无须反复从用户空间读入这些事件

epoll的时间复杂度:epoll_wait系统调用的events参数仅用来返回就绪的事件,这使得应用程序索引就绪文件描述符的时间复杂度达到O(1)。

最大文件描述符数量:poll和epoll_wait分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件。这两个数值都能达到系统允许打开的最大文件描述符数目,即65535
(cat/proc/sys/fs/file-max)。select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但这可能导致不可预期的后果。

工作模式:select和poll都只能工作在相对低效的LT模式,而epoll则可以工作在ET高效模式。并且epoll还支持EPOLLONESHOT事件。该事件能进一步减少可读、可写和异常等事件被触发的次数。

实现原理:select和poll采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。
epoll_wait则不同,它采用的是回调的方式。内核检测到就绪的文件描 述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。因此epoll_wait无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度是O(1)。

epoll_wait适用于连接数量多,但活动连接较少的情况。

select、poll和epoll的区别:

系统调用 select poll epoll
事件集合 用户通过3个参数来分别传入感兴趣的可读、可写及异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数。 统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件。 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无须反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件。
应用程序索引就绪文件描述符的时间复杂度 O(n) O(n) O(1)
最大支持文件描述符数 一般有最大值限制 65535 65535
工作模式 LT LT ET
内核实现和工作效率 采用轮询方式来检测就绪事件,算法时间复杂度为O(n) 采用轮询方式来检测就绪事件,算法时间复杂度为O(n) 采用回调方式来检测就绪事件,算法时间复杂度为O(1)