Linux中的多线程剖析

news2024/10/5 14:26:54

目录

1、前言

2、多线程理解

2.1 线程

2.2 通俗了解进程和线程

2.2.1 进程是资源分配的基本单位

2.2.2 Linux中的线程是一种轻量化进程

2.3 进程和线程详解

2.3.1 创建一个线程 (pthread_create)

2.3.2 线程自己的一部分数据

2.3.3 线程组

2.3.4 关于进程的其他操作

2.4 Linux线程互斥

2.4.1 互斥量mutex

2.4.2 互斥量的接口

2.5 可重入和线程安全

2.5.1 线程安全

2.5.2 重入

2.6 死锁

2.6.1 死锁四个必要条件

2.6.2 避免死锁

2.7 线程同步

2.7.1 条件变量

2.7.2 同步概念与竞态条件

2.7.3 条件变量函数

2.8 POSIX信号量

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

线程池:

(1),使用条件变量和阻塞队列实现线程池

(2),使用信号量和循环队列实现线程池

4、结语


1、前言

        今天呢,我们来深度理解一下什么时多线程,还有他的相关操作,可是为什么叫Linux中的多线程剖析呢,因为多线程在Linux和Windows系统底层中的实现并不一样,相比之下,Windows的实现更加的复杂,而Linux中的简单一些,有利于理解,好了下面我们进入正片。注:如果本文中有什么错误,请在评论区指出,或者私信作者,谢谢。

2、多线程理解

2.1 线程

        要理解多线程,那我们就先得知道什么叫做线程,,通俗的讲呢,就是一个程序的执行路线就是一个线程,我们写的程序大多都是只有一条执行路线的,从调用main函数,一直结束,都只有一个路线,这就是一个线程。

        可是我们之前接触过一个概念叫做进程,那么线程和进程有什么区别,又有什么联系呢?

2.2 通俗了解进程和线程

        首先呢,线程是运行在进程中的,       

        进程是资源分配的基本单位,而进程是CPU调度的基本单位。

        接下来我们详细理解:

2.2.1 进程是资源分配的基本单位

        在我之前的文章中讲过,在Linux中呢,进程在内存中是由PCB进行管理的,而具体实现就是task_struct 结构体,这个结构体中保存着对应进程的相关信息,比如地址空间,文件描述相关,详细可以参照我之前的文章,

        如下如,Linux中的进程包括,PCB(task_struct),地址空间,页表,以及页表映射在内存中的数据,这些东西组成了一个进程,当然,下面这张图只是一个单线程进程,

2.2.2 Linux中的线程是一种轻量化进程

        那么在Linux中,是怎么对进程进行实现的呢?

         如上图所示,Linux中在设计线程的时候,采用了和进程管理一样的模块描述符,一个task_struct就是一个线程,但是这些task_struct并没有指向自己独有的地址空间和资源,这样的进程结构我们称作轻量化进程(LWP),也就是Linux中对线程的实现,

        这种实现,让CPU不能分辨自己处理的是进程还是线程,但是这就是它设计的精妙之处。

        对于轻量化进程来说,他们都指向同一个地址空间,所以他们可以很轻松的共一些数据和资源,但是对于Windows来说,他们为进程单独设计了一种结构,那样子的话,工程量就非常大了,具体可以自己了解。

2.3 进程和线程详解

        通过上面的通俗了解,我们就对进程和线程有了一定的了解和认识,那么我们详细看看:

2.3.1 创建一个线程 (pthread_create)

        Linux对外提供了一个用户级别的库,不是系统调用接口,这个库当中,就提供了一些对线程操作的函数:

#include <pthread.h>
int pthread_create(
    pthread_t *thread, //返回线程ID
    const pthread_attr_t *attr, //设置线程的属性,attr为NULL表示使用默认属性
    void *(*start_routine) (void*), //是个函数地址,线程启动后要执行的函数
    void *arg //传给线程启动函数的参数
    );
//返回值:成功返回0,错误返回错误码,
int pthread_join(pthread_t thread, void **value_ptr);

        我们可以使用这个函数来创建一个线程,但是等线程执行万指定的函数之后,我们需要对线程资源进行释放

        要注意的是,这里我们使用函数创建的线程是用户态的线程,对应底层的维护是使用轻量级进程来维护的

#include <iostream>
#include <pthread.h>


void* fun1(void* arg){
    std::cout << (char*)arg << std::endl;
    return nullptr;
}
void* fun2(void* arg){
    std::cout << (char*)arg << std::endl;
    return nullptr;
}

int main() {
    pthread_t p1,p2;
    //创建并启动线程
    pthread_create(&p1,nullptr,fun1,(void*)"我是线程p1");
    pthread_create(&p2,nullptr,fun2,(void*)"我是线程p2");

    //阻塞等待线程结束,并对其进行资源释放
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    return 0;
}

        在运行的时候,要注意:要指定pthread库的库名,不然会链接错误

makefile:

.PHONY:all
all:thread01

thread01:thread01.cpp
	g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean
clean:
	rm -f thread01

运行结果:

2.3.2 线程自己的一部分数据

        大家肯定注意到了一个点,那就是pthread_create中,有一个输出型参数,他返回的是什么呢,我们打印 出来看一下

std::cout << "p1:" << p1 << std::endl;
std::cout << "p2:" << p2 << std::endl;
//运行结果:
//p1:139891316045568
//p2:139891307652864

        这么大长串的东西,叫做他的线程ID,但是进程ID那么一点点,这个东西怎么会那么大,大家不要将这两个东西搞混了,这里所说的线程ID并不是我们在监视窗口上看到的那个ID,在监视窗口上那个是线程的tid,这里的进程ID是他在编码层面上的ID,我们可以详细理解一下:

监视窗口上的进程ID(LWP):

        编码层面上的线程ID到底是个什么鬼,这里我们要知道一个东西,就是线程是CPU调度的最小粒子,就是他是要在单独运行的,那我们知道一个程序都是在栈里运行的,但是现在很多个线程都公用一个地址空间,那栈也是公用的吗,并不是,那样就太复杂了,在弹栈,压栈的时候,并没有那么多的寄存器可以供我们维护这个东西,

        所以呢,在地址空间中,有一块叫做共享区的地方,这块地方上面是栈,下边是堆,他们两个相向而生,所以说这块空间非常大,所以呢,在设计之初,就使用这块地方来为本进程创建的线程开辟栈区和一些线程私有的资源区,

         如上图所示,这就是创建的线程的资源分配,我们使用 pthread_create 的时候,返回给我们的线程ID,其实就是对应的线程资源在地址空间中的地址,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID,

pthread_t pthread_self(void);

        线程自己的东西除了栈还有线程ID,一组寄存器,errno,信号屏蔽字,调度优先级

2.3.3 线程组

        没有线程之前,一个进程对应内核里的一个进程描述符,对应一个进程ID。但是引入线程概念之后,情况发生了变化,一个用户进程下管辖N个用户态线程,每个线程作为一个独立的调度实体在内核态都有自己的进程描述符,进程和内核的描述符一下子就变成了1:N关系,POSIX标准又要求进程内的所有线程调用getpid函数时返回相同的进程ID,如何解决上述问题呢?

        Linux内核引入了线程组的概念。

        多线程的进程,又被称为线程组,线程组内的每一个线程在内核之中都存在一个进程描述符(task_struct)与之对应。进程描述符结构体中的pid,表面上看对应的是进程ID,其实不然,它对应的是线程ID;进程描述符中的tgid,含义是Thread Group ID,该值对应的是用户层面的进程ID

        不同于pthread_t类型的线程ID,和进程ID一样,线程ID是pid_t类型的变量,而且是用来唯一标识线程的一个整型变量。

2.3.4 关于进程的其他操作

线程终止

void pthread_exit(void *value_ptr);
//参数
//value_ptr:value_ptr不要指向一个局部变量。
//返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

        需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。 

取消一个执行中的线程

int pthread_cancel(pthread_t thread);
//参数
//thread:线程ID
//返回值:成功返回0;失败返回错误码

等待线程结束

int pthread_join(pthread_t thread, void **value_ptr);
//参数
//thread:线程ID
//value_ptr:它指向一个指针,后者指向线程的返回值
//返回值:成功返回0;失败返回错误码

        调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

        1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。

        2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_CANCELED。

        3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传pthread_exit的参数。

        4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。

分离线程

        默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。

        如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。

int pthread_detach(pthread_t thread);

2.4 Linux线程互斥

我们先来了解一些概念性的东西:

        临界资源:多线程执行流共享的资源就叫做临界资源

        临界区:每个线程内部,访问临界自娱的代码,就叫做临界区

        互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

        原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

2.4.1 互斥量mutex

        大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。

多个线程并发的操作共享变量,会带来一些问题。

我们举一个抢票的例子来说

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>
#define THREADNO 5

int residue = 1000;

void *loot(void* arg){
    while (true) {
        
        if (residue > 0) {
            std::cout << "线程" << (long long)arg + 1 << ":" << residue-- << " 余票:" << residue << std::endl;;
            usleep(1000);
        }
        else {
            
            break;
        }
        
        usleep(2000);
    }
}

int main() {
    //创建线程
    pthread_t tids[THREADNO];
    for (long long i = 0; i < THREADNO; ++i) {
        if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    std::cout << "进程创建完毕" << std::endl;


    for (int i = 0; i < THREADNO; ++i) {
        pthread_join(tids[i],nullptr);
    }

    return 0;
}

        CPU运算速度是非常快的,所以在访问临界资源的时候,就可能出现数据错误,本来只有一千张票,结果发放了一千多张。

为什么可能无法获得错误结果?

        if 语句判断条件为真以后,代码可以并发的切换到其他线程

        usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段

        --ticket操作本身就不是一个原子操作

2.4.2 互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

        方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

        方法2,动态分配:

int pthread_mutex_init(
    pthread_mutex_t *restrict mutex, 
    const pthread_mutexattr_t *restrict attr
);
//参数:
//mutex:要初始化的互斥量 
//attr:NULL

销毁互斥量

销毁互斥量需要注意:

        使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁

        不要销毁一个已经加锁的互斥量

        已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

 互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
//返回值:成功返回0,失败返回错误号

调用pthread_ lock 时,可能会遇到以下情况:

        互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功

        发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

修改上面的抢票代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>

#define THREADNO 2

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
int residue = 100;

void *loot(void* arg){
    while (true) {
        pthread_mutex_lock(&mtx);
        if (residue > 0) {
            std::cout << "线程" << (long long)arg + 1 << ":" << residue << " 余票:" << residue - 1 << std::endl;
            residue--;
            usleep(5000);
            //sleep(1);
        }
        else {
            pthread_mutex_unlock(&mtx);
            break;
        }
        pthread_mutex_unlock(&mtx);
        usleep(2000);
    }
}

int main() {
    //创建线程
    pthread_t tids[THREADNO];
    for (long long i = 0; i < THREADNO; ++i) {
        if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    std::cout << "进程创建完毕" << std::endl;


    for (int i = 0; i < THREADNO; ++i) {
        pthread_join(tids[i],nullptr);
    }

    return 0;
}

        经过上面的例子,大家已经意识到单纯的i++或者++i都不是原子的,有可能会有数据一致性问题为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

2.5 可重入和线程安全

2.5.1 线程安全

        多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

2.5.2 重入

        同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

常见的线程不安全的情况

        不保护共享变量的函数

        函数状态随着被调用,状态发生变化的函数

        返回指向静态变量指针的函数

        调用线程不安全函数的函数

常见的线程安全的情况

        每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的

        类或者接口对于线程来说都是原子操作

        多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

        调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的

        调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构

        可重入函数体内使用了静态的数据结构

常见可重入的情况

        不使用全局变量或静态变量

        不使用用malloc或者new开辟出的空间

        不调用不可重入函数

        不返回静态或全局数据,所有数据都有函数的调用者提供

        使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全联系

        函数是可重入的,那就是线程安全的

        函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题

        如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

可重入与线程安全区别

        可重入函数是线程安全函数的一种

        线程安全不一定是可重入的,而可重入函数则一定是线程安全的。

        如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

2.6 死锁

        死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

2.6.1 死锁四个必要条件

        互斥条件:一个资源每次只能被一个执行流使用

        请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

        不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

        循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

2.6.2 避免死锁

        破坏死锁的四个必要条件

        加锁顺序一致

        避免锁未释放的场景

        资源一次性分配

2.7 线程同步

2.7.1 条件变量

        当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

2.7.2 同步概念与竞态条件

        同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步

        竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

2.7.3 条件变量函数

初始化

int pthread_cond_init(
    pthread_cond_t *restrict cond,
    const pthread_condattr_t *restrict attr); 
//参数:
//cond:要初始化的条件变量 
//attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(
    pthread_cond_t *restrict cond,
    pthread_mutex_t *restrict mutex
); 
//参数:
//cond:要在这个条件变量上等待 
//mutex:互斥量

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond);

简单案例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <string>

struct data{
    char name;
    pthread_mutex_t *mtx;
    pthread_cond_t *cond;
};



void* print(void *arg){
    data *d = (data*)arg;
    char name[64];
    snprintf(name, sizeof(name),"线程%c打印--我是线程%c!",d->name,d->name);
    std::string threadname = name;
    
    while (true) {
        pthread_cond_wait(d->cond,d->mtx);
        std::cout << threadname <<std::endl;
        sleep(1);
        pthread_cond_signal(d->cond);
    }
}


int main() {
    pthread_mutex_t mtx;
    pthread_cond_t cond;

    pthread_mutex_init(&mtx,nullptr);
    pthread_cond_init(&cond,nullptr);

    data d1,d2;
    d1.name = 'A';
    d1.mtx = &mtx;
    d1.cond = &cond;
    d2.name = 'B';
    d2.mtx = &mtx;
    d2.cond = &cond;

    pthread_t p1,p2;
    pthread_create(&p1,nullptr,print,(void*)&d1);
    pthread_create(&p2,nullptr,print,(void*)&d2);
    sleep(1);
    pthread_cond_signal(&cond);
    
    pthread_join(p1,nullptr);
    pthread_join(p2,nullptr);
    while (1);
    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

运行结果:

为什么pthread_ cond_ wait 需要互斥量?

        条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

2.8 POSIX信号量

        POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。  但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); 
//参数:
//pshared:0表示线程间共享,非零表示进程间共享 
//value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem);

发布信号量

//发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

        为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

        解耦、支持并发、支持忙闲不均

线程池:

        一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

        1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

        2.  对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

        3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

        下来我们分别用条件变量和信号量实现线程池,单机版的,我们就设计在线程池内部自产自销的方案实现

(1),使用条件变量和阻塞队列实现线程池

//thread.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdio>
//#include <functional>


class Thread{

    //typedef std::function<void* (void*)> function;
    typedef void*(*function)(void*);

public:
    Thread(){}
    Thread(int num):_tid(0)
    {
        char name[64];
        snprintf(name, sizeof(name),"thread%d",num);
        _name = name;
    }

    void create(function fun,void* arg){
        pthread_create(&_tid, nullptr, fun, arg);
    }

    void join() {
        pthread_join(_tid,nullptr);
    }

    std::string name(){
        return _name;
    }

    ~Thread(){}

private:
    pthread_t _tid;
    std::string _name;
};
//Task.hpp
#pragma once
#include <iostream>

template<class T, class D>
class Task{

public:
    Task(T fun, D num1, D num2):_fun(fun),_arg0(num1),_arg1(num2)
    {}

    T fun() {
        return _fun;
    }

    D arg0() {
        return _arg0;
    }
    D arg1() {
        return _arg1;
    }

private:
    T _fun;
    D _arg0;
    D _arg1;
};
//mypool.hpp
#pragma once

#include "thread.hpp"
#include "Task.hpp"
#include <vector>
#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
#define CAPACITY 10

int add(int x, int y)
{
    return x + y;
}

template <class T, class D>
struct poolData
{
    int _capacity;
    //线程自身
    Thread* _self;
    std::queue<Task<T, D> *> *_task;
    // 锁
    pthread_mutex_t *_mtx;
    // 条件变量
    pthread_cond_t *_full;  // 满了
    pthread_cond_t *_empty; // 空了
};

template <class T, class D>
class Pool
{
public:
    Pool(int num) : _capacity(10)
    {
        _productor = new std::vector<Thread *>(num);
        _consumer = new std::vector<Thread *>(num);
        _task = new std::queue<Task<T, D> *>;
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }

    std::vector<Thread *> *getpro()
    {
        return _productor;
    }
    std::vector<Thread *> *getcon()
    {
        return _consumer;
    }
    std::queue<Task<T, D> *> *gettask()
    {
        return _task;
    }

    void strat()
    {
        //poolData<T, D> prodata[_productor->size()];
        //poolData<T, D> condata[_productor->size()];
        std::vector<poolData<T, D>> prodata(_productor->size());
        std::vector<poolData<T, D>> condata(_productor->size());
        for (int i = 0; i < _productor->size(); ++i)
        {
            //data[i]._consumer = _consumer;
            //data[i]._productor = _productor;
            prodata[i]._task = _task;
            prodata[i]._full = &_full;
            prodata[i]._empty = &_empty;
            prodata[i]._mtx = &_mtx;
            prodata[i]._capacity = _capacity;
            (*_productor)[i] = new Thread(i);
            prodata[i]._self = (*_productor)[i];
            (*_productor)[i]->create(productor, &prodata[i]);

            condata[i]._task = _task;
            condata[i]._full = &_full;
            condata[i]._empty = &_empty;
            condata[i]._mtx = &_mtx;
            condata[i]._capacity = _capacity;
            (*_consumer)[i] = new Thread(i);
            condata[i]._self = (*_consumer)[i];
            (*_consumer)[i]->create(consumer, &condata[i]);
        }
        for (int i = 0; i < _productor->size(); ++i)
        {
            (*_productor)[i]->join();
            (*_consumer)[i]->join();
        }
        for (int i = 0; i < _productor->size(); ++i)
        {
            delete (*_productor)[i];
            delete (*_consumer)[i];
        }
    }

    // 生产者
    static void *productor(void *args)
    {
        poolData<T, D> *pd = (poolData<T, D> *)args;
        //std::vector<Thread *> *pro = pd->_productor;
        // 消费者队列
        // std::vector<Thread*> *con = pd->_consumer;
        Thread* self = pd->_self;
        // 阻塞队列
        std::queue<Task<T, D> *> *task = pd->_task;
        // 锁
        pthread_mutex_t *mtx = pd->_mtx;
        // 条件变量
        pthread_cond_t *full = pd->_full;   // 满了
        pthread_cond_t *empty = pd->_empty; // 空了
        int capacity = pd->_capacity;
        srand((uint64_t)time(nullptr) ^ 0x6666);
        while (true)
        {
            int x = rand() % 1000 + 10;
            int y = rand() % 1000 + 10;

            pthread_mutex_lock(mtx);
            // 如果满了,就挂起
            while (task->size() == capacity){
                std::cout << "生产者挂起!" << std::endl;
                pthread_cond_wait(full, mtx);
            }
                
            
            std::cout << "生产者拿到锁,开始生产"<< std::endl;
            // 添加任务
            task->push(new Task<int (*)(int, int), int>(add, x, y));
            std::cout << "生产者" << self->name() << " 生产:" << x << "+" << y << "=?" << std::endl;
            usleep(rand()%3000);
            std::cout << "生产者生产完毕,释放锁,唤醒线程"<< std::endl;
            pthread_cond_signal(empty);
            pthread_mutex_unlock(mtx);
            

            
            sleep(1);
        }
    }
    // 消费者
    static void *consumer(void *args)
    {
        poolData<T, D> *pd = (poolData<T, D> *)args;
        // std::vector<Thread*>* pro = pd->_productor;
        // 消费者队列
        //std::vector<Thread *> *con = pd->_consumer;
        Thread* self = pd->_self;
        // 阻塞队列
        std::queue<Task<T, D> *> *task = pd->_task;
        // 锁
        pthread_mutex_t *mtx = pd->_mtx;
        // 条件变量
        pthread_cond_t *full = pd->_full;   // 满了
        pthread_cond_t *empty = pd->_empty; // 空了

        while (true)
        {

            pthread_mutex_lock(mtx);
            while (task->empty()) {
                std::cout << "消费者挂起!" << std::endl;
                pthread_cond_wait(empty, mtx);
            }
            std::cout << "消费者拿到锁,开始消费"<< std::endl;
            // 执行任务
            // task->push(Task(add,x,y));
            Task<T, D> *t = task->front();
            task->pop();
            D num = t->fun()(t->arg0(), t->arg1());
            std::cout << "消费者" << self->name() << " 消费:" << t->arg0() << "+" << t->arg1() << "=" << num << std::endl;
            delete t;
            usleep(rand()%3000);
            std::cout << "消费者消费完毕,释放锁,唤醒线程"<< std::endl;

            pthread_cond_signal(full);
            pthread_mutex_unlock(mtx);
            

            //usleep(rand() % 2000);
            sleep(2);
        }
    }

    ~Pool()
    {
        if (!_productor->empty())
        {
            for (int i = 0; i < _productor->size(); ++i)
            {
                if ((*_productor)[i] != nullptr)
                {
                    delete (*_productor)[i];
                }
            }
        }
        if (!_consumer->empty())
        {
            for (int i = 0; i < _consumer->size(); ++i)
            {
                if ((*_consumer)[i] != nullptr)
                {
                    delete (*_consumer)[i];
                }
            }
        }

        while (!_task->empty())
        {
            Task<T,D> *t = _task->front();
            _task->pop();
            delete t;
        }

        delete _productor;
        delete _consumer;
        delete _task;
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

private:
    int _capacity;
    // 生产着队列
    std::vector<Thread *> *_productor;
    // 消费者队列
    std::vector<Thread *> *_consumer;
    // 阻塞队列
    std::queue<Task<T, D> *> *_task;
    // 锁
    pthread_mutex_t _mtx;
    // 条件变量
    pthread_cond_t _full;  // 满了
    pthread_cond_t _empty; // 空了
};
//test.cpp
#include "mypool.hpp"

int main() {
    //std::cout << "hello" << std::endl;
    Pool<int(*)(int,int),int> p(1);
    p.strat();

    return 0;
}

(2),使用信号量和循环队列实现线程池

#pragma once

//mutex.hpp
#include <iostream>
#include <pthread.h>


class mutex{

public:
    mutex(){
        pthread_mutex_init(&_mtx,nullptr);
    }

    void lock(){
        pthread_mutex_lock(&_mtx);
    }

    void unlock(){
        pthread_mutex_unlock(&_mtx);
    }

    ~mutex(){
        pthread_mutex_destroy(&_mtx);
    }


private:
    pthread_mutex_t _mtx;
};
//sem.hpp
#pragma once

#include <iostream>
#include <semaphore.h>

class sem{

public:
    sem(int value)
    {
        sem_init(&_sem, 0, value);
    }

    void p(){
        sem_wait(&_sem);
    }

    void v(){
        sem_post(&_sem);
    }

    ~sem(){
        sem_destroy(&_sem);
    }


private:
    sem_t _sem;
};
//ringqueue.hpp
#include <iostream>
#include <vector>
#include "sem.hpp"
#include "mutex.hpp"

template<class T>
class ringqueue {

public:
    ringqueue(int capacity = 10)
    :_ring_queue(capacity),
    _start(0),
    _tail(0),
    _space_sem(capacity),
    _data_sem(0),
    _mtx()
    {}

    void push(const T &in){

        _space_sem.p();
        _mtx.lock();
        _ring_queue[_start++] = in;
        _start %= _ring_queue.size();
        _data_sem.v();
        _mtx.unlock();
    }

    void pop(T & out){
        _data_sem.p();
        _mtx.lock();
        out = _ring_queue[_tail++];
        _tail %= _ring_queue.size();
        _space_sem.v();
        _mtx.unlock();
    }

    ~ringqueue()
    {
        
    }

private:
    std::vector<T> _ring_queue;
    int _start;
    int _tail;
    sem _space_sem;
    sem _data_sem;
    mutex _mtx;
};
//mypool.hpp
#pragma once

#include "thread.hpp"
#include "Task.hpp"
#include "ringQueue.hpp"
//#include <vector>
//#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
//#define CAPACITY 10

int add(int x, int y)
{
    return x + y;
}

template <class T, class D>
struct poolData
{
    Thread* _self;
    ringqueue<Task<T,D>*>* _rq;
};

template <class T, class D>
class Pool
{
public:
    Pool(int num) 
    :_productor(num),
    _consumer(num)
    {
        
    }

    

    void strat()
    {
        poolData<T,D> prodata[_productor.size()];
        poolData<T,D> condata[_consumer.size()];
        for (int i = 0; i < _productor.size(); ++i) {
            _productor[i] = new Thread(i);
            prodata[i]._self = _productor[i];
            prodata[i]._rq = &_rq;
            _productor[i]->create(productor,&prodata[i]);
            _consumer[i] = new Thread(i);
            condata[i]._self = _consumer[i];
            condata[i]._rq = &_rq;
            _consumer[i]->create(consumer,&condata[i]);
        }

        for (int i = 0; i < _productor.size(); ++i) {
            _productor[i]->join();
            delete _productor[i];
            _consumer[i]->join();
            delete _consumer[i];
        }
    }

    // 生产者
    static void *productor(void *args)
    {
        poolData<T,D> *pd = (poolData<T,D>*)args;
        Thread *self = pd->_self;
        ringqueue<Task<T,D>*> *rq = pd->_rq;
        srand(time(nullptr) ^ 0x6666);
        while (true) {
            int x = rand() % 1000 + 10;
            int y = rand() % 1000 + 10;
            rq->push(new Task<T,D>(add,x,y));
            std::cout << "生产线程" << self->name() << "生产任务:" << x << "+" << y << "=?" << std::endl;
            usleep(rand() %5000 + 3000);
            //sleep(1);
        }
    }
    // 消费者
    static void *consumer(void *args)
    {
        poolData<T,D> *pd = (poolData<T,D>*)args;
        Thread *self = pd->_self;
        ringqueue<Task<T,D>*> *rq = pd->_rq;
        srand(time(nullptr) ^ 0x6666);
        while (true) {
            Task<T,D> *t;
            rq->pop(t);
            int z = t->fun()(t->arg0(),t->arg1());
            std::cout << "消费者" << self->name() << "消费任务:" << t->arg0() << "+" << t->arg1() << "=" << z << std::endl;

            delete t;
            usleep(rand() % 5000 + 3000);
        }
    }

    ~Pool()
    {
        
    }

private:
    ringqueue<Task<T,D>*> _rq;
    std::vector<Thread*> _productor;
    std::vector<Thread*> _consumer;
};
//test.cpp
#include "mypool.hpp"

int main() {
    Pool<int(*)(int,int),int> p(5);
    p.strat();
    return 0;
}

4、结语

        好了,今天的分享就到这里了,如果文章对你有帮助,请留下你的评论,如果有错误,也请评论私信作者,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/971833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android之RecyclerView仿ViewPage滑动

文章目录 前言一、效果图二、实现步骤1.xml主布局2.所有用到的drawable资源文件3.xml item布局4.adapter适配器5.javabean实体类6.activity使用 总结 前言 我们都知道ViewPageFragment滑动&#xff0c;但是的需求里面已经有了这玩意&#xff0c;但是在Fragment中还要有类似功能…

Springboot + Sqlite实战(离线部署成功)

最近有个需求&#xff0c;是手机软件离线使用&#xff0c; 用的springboot mybatis-plus mysql&#xff0c;无法实现&#xff0c;于是考虑使用内嵌式轻量级的数据库SQLlite 引入依赖 <dependency><groupId>org.xerial</groupId><artifactId>sqlite-…

手写Mybatis:第10章-使用策略模式,调用参数处理器

文章目录 一、目标&#xff1a;参数处理器二、设计&#xff1a;参数处理器三、实现&#xff1a;参数处理器3.1 工程结构3.2 参数处理器关系图3.3 入参数校准3.4 参数策略处理器3.4.1 JDBC枚举类型修改3.4.2 类型处理器接口3.4.3 模板模式&#xff1a;类型处理器抽象基类3.4.4 类…

drone的简单使用

&#xff08;一&#xff09;简介 Drone 是一个基于Docker容器技术的可扩展的持续集成引擎&#xff0c;用于自动化测试、构建、发布。每个构建都在一个临时的Docker容器中执行&#xff0c;使开发人员能够完全控制其构建环境并保证隔离。开发者只需在项目中包含 .drone.yml文件&…

Java“牵手”唯品会商品列表数据,关键词搜索唯品会商品数据接口,唯品会API申请指南

唯品会商城是一个网上购物平台&#xff0c;售卖各类商品&#xff0c;包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取唯品会商品列表和商品详情页面数据&#xff0c;您可以通过开放平台的接口或者直接访问唯品会商城的网页来获取商品详情信息。以下是两种常用方法的介…

《智能网联汽车自动驾驶功能测试规程》

一、 编制背景 2018 年4 月12 日&#xff0c;工业和信息化部、公安部、交通运输部联合发布《智能网联汽车道路测试管理规范(试行)》&#xff08;以下简称《管理规范》&#xff09;&#xff0c;对智能网联汽车道路测试申请、审核、管理以及测试主体、测试驾驶人和测试车辆要求等…

webpack5 (三)

webpack 高级配置 其实就是对 webpack 进行优化&#xff0c;让代码在编译/运行时性能更好 1. 提升开发体验 2. 提升打包构建速度 3. 减少代码体积 4. 优化代码运行性能 一、提升开发体验 sourcemap 在编译打包后所有的 css 和 js 都合并为了一个文件&#xff0c;并多了很多…

管理类联考——数学——汇总篇——知识点突破——数据分析——计数原理

角度——⛲️ 一、考点讲解 分类计数原理&#xff08;加法原理&#xff09; (1&#xff09;定义 如果完成一件事有n类办法&#xff0c;只要选择其中一类办法中的任何一种方法&#xff0c;就可以完成这件事。若第一类办法中有 m 1 m_1 m1​种不同的方法&#xff0c;第二类办法中…

SpringCloud(35):Nacos 服务发现快速入门

本小节,我们将演示如何使用Spring Cloud Alibaba Nacos Discovery为Spring cloud 应用程序与 Nacos 的无缝集成。 通过一些原生的spring cloud注解,我们可以快速来实现Spring cloud微服务的服务发现机制,并使用Nacos Server作为服务发现中心,统一管理所有微服务。 1 Spring…

uniapp集成windicss的流程

一、背景介绍 Windicss是一个基于Tailwind CSS 灵感的库,它更快、更兼容,使用 TypeScript 构建。Windicss的目标是为了解决与Tailwind CSS 类似的问题,提供一个可以快速上手开发的组件库,让开发者不再需要繁琐地编写 CSS 样式。Windicss包含了几乎所有的 CSS 样式,因此开发…

MongoDB常用的比较符号和一些功能符号

比较符号 results collection.find({age: {$gt: 20}})功能符号 results collection.find({name: {$regex: ^M.*}})

联合教育部高等学校科学研究发展中心,阿依瓦科技创新教育专项正式发布!

7 月 24 日&#xff0c;教育部科技发展中心官网发布了《中国高校产学研创新基金&#xff0d;阿依瓦科技创新教育专项申请指南》。 针对高校在人工智能、智能制造、智慧校园、大数据等领域科研和教研的创新研究&#xff0c;教育部高等学校科学研究发展中心与阿依瓦(北京)技术有…

RK3399平台开发系列讲解(内核调试篇)IO 数据工具:iostat和iotop

🚀返回专栏总目录 文章目录 一、iostat 命令二、/proc/diskstats 文件三、iotop 命令沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 在 Linux 系统上,iostat 和 iotop 这两个 IO 数据工具非常常用。它们都是性能分析领域中不可缺少的工具性软件。 一、iostat 命令…

linux和docker下mysql安装

目录 一、linux下mysql的安装 1.进入到/etc/yum.repos.d 2.编辑vim mysql-community.repo 3.编辑以下内容 4.保存退出&#xff0c;更新缓存yum makecache 5.下载mysql 6.启动并查看mysql状态 7.查找mysql密码 8.登陆mysql 9.密码修改参考MySQL密码修改 二、docker安…

solidity开发环境配置,vscode搭配remix

#学习笔记 初学solidity&#xff0c;使用remix非常方便&#xff0c;因为需要的环境都配置好了&#xff0c;打开网站就可以使用。 不过在编写代码方面&#xff0c;使用vscode更方便&#xff0c;而vscode本身并不能像remix那样部署合约&#xff0c;它还需要安装插件。 点击红色箭…

springboot整合mybatis实现增删改查(xml)--项目阶段1

目录 一、前言 二、创建项目 创建MySQL数据库和表 创建springboot项目 本文总体代码结构图预览 三、编写代码 &#xff08;一&#xff09;新建实体层属性类 &#xff08;二&#xff09;新建数据层mapper接口 &#xff08;三&#xff09;新建mapper的映射SQL&#xff08…

QProcess 调用 ffmpeg来处理音频

项目场景&#xff1a; 在文章 qt 实现音视频的分贝检测系统中&#xff0c;实现的是边播放变解析音频数据来统计音频的分贝大小&#xff0c;并不满足实际项目的需求&#xff0c;有的视频声音正常&#xff0c;有的视频声音就偏低&#xff0c;即使放到最大音量声音也是比较小&…

Apache实现weblogic集群配置

安装apache&#xff0c;安装相对稳定的版本。如果安装后测试能否正常启动&#xff0c;可以通过访问http://localhost/进行测试。安装Weblogic&#xff0c;参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…

90、00后严选出的数据可视化工具:奥威BI工具

90、00后主打一个巧用工具&#xff0c;绝不低效率上班&#xff0c;因此当擅长大数据智能可视化分析的BI数据可视化工具出现后&#xff0c;自然而然地就成了90、00后职场人常用的数据可视化工具。 奥威BI工具三大特点&#xff0c;让职场人眼前一亮&#xff01; 1、零编程&…

【STM32】学习笔记-时间戳RTC

Unix时间戳 Unix 时间戳&#xff08;Unix Timestamp&#xff09;定义为从UTC/GMT的1970年1月1日0时0分0秒开始所经过的秒数&#xff0c;不考虑闰秒 时间戳存储在一个秒计数器中&#xff0c;秒计数器为32位/64位的整型变量 世界上所有时区的秒计数器相同&#xff0c;不同时区通…