1.首先写一个队列来存,线程
queue.c
#ifndef QUEUE_H
#define QUEUE_H
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
typedef struct Queue
{
void **arr;
int cap;
int front;
int rear;
} Queue;
//创建队列
Queue *create_queue(int cap);
//销毁队列
void destroy_queue(Queue *queue);
//入队
void push_queue(Queue *queue, void *arg);
//出队
void *pop_queue(Queue *queue);
//队空
bool empty_queue(Queue *queue);
//队满
bool full_queue(Queue *queue);
//队头
void *front_queue(Queue *queue);
//队尾
void *rear_queue(Queue *queue);
#endif//QUEUE_H
queue.c
#include "queue.h"
//创建队列
Queue *create_queue(int cap)
{
Queue *queue = malloc(sizeof(Queue));
//多开辟一一个空间
queue->arr=malloc(cap*sizeof(void*)*cap+1);
queue->cap=cap;
queue->front=0;
queue->rear=0;
return queue;
}
//销毁队列
void destroy_queue(Queue *queue)
{
free(queue->arr);
free(queue);
}
//入队
void push_queue(Queue *queue, void *arg)
{
//队尾入队
queue->arr[queue->rear++]=arg;
queue->rear%=queue->cap;
}
//出队
void *pop_queue(Queue *queue)
{
queue->front=(queue->front+1)%queue->cap;
}
//队空
bool empty_queue(Queue *queue)
{
return queue->front==queue->rear;
}
//队已满
bool full_queue(Queue *queue)
{
return (queue->rear+1)%queue->cap==queue->front;
}
//队头
void *front_queue(Queue *queue)
{
return queue->arr[queue->front];
}
//队尾
void *rear_queue(Queue *queue)
{
return queue->arr[(queue->rear+queue->cap-1)%queue->cap];
}
2.利用队列,写一个线程池
thread_pool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "queue.h"
typedef void (*EnterFP)(void *);//线程真正的业务逻辑函数格式
typedef struct ThreadPool {
int thread_cnt;//线程数量
pthread_t *tids;//创建线程id
Queue *store;//队列仓库
EnterFP enter;//线程真正的业务逻辑函数
pthread_mutex_t hlock;//队头互斥锁
pthread_mutex_t tlock;//队尾互斥锁
pthread_cond_t empty;//空仓库条件变量
pthread_cond_t full;//满仓库条件变量
} ThreadPool;
//创建线程池
ThreadPool *create_threadpool(int thread_cnt,int store_cap, EnterFP enter);
//启动线程池
void start_threadpool(ThreadPool *threadpool);
//生成数据
void push_threadpool(ThreadPool *threadpool, void *task);
//消费数据
void *pop_threadpool(ThreadPool *threadpool);
//销毁线程池
void destroy_threadpool(ThreadPool *threadpool);
#endif//THREADPOOL_H
pthread_pool.c
#include<stdlib.h>
#include "threadpool.h"
//线程池的线程的入口函数
static void *run(void *arg)
{
ThreadPool *threadpool=(ThreadPool*)arg;
while(1)
{
//线程就负责消费数据,能返回就意味着从仓库里拿到了数据
void *task=pop_threadpool(threadpool);
//拿到数据,去运行线程要执行的业务逻辑函数
threadpool->enter(task);//?
}
}
//消费数据
//创建线程池
ThreadPool *create_threadpool(int thread_cnt,int store_cap, EnterFP enter)
{
//申请线程池内存
ThreadPool*threadpool=malloc(sizeof(ThreadPool));
//申请线程内存
threadpool->tids=malloc(sizeof(pthread_t)*thread_cnt);
//创建仓库队列
threadpool->store=create_queue(store_cap);
//初始化线程容量
threadpool->thread_cnt=thread_cnt;
//初始化线程业务逻辑函数
threadpool->enter=enter;
//初始化互斥锁,条件变量
pthread_mutex_init(&threadpool->hlock,NULL);
pthread_mutex_init(&threadpool->tlock,NULL);
pthread_cond_init(&threadpool->empty,NULL);
pthread_cond_init(&threadpool->full,NULL);
return threadpool;
}
//启动线程池
void start_threadpool(ThreadPool *threadpool)
{
for(int i=0;i<threadpool->thread_cnt;i++)
{
//创建线程
pthread_create(&threadpool->tids[i],NULL,run,threadpool);
}
}
//生成数据
void push_threadpool(ThreadPool *threadpool, void *task)
{
//队尾加锁
pthread_mutex_lock(&threadpool->tlock);
//如果队满,不生产
while(full_queue(threadpool->store))
{
//唤醒消费者数据的线程
pthread_cond_signal(&threadpool->empty);
//睡眠并解锁队尾
pthread_cond_wait(&threadpool->full,&threadpool->tlock);
}
//生产数据存入队尾
push_queue(threadpool->store,task);
//唤醒一个消费者数据的线程
pthread_cond_signal(&threadpool->empty);
//队尾解锁
pthread_mutex_unlock(&threadpool->tlock);
}
//消费数据
void *pop_threadpool(ThreadPool *threadpool)
{
//队头加锁
pthread_mutex_lock(&threadpool->hlock);
//如果一直队空,不消费
while(empty_queue(threadpool->store))
{
//唤醒生产者数据的线程
pthread_cond_signal(&threadpool->full);
//睡眠并解锁队头
pthread_cond_wait(&threadpool->empty,&threadpool->hlock);
}
//消费数据
void *task=front_queue(threadpool->store);
pop_queue(threadpool->store);
//唤醒一个生产者者数据的线程
pthread_cond_signal(&threadpool->full);
//队头解锁
pthread_mutex_unlock(&threadpool->hlock);
return task;
}
//销毁线程池
void destroy_threadpool(ThreadPool *threadpool)
{
//结束线程池中所有的消费者线程
for(int i=0;i<threadpool->thread_cnt;i++)
{
pthread_cancel(threadpool->tids[i]);
}
pthread_cond_destroy(&threadpool->empty);
pthread_cond_destroy(&threadpool->full);
pthread_mutex_destroy(&threadpool->hlock);
pthread_mutex_destroy(&threadpool->tlock);
destroy_queue(threadpool->store);
free(threadpool->tids);
free(threadpool);
}
3.线程池已经封装好了,先在来写服务器,和客户端来测试
tcp_s.c:服务端
#include <stdio.h>
#include <stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"threadpool.h"
//业务逻辑
void enter(void*arg)
{
int cli_fd=*(int*)arg;
char buf[4096];
size_t buf_size=sizeof(buf);
for(;;)
{
int ret=recv(cli_fd,buf,buf_size,0);
if(ret<=0||0==strcmp(buf,"quit"))
{
//关闭套接字
close(cli_fd);
//释放资源
free(arg);
printf("客户端退出\n");
return;
}
printf("recv:%s bits:%d \n",buf,ret);
strcat(buf,"return");
ret=send(cli_fd,buf,strlen(buf)+1,0);
if(ret<=0)
{
close(cli_fd);
free(arg);
printf("客户端退出\n");
return;
}
}
}
int main(int argc,char *argv[]) {
int svr_fd=socket(AF_INET,SOCK_STREAM,0);
if(svr_fd<0)
{
printf("socket error\n");
return -1;
}
struct sockaddr_in addr={};
addr.sin_family=AF_INET;
//字符串转化为数字
addr.sin_port=htons(atoi(argv[2]));
addr.sin_addr.s_addr=inet_addr(argv[1]);
socklen_t addrlen=sizeof(addr);
if(bind(svr_fd,(struct sockaddr*)&addr,addrlen))
{
printf("bind error\n");
return -1;
}
//监听
if(listen(svr_fd,10))
{
perror("listen error");
return -1;
}
ThreadPool*threadpool=create_threadpool(10,20,enter);
start_threadpool(threadpool);
while(1)
{
//主线程这边相当于生产者
int *cli_fd_p=malloc(sizeof(int));
*cli_fd_p=accept(svr_fd,(struct sockaddr*)&addr,&addrlen);
push_threadpool(threadpool,cli_fd_p);
}
}
tcp_c.c 客户端
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/un.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <netinet/in.h>
typedef struct sockaddr *SP;
int main(int argc,const char* argv[])
{
//创建socket
int cli_fd=socket(AF_INET,SOCK_STREAM,0);
if(cli_fd<0)
{
perror("socket");
return -1;
}
//准备通信地址
struct sockaddr_in addr={};
addr.sin_family=AF_INET;
addr.sin_port=htons(8888);
addr.sin_addr.s_addr=inet_addr("127.0.0.1");
socklen_t addrlen=sizeof(addr);
//连接服务器
if(connect(cli_fd,(SP)&addr,addrlen))
{
perror("connect");
return -1;
}
char buf[4096];
size_t buf_size=sizeof(buf);
while(1)
{
//发送请求
printf(">>>>>");
scanf("%s",buf);
int ret=send(cli_fd,buf,strlen(buf)+1,0);
//ret=write(cli_fd,buf,strlen(buf)+1);
if(ret<=0)
{
printf("服务器正在升级,请稍后重试\n");
break;
}
if(0==strcmp("quit",buf))
{
printf("通信结束\n");
break;
}
//接收请求
//int ret=read(cli_fd,buf,buf_size);
ret=recv(cli_fd,buf,buf_size,0);
if(ret<=0)
{
printf("服务器正在维护,请稍候重试\n");
break;
}
printf("read:%s bits:%d\n",buf,ret);
}
return 0;
}
4.下面看测试结果
我用了两个客户端
客户端1
客户端2
封装的线程成功!!!