Linux - IO多路复用之epoll

发布时间 2023-05-08 11:38:58作者: [BORUTO]

1. epoll概述

epoll 全称 eventpoll,是 linux 内核实现 IO 多路转接 / 复用(IO multiplexing)的一个实现。IO 多路转接的意思是在一个操作里同时监听多个输入输出源,在其中一个或多个输入输出源可用的时候返回,然后对其的进行读写操作。

epoll 是 select 和 poll 的升级版,相较于这两个前辈,epoll 改进了工作方式,因此它更加高效。

  • 对于待检测集合select和poll是基于线性方式处理的,epoll是基于红黑树来管理待检测集合的。
  • select和poll每次都会线性扫描整个待检测集合,集合越大速度越慢,epoll使用的是回调机制,效率高,处理效率也不会随着检测集合的变大而下降
  • select和poll工作过程中存在内核/用户空间数据的频繁拷贝问题,在epoll中内核和用户区使用的是共享内存(基于mmap内存映射区实现),省去了不必要的内存拷贝。
  • 程序猿需要对select和poll返回的集合进行判断才能知道哪些文件描述符是就绪的,通过epoll可以直接得到已就绪的文件描述符集合,无需再次检测
  • 使用 epoll 没有最大文件描述符的限制,仅受系统中进程能打开的最大文件数目限制

当多路复用的文件数量庞大、IO 流量频繁的时候,一般不太适合使用 select () 和 poll (),这种情况下 select () 和 poll () 表现较差,推荐使用 epoll ()。

 

2. 操作函数

在 epoll 中一共提供是三个 API 函数,分别处理不同的操作,函数原型如下:

#include <sys/epoll.h>

// 创建epoll实例,通过一棵红黑树管理待检测集合
int epoll_create(int size);

// 管理红黑树上的文件描述符(添加、修改、删除)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 检测epoll树中是否有就绪的文件描述符
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

select/poll 低效的原因之一是将 “添加 / 维护待检测任务” 和 “阻塞进程 / 线程” 两个步骤合二为一。每次调用 select 都需要这两步操作,然而大多数应用场景中,需要监视的 socket 个数相对固定,并不需要每次都修改。epoll 将这两个操作分开,先用 epoll_ctl() 维护等待队列,再调用 epoll_wait() 阻塞进程(解耦)。通过下图的对比显而易见,epoll 的效率得到了提升。

epoll_create() 函数的作用是创建一个红黑树模型的实例,用于管理待检测的文件描述符的集合。

int epoll_create(int size);

函数参数 size:在 Linux 内核 2.6.8 版本以后,这个参数是被忽略的,只需要指定一个大于 0 的数值就可以了。
函数返回值:
失败:返回 - 1
成功:返回一个有效的文件描述符,通过这个文件描述符就可以访问创建的 epoll 实例了

epoll_ctl() 函数的作用是管理红黑树实例上的节点,可以进行添加、删除、修改操作。

// 联合体, 多个变量共用同一块内存        
typedef union epoll_data {
 	void        *ptr;
	int          fd;	// 通常情况下使用这个成员, 和epoll_ctl的第三个参数相同即可
	uint32_t     u32;
	uint64_t     u64;
} epoll_data_t;

struct epoll_event {
	uint32_t     events;      /* Epoll events */
	epoll_data_t data;        /* User data variable */
};
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数参数:
epfd:epoll_create () 函数的返回值,通过这个参数找到 epoll 实例
op:这是一个枚举值,控制通过该函数执行什么操作
EPOLL_CTL_ADD:往 epoll 模型中添加新的节点
EPOLL_CTL_MOD:修改 epoll 模型中已经存在的节点
EPOLL_CTL_DEL:删除 epoll 模型中的指定的节点
fd:文件描述符,即要添加 / 修改 / 删除的文件描述符
event:epoll 事件,用来修饰第三个参数对应的文件描述符的,指定检测这个文件描述符的什么事件
events:委托 epoll 检测的事件
EPOLLIN:读事件,接收数据,检测读缓冲区,如果有数据该文件描述符就绪
EPOLLOUT:写事件,发送数据,检测写缓冲区,如果可写该文件描述符就绪
EPOLLERR:异常事件
data:用户数据变量,这是一个联合体类型,通常情况下使用里边的 fd 成员,用于存储待检测的文件描述符的值,在调用 epoll_wait() 函数的时候这个值会被传出。
函数返回值:
失败:返回 - 1
成功:返回 0

epoll_wait() 函数的作用是检测创建的 epoll 实例中有没有就绪的文件描述符。

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

函数参数:
epfd:epoll_create () 函数的返回值,通过这个参数找到 epoll 实例
events:传出参数,这是一个结构体数组的地址,里边存储了已就绪的文件描述符的信息
maxevents:修饰第二个参数,结构体数组的容量(元素个数)
timeout:如果检测的 epoll 实例中没有已就绪的文件描述符,该函数阻塞的时长,单位 ms 毫秒
0:函数不阻塞,不管 epoll 实例中有没有就绪的文件描述符,函数被调用后都直接返回
大于 0:如果 epoll 实例中没有已就绪的文件描述符,函数阻塞对应的毫秒数再返回
-1:函数一直阻塞,直到 epoll 实例中有已就绪的文件描述符之后才解除阻塞
函数返回值:
成功:
等于 0:函数是阻塞被强制解除了,没有检测到满足条件的文件描述符
大于 0:检测到的已就绪的文件描述符的总个数
失败:返回 - 1

 

3. epoll 的使用

3.1 操作步骤

在服务器端使用 epoll 进行 IO 多路转接的操作步骤如下:

1. 创建监听的套接字

int lfd = socket(AF_INET, SOCK_STREAM, 0);

2. 设置端口复用(可选)

int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

3. 使用本地的IP与端口和监听的套接字进行绑定

int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

4. 给监听的套接字设置监听

listen(lfd, 128);

5. 创建epoll实例对象

int epfd = epoll_create(100);

6. 将用于监听的套接字添加到epoll实例中

struct epoll_event ev;
ev.events = EPOLLIN;    // 检测lfd读读缓冲区是否有数据
ev.data.fd = lfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);

7. 检测添加到epoll实例中的文件描述符是否已就绪,并将这些已就绪的文件描述符进行处理

int num = epoll_wait(epfd, evs, size, -1);
  • 如果是监听的文件描述符,和新客户端建立连接,将得到的文件描述符添加到epoll实例中
int cfd = accept(curfd, NULL, NULL);
ev.events = EPOLLIN;
ev.data.fd = cfd;

// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
  • 如果是通信的文件描述符,和对应的客户端通信,如果连接已断开,将该文件描述符从epoll实例中删除
int len = recv(curfd, buf, sizeof(buf), 0);
if(len == 0)
{
    // 将这个文件描述符从epoll模型中删除
    epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
    close(curfd);
}
else if(len > 0)
{
    send(curfd, buf, len, 0);
}

8. 重复第 7 步的操作

 

3.2 示例代码

服务器代码:

#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

// server
int main(int argc, const char* argv[])
{
    // 创建监听的套接字
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    if(lfd == -1)
    {
        perror("socket error");
        exit(1);
    }

    // 绑定
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(9999);
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);  // 本地多有的IP
    
    // 设置端口复用
    int opt = 1;
    setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    // 绑定端口
    int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    if(ret == -1)
    {
        perror("bind error");
        exit(1);
    }

    // 监听
    ret = listen(lfd, 64);
    if(ret == -1)
    {
        perror("listen error");
        exit(1);
    }

    // 现在只有监听的文件描述符
    // 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
    // 创建一个epoll模型
    int epfd = epoll_create(100);
    if(epfd == -1)
    {
        perror("epoll_create");
        exit(0);
    }

    // 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
    struct epoll_event ev;
    ev.events = EPOLLIN;    // 检测lfd读读缓冲区是否有数据
    ev.data.fd = lfd;
    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
    if(ret == -1)
    {
        perror("epoll_ctl");
        exit(0);
    }

    struct epoll_event evs[1024];
    int size = sizeof(evs) / sizeof(struct epoll_event);
    // 持续检测
    while(1)
    {
        // 调用一次, 检测一次
        int num = epoll_wait(epfd, evs, size, -1);
        for(int i=0; i<num; ++i)
        {
            // 取出当前的文件描述符
            int curfd = evs[i].data.fd;
            // 判断这个文件描述符是不是用于监听的
            if(curfd == lfd)
            {
                // 建立新的连接
                int cfd = accept(curfd, NULL, NULL);
                // 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
                ev.events = EPOLLIN;    // 读缓冲区是否有数据
                ev.data.fd = cfd;
                ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
                if(ret == -1)
                {
                    perror("epoll_ctl-accept");
                    exit(0);
                }
            }
            else
            {
                // 处理通信的文件描述符
                // 接收数据
                char buf[1024];
                memset(buf, 0, sizeof(buf));
                int len = recv(curfd, buf, sizeof(buf), 0);
                if(len == 0)
                {
                    printf("客户端已经断开了连接\n");
                    // 将这个文件描述符从epoll模型中删除
                    epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
                    close(curfd);
                }
                else if(len > 0)
                {
                    printf("客户端say: %s\n", buf);
                    send(curfd, buf, len, 0);
                }
                else
                {
                    perror("recv");
                    exit(0);
                } 
            }
        }
    }

    return 0;
}

 

客户端代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
    // 1. 创建用于通信的套接字
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 连接服务器
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // ipv4
    addr.sin_port = htons(9999);   // 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
    int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
        perror("connect");
        exit(0);
    }

    // 通信
    while(1)
    {
        // 读数据
        char recvBuf[1024];
        // 写数据
        // sprintf(recvBuf, "data: %d\n", i++);
        fgets(recvBuf, sizeof(recvBuf), stdin);
        write(fd, recvBuf, strlen(recvBuf)+1);
        // 如果客户端没有发送数据, 默认阻塞
        read(fd, recvBuf, sizeof(recvBuf));
        printf("recv buf: %s\n", recvBuf);
        sleep(1);
    }
    // 释放资源
    close(fd); 
    return 0;
}

 

当在服务器端循环调用 epoll_wait() 的时候,就会得到一个就绪列表,并通过该函数的第二个参数传出:

struct epoll_event evs[1024];
int num = epoll_wait(epfd, evs, size, -1);

每当 epoll_wait() 函数返回一次,在 evs 中最多可以存储 size 个已就绪的文件描述符信息,但是在这个数组中实际存储的有效元素个数为 num 个,如果在这个 epoll 实例的红黑树中已就绪的文件描述符很多,并且 evs 数组无法将这些信息全部传出,那么这些信息会在下一次 epoll_wait() 函数返回的时候被传出。

通过 evs 数组被传递出的每一个有效元素里边都包含了已就绪的文件描述符的相关信息,这些信息并不是凭空得来的,这取决于我们在往 epoll 实例中添加节点的时候,往节点中初始化了哪些数据:

struct epoll_event ev;
// 节点初始化
ev.events = EPOLLIN;    
ev.data.fd = lfd;	// 使用了联合体中 fd 成员
// 添加待检测节点到epoll实例中
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);

在添加节点的时候,需要对这个 struct epoll_event 类型的节点进行初始化,当这个节点对应的文件描述符变为已就绪状态,这些被传入的初始化信息就会被原样传出,这个对应关系必须要搞清楚。

 

 

4. epoll实现多个客户端连接服务器

4.1 服务器

//服务器端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>        
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <signal.h>
#include <sys/epoll.h>
 
//最多允许的客户端数量
#define NUM 100
 
int serverSocket;
 
void hand(int val){
	close(serverSocket);
	printf("bye bye!\n");
	exit(0);
}
int main(int argc,char* argv[]){
	if(argc != 3) printf("请输入ip地址和端口号!\n"),exit(0);
	printf("ip: %s     port:%d\n",argv[1],atoi(argv[2]));
 
	signal(SIGINT,hand);
 
	//1. 创建socket 参数一: 协议类型(版本) 参数二: 通信媒介 参数三: 保护方式
	serverSocket = socket(AF_INET,SOCK_STREAM,0);
	if(-1 == serverSocket) printf("创建socket失败:%m\n"),exit(-1);
	printf("创建socket成功!\n");
 
	//2. 创建服务器协议地址簇
	struct sockaddr_in sAddr = { 0 };
	sAddr.sin_family = AF_INET;        //协议类型 和socket函数第一个参数一致
	sAddr.sin_addr.s_addr = inet_addr(argv[1]);  //将字符串转整数
	sAddr.sin_port = htons(atoi(argv[2]));    //将字符串转整数,再将小端转换成大端
 
	//3. 绑定服务器协议地址簇
	int r = bind(serverSocket,(struct sockaddr*)&sAddr,sizeof sAddr);
	if(-1 == r) printf("绑定失败:%m\n"),close(serverSocket),exit(-2);
	printf("绑定成功!\n");
 
	//4. 监听  
	r = listen(serverSocket,10);   //数量
	if(-1 == r) printf("监听失败:%m\n"),close(serverSocket),exit(-3);
	printf("监听成功!\n");
 
	//监视serverSocket和ClientSocket
	struct sockaddr_in cAddr = {0};
	int len = sizeof(cAddr);
	int cfd;        //临时保存一个客户端fd
	char buff[1024];
	int fdNum = 0;
 
	//创建epoll 参数可缺省(只需设置最大描述符号即可)
	int epfd = epoll_create(NUM);
	if(-1 == epfd){
		printf("创建epoll失败:%m\n");
		close(serverSocket);
		exit(-1);
	}
	printf("创建epoll成功!\n");
 
	//注册描述符号事件
	struct epoll_event ev;  //注册时使用
	struct epoll_event events[NUM]; //等待处理事件时使用
 
	ev.events = EPOLLIN;
	ev.data.fd = serverSocket;
 
	r = epoll_ctl(epfd,EPOLL_CTL_ADD,serverSocket,&ev);
	if(-1 == r){
		printf("注册serverSocket事件失败:%m\n");
		close(epfd);
		close(serverSocket);
		exit(-2);
	}
	printf("注册serverSocket事件成功!\n");
	//等待,挨个处理事件
	//epoll_wait 成功返回有动静的描述符号数量
	int curCfd;  //用于保存当前发数据的客户端epoll_wait的返回值
	int nfds;    //用于保存epoll_wait的返回值
	while(1){
		nfds = epoll_wait(epfd,events,NUM,1000);  //延时1000ms
		if(nfds < 0){
			printf("epoll_wait失败:%m\n");
			close(epfd);
			close(serverSocket);
			exit(-3);
		}else if(0 == nfds){
			//没有动静
			continue;
		}else{
			//有动静
			printf("有动静发生----\n");
			for(int i = 0;i < nfds; i++){
				if(serverSocket == events[i].data.fd){
					cfd = accept(serverSocket,(struct sockaddr*)&cAddr,&len);
					if(-1 == cfd){
						printf("服务器崩溃:%m\n");
						close(epfd);
						close(serverSocket);
						exit(-4);
					}
					printf("有客户端%d连接上服务器了:%s\n",cfd,
						inet_ntoa(cAddr.sin_addr));
 
					//注册客户端描述符号事件
					ev.events = EPOLLIN;
					ev.data.fd = cfd;
 
					r = epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);
					if(-1 == r){
						printf("注册clientSocket失败:%m\n");
						close(epfd);
						close(serverSocket);
						exit(-5);
					}
					printf("注册clientSocket事件成功!\n");
				}else if(events[i].events && EPOLLIN){
					//有客户端发数据过来
					curCfd = events[i].data.fd;
					r = recv(curCfd,buff,1023,0);
					if(r > 0){
						buff[r] = 0;
						printf("%d >> %s\n",curCfd,buff);
					}
				}
			}
		}
	}
 
	return 0;
}

 

4.2 客户端

 (注: 与poll代码相同)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>        
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <signal.h>
 
int clientSocket;
void hand(int val){
	//5. 关闭连接
	close(clientSocket);
	printf("bye bye!\n");
	exit(0);
}
int main(int argc,char* argv[]){
	if(argc != 3) printf("请输入ip地址和端口号!\n"),exit(0);
	printf("ip: %s     port:%d\n",argv[1],atoi(argv[2]));
 
	signal(SIGINT,hand);
 
	//1. 创建socket 参数一: 协议类型(版本) 参数二: 通信媒介 参数三: 保护方式
	clientSocket = socket(AF_INET,SOCK_STREAM,0);
	if(-1 == clientSocket) printf("创建socket失败:%m\n"),exit(-1);
	printf("创建socket成功!\n");
 
	//2. 创建服务器协议地址簇
	struct sockaddr_in cAddr = { 0 };
	cAddr.sin_family = AF_INET;
	cAddr.sin_addr.s_addr = inet_addr(argv[1]);  //将字符串转整数
	cAddr.sin_port = htons(atoi(argv[2]));    //将字符串转整数,再将小端转换成大端
 
	//3.连接服务器
	int r = connect(clientSocket,(struct sockaddr*)&cAddr,sizeof cAddr);
	if(-1 == r) printf("连接服务器失败:%m\n"),close(clientSocket),exit(-2);
	printf("连接服务器成功!\n");
 
	//4. 通信
	char buff[256] = {0};
	while(1){
		printf("你想要发送:");
		scanf("%s",buff);
		send(clientSocket,buff,strlen(buff),0);
	}
 
	return 0;
}

运行结果:

 

 

作者: 苏丙榅

链接: https://subingwen.cn/linux/epoll

来源: 爱编程的大丙

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。