出处:B站码出名企路
个人笔记:因为是跟着b站的教学视频以及文档初步学习,可能存在诸多的理解有误,对大家仅供借鉴,参考,然后是B站up阳哥的视频,我是跟着他学。大家有兴趣的可以到b站搜索。加油,一起学习。我的问题,大家如果看见,希望可以提出指正,谢谢大家。
应用场景
多线程的应用场景非常多,常见的有:
-
网络通信:在网络通信应用中,一般需要同时处理多个请求,如果使用单线程模式,会阻塞其他请求,造成性 能瓶颈,因此使用多线程可以提高并发处理能力。
-
数据库操作:在数据库操作中,有时需要同时对多个数据表进行操作,使用多线程可以提高处理效率。
-
图像处理:在图像处理应用中,需要对多个图像进行处理,在单线程模式下,处理速度会很慢,使用多线程可 以提高处理速度。
-
游戏开发:在游戏开发中,常常需要同时处理多个任务,比如处理游戏画面、物理效果、声音效果等,使用多 线程可以提高游戏的运行速度和流畅度。
-
并行计算:在科学计算领域中,常常需要对大量数据进行处理和计算,使用多线程可以将计算任务划分到多个 线程中进行,从而提高计算速度。
总之,多线程在提高程序性能、响应性和资源利用率方面有着广泛的应用。然而,需要注意在多线程编程中处理线程同步、共享数据等问题,以确保程序的正确性和稳定性。
图解结构
模块拆解
第一步:StateSubmitor耗时内容处理类
此处并没有很多具体实现,因为要结合业务。比如耗时处理逻辑
class StateSubmitor
{
public:
explicit StateSubmitor(const std::string& str);
~StateSubmitor();
//submit: 提交到队列中
//const std::string& content 内容,包括海量数据
void submit(const std::string& content);//content可任意
//flush: 将队列中的所有状态信息发往远程收集端
//具体的业务逻辑
void flush();
private:
StateSubmitor(const StateSubmitor&) = delete;
StateSubmitor& operator=(const StateSubmitor&) = delete;
};
void StateSubmitor::submit(const std::string& content){
/*
@ 对 content的耗时处理逻辑
*/
}
第二步:NodeMonitor线程启动类
//节点监控, 监控任务的发生, 业务的产生. 多线程同步等控制逻辑的封装
class NodeMonitor
{
public:
~NodeMonitor();
static NodeMonitor* instance();
void start();
void shutdown();
bool init();
private:
NodeMonitor();
NodeMonitor(const NodeMonitor&) = delete;
NodeMonitor& operator=(const NodeMonitor&) = delete;
void stateInfo(const std::string& strs);
void ThreadFunc(); //消费者线程入口函数
bool shutdown_; //开关
std::mutex mutex_;
std::thread thread_; //消费者线程
std::condition_variable cond_;
//queue
std::queue<std::string> task_queue_; //任务队列
std::unique_ptr<StateSubmitor> submitor_; //unique_ptr管理submitor对象
};
}
具体实现,这里才是多线程同步互斥的重点部分,核心,利用任务队列做缓冲容器,解耦合。使得生产者线程和消费者线程之间的耦合度降低,生产者只管将任务放入任务队列,然后即可返回,无需等待消费者处理。消费者只管从任务队列中拿取任务处理。大大提高效率。通过缓存大大减低了生产者和消费者之间的耦合程度。
生活场景:快递驿站,快递小哥就是生产者,我们就是消费者。快递驿站就是容器队列。
//析构一般独立一个函数
NodeMonitor::~NodeMonitor(){
this->shutdown();//做资源释放等等操作
}
//创建线程安全的单例
//call_once 确保多线程下仅仅创建一个NodeMonitor对象
NodeMonitor* NodeMonitor::instance(){
static NodeMonitor* instance = nullptr;
static std::once_flag flag;
std::call_once(flag, [&]{
instance = new (std::nothrow) NodeMonitor();
});
return instance;
}
//线程启动
void NodeMonitor::start(){
//创建消费者
thread_ = std::thread(&NodeMonitor::ThreadFunc, this);
//启动生产者
if (!init()){
return;
}
}
//生产者函数
bool NodeMonitor::init(){
submitor_.reset(new StateSubmitor("lyy")); //创建submitor
/*
@ 不断地填充stateInfo
@ 如果是实际应用场景可能会采取轮询, 或者是event事件触发,
此处阳哥按照最简单的塞入文本信息作为事件(任务)
*/
while (true)
{
stateInfo("lxk");
}
return true;
}
//填入需要的信息 <=> push任务
void NodeMonitor::stateInfo(const std::string& strs){
std::unique_lock<std::mutex> lock(mutex_);
task_queue_.push(strs); //生产, 塞入任务
cond_.notify_one(); //通知消费
}
//线程销毁
void NodeMonitor::shutdown(){
std::unique_lock<std::mutex> lock(mutex_);
shutdown_ = true;
cond_.notify_all();
if (thread_.joinable()){
thread_.join();
}
}
//消费者函数
void NodeMonitor::ThreadFunc(){
while (!shutdown_)
{
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]{
return shutdown_ || !task_queue_.empty();
});
if (shutdown_){
break;
}
std::string str = task_queue_.front();
task_queue_.pop();
lock.unlock();
submitor_->submit(str);//提交状态信息
}
}
具体案例
消息队列作业实现
#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <memory>
#include <condition_variable>
#include <string>
#include <chrono>
namespace XX
{
class MessageQueue {//封装消息队列类
public:
void push(const std::string& message);
std::string pop();
bool empty();
private:
std::mutex mutex_; //互斥锁, 保障互斥操作
std::condition_variable cond_; //通知, 保障同步
std::queue<std::string> msg_queue_; //容器
};
class StateSubmitor {//消息处理类, 业务处理, 管理消息队列
public:
explicit StateSubmitor(MessageQueue& msg_queue);
~StateSubmitor();
void submit(const std::string& content); //提交状态信息并将其添加到队列中
void flush(); //flush: 将队列中的所有状态信息发往远程收集端, 清空处理所有消息.
private:
StateSubmitor(const StateSubmitor &) = delete;
StateSubmitor &operator=(const StateSubmitor &) = delete;
private:
MessageQueue& msg_queue_; //消息队列
};
// 节点监控, 监控任务的发生, 业务的产生. 多线程同步等控制逻辑的封装
class NodeMonitor {
public:
~NodeMonitor();
static NodeMonitor *instance();
void start();
void shutdown();
bool init();
private:
NodeMonitor();
void ProducerThreadFunc(); //线程函数
void ConsumerThreadFunc(); //线程函数
NodeMonitor(const NodeMonitor &) = delete;
NodeMonitor &operator=(const NodeMonitor &) = delete;
private:
std::thread producer_thread_; //生产者线程,不停的往消息队列塞入监控到的用户状态信息消息.
static int count_;
std::unique_ptr<StateSubmitor> submitor_;
MessageQueue msg_queue_; //消息队列
std::thread consumer_thread_;//消费者线程, 不停的从消息队列中抽出消息进行处理
bool shutdown_; //开关
};
}
namespace XX {
int NodeMonitor::count_ = 0;//初始化
void MessageQueue::push(const std::string& message) {
std::unique_lock<std::mutex> lock(mutex_);
msg_queue_.push(message);//塞入消息
cond_.notify_one();//通知消费
}
std::string MessageQueue::pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]{//等待消息到来
return !empty();
});
std::string msg = msg_queue_.front();//拿到消息
msg_queue_.pop();
return msg;
}
bool MessageQueue::empty() {
return msg_queue_.empty();
}
StateSubmitor::StateSubmitor(MessageQueue& msg_queue)
: msg_queue_(msg_queue) {}
void StateSubmitor::submit(const std::string& content) {
//提交状态信息消息的业务操作
std::cout << "消息为: " << content << std::endl;
//将业务状态消息push到消息队列中
msg_queue_.push(content);
}
void StateSubmitor::flush() {
//清空所有消息
}
StateSubmitor::~StateSubmitor() {
this->flush();
}
NodeMonitor::NodeMonitor():shutdown_(false){
}
NodeMonitor::~NodeMonitor(){
this->shutdown();//释放资源...操作
}
void NodeMonitor::ProducerThreadFunc() {
while (!shutdown_) { //不断生产
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::string msg = "消息";
msg += std::to_string(count_);
count_ ++;
submitor_->submit(msg);
}
}
NodeMonitor* NodeMonitor::instance(){
static NodeMonitor* instance = nullptr;
static std::once_flag flag;
std::call_once(flag, [&]{
instance = new (std::nothrow) NodeMonitor();
});
return instance;
}
void NodeMonitor::ConsumerThreadFunc() {
while (!shutdown_) { //不断消费
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::string msg = msg_queue_.pop();//弹出一条消息
std::cout << "处理了: " << msg << std::endl;
}
}
void NodeMonitor::start() {
init();
}
void NodeMonitor::shutdown() {
shutdown_ = true;
}
bool NodeMonitor::init() {
submitor_.reset(new StateSubmitor(msg_queue_)); //创建submitor
//创建生产者,消费者线程并且join
producer_thread_ = std::thread(&NodeMonitor::ProducerThreadFunc, this);
consumer_thread_ = std::thread(&NodeMonitor::ConsumerThreadFunc, this);
producer_thread_.join();
consumer_thread_.join();
return true;
}
}
int main() {
XX::NodeMonitor::instance()->start();
return 0;
}