linux 4 网络基础 POLL EPOLL epoll堆 线程池 udp 本地套接字

发布时间 2023-07-31 18:07:53作者: snowa

Linux 5day

1.poll监听

poll相对与sellect的优缺点

优点:

没有文件描述符1024的限制

请求和返回是分离的

如:select read集合 返回read集合

缺点和select一样:

每次都需要将需要监听的文件描述符从应用层拷贝到内核

每次都需要将数组中的元素遍历一遍才知道那个变化了

大量并发,少量活跃效率低

poll原理

\#include <poll.h>

 int poll(struct pollfd *fds, nfds_t nfds, int timeout);

功能: 监听多个文件描述符的属性变化

参数:

   fds : 监听的数组的首元素地址

  nfds: 数组有效元素的最大下标+1

   timeout : 超时时间 -1是永久监听 >=0 限时等待

数组元素:

struct pollfd

​       struct pollfd {

​        int  fd;     /* file descriptor */ 需要监听的文件描述符

​        short events;   /* requested events */需要监听文件描述符什么事件 EPOLLIN 读事件  EPOLLOUT写事件

​        short revents;  /* returned events */ 返回监听到的事件  EPOLLIN 读事件  EPOLLOUT写事

​      };
代码
/* server.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <errno.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 6666
#define OPEN_MAX 1024

int main(int argc, char* argv[])
{
	int i, j, maxi, listenfd, connfd, sockfd;
	int nready;
	ssize_t n;
	char buf[MAXLINE], str[INET_ADDRSTRLEN];
	socklen_t clilen;
	struct pollfd client[OPEN_MAX];
	struct sockaddr_in cliaddr, servaddr;

	listenfd = Socket(AF_INET, SOCK_STREAM, 0);

	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(SERV_PORT);

	Bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

	Listen(listenfd, 20);
	/// 套接字创建 绑定 监听

	client[0].fd = listenfd;
	client[0].events = POLLIN; 					/* listenfd监听普通读事件 */
	//初始化数组 给fd赋-1
	for (i = 1; i < OPEN_MAX; i++)
		client[i].fd = -1; 							/* 用-1初始化client[]里剩下元素 */
	maxi = 0; 										/* client[]数组有效元素中最大元素下标 */

	for (; ; ) {
		nready = poll(client, maxi + 1, -1); 			/* 阻塞 */
		
		//新客户端链接
		if (client[0].revents & POLLIN) { 		/* 有客户端链接请求 */
			clilen = sizeof(cliaddr);
			connfd = Accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
			printf("received from %s at PORT %d\n",
				inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
				ntohs(cliaddr.sin_port));
			for (i = 1; i < OPEN_MAX; i++) {
				if (client[i].fd < 0) {
					client[i].fd = connfd; 	/* 找到client[]中空闲的位置,存放accept返回的connfd */
					break;
				}
			}

			if (i == OPEN_MAX)
				perr_exit("too many clients");

			client[i].events = POLLIN; 		/* 设置刚刚返回的connfd,监控读事件 */
			if (i > maxi)
				maxi = i; 						/* 更新client[]中最大元素下标 */
			if (--nready <= 0)
				continue; 						/* 没有更多就绪事件时,继续回到poll阻塞 */
		}
		//检测cfd
		for (i = 1; i <= maxi; i++) { 			/* 检测client[] */
			if ((sockfd = client[i].fd) < 0)
				continue;
			if (client[i].revents & POLLIN) {
				if ((n = Read(sockfd, buf, MAXLINE)) < 0) {
					if (errno == ECONNRESET) { /* 当收到 RST标志时 */
						/* connection reset by client */
						printf("client[%d] aborted connection\n", i);
						Close(sockfd);
						client[i].fd = -1;
					}
					else {
						perr_exit("read error");
					}
				}
				else if (n == 0) {
					/* connection closed by client */
					printf("client[%d] closed connection\n", i);
					Close(sockfd);
					client[i].fd = -1;
				}
				else {
					for (j = 0; j < n; j++)
						buf[j] = toupper(buf[j]);
					Writen(sockfd, buf, n);
				}
				if (--nready <= 0)
					break; 				/* no more readable descriptors */
			}
		}
	}
	return 0;
}

epoll 高并发服务器

1.epoll原理图

epoll原理

优点
1.没有文件描述符限制
2.每次监听都不需要拷贝进内核
3.遍历次数会减少
4.返回的是以及变化的文件描述符

2.epoll api
a> 创建红黑树
#include <sys/epoll.h>

​    int epoll_create(int size);

  参数:

   size : 监听的文件描述符的上限, 2.6版本之后写1即可,

  返回: 返回树的句柄

b> 上树 下树 修改节点
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
    epfd : 树的句柄
    op : EPOLL_CTL_ADD 上树   EPOLL_CTL_DEL 下树 EPOLL_CTL_MOD 修改
    fd : 上树,下树的文件描述符
    event :   上树的节点
///////////////////////////////////////////////////////
   struct epoll_event {
               uint32_t     events;      /* Epoll events */  需要监听的事件
               epoll_data_t data;        /* User data variable */ 需要监听的文件描述符
           };


 typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;

       //例子
 int epfd =  epoll_create(1);
struct epoll_event ev;
ev. data.fd = cfd;
ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD,cfd, &ev);


c> 监听
#include <sys/epoll.h>

       int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
    功能: 监听树上文件描述符的变化
    epfd : 数的句柄
    events : 接收变化的节点的数组的首地址
    maxevents :  数组元素的个数
    timeout : -1 永久监听  大于等于0 限时等待
返回值: 返回的是变化的文件描述符个数
epoll高并发服务器编写
#include <stdio.h>
#include <fcntl.h>
#include "wrap.h"
#include <sys/epoll.h>
void prtip(struct sockaddr_in* cliaddr)
{
	char ip[16] = "";
	printf("主机 ip=%s port=%d 即将链接\n", inet_ntop(AF_INET, &(cliaddr->sin_addr.s_addr), ip, 16),
		ntohs(cliaddr->sin_port));

}
int main()
{


	printf("请输入服务器端口号\n");
	char buf[8] = "";

	read(STDIN_FILENO, buf, sizeof(buf));
	printf("   \n");
	printf("等待客户端链接\n");
	int port = atoi((char*)buf);
	//创建 绑定
	int lfd = tcp4bind(port, NULL);
	//监听
	Listen(lfd, 128);
	//前置操作
	int hhs = epoll_create(1);//创建树
	struct epoll_event ev, evs[1024];//进行上树等等结构体
	//将lfd上树
	ev.data.fd = lfd;
	ev.events = EPOLLIN;
	epoll_ctl(hhs, EPOLL_CTL_ADD, lfd, &ev);

	//存放信息
	struct sockaddr_in as;
	socklen_t len = sizeof(&as);

	while (1)
	{
		//监听开始
		int sum = epoll_wait(hhs, evs, 1024, -1);
		printf("监听中\n");
		if (sum == 0)
		{
			continue;
		}
		else if (sum > 0)
		{
			//判断是lfd 还是cfd
			for (int i = 0; i < sum; i++)
			{
				if (evs[i].data.fd == lfd && evs[i].events & EPOLLIN)
				{
					//lfd变化 
					int cfd = Accept(lfd, (struct sockaddr*)&as, &len);
					prtip(&as);
					//加入到监听集合
					//上树
					ev.data.fd = cfd;
					ev.events = EPOLLIN;
					epoll_ctl(hhs, EPOLL_CTL_ADD, cfd, &ev);

				}
				else if (evs[i].events & EPOLLIN)
				{
					//cfd读变化
					printf("读");
					char buf[4] = "";

					int n = read(evs[i].data.fd, buf, sizeof(buf));
					if (n == 0)
					{
						printf("服务器 %d 即将退出", evs[i].data.fd);
						close(evs[i].data.fd);//将cfd关闭
						epoll_ctl(hhs, EPOLL_CTL_DEL, evs[i].data.fd, &evs[i]);//下树
						continue;
					}
					else if (n > 0)
					{
						printf("客户端 %d : %s" ,evs[i].data.fd,buf);
					}


				}//else if





			}//for



		}//else



	}///while
	return 0;
}

5.epoll两种工作方式

监听读缓冲区

水平触发 数据来一次触发一次监听 读1024 发1m

边沿触发

数据来一次epol_wait只触发一次

监听写缓冲区
水平触发 只要可以写就触发 没满就触发
边沿触发 数据从有到无就触发 数据满 发一次就会有无

边沿触发代码步骤

//1.设置cfd为边沿触发
ev.data.fd =cfd;
ev.events =EPOLLIN | EPOLLET;
//2.设置vfd rad为非阻塞
  
					int flags = fcntl(cfd,F_GETFL);//获取的cfd的标志位
					flags |= O_NONBLOCK;
					fcntl(cfd,F_SETFL,flags);
//3.防止read 返回报错 read接一个死循环
int n = read(evs[i].data.fd,buf,sizeof(buf));
						if(n < 0)//出错,cfd下树
						{
							//如果缓冲区读干净了,这个时候应该跳出while(1)循环,继续监听
							if(errno == EAGAIN)
							{
								break;
							
							}

6.线程池

1.线程池原理

线程池原理

线程池处理的任务,所需要处理的时间很短,如果太长任务 卡着 缓冲区都没人拿走

一个锁
条件变量
循环队列

2.线程池代码分析

头文件

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>


typedef struct _PoolTask
{
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;

typedef struct _ThreadPool
{
    int max_job_num;//最大任务个数
    int job_num;//实际任务个数
    PoolTask *tasks;//任务队列数组
    int job_push;//入队位置
    int job_pop;// 出队位置

    int thr_num;//线程池内线程个数
    pthread_t *threads;//线程池内线程数组
    int shutdown;//是否关闭线程池
    pthread_mutex_t pool_lock;//线程池的锁
    pthread_cond_t empty_task;//任务队列为空的条件
    pthread_cond_t not_empty_task;//任务队列不为空的条件

}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数

#endif

.c文件

//简易版线程池
#include "threadpoolsimple.h"
//初始化结构体

ThreadPool *thrPool = NULL;

int beginnum = 1000;
//线程回调 线程抢任务的具体方法
void *thrRun(void *arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool*)arg;//参数给予
    int taskpos = 0;//任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));//开辟一个任务数组

    while(1)
	{
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

		//无任务并且线程池不是要摧毁
        while(thrPool->job_num <= 0 && !thrPool->shutdown )
		{
			//如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);//阻塞此处等待信号
        }
        
        if(thrPool->job_num)
		{
            //有任务需要处理
            taskpos = (thrPool->job_pop++)%thrPool->max_job_num;
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
			//为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));//将任务数组结构体拷贝一份 防止覆盖任务
            task->arg = task;
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if(thrPool->shutdown)
		{
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
			pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }
    
    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
    printf("begin call %s-----\n",__FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//给结构体赋值

    thrPool->thr_num = thrnum;//线程个数
    thrPool->max_job_num = maxtasknum;//最大任务个数
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列数组

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock,NULL);//上锁
    pthread_cond_init(&thrPool->empty_task,NULL);//条件变量1
    pthread_cond_init(&thrPool->not_empty_task,NULL);//条件变量2

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间 线程数组 
	//设置线程自动分离
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //创建线程
    for(i = 0;i < thrnum;i++)
	{
        pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程->追线程回调
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀 

    int i = 0;
    for(i = 0; i < pool->thr_num ; i++)
	{
        pthread_join(pool->threads[i],NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool *pool)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

	//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while(pool->max_job_num <= pool->job_num)
	{
        pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
    }

    int taskpos = (pool->job_push++)%pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知包身工
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void *arg)
{
    PoolTask *task = (PoolTask*)arg;
    int num = task->tasknum;
    printf("task %d is runing %lu\n",num,pthread_self());

    sleep(1);
    printf("task %d is done %lu\n",num,pthread_self());
}


int main()
{
    //建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
    create_threadpool(3,20);
    int i = 0;
    for(i = 0;i < 50 ; i++)
	{
        addtask(thrPool);//模拟添加任务
    }

    sleep(20);
    destroy_threadpool(thrPool);

    return 0;
}

代码逻辑图片

线程池代码运行逻辑

3.线程池配epoll

.c

//简易版线程池
#include "threadpoolsimple.h"
//初始化结构体

ThreadPool *thrPool = NULL;

int beginnum = 1000;
//线程回调 线程抢任务的具体方法
void *thrRun(void *arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool*)arg;//参数给予
    int taskpos = 0;//任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));//开辟一个任务数组

    while(1)
	{
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

		//无任务并且线程池不是要摧毁
        while(thrPool->job_num <= 0 && !thrPool->shutdown )
		{
			//如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);//阻塞此处等待信号
        }
        
        if(thrPool->job_num)
		{
            //有任务需要处理
            taskpos = (thrPool->job_pop++)%thrPool->max_job_num;
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
			//为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));//将任务数组结构体拷贝一份 防止覆盖任务
            task->arg = task;
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if(thrPool->shutdown)
		{
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
			pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }
    
    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
    printf("begin call %s-----\n",__FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//给结构体赋值

    thrPool->thr_num = thrnum;//线程个数
    thrPool->max_job_num = maxtasknum;//最大任务个数
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列数组

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock,NULL);//上锁
    pthread_cond_init(&thrPool->empty_task,NULL);//条件变量1
    pthread_cond_init(&thrPool->not_empty_task,NULL);//条件变量2

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间 线程数组 
	//设置线程自动分离
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //创建线程
    for(i = 0;i < thrnum;i++)
	{
        pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程->追线程回调
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀 

    int i = 0;
    for(i = 0; i < pool->thr_num ; i++)
	{
        pthread_join(pool->threads[i],NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

	//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while(pool->max_job_num <= pool->job_num)
	{
        pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
    }

    int taskpos = (pool->job_push++)%pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->tasks[taskpos].fd = fd;
     pool->tasks[taskpos].evs = evs;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知包身工
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void *arg)
{
   
 
    PoolTask *task = (PoolTask*)arg;
     char buf[1024]="";
    int n = Read(task->fd , buf,sizeof(buf));
    if(n == 0 )
        {
         close(task->fd);//关闭cfd
        epoll_ctl(task->epfd,EPOLL_CTL_DEL,task->fd,task->evs);//将cfd上树
            printf("client close\n");
        }
    else if(n> 0)
        {
         printf("%s\n",buf );
         

        }
 
}


void prtip(struct sockaddr_in* cliaddr)
{
	char ip[16] = "";
	printf("主机 ip=%s port=%d 即将链接\n", inet_ntop(AF_INET, &(cliaddr->sin_addr.s_addr), ip, 16),
		ntohs(cliaddr->sin_port));

}
int main()
{

  create_threadpool(3,20);
	printf("请输入服务器端口号\n");
	char buf[8] = "";

	read(STDIN_FILENO, buf, sizeof(buf));
	printf("   \n");
	printf("等待客户端链接\n");
	int port = atoi((char*)buf);
	//创建 绑定
	int lfd = tcp4bind(port, NULL);
	//监听
	Listen(lfd, 128);
	//前置操作
	int hhs = epoll_create(1);//创建树
	struct epoll_event ev, evs[1024];//进行上树等等结构体
	//将lfd上树
	ev.data.fd = lfd;
	ev.events = EPOLLIN;
	epoll_ctl(hhs, EPOLL_CTL_ADD, lfd, &ev);

	//存放信息
	struct sockaddr_in as;
	socklen_t len = sizeof(&as);

	while (1)
	{
		//监听开始
		int sum = epoll_wait(hhs, evs, 1024, -1);
		printf("监听中\n");
		if (sum == 0)
		{
			continue;
		}
		else if (sum > 0)
		{
			//判断是lfd 还是cfd
			for (int i = 0; i < sum; i++)
			{
				if (evs[i].data.fd == lfd && evs[i].events & EPOLLIN)
				{
					//lfd变化 
					int cfd = Accept(lfd, (struct sockaddr*)&as, &len);
					//设置非阻塞
					int flage = fcntl(cfd, F_GETFL);//获取CFD标志位
					flage |= O_NONBLOCK;
					fcntl(cfd, F_SETFL, flage);
					prtip(&as);
					//加入到监听集合
					//上树
					ev.data.fd = cfd;
					ev.events = EPOLLIN | EPOLLET;
					epoll_ctl(hhs, EPOLL_CTL_ADD, cfd, &ev);

				}
				else if (evs[i].events & EPOLLIN)
				{
					


						addtask(thrPool, evs[i].data.fd, &ev);


					
				}




			}//for



		}//else



	}///while
	return 0;
}



.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <fcntl.h>
#include "wrap.h"
#include <sys/epoll.h>

typedef struct _PoolTask
{
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
    int fd;
    int epfd;
    struct epoll_event* evs;
}PoolTask ;

typedef struct _ThreadPool
{
    int max_job_num;//最大任务个数
    int job_num;//实际任务个数
    PoolTask *tasks;//任务队列数组
    int job_push;//入队位置
    int job_pop;// 出队位置

    int thr_num;//线程池内线程个数
    pthread_t *threads;//线程池内线程数组
    int shutdown;//是否关闭线程池
    pthread_mutex_t pool_lock;//线程池的锁
    pthread_cond_t empty_task;//任务队列为空的条件
    pthread_cond_t not_empty_task;//任务队列不为空的条件

}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs);//添加任务到线程池
void taskRun(void *arg);//任务回调函数

#endif

7.udp通信

Tcp 传输控制协议 安全 丢包重传 面向链接(电话模型)

udp 用户数据协议 不安全不可靠 丢包不重传 快(局域网)

tcp通信流程:

服务器: 创建流式套接字 绑定 监听 提取 读写 关闭

客户端: 创建流式套接字 连接 读写 关闭

收发数据:

read recv

ssize_t recv(int sockfd, void *buf, size_t len, int flags);//flags==MSG_PEEK ///读数据不会删除缓冲区的数据

write send

ssize_t send(int sockfd, const void *buf, size_t len, int flags);//flags=1 紧急数据
2.udp通信 api
udp通信流程
服务器: 创建报式套接字 绑定 读写 关闭
客户端:   创建报式套接字 读写  关闭
发数据:
//发送
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
                      const struct sockaddr *dest_addr, socklen_t addrlen);

dest_addr: 目的地的地址信息
addrlen: 结构体大小
//收数据:
  ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
  struct sockaddr *src_addr, socklen_t *addrlen);

src_addr: 对方的地址信息
addrlen: 结构体大小的地址
    
    
///创建报式套接字
socket
int socket(int domain, int type, int protocol);
参数:
    domain : AF_INET
    type :SOCK_DGRAM
    protocol :0


udp服务端
int main()
{
	//1.创建套接字
	int fd = socket(AF_INET,SOCK_DGRAM, 0);
	//2.绑定
	struct sockaddr_in myaddr;
	myaddr.sin_family = AF_INET;

	myaddr.sin_port = htons(8080);
	myaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

	//绑定
	bind(fd, (struct sockaddr*)&myaddr, sizeof(myaddr));

	//读写
	char buf[1024] = "";

	struct sockaddr_in src_addr;
	socklen_t addlen = sizeof(src_addr);
	while (1)
	{
		//收数据
		int n=recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr*) & src_addr, &addlen);
		if (n < 0)
		{
			perror("read");
			break;
		}
		else
		{
			printf("客户端信息 %s\n", buf);
		}
	}


}

客户端

int main()
{
	//1.创建套接字
	int fd = socket(AF_INET,SOCK_DGRAM, 0);
	//2.绑定

	//读写
	char buf[1024] = "";

	struct sockaddr_in dstaddr;
	dstaddr.sin_family=AF_INET;
        dstaddr.sin_port=htons(8080);
        dstaddr.sin_addr.s_addr=inet_addr("127.0.0.1");
int da=0;
	while (1)
	{
da=read(STDIN_FILENO,buf,sizeof(buf));
sendto(fd,buf,da,0,(struct sockaddr*)&dstaddr,sizeof(dstaddr));

memset(buf,0,sizeof(buf));
		//收数据	
	}

}

8.本地套接字

1.本地套接字

unix domain socket

本地套接字通信

全双工

套接字用文件来标识,这个文件在绑定之前是不能存在

本地套接字

 创建本地套接字用于tcp通信

int socket(int domain, int type, int protocol);

参数:

   domain : AF_UNIX

   type :SOCK_STREAM

   protocol : 0
绑定

int bind(int sockfd, const struct sockaddr *addr,

​        socklen_t addrlen);

sockfd: 本地套接字

addr: 本地套接字结构体地址

struct sockaddr_un {

​        sa_family_t sun_family;        /* AF_UNIX */

​        char    sun_path[108];      /* pathname *///文件的路径名

​      };

addrlen: sockaddr_un大小
提取

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

addr: struct sockaddr_un 结构体地址

需要注意的点:

客户端可以隐式绑定,但是服务器不可以

绑定指定文件时m,这个文件必须不存在,如果存在绑定失败
2.本地套接字代码
int main()
{
	unlink("sock.s");
	//1.创建套接字
	int lfd = socket(AF_UNIX,SOCK_STREAM, 0);
	//2.绑定
	struct sockaddr_un myaddr;
	myaddr.sun_family = AF_UNIX;
	strcpy(myaddr.sun_path, "sock.s");
	bind(lfd, (struct sockaddr*) & myaddr, sizeof(myaddr));
	listen(lfd, 128);

	//3.提取
	struct sockaddr_un cliaddr;
	socklen_t len = sizeof(cliaddr);
	int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &len);

	//读写
	char buf[1024] = "";

	while (1)
	{
		//收数据
		int n=recv(cfd, buf, sizeof(buf),0);
		if (n < 0)
		{
			perror("read");
			break;
		}
		else
		{
			printf("客户端信息 %s\n", buf);
		}
	}


}

客户端

int main()
{
	
	//1.创建套接字
	int cfd = socket(AF_UNIX,SOCK_STREAM, 0);
	//2.绑定
	struct sockaddr_un myaddr;
	myaddr.sun_family = AF_UNIX;
	strcpy(myaddr.sun_path, "sock.s");
	bind(cfd, (struct sockaddr*) & myaddr, sizeof(myaddr));
	
	//3.提取
	struct sockaddr_un seraddr;
seraddr.sun_family=AF_UNIX;
	strcpy(seraddr.sun_path,"sock.s");
	connect(cfd,(struct  sockaddr*)&seraddr,sizeof(seraddr));
	//读写
	char buf[1024] = "";

	while (1)
	{
printf("1\n");
int da=read(STDIN_FILENO,buf,sizeof(buf));
		//数据
		send(cfd,buf,da,0);
		memset(buf,0,sizeof(buf));
	}


}