Linux多线程(个人笔记)

news2024/11/6 22:47:49

Linux多线程

  • 1.Linux线程概念
    • 1.1线程的优点
    • 1.2线程的缺点
  • 2.Linux线程VS进程
  • 3.Linux线程控制
    • 3.1创建线程
    • 3.2线程tid及进程地址空间布局
    • 3.3线程终止
    • 3.4线程等待
  • 4.分离线程
  • 5.线程互斥
    • 5.1互斥锁mutex
    • 5.2互斥锁接口
    • 5.3互斥锁实现原理
    • 5.4可重入VS线程安全
  • 6.线程同步
    • 6.1条件变量
    • 6.2pthread_cond_wait需要互斥锁的原因
  • 7.生产者消费者模型
    • 7.1信号量(计数器)
  • 8.线程池


1.Linux线程概念

线程是操作系统能够进行运算调度的最小单位,是程序执行的基本单元。
在这里插入图片描述

1.1线程的优点

线程的优点主要包括:
资源共享:同一进程中的线程共享进程的内存和资源,减少了资源使用的开销。
提高效率:通过并发执行,多个线程可以同时处理任务,充分利用多核处理器的计算能力,提高程序的整体效率。
响应性:在用户界面应用中,使用线程可以保持界面响应,避免因为长时间的计算阻塞界面。
快速切换:线程之间的切换比进程之间的切换更快,减少了上下文切换的时间成本。
降低延迟:在需要频繁执行短小任务的场景中,线程可以减少延迟,提高响应速度。

1.2线程的缺点

线程的缺点包括:
复杂性:多线程程序的设计和调试相对复杂,容易出现死锁、竞态条件等问题。
上下文切换开销:尽管线程切换比进程切换快,但频繁的上下文切换仍然会消耗资源,影响性能。
共享数据的安全性:多个线程共享同一块内存区域,可能导致数据不一致,需要额外的同步机制(如锁)来确保数据安全。
资源竞争:多个线程同时访问共享资源可能导致性能下降,尤其是在锁的竞争情况下。
调试困难:多线程环境中的错误通常难以重现和调试,增加了开发和维护的难度。
内存使用:虽然线程共享内存,但每个线程都有自己的栈空间,过多线程会导致内存消耗增加。
平台依赖性:线程的实现和调度可能因操作系统而异,导致跨平台开发时的兼容性问题。

2.Linux线程VS进程

进程是资源分配的基本单位
线程是调度的基本单位
线程共享进程的数据,但也拥有自己的一部分数据:
如栈,寄存器内容,线程id,errno,信号屏蔽字,调度优先级

3.Linux线程控制

头文件#include<pthread.h>
链接线程库函数时要使用编译器命令"-lpthread"选项

3.1创建线程

在这里插入图片描述

int pthread_create(pthread_t* thread,const pthread_arr_t* arr,void*(*start_routine)(void*),void* arg)
//参数
//thread:返回线程ID
//attr:设置线程的属性,一般设置为nullptr
//start_routine:函数地址,线程启动后要执行的函数
//arg:传给线程启动函数的参数
//返回值:成功返回0,失败返回错误码

示例代码:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>

int gcnt = 100;

// 新线程
void *ThreadRoutine(void *arg)
{
  const char *threadname = (const char *)arg;
  while (true)
  {
    std::cout << "I am a new thread" << threadname << ",pid: " << getpid() << "gcnt: " << gcnt << "&gcnt" << std::endl;
    gcnt--;
    sleep(1);
  }
}

int main()
{
  // 已有进程了
  pthread_t tid;
  pthread_create(&tid, nullptr, ThreadRoutine, (void *)"thread 1");

  // sleep(3);
  pthread_t tid1;
  pthread_create(&tid1, nullptr, ThreadRoutine, (void *)"thread 2");

  // sleep(3);
  pthread_t tid2;
  pthread_create(&tid2, nullptr, ThreadRoutine, (void *)"thread 3");

  // sleep(3);
  pthread_t tid3;
  pthread_create(&tid3, nullptr, ThreadRoutine, (void *)"thread 4");

  // sleep(3);

  // 主线程
  while (true)
  {
    std::cout << "I am main thread" << ",pid: " << getpid() << "gcnt: " << "&gcnt: " << &gcnt << std::endl;
    sleep(1);
  }
  return 0;
}

3.2线程tid及进程地址空间布局

在这里插入图片描述

pthread_create函数会产生一个线程id,存放在第一个参数指向的地址中
在这里插入图片描述

//pthread_t pthread_self(void);
//线程本身调用该函数会返回自身的线程地址,也就是LWP

3.3线程终止

终止某个线程而不是终止整个进程,三种方法:
1.在线程函数return,对主线程不适应,从main函数return相当于调用了exit
2.线程可以调用pthread_exit终止自己。
3.线程可以调用pthread_cancel终止同一个进程中的另一个线程

//void pthread_exit(void* retval);
//retval:这是一个指向任意类型的指针,表示线程的返回值。当线程结束时,这个值可以被其他线程通过 pthread_join 获取。
//pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了
//int pthread_exit(pthread_t thread);
//thread:要取消线程ID
//返回值:成功返回0,失败返回错误码

3.4线程等待

已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
创建新的线程不会复用刚才退出线程的地址空间

//int pthread_join(pthread_t thread,void** value_ptr);
//thread:线程ID
//value_ptr:用于接收线程的返回值,线程的返回值是void* ,所以这里是void**
//返回值:成功返回0,失败返回错误码
//1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。 
//2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。 
//3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
//4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。

4.分离线程

当线程退出时,自动释放线程资源。
一个线程不能既是joinable又是分离的。
线程如果被分离,该线程可以被取消,但是不能被join

//int pthread_detach(pthread_t thread);
//pthread_detach(pthread_self());

5.线程互斥

临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

5.1互斥锁mutex

互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥锁。
在这里插入图片描述

5.2互斥锁接口

初始化互斥锁
初始化互斥锁两种方法:
1.静态分配:

pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

2.动态分配

int pthread_mutex_init(pthread_mutex_t* mutex,const pthread_mutexattr_t* attr);
//参数:
//mutex:要初始化的互斥锁
//attr:nullptr

销毁互斥锁
使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
不要销毁一个已经加锁的互斥量
已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t* mutex);

互斥锁加锁和解锁

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
//返回值:成功返回0,失败返回错误码

5.3互斥锁实现原理

在这里插入图片描述
在这里插入图片描述

cnt++语句其实是由三条汇编指令完成的,所以不是原子的
数据在内存中,本质是被线程共享的。
数据被读取到寄存器中,本质变成了线程的上下文,属于线程私有数据!

线程加锁的本质:
大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换
,这里的交换是由一条汇编语句组成的,所以是原子的
在这里插入图片描述
每一个线程都有一份,属于自己的上下文
xchgb作用:讲一个共享的mutex资源,交换到自己上下文中,属于线程自己

5.4可重入VS线程安全

线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

可重入还是不可重入,描述的是函数的特点!
线程安全与否,描述的是线程的特征
在这里插入图片描述
产生死锁的四个必要条件:
互斥条件:一个资源每次只能被一个执行流使用
请求与保持条件:一个执行流因请求资源而阻塞,对以获得的资源保持不放
不剥夺条件:一个执行流已获得的资源,在未完成之前,不能强行剥夺
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

解决或避免死锁只需要破坏其中一个条件或者多个条件

6.线程同步

在这里插入图片描述

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。

6.1条件变量

在这里插入图片描述

条件变量是线程同步中一种常用的机制,用于在线程之间进行协作,尤其是当一个线程需要等待某个条件成立时。它提供了一种方式来让线程在某个条件未满足时挂起,并且在条件满足时能够被通知从挂起状态恢复执行。

条件变量的初始化

int pthread_cond_init(pthread_cond_t* cond,const pthread_condattr_t* attr);
//cond:要初始化的条件变量
//attr:nullptr

条件变量的销毁

int pthread_cond_destroy(pthread_cond_t* cond);

条件变量的等待

int pthread_cond_wait(pthread_cond_t* cond,pthread_mutex_t* mutex);
//cond:要在这个条件变量上等待
//mutex:互斥锁,条件变量要配合互斥锁才能使用

唤醒等待

int pthread_cond_broadcast(pthread_cond_t* cond);//唤醒所有线程
int pthread_cond_signal(pthread_cond_t* cond);//唤醒队列中的第一个线程

条件变量的测试示例代码

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

int tickets = 1000;

void *threadRoutine(void *arg)
{
  std::string name = static_cast<const char *>(arg);
  while (true)
  {
    // sleep(1);
    // 单纯的互斥,能保证数据安全,不一定合理高效
    pthread_mutex_lock(&mutex);
    if (tickets > 0)
    {
      std::cout << name << ",get a ticket: " << tickets-- << std::endl; // 模拟抢票
      usleep(1000);
    }
    else
    {
      std::cout << "没有票了," << name << std::endl; // 就是每一个线程在大量的申请锁和释放锁
      // 1.让线程子啊进行等待的时候,会自动释放锁
      // 2.线程被唤醒的时候,是在临界区内唤醒的,当线程被唤醒,线程在pthread_cond_wait返回的时候,要重新申请并持有锁
      // 3.当线程被唤醒的时候,重新申请并持有锁本质也是要参与锁的竞争的
      pthread_cond_wait(&cond, &mutex);
    }
    pthread_mutex_unlock(&mutex);
  }
}

int main()
{
  pthread_t t1, t2, t3;
  pthread_create(&t2, nullptr, threadRoutine, (void *)"thread-2");
  pthread_create(&t1, nullptr, threadRoutine, (void *)"thread-1");
  pthread_create(&t3, nullptr, threadRoutine, (void *)"thread-3");

  sleep(5); // for test,5s开始进行让cond成立,唤醒一个线程

  while (true)
  {
    // pthread_cond_signal(&cond);
    // pthread_cond_broadcast(&cond);

    // 临时
    sleep(6);
    pthread_mutex_lock(&mutex);
    tickets += 100;
    pthread_mutex_unlock(&mutex);
    // pthread_cond_broadcast(&cond);
    pthread_cond_signal(&cond);
  }

  pthread_join(t1, nullptr);
  pthread_join(t2, nullptr);
  pthread_join(t3, nullptr);
  return 0;
}

6.2pthread_cond_wait需要互斥锁的原因

条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据

7.生产者消费者模型

在这里插入图片描述

//LockGuard.hpp
#pragma once

#include <pthread.h>

// 不定义锁,默认认为外部会给我们传入对象
class Mutex
{
public:
  Mutex(pthread_mutex_t *lock)
      : _lock(lock)
  {
  }
  void Lock()
  {
    pthread_mutex_lock(_lock);
  }

  void Unlock()
  {
    pthread_mutex_unlock(_lock);
  }
  ~Mutex()
  {
  }

private:
  pthread_mutex_t *_lock;
};

class LockGuard
{
public:
  LockGuard(pthread_mutex_t *lock)
      : _mutex(lock)
  {
    _mutex.Lock();
  }

  ~LockGuard()
  {
    _mutex.Unlock();
  }

private:
  Mutex _mutex;
};
//Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

const int defaultvalue = 0;

enum
{
  ok = 0,
  div_zero,
  mod_zero,
  unknow
};

const std::string opers = "+-*/%)(&)";

class Task
{
public:
  Task()
  {
  }

  Task(int x, int y, char op)
      : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
  {
  }

  void Run()
  {
    switch (oper)
    {
    case '+':
      result = data_x + data_y;
      break;
    case '-':
      result = data_x - data_y;
      break;
    case '*':
      result = data_x * data_y;
      break;
    case '/':
      if (data_y == 0)
      {
        code = div_zero;
      }
      else
      {
        result = data_x / data_y;
      }
      break;
    case '%':
      if (data_y == 0)
      {
        code = mod_zero;
      }
      else
      {
        result = data_x % data_y;
      }
      break;
    default:
      code = unknow;
      break;
    }
  }

  void operator()()
  {
    Run();
    sleep(2);
  }

  std::string PrintTask()
  {
    std::string s;
    s = std::to_string(data_x);
    s += oper;
    s += std::to_string(data_y);
    s += "=?";
    return s;
  }

  std::string PrintResult()
  {
    std::string s;
    s = std::to_string(data_x);
    s += oper;
    s += std::to_string(data_y);
    s += "=";
    s += std::to_string(result);
    s += "[";
    s += std::to_string(code);
    s += "]";
    return s;
  }

  ~Task()
  {
  }

private:
  int data_x;
  int data_y;
  char oper; // + - * / %
  int result;
  int code; // 结果码,0:结果可信 !0:结果不可信,1,2,3,4
};
//Log.hpp
#pragma once

#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>

enum
{
  Debug = 0,
  Info,
  Warning,
  Error,
  Fatal
};

std::string LevelToString(int level)
{
  switch (level)
  {
  case Debug:
    return "Debug";
  case Info:
    return "Info";
  case Warning:
    return "Warning";
  case Error:
    return "Error";
  case Fatal:
    return "Fatal";
  default:
    return "Unknown";
  }
}

class Log
{
public:
  Log() {}
  void LogMessage(int level, const char *format, ...) // 类C的一个日志接口
  {
    char content[1024];
    va_list args; // char*,void*
    va_start(args, format);
    // args 指向了可变参数部分
    vsnprintf(content, sizeof(content), format, args);
    va_end(args); // args=nullptr;
    uint64_t currtime = time(nullptr);
    printf("[%s][%s]%s\n", LevelToString(level).c_str(), std::to_string(currtime).c_str(), content);
  }

  ~Log() {}

private:
};

生产者消费者模型(临界资源整体版)参考代码:
在这里插入图片描述

//BlockQueue.hpp
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"

const int defaultcap = 5; // for test

template <class T>
class BlockQueue
{
public:
  BlockQueue(int cap = defaultcap)
      : _capacity(cap)
  {
    pthread_mutex_init(&mutex, nullptr);
    pthread_cond_init(&_p_cond, nullptr);
    pthread_cond_init(&_c_cond, nullptr);
  }

  bool IsFull()
  {
    return _q.size() == _capacity;
  }

  bool IsEmpty()
  {
    return _q.size() == 0;
  }

  void Push(const T &in) // 生产者
  {
    LockGuard lockguard(&_mutex);
    // pthread_mutex_lock(&_mutex);//2.lockguard 3.重新理解生产消费者模型(代码+理论)4.代码整体改成多生产多消费
    while (IsFull())
    {
      // 阻塞等待
      pthread_cond_wait(&_p_cond, &mutex);
    }

    _q.push(in);
    // if(_q.size()>_productor_water_line) pthread_cond_signal(&_c_cond);
    pthread_cond_signal(&_c_cond);
    // pthread_mutex_unlock(&_mutex);
  }

  void Pop(T *out)
  {
    LockGuard lockguard(&_mutex);
    // pthread_mutex_lock(&_mutex);
    while (IsEmpty())
    {
      pthread_cond_wait(&_c_cond, &_mutex);
    }
    *out = _q.front();
    _q.pop();
    // if(_q.size()<_consumer_water_line) pthread_cond_signal(&_p_cond);
    pthread_cond_signal(&_p_cond);
    // pthread_mutex_unlock(&_mutex);
  }

  ~BlockQueue()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_p_cond);
    pthread_cond_destroy(&_c_cond);
  }

private:
  std::queue<T> _q;
  int _capacity; //_q.size()==_capacity,满了,不能生产,_q.size()==0,空,不能消费了
  pthread_mutex_t _mutex;
  pthread_cond_t _p_cond; // 给生产者的
  pthread_cond_t _c_cond; // 给消费者的

  // int _consumer_water_line;//_comsumer_water_line=_capacity/3*2
  // int _productor_water_line;//_productor_water_line=_capacity/3
};

//Main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

class ThreadData
{
public:
  BlockQueue<Task> *bq;
  std::string name;
};

void *consumer(void *args)
{
  ThreadData *td = static_cast<ThreadData *>(args);
  // BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
  while (true)
  {
    // sleep(1);
    Task t;
    // 1.消费数据bq->pop(&data);
    td->bq->Pop(&t);

    // 2.进行处理
    // t.Run();
    t(); // 处理任务的时候会消耗时间
    std::cout << "consumer data: " << t.PrintResult() << ", " << td->name << std::endl;
    // 消费者没有sleep
  }
  return nullptr;
}

void *productor(void *args)
{
  BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  // BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
  // BlockQueue* bq=static_cast<BlockQueue *>(args);//生产者和消费者同时看到了一份公共资源
  while (true)
  {
    // 1.有数据,从具体的场景中来,从网络中拿数据
    // 生产前,任务从哪里来
    int data1 = rand() % 10; //[0,9]
    usleep(rand() % 123);
    int data2 = rand() % 10;
    usleep(rand() % 123);
    char oper = opers[rand() % (opers.size())];
    Task t(data1, data2, oper);
    std::cout << "productor task: " << t.PrintTask() << std::endl;

    // 2.进行生产
    // bq->Push(data);
    bq->Push(t);
    sleep(1);
    // for debug
  }
  return nullptr;
}

int main()
{
  srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据
  // 生产者给消费者分派任务
  BlockQueue<Task> *bq = new BlockQueue<Task>();
  pthread_t c[3], p[2]; // 消费者和生产者

  ThreadData *td = new ThreadData();
  td->bq = bq;
  td->name = "thread-1";
  pthread_create(&c[0], nullptr, consumer, td);

  ThreadData *td1 = new ThreadData();
  td1->bq = bq;
  td1->name = "thread-2";
  pthread_create(&c[1], nullptr, consumer, td1);

  ThreadData *td2 = new ThreadData();
  td2->bq = bq;
  td2->name = "thread-3";
  pthread_create(&c[2], nullptr, consumer, td2);

  pthread_create(&p[0], nullptr, productor, bq);
  pthread_create(&p[1], nullptr, productor, bq);

  pthread_join(c[0], nullptr);
  pthread_join(c[1], nullptr);
  pthread_join(c[2], nullptr);
  pthread_join(p[0], nullptr);
  pthread_join(p[1], nullptr);

  return 0;
}

7.1信号量(计数器)

在这里插入图片描述

初始化信号量

//include<semaphone.h>
int sem_init(sem_t* sem,int pshared,unsigned int value);
//pshared:0表示线程间共享,非0表示进程间共享
//value:信号量初始值多少

销毁信号量

int sem_destroy(sem_t* sem);

等待信号量

int sem_wait(sem_t* sem);//P()
//信号量的值减一

发布信号量

int sem_post(sem_t* sem);//V()
//信号量的值加一

生产者消费者模型(临界资源局部版)参考代码:
在这里插入图片描述

//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"

const int defaultsize = 5;

template <class T>
class RingQueue
{
private:
  void P(sem_t &sem)
  {
    sem_wait(&sem);
  }

  void V(sem_t &sem)
  {
    sem_post(&sem);
  }

public:
  RingQueue(int size = defaultsize)
      : _ringqueue(size), _size(size), _p_step(0), _c_step(0)
  {
    sem_init(&_space_sem, 0, size);
    sem_init(&_data_sem, 0, 0);

    pthread_mutex_init(&_p_mutex, nullptr);
    pthread_mutex_init(&_c_mutex, nullptr);
  }

  void Push(const T &in)
  {
    // 生产
    // 先加锁1,还是先申请信号量?2
    P(_space_sem);
    {
      LockGuard lockguard(&_p_mutex);
      _ringqueue[_p_step] = in;
      _p_step++;
      _p_step %= _size;
    }
    V(_data_sem);
  }

  void Pop(T *out)
  {
    // 消费
    P(_data_sem);
    {
      LockGuard lockguard(&_c_mutex);
      *out = _ringqueue[_c_step];
      _c_step++;
      _c_step %= _size;
    }
    V(_space_sem);
  }

  ~RingQueue()
  {
    sem_destroy(&_space_sem);
    sem_destroy(&_data_sem);

    pthread_mutex_destroy(&_p_mutex);
    pthread_mutex_destroy(&_c_mutex);
  }

private:
  std::vector<T> _ringqueue;
  int _size;

  int _p_step; // 生产者的生产位置
  int _c_step; // 消费位置

  sem_t _space_sem; // 生产者
  sem_t _data_sem;  // 消费者

  pthread_mutex_t _p_mutex;
  pthread_mutex_t _c_mutex;
};
//Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>

void *Productor(void *args)
{
  // sleep(5);
  RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
  while (true)
  {
    // 数据怎么来的?
    // 1. 有数据,从具体场景中来,从网络中拿数据
    // 生产前,你的任务从哪里来的呢???
    int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
    usleep(rand() % 123);
    int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
    usleep(rand() % 123);
    char oper = opers[rand() % (opers.size())];
    Task t(data1, data2, oper);
    std::cout << "productor task: " << t.PrintTask() << std::endl;

    // rq->push();
    rq->Push(t);

    // sleep(1);
  }
}

void *Consumer(void *args)
{
  RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
  while (true)
  {
    // sleep(1);
    Task t;
    rq->Pop(&t);

    t();
    std::cout << "consumer done,data is : " << t.PrintResult() << std::endl;
  }
}

int main()
{
  Log log;
  log.LogMessage(Debug, "hello %d,%s,%f", 10, "LJH", 9.24);
  log.LogMessage(Warning, "hello %d,%s,%f", 10, "LJH", 9.24);
  log.LogMessage(Error, "hello %d,%s,%f", 10, "LJH", 9.24);
  log.LogMessage(Info, "hello %d,%s,%f", 10, "LJH", 9.24);

  // //单生产,单消费:321
  // //如果是多生产多消费

  // srand((uint64_t)time(nullptr)^pthread_self());
  // pthread_t c[3],c[2];

  // //唤醒队列中只能放置整形???
  // //RingQueue<int>* rq=new RingQueue<int>();
  // RingQueue<Task>* rq=new RingQueue<Task>();

  // pthread_create(&p[0], nullptr, Productor, rq);
  // pthread_create(&p[1], nullptr, Productor, rq);
  // pthread_create(&c[0], nullptr, Consumer, rq);
  // pthread_create(&c[1], nullptr, Consumer, rq);
  // pthread_create(&c[2], nullptr, Consumer, rq);

  // pthread_join(p[0], nullptr);
  // pthread_join(p[1], nullptr);
  // pthread_join(c[0], nullptr);
  // pthread_join(c[1], nullptr);
  // pthread_join(c[2], nullptr);

  return 0;
}

8.线程池

在这里插入图片描述

//Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>

// 设计者的视角
// typedef std::function<void()> func_t
template <class T>
using func_t = std::function<void(T &)>;

template <class T>
class Thread
{
public:
  Thread(const std::string &threadname, func_t<T> func, T &data)
      : _tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
  {
  }

  static void *ThreadRoutine(void *args)
  {
    //(void)args;//仅仅是为了防止编译器有告警
    Thread *ts = static_cast<Thread *>(args);
    ts->_func(ts->_data);
    return nullptr;
  }

  bool Start()
  {
    int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
    if (n == 0)
    {
      _isrunning = true;
      return true;
    }
    else
    {
      return false;
    }
  }

  bool Join()
  {
    if (_isrunning)
      return true;
    int n = pthread_join(_tid, nullptr);
    if (n == 0)
    {
      _isrunning = false;
      return true;
    }
    return false;
  }

  std::string ThreadName()
  {
    return _threadname;
  }

  bool IsRunning()
  {
    return _isrunning;
  }

  ~Thread()
  {
  }

private:
  pthread_t _tid;
  std::string _threadname;
  bool _isrunning;
  func_t<T> _func;
  T _data;
};
//ThreadBool.hpp
#pragma once

#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

static const int defaultnum = 5;

class ThreadData
{
public:
  ThreadData(const std::string &name)
      : threadname(name)
  {
  }
  ~ThreadData()
  {
  }

public:
  std::string threadname;
};

template <class T>
class ThreadPool
{
private:
  ThreadPool(int thread_num = defaultnum)
      : _thread_num(thread_num)
  {
    pthread_mutex_init(&_mutex, nullptr);
    pthread_cond_init(&_cond, nullptr);
    // 构建指定个数的线程
    for (int i = 0; i < _thread_num; i++)
    {
      // 待优化
      std::string threadname = "thread-";
      threadname += std::to_string(i + 1);

      ThreadData td(threadname);
      // Thread<ThreadData> t(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
      //_threads.push_back(t);
      _threads.emplace_back(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
      lg.LogMessage(Info, "%s is created...\n", threadname.c_str());
    }
  }

  ThreadPool(const ThreadPool<T> &tp) = delete;
  const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;

public:
  // 线程安全问题
  static ThreadPool<T> *GetInstance()
  {
    if (instance == nullptr)
    {
      LockGuard lockguard(&sig_lock);
      if (instance == nullptr)
      {
        lg.LogMessage(Info, "创建单例成功...\n");
        instance = new ThreadPool<T>();
      }
    }
    return instance;
  }

  bool Start()
  {
    // 启动
    for (auto &thread : _threads)
    {
      thread.Start();
      lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
    }
    return true;
  }

  void ThreadWait(const ThreadData &td)
  {
    lg.LogMessage(Debug, "no task,%s is sleeping...\n", td.threadname.c_str());
    pthread_cond_wait(&_cond, &_mutex);
  }

  void ThreadWakeup()
  {
    pthread_cond_signal(&_cond);
  }

  void checkSelf()
  {
    // 1._task_num>_task_num_high_water && _thread_num<_thread_num_high_water
    // 创建更多的线程,并且更新_thread_num

    // 2._task_num==_task_num_low_water && _thread_num>=_thread_num_high_water
    // 把自己退出了,并且更新_thread_num
  }

  void ThreadRun(ThreadData &td)
  {
    while (true)
    {
      checkSelf();
      // 取任务
      T t;
      {
        LockGuard lockguard(&_mutex);
        while (_q.empty())
        {
          ThreadWait(td);
          lg.LogMessage(Debug, "thread %s is wakeup\n", td.threadname.c_str());
        }
        t = _q.front();
        _q.pop();
      }
      // 处理任务
      t();
      lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
                    td.threadname, t.PrintTask().c_str(), t.PrintResult().c_str());
    }
  }

  void Push(T &in)
  {
    lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
    LockGuard lockguard(&_mutex);
    _q.push(in);
    ThreadWakeup();
  }

  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
  }

  // for debug
  void Wait()
  {
    for (auto &thread : _threads)
    {
      thread.Join();
    }
  }

private:
  std::queue<T> _q;
  std::vector<Thread<ThreadData>> _threads;
  int _thread_num;
  pthread_mutex_t _mutex;
  pthread_cond_t _cond;

  static ThreadPool<T> *instance;
  static pthread_mutex_t sig_lock;

  // 扩展1:
  // int _thread_num;
  // int _task_num;

  // int _thread_num_low_water;  // 3
  // int _thread_num_high_water; // 10
  // int _task_num_low_water;    // 0
  // int _task_num_high_water;   // 30
};

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

//main.cc
#include <iostream>
#include <memory>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"

// std::autmic<int> cnt;
int main()
{
  // std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
  // tp->Start();

  sleep(10);
  ThreadPool<Task>::GetInstance()->Start();
  srand((uint64_t)time(nullptr) ^ getpid());

  while (true)
  {
    // 可以搞一个任务
    int x = rand() % 100 + 1;
    usleep(1234);
    int y = rand() % 200;
    usleep(1234);
    char oper = opers[rand() % opers.size()];

    Task t(x, y, oper);
    // std::cout<<"make task: "<<t.PrintTask()<<std::endl;
    ThreadPool<Task>::GetInstance()->Push(t);

    sleep(1);
  }
  ThreadPool<Task>::GetInstance()->Wait();
  // TODO
  return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2234575.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【案例】故障雪花屏

开发平台&#xff1a;Unity 6.0 开发工具&#xff1a;Shader Graph 参考视频&#xff1a;【U2D Shader Graph】❄️雪❄️花❄️屏❄️   一、效果图 二、Shader Graph 路线图 三、案例分析 核心思路&#xff1a;雪花屏幕效果 &#xff08;混合&#xff09; 原图像 最终图像…

Hunyuan-Large:推动AI技术进步的下一代语言模型

腾讯近期推出了基于Transformer架构的混合专家&#xff08;MoE&#xff09;模型——Hunyuan-Large&#xff08;Hunyuan-MoE-A52B&#xff09;。该模型目前是业界开源的最大MoE模型之一&#xff0c;拥有3890亿总参数和520亿激活参数&#xff0c;展示了极强的计算能力和资源优化优…

arkUI:Column和Rom的间距设置(列向,横向)

arkUI&#xff1a;Column和Rom的间距设置&#xff08;列向&#xff0c;横向&#xff09; 1 主要内容说明2 相关内容举例和说明2.1 Column的间距&#xff08;列的间距&#xff09;2.1.1 源码1 &#xff08;Column的间距&#xff09;2.1.2 源码1运行效果 2.2 Row的间距&#xff0…

QML项目实战:自定义Combox

目录 一.添加模块 import QtQuick.Controls 2.4 import QtQuick.Templates 2.4 as T import QtGraphicalEffects 1.15 import QtQuick 2.15 as T2 二.自定义Combox 1.combox文字显示 2.设置下拉图标显示 3.下拉框中选中背景设置 4.下拉框中选中文字设置 5.下拉框设置…

【设计模式系列】原型模式(十一)

一、什么是原型模式 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它使得一个对象可以复制自身&#xff0c;从而创建一个与自己属性一致的新对象&#xff0c;而无需知晓对象创建的细节。这种模式允许动态地增加对象的数量&#xff0c;并…

h5web浏览器获取腾讯地图经纬度

https://lbs.qq.com/dev/console/application/mine 去腾讯地图申请key 然后前端页面引用 <script type"text/javascript" src"https://apis.map.qq.com/tools/geolocation/min?key自己的key&referertest"></script>调用代码 let geoloca…

微积分复习笔记 Calculus Volume 1 - 4.6 | Limits at Infinity and Asymptotes

4.6 Limits at Infinity and Asymptotes - Calculus Volume 1 | OpenStax

开源的flash浏览器 CelfFlashBrowser

特点 不用安装flash就可以玩flash游戏。 可播放在线和本地的swf文件 下载地址 &#xff1a;https://github.com/Mzying2001/CefFlashBrowser

游戏引擎中的颜色科学

游戏引擎中的渲染组件的作用是生成一个二维图片&#xff0c;在特定的时间从给定的视点观察的方向看到的一个三维空间的状态。他们的生成每一张图片都会被称为帧&#xff0c;他们生成的速度称为帧率。 像素 在每一帧中&#xff0c;游戏引擎的视觉输出基本上是一大堆彩色像素&a…

css中pointer-events:none属性对div里面元素的鼠标事件的影响

文章目录 前倾提要当没有设置属性pointer-events时候结果 当子元素设置了pointer-events: none修改后的代码结果如下所示 当父元素设置了pointer-events: none若两个div同级也就是兄弟级 前倾提要 在gis三维开发的地图组件上放一个背景图片&#xff0c;左右两侧的颜色渐变等&a…

中科蓝汛GPIO操作说明

概述 本篇文章介绍如何使用中科蓝汛AB5681&#xff0c;GPIO管脚使用说明。 一、第一种写法 1&#xff09;、GPIO配置输入模式 //内部上拉 GPIOBDE | BIT(4); //数字IO使能: 0为模拟IO, 1 为数字IO GPIOBDIR | BIT(4); //控制IO的方向: 0为输出, 1为输入. GPIOBFEN &…

Kotlin 协程使用及其详解

Kotlin协程&#xff0c;好用&#xff0c;但是上限挺高的&#xff0c;我一直感觉自己就处于会用&#xff0c;知其然不知其所以然的地步。 做点小总结&#xff0c;比较浅显。后面自己再继续补充吧。 一、什么是协程&#xff1f; Kotlin 协程是一种轻量级的并发编程方式&#x…

LabVIEW 离心泵机组故障诊断系统

开发了一套基于LabVIEW图形化编程语言设计的离心泵机组故障诊断系统。系统利用先进的数据采集技术和故障诊断方法&#xff0c;通过远程在线监测与分析&#xff0c;有效提升了离心泵的预测性维护能力&#xff0c;保证了石油化工生产的连续性和安全性。 项目背景及意义 离心泵作…

线程函数和线程启动的几种不同形式

线程函数和线程启动的几种不同形式 在C中&#xff0c;线程函数和线程启动可以通过多种形式实现。以下是几种常见的形式&#xff0c;并附有相应的示例代码。 1. 使用函数指针启动线程 最基本的方式是使用函数指针来启动线程。 示例代码&#xff1a; #include <iostream&g…

辐射传输方程的分解

Decomposition of the Boundary Value Problem for Radiative Transfer Equation of MODIS and MISR instruments 0.Notions Let L L L be the straming-collision operator, and S S S is scattering operator: L I Ω ⋅ ∇ I ( r , Ω ) σ ( r , Ω ) I ( r , Ω ) S…

智会智展,活动必备

智会智展 APP 各大应用市场均可下载统一链接https://m.malink.cn/s/r6nQVf

Hive操作库、操作表及数据仓库的简单介绍

数据仓库和数据库 数据库和数仓区别 数据库与数据仓库的区别实际讲的是OLTP与OLAP的区别 操作型处理(数据库)&#xff0c;叫联机事务处理OLTP&#xff08;On-Line Transaction Processing&#xff09;&#xff0c;也可以称面向用户交易的处理系统&#xff0c;它是针对具体业务…

如何选择适合小团队的项目管理工具?免费与开源软件推荐

目录 一、小团队项目管理工具的重要性 二、热门项目管理工具介绍 &#xff08;一&#xff09;禅道 &#xff08;二&#xff09;Trello &#xff08;三&#xff09;Asana &#xff08;四&#xff09;JIRA 三、免费项目管理软件推荐 &#xff08;一&#xff09;ES 管理器 …

kafka如何获取 topic 主题的列表?

大家好&#xff0c;我是锋哥。今天分享关于【kafka如何获取 topic 主题的列表&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; kafka如何获取 topic 主题的列表&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Kafka中&#xff0c;可以…

Maven详解—(详解Maven,包括Maven依赖管理以及声明周期,Maven仓库、idea集成Maven)

文章目录 Maven详解一.初始Maven1.1 概述1.2 作用 二.Maven模型2.1 概述2.2 构建生命周期/阶段2.3 项目对象模型2.4 依赖管理模型 三.Maven仓库四.Maven安装4.1 下载4.2 安装步骤 五.Idea集成Maven Maven详解 一.初始Maven 1.1 概述 Maven是Apache旗下的一个开源项目&#x…