linux 8 基于线程池和epoll监听实现聊天服务器

发布时间 2023-07-31 15:41:08作者: snowa
1.立项

功能
1.聊天总人数显示
2.账号密码注册功能-保留名字-永久保留id->保留id功能取消
3.总聊天室-进入前可输入名字 顺序id
4.聊天室聊天
5.单对单聊天
6.id=cfd串联起来

4.服务器代码
#include "threadpoolsimple.h"
//初始化结构体
#include<stdio.h>
ThreadPool* thrPool = NULL;
typedef struct  sumfd
{
    int cfd;
    char name[64];

}sumfd;
struct sumfd sumfd1[1024];
int beginnum = 1000;
///int sumfd[1024] = { 0 };//所有fd保存处 拷贝一份 不去对ev进行读取 或者再次写入
int lx = 0;
//线程回调 线程抢任务的具体方法
void* thrRun(void* arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool* pool = (ThreadPool*)arg;//参数给予
    int taskpos = 0;//任务位置
    PoolTask* task = (PoolTask*)malloc(sizeof(PoolTask));//开辟一个任务数组

    while (1)
    {
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

        //无任务并且线程池不是要摧毁
        while (thrPool->job_num <= 0 && !thrPool->shutdown)
        {
            //如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);//阻塞此处等待信号
        }

        if (thrPool->job_num)
        {
            //有任务需要处理
            taskpos = (thrPool->job_pop++) % thrPool->max_job_num;
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
            //为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task, &thrPool->tasks[taskpos], sizeof(PoolTask));//将任务数组结构体拷贝一份 防止覆盖任务
            task->arg = task;
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if (thrPool->shutdown)
        {
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
            pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }

    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum, int maxtasknum)
{
    printf("begin call %s-----\n", __FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//给结构体赋值

    thrPool->thr_num = thrnum;//线程个数
    thrPool->max_job_num = maxtasknum;//最大任务个数
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask) * maxtasknum));//申请最大的任务队列数组

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock, NULL);//上锁
    pthread_cond_init(&thrPool->empty_task, NULL);//条件变量1
    pthread_cond_init(&thrPool->not_empty_task, NULL);//条件变量2

    int i = 0;
    thrPool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrnum);//申请n个线程id的空间 线程数组 
    //设置线程自动分离
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //创建线程
    for (i = 0; i < thrnum; i++)
    {
        pthread_create(&thrPool->threads[i], &attr, thrRun, (void*)thrPool);//创建多个线程->追线程回调
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool* pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀 

    int i = 0;
    for (i = 0; i < pool->thr_num; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool* pool, int fd, struct epoll_event* evs)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

    //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while (pool->max_job_num <= pool->job_num)
    {
        pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
    }

    int taskpos = (pool->job_push++) % pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->tasks[taskpos].fd = fd;
    pool->tasks[taskpos].evs = evs;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知包身工
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void* arg)
{

    //
    PoolTask* task = (PoolTask*)arg;
    char buf[1024] = "";
    int n = Read(task->fd, buf, sizeof(buf));
    if (n == 0)
    {
        printf("客户端%d 即将关闭\n", task->fd);
        for (int i = 0; i < 1024; i++)
        {
            if (sumfd1[i].cfd == task->fd)
            {
                sumfd1[i].cfd = 0;

                break;
            }

        }
        close(task->fd);//关闭cfd
        epoll_ctl(task->epfd, EPOLL_CTL_DEL, task->fd, task->evs);//将cfd上树

    }
    else if (n > 0)
    {
        for (int i = 0; i < 1024; i++)
        {
            if (sumfd1[i].cfd != 0)
            {
                //"ID XX 发送:
                char buf1[80] = "id ";
int iu=0;
for(iu=0;iu<1024;iu++)
{
if(sumfd1[iu].cfd==task->fd)
{
break;
}


}
               
                sprintf(buf1, "name: %s  发送:%s ", sumfd1[iu].name,buf);
                send(sumfd1[i].cfd, buf1, sizeof(buf1), 0);
               // send(sumfd1[i].cfd, buf, sizeof(buf), 0);
            }
            else
            {
                lx++;

            }
            if (lx >= 4)
            {
                lx = 0;
                break;
            }

        }
    }
    lx = 0;


}


void prtip(struct sockaddr_in* cliaddr)
{
    char ip[16] = "";
    printf("主机 ip=%s port=%d 即将链接\n", inet_ntop(AF_INET, &(cliaddr->sin_addr.s_addr), ip, 16),
        ntohs(cliaddr->sin_port));

}
char bufname[90] = "";
int kg=0;
int main()
{

    create_threadpool(3, 20);
    printf("请输入服务器端口号\n");
    char buf[8] = "";

    read(STDIN_FILENO, buf, sizeof(buf));
    printf("   \n");
    printf("等待客户端链接\n");

    //
   


    int port = atoi((char*)buf);
    //创建 绑定
    int lfd = tcp4bind(port, NULL);
    //监听
    Listen(lfd, 128);
    //前置操作
    int hhs = epoll_create(1);//创建树
    struct epoll_event ev, evs[1024];//进行上树等等结构体
    //将lfd上树
    ev.data.fd = lfd;
    ev.events = EPOLLIN;
    epoll_ctl(hhs, EPOLL_CTL_ADD, lfd, &ev);

    //存放信息
    struct sockaddr_in as;
    socklen_t len = sizeof(&as);
    char buf12[600] = "";
    while (1)
    {
        //监听开始
int sum;
if(kg==0)
{
  sum= epoll_wait(hhs, evs, 1024, -1);
}else
{
sum=0;
}
       
        printf("监听中\n");
        if (sum == 0)
        {
            continue;
        }
        else if (sum > 0)
        {
            //判断是lfd 还是cfd
            for (int i = 0; i < sum; i++)
            {
                if (evs[i].data.fd == lfd && evs[i].events & EPOLLIN)
                {
                    //lfd变化 
                    int cfd = Accept(lfd, (struct sockaddr*)&as, &len);
                    //设置非阻塞
                    int flage = fcntl(cfd, F_GETFL);//获取CFD标志位
                    flage |= O_NONBLOCK;
                    fcntl(cfd, F_SETFL, flage);
                    prtip(&as);
                    int m = 0;
                    int i2 = 0;
                       for (i2 = 0; i2 < 1024; i2++)//实际循环次数会很少
                    {
                        if (sumfd1[i2].cfd == 0)
                        {
                            sumfd1[i2].cfd = cfd;
kg=1;
                             printf("请在5S内输入名字\n");

                             sleep(5);
                             char buf2[80]="";
                             Read(cfd ,buf2, sizeof(buf2));
                             strcpy(sumfd1[i2].name,buf2);
                             
                             kg=0;
                            m = i2;
                            break;

                        }
                    }

                    //加入到监听集合
                    //上树
                    ev.data.fd = cfd;
                    ev.events = EPOLLIN | EPOLLET;
                    epoll_ctl(hhs, EPOLL_CTL_ADD, cfd, &ev);
                   
                   // sumfd1
                    
                    //"ID XX 发送:
                    strcpy(buf12, " ");
                    sprintf(buf12, "你的编号 %d 成功注册 切记不要随意告诉别人哦\n当前聊天室总人数%d \n",cfd, m + 1);
                    send(cfd, buf12, sizeof(buf12), 0);


                    lx = 0;

                }
                else if (evs[i].events & EPOLLIN)
                {
                    addtask(thrPool, evs[i].data.fd, &ev);

                }




            }//for



        }//else



    }///while
    return 0;
}


3.总结

不完善 客户端没写 会等技术上来用qt重写一次聊天服务器