前言
学过Android应用开发的大概都知道Handler这个东东,这也是面试中老生常谈的问题。其实不仅仅是Android,iOS以及PC的操作系统,底层也离不开消息机制。这个属于生产消费者问题。
什么是生产者消费者模式
生产者消费者模式(Producer-Consumer Pattern)是一种多线程协作的设计模式,其中有两种不同类型的线程:生产者和消费者。这两种线程通过共享的缓冲区(队列、缓冲池等)进行通信,以协调任务的执行。
在生产者消费者模式中,生产者负责生成一些数据或执行一些任务,并将其放置到共享的缓冲区中。而消费者则负责从缓冲区中获取数据或任务,并进行处理。这样,生产者和消费者可以在不同的时间段内独立地工作,通过共享的缓冲区进行数据交换,实现了解耦和协作。
典型的生产者消费者模式中包含以下元素:
- 生产者(Producer): 生成数据或执行任务,并将其放入共享缓冲区。
- 消费者(Consumer): 从共享缓冲区获取数据或任务,并进行相应的处理。
- 共享缓冲区: 用于存储生产者生成的数据或任务,以供消费者获取。
- 同步机制: 用于确保生产者和消费者之间的协调和同步,防止竞争条件(Race Condition)和死锁(Deadlock)等问题。
生产者消费者模式的一个典型场景是任务队列。生产者将任务放入队列,而消费者从队列中取出任务并执行。这种模式可以有效地利用多线程的优势,提高系统的效率和性能。
使用C++手写一个简易的消息处理机制
效果演示
完整代码
//
// main.cpp
// ProducerCustomer
//
// Created by dora on 2023/12/23.
//
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#define EVENT_TYPE_1 0
#define EVENT_TYPE_2 1
#define EVENT_TYPE_3 2
std::queue<int> buffer; // 共享缓冲区
std::mutex mtx; // 互斥锁
std::condition_variable condition; // 条件变量
const int bufferSize = 5; // 消息的缓冲区大小
// 生产者线程函数
void producer() {
while (true) {
{
// 生产者休眠40毫秒,太快会导致消费者线程处理不过来,如造成UI卡顿掉帧
std::chrono::milliseconds sleepDuration(40);
std::this_thread::sleep_for(sleepDuration);
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足:缓冲区不满
condition.wait(lock, [] { return buffer.size() < bufferSize; });
// 生产数据并放入缓冲区
buffer.push(EVENT_TYPE_1);
std::cout << "Produced: " << EVENT_TYPE_1 << " Event Size:("<< buffer.size() <<")\n";
}
// 通知消费者缓冲区非空
condition.notify_all();
}
}
void producer2() {
while (true) {
{
// 生产者休眠40毫秒,太快会导致消费者线程处理不过来,如造成UI卡顿掉帧
std::chrono::milliseconds sleepDuration(40);
std::this_thread::sleep_for(sleepDuration);
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足:缓冲区不满
condition.wait(lock, [] { return buffer.size() < bufferSize; });
// 生产数据并放入缓冲区
buffer.push(EVENT_TYPE_2);
std::cout << "Produced: " << EVENT_TYPE_2 << " Event Size:("<< buffer.size() <<")\n";
}
// 通知消费者缓冲区非空
condition.notify_all();
}
}
void producer3() {
while (true) {
{
// 生产者休眠40毫秒,太快会导致消费者线程处理不过来,如造成UI卡顿掉帧
std::chrono::milliseconds sleepDuration(40);
std::this_thread::sleep_for(sleepDuration);
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足:缓冲区不满
condition.wait(lock, [] { return buffer.size() < bufferSize; });
// 生产数据并放入缓冲区
buffer.push(EVENT_TYPE_3);
std::cout << "Produced: " << EVENT_TYPE_3 << " Event Size:("<< buffer.size() <<")\n";
}
// 通知消费者缓冲区非空
condition.notify_all();
}
}
// 消费者线程函数
void consumer() {
while (true) {
{
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足:缓冲区非空
condition.wait(lock, [] { return !buffer.empty(); });
// 消费数据
int data = buffer.front();
buffer.pop();
std::cout << "Consumed: " << data << " Event Size:("<< buffer.size() <<")\n";
}
// 通知生产者缓冲区不满
condition.notify_all();
}
}
int main() {
// 创建生产者和消费者线程
std::thread producerThread(producer);
std::thread producerThread2(producer2);
std::thread producerThread3(producer3);
std::thread consumerThread(consumer);
// 等待线程结束
producerThread.join();
producerThread2.join();
producerThread3.join();
consumerThread.join();
system("pause");
return 0;
}
代码解析
这里一共使用了三个生产者线程,一个消费者线程。消费者线程通常就是我们Android中的UI线程或者叫主线程,Handler mH所对应的就是主线程,因为它对应的就是主线程的Looper。由于这只是一个简单的模拟,实际上还会使用到如thread_local_posix.h,Linux使用的是POSIX标准的线程。生产者模拟的就是消息的发送者,消费者模拟的就是消息处理器Handler,所有消息发送到主线程的Handler统一处理,比如发送广播、启动服务、打开一个Activity等。
mutex: 互斥变量
unique_lock: 锁,作用同
java.util.concurrent.locks.Lock
condition_variable: 条件变量,作用同
java.util.concurrent.Condition
当然Java中我们通常使用的是ReentrantLock
。std::unique_lock的析构函数会自动释放锁,从而避免了手动调用 unlock
。在性能上,ReentrantLock支持可重入性,需要维护更多的状态信息,所以无论是算法复杂度还是语言上性能都是不如std::unique_lock的。queue的front
函数只查看队列最前面的元素,而不移除掉,而pop
函数则是移除掉队列最前面的元素。[] {}
是一个Lambda表达式,C++11开始支持的特性。condition_variable的notify_all
和notify_one
都是用于通知其他等待锁被释放的线程可以开始执行,只不过notify_all会通知所有等待在条件变量上的线程,让它们都尝试重新获取锁并继续执行。这样,所有等待的线程都有机会获得锁。而notify_one只会通知等待在条件变量上的一个线程,具体是哪一个线程取决于实现和调度器的策略。这样,只有一个线程会获得锁,其他线程仍然保持等待状态。
总结
线程安全是多线程开发的一个必不可少的技能,让多线程可以更好的协作执行任务。相当于在马路上制定了交通规则,即使很多车,也不会相撞。当然在现实生活中,会有意外发生,在程序的世界里,除非程序异常退出,否则都是按规则执行任务的。本文主要是让大家熟悉一下C/C++底层语言,为Android NDK开发打基础。