openharmony5.0.0中C++公共基础类测试-线程相关(一)

news2025/4/24 11:58:41

C++公共基础类测试及源码剖析

延续传统,show me the code,除了给出应用示例还重点分析了下openharmony中的实现。

简介

openharmony中提供了C++公共基础类库,为标准系统提供了一些常用的C++开发工具类本文分析其实现,并给出使用示例,库主要涉及内容如下:

  • 文件、路径、字符串相关操作的能力增强接口
  • 读写锁、信号量、定时器、线程增强及线程池等接口
  • 安全数据容器、数据序列化等接口
  • 各子系统的错误码相关定义

环境

系统:openharmony5.0.0
部署:参照源码中的说明文档(quickstart-pkg-3568-helloworld.md)

本节重点说明关于线程相关的内容。

源码目录

commonlibrary/c_utils
├─ base
│   ├── include       # 对各子系统开放的接口头文件
│   ├── src           # 源文件
│   └── test          # 测试代码
├─ Docs
    ├── en            # 英文文档
    └── zh-cn         # 中文文档

线程相关

概述

包含强化线程能力、线程池、线程安全Map、线程安全栈与队列、线程安全阻塞队列

  • 强化线程能力:提供例如启动线程、同步通知、异步通知等功能的接口
  • 线程池:提供线程安全的线程池功能。线程安全是对于线程池本身而非池内线程而言的。 维护一个任务队列,一个线程组。使用者向任务队列中注册需要进行的任务,线程组执行任务队列中的任务
  • 线程安全Map:提供了一个线程安全的map实现。SafeMap在STL map基础上封装互斥锁,以确保对map的操作安全。
  • 线程安全栈与队列:线程安全队列,是在dequeue的基础上封装std::lock_guard,以此实现线程的相关操作。根据继承SafeQueueInner抽象类,并对dequeue的pop方法的重写,可以实现SafeStack和SafeQueue的相关方法
  • 线程安全阻塞队列:线程安全阻塞队列SafeBlockQueue类,提供阻塞和非阻塞版的入队入队和出队接口,并提供可最追踪任务完成状态的的SafeBlockQueueTracking类。

强化线程能力

接口说明

OHOS::Thread

返回值类型名称
Thread() 构造函数, 构造一个Thread对象,但并不会启动线程。
virtual ~Thread() 析构函数
ThreadStatusStart(const std::string& name, int32_t priority = THREAD_PROI_NORMAL, size_t stack = 0); 创建并启动一个子线程,循环执行Run(),当Run()返回false或通知退出时停止。
ThreadStatusNotifyExitSync() 同步通知线程退出,即阻塞式停止子线程。 当前线程被阻塞,等待子线程结束。
voidvirtual NotifyExitAsync() 异步通知线程退出,即子线程退出与否不阻塞当前线程。 通知子线程停止,当前线程继续运行。
boolvirtual ReadyToWork() 判断线程是否已经准备就绪,始终返回true。
boolIsExitPending() const 获取线程退出待定标志位。
boolIsRunning() const 判断线程是否在运行
pthread_tGetThread() const 获取线程ID
类图关系

在这里插入图片描述

源码剖析

从接口和类图中我们可以看到强化线程能力这一节,主要是提供例如启动线程、同步通知、异步通知等功能的接口,此库主要是通过thread类进行接口的封装,客户需要实现一个run的接口函数,此函数由库内部调用。异步通知是使用c++的一个线程同步的工具类condition_variable来实现的,其他的接口由对应的标志位来实现的(不再详细说)。

对condition_variable不熟悉的码友可以看下这篇介绍

启动函数调用流程如下:

创建并启动子线程的函数(start),主要通过linux系统函数pthread_create创建线程,将入口函数设置为ThreadParam类中的代理函数(Proxy),在Proxy函数中设置线程名、优先级等,并通过循环函数(ThreadStart)循环调用用户端的run函数,并实时检测更新线程状态。具体流程如下:

ThreadStatus Thread::Start(const std::string& name, int32_t priority, size_t stack)
    |-->status_ = ThreadStatus::OK //一些变量的初始化
    |-->ThreadParam para; //定义线程参数
	|-->para.startRoutine = ThreadStart;//初始化参数的值,由线程启动时调用
	|-->para.args = this;//将本线程(thread)传到参数中,作为ThreadStart的传参
	|-->para.name = name;//线程名称
	|-->para.priority = priority//线程优先级
	|-->bool res = CreatePThread(para, stack, &thread_)//创建线程
        |-->auto t = new ThreadParam;//新创建线程参数对象
		|-->t->startRoutine = para.startRoutine//将相关参数赋值到t对象中
         |-->para.args = t//将参数对象t作为ThreadParam::Proxy传参
		|-->para.startRoutine = reinterpret_cast<ThreadFunc>(&ThreadParam::Proxy)//强制进行类型转换
        |-->int result = pthread_create(&thread, &attr, reinterpret_cast<PThreadRoutine>(para.startRoutine), para.args)//核心函数将ThreadParam::Proxy作为线程的入口函数

线程创建后会触发代理函数(Proxy)如下:

static int Proxy(const ThreadParam* t)
    |-->(void)setpriority(PRIO_PROCESS, 0, prio);//系统函数,设置当前用户的优先级
	|-->prctl(PR_SET_NAME, threadName.substr(0, MAX_THREAD_NAME_LEN).c_str(), 0, 0, 0)//系统函数,设置进程名
     |-->t->startRoutine(t->args)//回调ThreadStart(由start函数设置)

t->startRoutine为start函数中设置的循环函数(ThreadStart),执行过程如下:

int Thread::ThreadStart(void* args)
    |-->循环执行以下操作
    |-->result = self->Run()//回调客户端重写的run函数
    |-->std::unique_lock<std::mutex> lk(self->lock_);
	|-->if ((!result) || self->exitPending_)//当退出标志为true时
	|-->self->cvThreadExited_.notify_all();//唤醒所有等待的线程
	|-->break;//线程退出循环

在同步通知线程退出时,采用了线程间同步的一种高级工具(std::condition_variable),允许线程在某些条件不满足时挂起,直到其他线程通知它们条件已经满足

ThreadStatus Thread::NotifyExitSync()
    |-->std::unique_lock<std::mutex> lk(lock_);//C++标准库中的一种 RAll风格的互斥锁管理器,,它可以方便地管理互斥锁的锁定和解锁操作。在进入代码块时自动锁定 self->lock_,在离开代码块时(无论是通寸正常执行结束还是通过异常退出),自动解锁 self->lock从而确保互斥锁的正确使用和避免死锁情况的发生
    |-->exitPending_ = true;//退出标志
    |-->while (running_) {cvThreadExited_.wait(lk);}//会释放互斥锁,然后在等待期间重新获取它,
    |-->exitPending_ = false;//退出标志
	|-->return status_;

由ThreadStart函数中已确认线程退出标志(exitPending_)后通过cvThreadExited_.notify_all()唤醒,此函数再进行返回。

应用示例

本案例完成如下工作:

  • 主线程每1秒打印子进程的相关信息。主线程在第5秒时,关闭子线程运行。
  • 创建1个子线程,每隔1秒打印当前运行次数。
// 时间标记量
static const int FORMAX = 5;

// 自定义类,继承OHOS::Thread,Run()重新写
class ThreadSample : public OHOS::Thread {
public:
    ThreadSample() : OHOS::Thread::Thread()
    {
    }
    ~ThreadSample()
    {
    }
protected:
    bool Run() override;
};

// 程序运行代码,每隔1秒打印相关信息
bool ThreadSample::Run(){
    static int current = 0;
    current++;
    cout << "Run(): current = " << current << endl;
    sleep(1);
    return true;
}

int main(int argc, char **argv){
    ThreadSample thread;
    // 启动线程
    thread.Start("thread sample", OHOS::THREAD_PROI_NORMAL, 0);
    // 打印线程相关信息
    for (int i = 0; i < (2 * FORMAX); i++) {
        cout << "main: i = " << i << endl;
        cout << "   ThreadId   = " << thread.GetThread() << endl;
        cout << "   ReadyToWork = " << thread.ReadyToWork() << endl;
        cout << "   IsExitPending = " << thread.IsExitPending() << endl;
        cout << "   IsRunning = " << thread.IsRunning() << endl;
        if (i == (1 * FORMAX)) {
            // 异步停止线程,不用等待,直接返回
            cout << "main: NotifyExitAsync" << endl;
            thread.NotifyExitAsync();
        }
        sleep(1);
    }
    thread.NotifyExitSync();// 等待退出
    return 0;
}
  • 运行结果
# ./utils_thread                                                               
main: i = 0
   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
Run(): current =  1  IsRunning = 1

Run(): current = main: i = 21

   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
   IsRunning = 1
Run(): current = 3
main: i = 2
   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
   IsRunning = 1
Run(): current = 4
main: i = 3
   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
   IsRunning = 1
Run(): current = 5
main: i = 4
   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
   IsRunning = 1
Run(): current = 6 
main: i = 5
   ThreadId   = 4152769824
   ReadyToWork = 1
   IsExitPending = 0
   IsRunning = 1
main: NotifyExitAsync
main: i = 6
   ThreadId   = 4294967295
   ReadyToWork = 1
   IsExitPending = 1
   IsRunning = 0
main: i = 7
   ThreadId   = 4294967295
   ReadyToWork = 1
   IsExitPending = 1
   IsRunning = 0
main: i = 8
   ThreadId   = 4294967295
   ReadyToWork = 1
   IsExitPending = 1
   IsRunning = 0
main: i = 9
   ThreadId   = 4294967295
   ReadyToWork = 1
   IsExitPending = 1
   IsRunning = 0

注意在第32行Run(): current = 6之后,便不再打印current,说明此线程已经退出,所以此后打印的信息即为线程退出后的状态。通过多次启动此进程(utils_thread),可见ThreadId在IsRunning为true的状态时呈现的是动态的,在为false时是固定的,也进一步说明threadid退出后固定为INVALID_PTHREAD_T

线程池

接口说明
  • OHOS::ThreadPool
Name
ThreadPool(const std::string& name = std::string())
构造ThreadPool。为线程池内线程命名。
~ThreadPool() override
voidAddTask(const Task& f)
向任务队列中添加一个Task。若未调用Start()则直接执行Task且不会向任务队列添加该Task.
size_tGetCurTaskNum()
获取当前任务数。
size_tGetMaxTaskNum() const
获取最大任务数。
std::stringGetName() const
获取线程池命名。
size_tGetThreadsNum() const
获取线程池内线程数。
voidSetMaxTaskNum(size_t maxSize)
设置任务队列中最大任务数。
uint32_tStart(int threadsNum)
启动给定数量threadsNum的线程,执行任务队列中的任务。
voidStop()
停止线程池,等待所有线程结束。
类关系图

ThreadPool

在这里插入图片描述

源码剖析

从以上类图可知线程池类(ThreadPool)继承了NoCopyable类,禁止拷贝操作,通过成员变量tasks维护一个任务队列,通过成员变量(threads)维护一个线程组。使用者向任务队列中注册需要进行的任务,线程组执行任务队列中的任务实现对任务的管理。以上主要操作涉及接口主要为Start(线程组执行任务队列中的任务)和AddTask(向任务队列中注册需要进行的任务)。

禁止拷贝操作的实现:

讨论禁止拷贝操作的实现首先要了解c++11引入的一种语法,用于显式地禁用某个函数,包括构造函数、析构函数、拷贝构造函数、赋值运算符等。通过将函数声明为 =delete,可以显式地阻止该函数被调用,即使在类内部或者友元函数中也无法使用。

class NoCopyable {
protected: //允许子类继承,但防止直接实例化!
    NoCopyable() {}//构造函数
    virtual ~NoCopyable() {}//析构函数
private://设置为private可以确保NoCopyable类的复制构造函数和移动赋值操作符完全不可访问,包括在其子类中。这样可以更好地隐藏类的实现细节,防止意外的复制和移动操作。如果将DISALLOW_COPY_AND_MOVE设置为protected,子类可能会尝试调用这些被删除的函数,导致编译错误。将它们设置为private可以确保编译器阻止任何对这些函数的调用,包括在子类中。
    DISALLOW_COPY_AND_MOVE(NoCopyable);//DISALLOW_COPY_AND_MOVE为定义的宏,可展开为如下:

DISALLOW_COPY_AND_MOVE(NoCopyable)展开后如下:

NoCopyable(const NoCopyable&) = delete;NoCopyable& operator= (const NoCopyable&) = delete//=delete;用于禁用复制构造函数,这意味着任何尝试复制该对象的代码都会导致编译错误
NoCopyable(NoCopyable&&) = delete;NoCopyable& operator= (NoCopyable&&) = delete  //&& 表示右值引用,意味着该构造函数可以接受一个临时对象(即即将被销毁的对象)作为参数
 //=delete;用于禁用移动构造函数。任何尝试通过移动构造函数来复制该对象的代码都会导致编译错误

线程组执行任务队列中的任务

线程组的执行主要通过start函数执行,传入的参数为启动的线程数量,主要执行过程如下:

uint32_t ThreadPool::Start(int numThreads)
|-->threads_.reserve(numThreads)//预留对应启动线程的空间
|-->for (int i = 0; i < numThreads; ++i) {
|-->std::thread t([this] { this->WorkInThread(); });//这个函数会一直循环,直到线程池停止
    |-->while (running_) {//只要是启动状态便一直循环
    |-->Task task = ScheduleTask();//任务调度,会将最新的任务取出来
        |--> while (tasks_.empty() && running_) {hasTaskToDo_.wait(lock);}//如果任务队列为空,且在运行状态时,便开启等待(hasTaskToDo_:线程间同步的一种高级工具(std::condition_variable))
        |-->task = tasks_.front();//取出最新的任务
        |-->if (maxTaskNum_ > 0)acceptNewTask_.notify_one();//通知给新任务(添加新任务时当超过最大任务数时便会wait)
    |-->task();//执行取出来的任务
|-->int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str());//设置线程名称
|-->threads_.push_back(std::move(t));//将新创建的线程对象t移动到threads容器中。使用std::move可以避免不必要的复制操作,提高性能。

向任务队列中注册需要进行的任务

通过AddTask接口向任务队列(tasks_)中注册任务,主要执行过程如下:

void ThreadPool::AddTask(const Task &f)
|-->if (threads_.empty())f();//如果线程组为空则直接执行任务函数
|--> while (Overloaded()) acceptNewTask_.wait(lock)//如果过载时则一直等待
    |-->return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_)//判读是否过载
|-->tasks_.push_back(f)//将任务函数放入任务队列中
|-->hasTaskToDo_.notify_one()//通知线程组有新的任务来了

其中hasTaskToDo_和acceptNewTask_皆为std::condition_variable类型,此类型的详细说明可参考这篇文章

应用示例

本案例完成如下工作:

  • 创建1个线程池,设置该线程池内部有1024个线程空间。
  • 启动5个线程。每个线程每秒打印1段字符串,10秒后停止。
// 线程执行函数
void func(const std::string &name)
{
    for (int i = 0; i < 10; i++) {
        cout << "func: " << name << " and i = " << i << endl;
        sleep(1);
    }
}

int main(int argc, char **argv)
{
    OHOS::ThreadPool thread_poll("thread_poll_name");
    int max_task_num = 1024;
    int start_task_num = 5;
    string str_name;

    cout << "get max task num(default): " << thread_poll.GetMaxTaskNum() << endl;// 查看默认的线程池最大任务数
    
    cout << "set max task num: " << max_task_num << endl;
    thread_poll.SetMaxTaskNum(max_task_num);// 设置线程池的最大任务数
    cout << "get max task num(set): " << thread_poll.GetMaxTaskNum() << endl;// 再查看线程池最大任务数

    cout << "start thread: " << start_task_num << endl; // 开启启动线程
    thread_poll.Start(start_task_num);
    for (int i = 0; i < start_task_num; i++) {
        cout << "add task: i = " << i << endl;
        str_name = "thread_pool_" + to_string(i);
        auto task = std::bind(func, str_name);用于绑定函数和参数,不用立即执行
        // 添加任务到线程池中,并启动运行
        thread_poll.AddTask(task);
        sleep(1);
    }

    cout << "stop thread: start" << endl;    // 等待关闭所有的线程,会等待线程池程序全部结束才返回
    thread_poll.Stop();
    cout << "stop thread: end" << endl;
    return 0;
}
  • 执行过程

由于是多个线程同时执行所以会导致串口打印的时候反应不过来,有一些字段会稍微乱一些。

# ./utils_thread_poll                                                          
get max task num(default): 0
set max task num: 1024
get max task num(set): 1024
start thread: 5
add task: i = 0
func: thread_pool_0 and i = 0
add task: i = 1
func: thread_pool_0 and i = 1
func: thread_pool_1 and i = 0
func: thread_pool_0 and i = add task: i = 22

func: thread_pool_2 and i = 0
func: thread_pool_1 and i = 1
add task: i = func: thread_pool_0 and i = 33

func: thread_pool_1 and i = 2
func: thread_pool_2 and i = 1
func: thread_pool_3 and i = 0
func: thread_pool_0 and i = 4
add task: i = 4
func: thread_pool_2 and i = 2
func: thread_pool_3 and i = 1
func: thread_pool_4 and i = func: 0
thread_pool_1 and i = 3
func: thread_pool_0 and i = func: thread_pool_2 and i = 5
3
func: thread_pool_1 and i = 4
func: thread_pool_3 and i = 2
func: thread_pool_4 and i = 1
stop thread: start
func: thread_pool_0 and i = 6
func: thread_pool_1 and i = 5
func: thread_pool_3 and i = 3
func: thread_pool_4 and i = 2
func: thread_pool_2 and i = 4
func: thread_pool_0 and i = 7
func: thread_pool_4 and i = 3
func: thread_pool_3 and i = func: thread_pool_2func: thread_pool_1 and i = 6
 and i = 5
4
func: thread_pool_0 and i = 8
func: thread_pool_2 and i = func: 6thread_pool_1 and i = 
func: thread_pool_3 and i = 5
7
func: thread_pool_4 and i = 4
func: thread_pool_0 and i = 9
func: thread_pool_3 and i = func: 6
func: thread_pool_4 and i = func: thread_pool_2 and i = 57

thread_pool_1 and i = 8
func: thread_pool_3 and i = 7
func: thread_pool_4 and i = 6func: 
thread_pool_2 and i = 8
func: thread_pool_1 and i = 9
func: thread_pool_3 and i = 8
func: thread_pool_4 and i = 7
func: thread_pool_2 and i = 9
func: thread_pool_3 and i = 9
func: thread_pool_4 and i = 8
func: thread_pool_4 and i = 9
stop thread: end
  • 疑问答疑

问题一:WorkInThread函数中使用while(running_)死循环,发现只有当调用stop时,才会将running_置为false,当不调用stop时便会一直死循环吧?这样明显不合理吧?

答疑:这个循环不会导致死循环,因为它依赖于 ScheduleTask() 函数来获取任务,并且只有在有任务的情况下才会执行 task()。如果 ScheduleTask() 返回一个空的任务(即 taskfalse),线程会在下一次循环中再次尝试获取任务,而不会无限执行同一个任务。在某些情况下确实可能导致线程一直循环而不做任何工作,根据代码片段,ScheduleTask() 函数内部调用了 acceptNewTask_.notify_one();,这表明线程池使用了条件变量来管理任务的调度,从而避免了不必要的CPU消耗。

线程安全Map

接口说明
返回类型名称
SafeMap()
SafeMap(const SafeMap& rhs)
~SafeMap()
voidClear() 删除map中存储的所有键值对。
voidEnsureInsert(const K& key, const V& value) 在map中插入元素。
voidErase(const K& key) 删除map中键为key的键值对。
boolFind(const K& key, V& value) 在map中查找元素。
boolFindOldAndSetNew(const K& key, V& oldValue, const V& newValue) 在map中查找元素并将key对应的oldValue替换为newValue
boolInsert(const K& key, const V& value) 在map中插入新元素。
boolIsEmpty() 判断map是否为空。
voidIterate(const SafeMapCallBack& callback) 遍历map中的元素。
SafeMap&operator=(const SafeMap& rhs)
VReadVal(const K& key) 线程安全地读map内元素
voidChangeValueByLambda(const K& key, LambdaCallback callback) 线程安全地操作safemap内元素,操作行为需要自定义
intSize() 获取map的size大小。
类关系图

在这里插入图片描述

源码剖析

源码是在safe_map.h文件中直接实现的。

主要是对std::map<K, V> map_的二次封装,和c++标准map的接口类似,就不再详细说明了。只看个例子

bool FindOldAndSetNew(const K& key, V& oldValue, const V& newValue)//查找并替换键值对
|-->auto iter = map_.find(key);//查找键为key的键值对
|-->if (iter != map_.end()) {//判断是否找到了键值对
|-->oldValue = iter->second;//将旧值赋给oldValue
|-->map_.erase(iter);//删除原有键值对
|-->map_.insert(std::pair<K, V>(key, newValue));//插入新的键值对
应用示例

本案例主要完成如下工作:

  • 创建1个子线程,负责每秒调用EnsureInsert()插入元素;
  • 创建1个子线程,负责每秒调用Insert()插入元素;
  • 创建1个子线程,负责每秒调用Erase()删除元素;
  • 创建1个子线程,负责每秒调用FindOldAndSetNew()替换元素的值;
  • 主线程等待上述线程结束,Iterate()和Find()查看所有元素;
  • 主线程等待上述线程结束,清空SafeMap,并调用IsEmpty()查看是否确实是空。

通过创建线程池,设置最大任务数,并启动线程,通过AddTask的方式添加不同的任务,例如EnsureInsert、Insert等。

OHOS::ThreadPool threads("name_rwlock_threads")
threads.SetMaxTaskNum(128);
threads.Start(4);
str_name = "Thread_xxx";//Thread_EnsureInsert
auto task_ensure_insert = std::bind(<任务函数名称>, str_name);
threads.AddTask(task_ensure_insert);//添加任务

测试在map中插入元素(EnsureInsert)

static struct MapInfo m_map1_insert[] = {
    { 1, "aaa" },
    { 2, "bbb" },
    { 3, "ccc" },
    { 4, "ddd" },
    { 5, "eee" },
    { 6, "fff" },
    { 7, "ggg" },
    { 8, "hhh" },
    { 9, "iii" },
    { 10, "jjj" }
};
// 使用EnsureInsert()插入元素
void map_ensure_insert(const string& name)
{
    int key = 0;
    string value = "";
    
    for (int i = 0; i < (sizeof(m_map1_insert) / sizeof(struct MapInfo)); i++) {
        key = m_map1_insert[i].key;
        value = m_map1_insert[i].str;
        m_safemap.EnsureInsert(key, value);
        cout << name << ": insert successful and key = " << key << " and value = " << value << endl;
        sleep(1);
    }
}

测试在map中插入元素(Insert)

static struct MapInfo m_map2_insert[] = {
    { 101, "111" },
    { 102, "222" },
    { 103, "333" },
    { 104, "444" },
    { 105, "555" },
    { 106, "666" },
    { 107, "777" },
    { 108, "888" },
    { 109, "999" },
    { 110, "000" }
};
void map_insert(const string& name)
{
    int key = 0;
    string value = "";
    
    for (int i = 0; i < (sizeof(m_map2_insert) / sizeof(struct MapInfo)); i++) {
        key = m_map2_insert[i].key;
        value = m_map2_insert[i].str;
        if (m_safemap.Insert(key, value) == false) {
            cout << name << ": insert failed and key = " << to_string(key) << " and value = " << value << endl;
        } else {
            cout << name << ": insert successful and key = " << to_string(key) << " and value = " << value << endl;
        }
        sleep(1);
    }
}

测试在map中使用erase删除元素

void map_erase(const string& name)
{
    int key = 0;
    string value = "";
    
    for (int i = 0; i < (sizeof(m_map2_insert) / sizeof(struct MapInfo)); i++) {
        key = m_map2_insert[i].key;
        m_safemap.Erase(key);
        cout << name << ": Erase successful and key = " << to_string(key) << endl;
        sleep(1);
    }
}

测试使用FindOldAndSetNew替换元素的值

void map_findold_and_setnew(const string& name)
{
    int key = 0;
    string old_value = "";
    string new_value = "";
    
    for (int i = 0; i < (sizeof(m_map1_insert) / sizeof(struct MapInfo)); i++) {
        key = m_map1_reset[i].key;
        old_value = "";
        new_value = m_map1_reset[i].str;
        
        if (m_safemap.FindOldAndSetNew(key, old_value, new_value) == false) {
            cout << name << ": FindOldAndSetNew failed and key = " << to_string(key) << " and old_value = " << old_value << endl;
        } else {
            cout << name << ": FindOldAndSetNew successful and key = " << to_string(key)
                 << " and old_value = " << old_value << " and new_value = " << new_value << endl;
        }
        
        sleep(1);
    }
}

最后设置结束,并等待结束,然后打印SafeMap所有元素,最后清空并确认是否清空完成。

// 设置结束,并等待结束
    threads.Stop();
    cout << "Threads Stop" << endl;
    
    // 打印SafeMap所有元素
    cout << "SafeMap Iterate: " << endl;
    m_safemap.Iterate(map_iterate_print);
    cout << "SafeMap Find: " << endl;
    map_find_print();
    
    // 清空SafeMap
    cout << "SafeMap Clear" << endl;
    m_safemap.Clear();
    // 查看是否是空的
    cout << "SafeMap IsEmpty: " << m_safemap.IsEmpty() << endl;
  • 执行结果
# ./utils_thread_map                                                           
Thread_EnsureInsert: insert successful and key = 1Thread_Insert and value = aaa: insert successful and key = 
101 and value = 111
Thread_Erase: Erase successful and key = 101
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 1 and old_value = aaa and new_value = abc
Thread_EnsureInsert: insert successful and key = 2 and value = bbb
Thread_Erase: Erase successful and key = 102
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 2 and old_value = bbb and new_value = bcd
Thread_Insert: insert successful and key = 102 and value = 222
Thread_Erase: Erase successful and key = 103
Thread_EnsureInsert: insert successful and key = Thread_InsertThread_FindOldAndSetNew: FindOldAndSetNew failed and key = 3 and old_value = 
3 and value = ccc
: insert successful and key = 103 and value = 333
Thread_Erase: Erase successful and key = 104
Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 4 and old_value = ddd and new_value = def
Thread_EnsureInsert: insert successful and key = Thread_Insert: insert successful and key = 104 and value = 444
4 and value = ddd
Thread_Erase: Erase successful and key = 105
Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 5 and old_value = 
Thread_Insert: insert successful and key = 105 and value = 555
Thread_EnsureInsert: insert successful and key = 5 and value = eee
Thread_Erase: Erase successful and key = 106
Thread_Insert: insert successful and key = Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 1066 and value = 666 and old_value = 
Thread_EnsureInsert: insert successful and key = 6 and value = fff

Thread_Erase: Erase successful and key = 107
Thread_Insert: insert successful and key = 107 and value = 777
Thread_EnsureInsert: insert successful and key = Thread_FindOldAndSetNew: FindOldAndSetNew successful and key = 7 and old_value = ggg and new_value = ghi
7 and value = ggg
Thread_Erase: Erase successful and key = 108
Thread_EnsureInsert: insert successful and key = 8 and value = hhh
Thread_Insert: insert successful and key = 108Thread_FindOldAndSetNew and value = : FindOldAndSetNew failed and key = 8 and old_value = 
888
Thread_Erase: Erase successful and key = Thread_FindOldAndSetNew109: FindOldAndSetNew failed and key = 9 and old_value = 

Thread_Insert: insert successful and key = 109 and value = 999
Thread_EnsureInsert: insert successful and key = 9 and value = iii
Thread_Erase: Erase successful and key = 110
Thread_Insert: insert successful and key = 110 and value = 000
Thread_FindOldAndSetNew: FindOldAndSetNew failed and key = 10 and old_value = 
Thread_EnsureInsert: insert successful and key = 10 and value = jjj
Threads Stop
SafeMap Iterate: 
key = 1, value = abc
key = 2, value = bcd
key = 3, value = ccc
key = 4, value = def
key = 5, value = eee
key = 6, value = fff
key = 7, value = ghi
key = 8, value = hhh
key = 9, value = iii
key = 10, value = jjj
key = 103, value = 333
key = 104, value = 444
key = 105, value = 555
key = 106, value = 666
key = 107, value = 777
key = 108, value = 888
key = 109, value = 999
key = 110, value = 000
SafeMap Find: 
key = 1, value = abc
key = 2, value = bcd
key = 3, value = ccc
key = 4, value = def
key = 5, value = eee
key = 6, value = fff
key = 7, value = ghi
key = 8, value = hhh
key = 9, value = iii
key = 10, value = jjj
key = 103, value = 333
key = 104, value = 444
key = 105, value = 555
key = 106, value = 666
key = 107, value = 777
key = 108, value = 888
key = 109, value = 999
key = 110, value = 000
SafeMap Clear
SafeMap IsEmpty: 1

线程安全阻塞队列

这些类模板使用 C++ 标准库中的互斥锁(std::mutex)、条件变量(std::condition_variable)和队列(std::queue)来实现线程安全的操作。

接口说明
#include <safe_block_queue.h>

OHOS::SafeBlockQueue

返回值名称
SafeBlockQueue(int capacity) 构造函数,整数参数 capacity,用于设置队列的最大容量
virtual ~SafeBlockQueue() 析构函数
voidvirtual Push(T const& elem) 入队操作(阻塞版),将一个元素插入队列的末尾,如果队列已满,则使用条件变量阻塞当前线程,直到队列中有空位。插入完成后,通知等待队列不为空的线程。
boolvirtual PushNoWait(T const& elem) 入队操作(非阻塞版),如果队列已满,直接返回 false;否则插入元素并返回 true。
TPop() 出队操作(阻塞版),从队列的开头移除一个元素,如果队列为空,则使用条件变量阻塞当前线程,直到队列中有元素。移除完成后,通知等待队列不满的线程。
boolPopNotWait(T& outtask) 出队操作(非阻塞版),如果队列为空,直接返回 false;否则移除元素并返回 true
unsigned intSize() 获取队列容量
boolIsEmpty() 队列判空
boolIsFull() 队列判满

OHOS::SafeBlockQueueTracking,该类继承自 SafeBlockQueueSafeBlockQueueTracking 旨在提供线程安全的阻塞队列功能,并在此基础上增加对任务完成情况的跟踪能力。

class SafeBlockQueueTracking : public SafeBlockQueue

返回值名称
explicitSafeBlockQueueTracking(int capacity) 构造函数,接受一个 int 类型的参数 capacity,用于设置队列的最大容量,并初始化
virtual ~SafeBlockQueueTracking() 析构函数
voidvirtual Push(T const& elem) 入队操作(阻塞版),首先增加 unfinishedTaskCount_ 计数。如果队列已满,则使用条件变量 cvNotFull_ 阻塞当前线程,直到队列有空位。插入完成后,通知等待队列不为空的线程。
boolvirtual PushNoWait(T const& elem) 入队操作(非阻塞版)先检查队列是否已满。如果已满,则直接返回 false;否则插入元素,并增加 unfinishedTaskCount_ 计数,通知等待队列不为空的线程,返回 true
boolOneTaskDone() 一个任务完成时的响应函数,它会减少 unfinishedTaskCount_ 计数。如果计数减少到 0,则通知所有等待任务完成的线程
voidJoin() 等待未完成队列,会阻塞当前线程,直到队列中的所有任务都已完成(即 unfinishedTaskCount_ 计数变为 0)
intGetUnfinishTaskNum() 获取未完成任务数
类关系图

在这里插入图片描述

源码剖析

源码是在safe_block_queue.h文件中直接实现的。

主要是对std::queue queueT_的二次封装,和c++标准map的接口类似,就不再详细说明了,直接看应用示例吧。

应用示例

(1)使用SafeBlockQueue接口的案例

  • 判断命令行是否使用阻塞,还是非阻塞;
  • 创建子线程生产者,使用阻塞/非阻塞方式,入队操作;
  • 创建子线程消费者,使用阻塞/非阻塞方式,出队操作;
  • 主线程等待所有子线程结束
// 定义常量
const char STRING_WAIT[] = "wait";
const char STRING_NOWAIT[] = "nowait";

// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueue<int> m_safeBlockQueue(SIZE);

// 返回时间字符串
static string get_curtime()
{
    string str = "";
    time_t time_now = time(nullptr);
    struct tm tm_now;

    localtime_r(&time_now, &tm_now);
    str += to_string(tm_now.tm_year + 1900) + "-" + to_string(tm_now.tm_mon + 1) + "-" + to_string(tm_now.tm_mday);
    str += " ";
    str += to_string(tm_now.tm_hour) + ":" + to_string(tm_now.tm_min) + ":" + to_string(tm_now.tm_sec);

    return str;
}

static void product_wait(const string &name)
{
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
        // 使用阻塞方式的SafeBlockQueue
        m_safeBlockQueue.Push(i);
        cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;
        // 等待1秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

static void consume_wait(const string &name)
{
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;
        // 使用阻塞方式的SafeBlockQueue
        int value = m_safeBlockQueue.Pop();
        cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;
        // 等待0.5秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

static void product_nowait(const string &name)
{
    bool ret;
    
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
        // 使用非阻塞方式的SafeBlockQueue
        ret = m_safeBlockQueue.PushNoWait(i);
        cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;
        // 等待1秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

static void consume_nowait(const string &name)
{
    for (int i = 0; i < (SIZE * 2); i++) {
        // 等待有新数据
        int value = 0;
        // 使用非阻塞方式的SafeBlockQueue
        bool ret = m_safeBlockQueue.PopNotWait(value);
        cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;
        // 等待500毫秒
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

int main(int argc, char **argv)
{
    bool enable_wait = true;
    OHOS::ThreadPool threads("threads");
    string str_name = "";

    // 获取命令行参数
    if (argc != 2) {
        cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
        return -1;
    }
    if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {
        enable_wait = true;
    } else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {
        enable_wait = false;
    } else {
        cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
        return -1;
    }

    threads.SetMaxTaskNum(4);
    threads.Start(2);

    // 创建生产者线程
    cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;
    auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));
    threads.AddTask(task_product);
    
    // 等待SIZE秒,将SafeBlockQueue容器填满
    cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));
    
    // 创建消费者线程
    cout << get_curtime() << ", " << __func__ << ": consume start" << endl; 
    auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));
    threads.AddTask(task_consumer);
    
    threads.Stop();
    cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl; 
    
    return 0;
}
  • 执行结果
# ./utils_thread_queue wait                                                    
2017-8-5 17:2:16, main: task_product start
2017-8-5 17:2:16, main: sleep 5 sec
2017-8-5 17:2:16, product_wait: Push Start, i = 0
2017-8-5 17:2:16, product_wait: Push Success, i = 0
2017-8-5 17:2:16, product_wait: Sleep 1 sec 
2017-8-5 17:2:17, product_wait: Push Start, i = 1
2017-8-5 17:2:17, product_wait: Push Success, i = 1
2017-8-5 17:2:17, product_wait: Sleep 1 sec 
2017-8-5 17:2:18, product_wait: Push Start, i = 2
2017-8-5 17:2:18, product_wait: Push Success, i = 2
2017-8-5 17:2:18, product_wait: Sleep 1 sec 
2017-8-5 17:2:19, product_wait: Push Start, i = 3
2017-8-5 17:2:19, product_wait: Push Success, i = 3
2017-8-5 17:2:19, product_wait: Sleep 1 sec 
2017-8-5 17:2:20, product_wait: Push Start, i = 4
2017-8-5 17:2:20, product_wait: Push Success, i = 4
2017-8-5 17:2:20, product_wait: Sleep 1 sec 
2017-8-5 17:2:21, main: consume start
2017-8-5 17:2:21, consume_wait: Pop Start, i = 0
2017-8-5 17:2:21, consume_wait: Pop Success, i = 0, value = 0
2017-8-5 17:2:21, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:21, product_wait: Push Start, i = 5
2017-8-5 17:2:21, product_wait: Push Success, i = 5
2017-8-5 17:2:21, product_wait: Sleep 1 sec 
2017-8-5 17:2:21, consume_wait: Pop Start, i = 1
2017-8-5 17:2:21, consume_wait: Pop Success, i = 1, value = 1
2017-8-5 17:2:21, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:22, consume_wait: Pop Start, i = 2
2017-8-5 17:2:22, consume_wait: Pop Success, i = 2, value = 2
2017-8-5 17:2:22, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:22, product_wait: Push Start, i = 6
2017-8-5 17:2:22, product_wait: Push Success, i = 6
2017-8-5 17:2:22, product_wait: Sleep 1 sec 
2017-8-5 17:2:22, consume_wait: Pop Start, i = 3
2017-8-5 17:2:22, consume_wait: Pop Success, i = 3, value = 3
2017-8-5 17:2:22, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:23, consume_wait: Pop Start, i = 4
2017-8-5 17:2:23, consume_wait: Pop Success, i = 4, value = 4
2017-8-5 17:2:23, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:23, product_wait: Push Start, i = 7
2017-8-5 17:2:23, product_wait: Push Success, i = 7
2017-8-5 17:2:23, product_wait: Sleep 1 sec 
2017-8-5 17:2:23, consume_wait: Pop Start, i = 5
2017-8-5 17:2:23, consume_wait: Pop Success, i = 5, value = 5
2017-8-5 17:2:23, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:24, product_wait: Push Start, i = 8
2017-8-5 17:2:24, product_wait: Push Success, i = 8
2017-8-5 17:2:24, product_wait: Sleep 1 sec 
2017-8-5 17:2:24, consume_wait: Pop Start, i = 6
2017-8-5 17:2:24, consume_wait: Pop Success, i = 6, value = 6
2017-8-5 17:2:24, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:24, consume_wait: Pop Start, i = 7
2017-8-5 17:2:24, consume_wait: Pop Success, i = 7, value = 7
2017-8-5 17:2:24, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:25, product_wait: Push Start, i = 9
2017-8-5 17:2:25, product_wait: Push Success, i = 9
2017-8-5 17:2:25, product_wait: Sleep 1 sec 
2017-8-5 17:2:25, consume_wait: Pop Start, i = 8
2017-8-5 17:2:25, consume_wait: Pop Success, i = 8, value = 8
2017-8-5 17:2:25, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:25, consume_wait: Pop Start, i = 9
2017-8-5 17:2:25, consume_wait: Pop Success, i = 9, value = 9
2017-8-5 17:2:25, consume_wait: Sleep 0.5 sec 
2017-8-5 17:2:26, main: Queue Wait End

(2)使用SafeBlockQueueTracking接口的案例

  • 判断命令行是否使用阻塞,还是非阻塞;
  • 创建子线程生产者,使用阻塞/非阻塞方式,入队操作;
  • 创建子线程消费者,使用阻塞/非阻塞方式,出队操作;
  • 主线程等待所有子线程结束
// 定义常量
const char STRING_WAIT[] = "wait";
const char STRING_NOWAIT[] = "nowait";

// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueueTracking<int> m_safeBlockQueueTracking(SIZE);

// 返回时间字符串
static string get_curtime()
{
    string str = "";
    time_t time_now = time(nullptr);
    struct tm tm_now;

    localtime_r(&time_now, &tm_now);
    str += to_string(tm_now.tm_year + 1900) + "-" + to_string(tm_now.tm_mon + 1) + "-" + to_string(tm_now.tm_mday);
    str += " ";
    str += to_string(tm_now.tm_hour) + ":" + to_string(tm_now.tm_min) + ":" + to_string(tm_now.tm_sec);

    return str;
}

static void product_wait(const string &name)
{
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
        // 使用阻塞方式的SafeBlockQueueTracking
        m_safeBlockQueueTracking.Push(i);
        cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;
        // 等待1秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

static void consume_wait(const string &name)
{
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;
        // 使用阻塞方式的SafeBlockQueueTracking
        int value = m_safeBlockQueueTracking.Pop();
        cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;
        m_safeBlockQueueTracking.OneTaskDone();
        cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;
        // 等待0.5秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

static void product_nowait(const string &name)
{
    bool ret;
    
    for (int i = 0; i < (2 * SIZE); i++) {
        cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
        // 使用非阻塞方式的SafeBlockQueueTracking
        ret = m_safeBlockQueueTracking.PushNoWait(i);
        cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;
        if (ret == true) {
            m_safeBlockQueueTracking.OneTaskDone();
            cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;
        }
        // 等待1秒
        cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

static void consume_nowait(const string &name)
{
    for (int i = 0; i < (SIZE * 2); i++) {
        // 等待有新数据
        int value = 0;
        // 使用非阻塞方式的SafeBlockQueueTracking
        bool ret = m_safeBlockQueueTracking.PopNotWait(value);
        cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;
        // 等待500毫秒
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

int main(int argc, char **argv)
{
    bool enable_wait = true;
    OHOS::ThreadPool threads("threads");
    string str_name = "";

    // 获取命令行参数
    if (argc != 2) {
        cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
        return -1;
    }
    if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {
        enable_wait = true;
    } else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {
        enable_wait = false;
    } else {
        cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
        return -1;
    }

    threads.SetMaxTaskNum(4);
    threads.Start(2);

    // 创建生产者线程
    cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;
    auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));
    threads.AddTask(task_product);
    
    // 等待SIZE秒,将SafeBlockQueue容器填满
    cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));
    
    // 创建消费者线程
    cout << get_curtime() << ", " << __func__ << ": consume start" << endl; 
    auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));
    threads.AddTask(task_consumer);
    
    threads.Stop();
    cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl; 
    
    return 0;
}
  • 执行结果
# ./utils_thread_queue_track nowait                                            
2017-8-5 17:37:0, main: task_product start
2017-8-5 17:37:0, main: sleep 2017-8-5 17:37:0, product_nowait: Push Start, i = 5 sec
0
2017-8-5 17:37:0, product_nowait: Push ret = 1, i = 0
2017-8-5 17:37:0, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:0, product_nowait: Sleep 1 sec 
2017-8-5 17:37:1, product_nowait: Push Start, i = 1
2017-8-5 17:37:1, product_nowait: Push ret = 1, i = 1
2017-8-5 17:37:1, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:1, product_nowait: Sleep 1 sec 
2017-8-5 17:37:2, product_nowait: Push Start, i = 2
2017-8-5 17:37:2, product_nowait: Push ret = 1, i = 2
2017-8-5 17:37:2, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:2, product_nowait: Sleep 1 sec 
2017-8-5 17:37:3, product_nowait: Push Start, i = 3
2017-8-5 17:37:3, product_nowait: Push ret = 1, i = 3
2017-8-5 17:37:3, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:3, product_nowait: Sleep 1 sec 
2017-8-5 17:37:4, product_nowait: Push Start, i = 4
2017-8-5 17:37:4, product_nowait: Push ret = 1, i = 4
2017-8-5 17:37:4, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:4, product_nowait: Sleep 1 sec 
2017-8-5 17:37:5, main: consume start
2017-8-5 17:37:5, consume_nowait: PopNotWait ret = 1, value = 0
2017-8-5 17:37:5, product_nowait: Push Start, i = 5
2017-8-5 17:37:5, product_nowait: Push ret = 1, i = 5
2017-8-5 17:37:5, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:5, product_nowait: Sleep 1 sec 
2017-8-5 17:37:5, consume_nowait: PopNotWait ret = 1, value = 1
2017-8-5 17:37:6, consume_nowait: PopNotWait ret = 1, value = 2
2017-8-5 17:37:6, product_nowait: Push Start, i = 6
2017-8-5 17:37:6, product_nowait: Push ret = 1, i = 6
2017-8-5 17:37:6, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:6, product_nowait: Sleep 1 sec 
2017-8-5 17:37:6, consume_nowait: PopNotWait ret = 1, value = 3
2017-8-5 17:37:7, consume_nowait: PopNotWait ret = 1, value = 4
2017-8-5 17:37:7, product_nowait: Push Start, i = 7
2017-8-5 17:37:7, product_nowait: Push ret = 1, i = 7
2017-8-5 17:37:7, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:7, product_nowait: Sleep 1 sec 
2017-8-5 17:37:7, consume_nowait: PopNotWait ret = 1, value = 5
2017-8-5 17:37:8, consume_nowait: PopNotWait ret = 1, value = 6
2017-8-5 17:37:8, product_nowait: Push Start, i = 8
2017-8-5 17:37:8, product_nowait: Push ret = 1, i = 8
2017-8-5 17:37:8, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:8, product_nowait: Sleep 1 sec 
2017-8-5 17:37:8, consume_nowait: PopNotWait ret = 1, value = 7
2017-8-5 17:37:9, product_nowait: Push Start, i = 2017-8-5 17:37:9, consume_nowait: PopNotWait ret = 19, value = 8

2017-8-5 17:37:9, product_nowait: Push ret = 1, i = 9
2017-8-5 17:37:9, product_nowait: Push OneTaskDone successful
2017-8-5 17:37:9, product_nowait: Sleep 1 sec 
2017-8-5 17:37:9, consume_nowait: PopNotWait ret = 1, value = 9
2017-8-5 17:37:10, main: Queue Wait End

线程安全栈与队列

线程安全队列,是在dequeue的基础上封装std::lock_guard,以此实现线程的相关操作。根据继承SafeQueueInner抽象类,并对dequeue的pop方法的重写,可以实现SafeStack和SafeQueue的相关方法。
对std::lock_guard不熟悉的码友可以看这篇文章

接口说明
  • OHOS::SafeQueueInner
返回值名称
SafeQueueInner() 构造函数
virtual ~SafeQueueInner() 析构函数
voidErase(T& object) 移除某个元素
boolEmpty() 队列判空
voidClear() 清空队列元素
intSize() 获取队列的容量
voidPush(const T& pt) 入队操作
voidvirtual void DoPush(const T& pt) = 0 Push底层调用DoPush,需要重写
boolPop(T& pt) 出队操作
boolvirtual DoPop(T& pt) = 0 Push底层调用DoPop,需要重写
  • OHOS::SafeQueue

class SafeQueue : public SafeQueueInner

返回值名称
voidDoPush(const T& pt) 入队操作
boolDoPop(T& pt) 出队操作
  • OHOS::SafeStack

class SafeStack : public SafeQueueInner

返回值名称
voidDoPush(const T& pt) 入栈操作
boolDoPop(T& pt) 出栈操作
类关系图

在这里插入图片描述

源码剖析

源码是在safe_queue.h文件中直接实现的,主要包含两个类SafeStack、SafeQueue。

SafeStack模板类,可以存储任何类型的对象。它继承自 SafeQueueInner<T>,这意味着 SafeStack将继承 SafeQueueInner 的所有成员变量和成员函数,除了那些被重写的虚函数。SafeQueue模板类,可以存储任何类型的对象。它继承自 SafeQueueInner<T>,这意味着 SafeQueue 将继承 SafeQueueInner 的所有成员变量和成员函数,除了那些被重写的虚函数。两个类只有重写的push和pop函数有区别,主要在于实现队列为先进先出,栈为后进先出的特点。

应用示例

本案例主要完成如下工作:

  • 创建2个子线程,1个线程负责入队操作,1个线程负责出队操作
  • 子线程入队操作,每1秒做1次入队操作,循环5次
  • 子线程入队操作,每2秒做1次出队操作,循环5次
// 定义栈变量
static OHOS::SafeStack<int> m_safeStack;

static void funcSafeStackPush(const string &name)
{
    for (int i = 0; i < 5; i++) {
        // 入队操作
        cout << name << ", Push Start and i = " << i << endl;
        m_safeStack.Push(i);
        cout << name << ", Push Successful and i = " << i << " and value = " << i << endl;
        
        // 睡眠1秒
        cout << name << ", Sleep 1 sec" << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

static void funcSafeStackPop(const string &name)
{
    bool ret;
    int value;
    
    for (int i = 0; i < 5; i++) {
        // 出队操作
        cout << name << ", Pop Start and i = " << i << endl;
        ret = m_safeStack.Pop(value);
        if (ret) {
            cout << name << ", Pop Successful and i = " << i << " and ret = " << ret << " and value = " << value << endl;
        } else {
            cout << name << ", Pop Failed and i = " << i  << endl;
        }
        
        // 睡眠2秒
        cout << name << ", Sleep 2 sec" << endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 2));
    }
}

int main(int argc, char **argv)
{
    OHOS::ThreadPool threads("threads");
    string str_name;
    
    // 清空SafeStack
    m_safeStack.Clear();
    
    threads.SetMaxTaskNum(128);
    threads.Start(2);
    
    // 开启子线程,使用Push
    str_name = "Thread_SafeStack_Push";
    auto task_push = std::bind(funcSafeStackPush, str_name);
    threads.AddTask(task_push);
    
    // 开启子线程,使用Pop
    str_name = "Thread_SafeStack_Pop";
    auto task_pop = std::bind(funcSafeStackPop, str_name);
    threads.AddTask(task_pop);
    
    // 设置结束,并等待结束
    threads.Stop();
    cout << "Threads Stop" << endl;
    
    return 0;
}
  • 执行结果
# ./utils_thread_safestack                                                     
Thread_SafeStack_Push, Push Start and i = 0
Thread_SafeStack_Push, Push Successful and i = 0 and value = 0
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 0
Thread_SafeStack_Pop, Pop Successful and i = 0 and ret = 1 and value = 0
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 1
Thread_SafeStack_Push, Push Successful and i = 1 and value = 1
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 1
Thread_SafeStack_Pop, Pop Successful and i = 1 and ret = 1 and value = 1
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 2
Thread_SafeStack_Push, Push Successful and i = 2 and value = 2
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Push, Push Start and i = 3
Thread_SafeStack_Push, Push Successful and i = 3 and value = 3
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 2
Thread_SafeStack_Pop, Pop Successful and i = 2 and ret = 1 and value = 3
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Push, Push Start and i = 4
Thread_SafeStack_Push, Push Successful and i = 4 and value = 4
Thread_SafeStack_Push, Sleep 1 sec
Thread_SafeStack_Pop, Pop Start and i = 3
Thread_SafeStack_Pop, Pop Successful and i = 3 and ret = 1 and value = 4
Thread_SafeStack_Pop, Sleep 2 sec
Thread_SafeStack_Pop, Pop Start and i = 4
Thread_SafeStack_Pop, Pop Successful and i = 4 and ret = 1 and value = 2
Thread_SafeStack_Pop, Sleep 2 sec
Threads Stop

参考文档

C++公共基础库:包含简要说明、编译构建及各个功能节点的使用说明

凌智电子开发板

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2341442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TDengine 数据订阅设计

简介 数据订阅作为 TDengine 的一个核心功能&#xff0c;为用户提供了灵活获取所需数据的能力。通过深入了解其内部原理&#xff0c;用户可以更加有效地利用这一功能&#xff0c;满足各种实时数据处理和监控需求。 基本概念 主题 与 Kafka 一样&#xff0c;使用 TDengine 数…

URP-UGUI交互功能实现

一、非代码层面实现交互&#xff08;SetActive&#xff09; Button &#xff1a;在OnClick&#xff08;&#xff09;中添加SetActive方法&#xff08;但是此时只首次有效&#xff09; Toggle &#xff1a;在OnClick&#xff08;&#xff09;中添加动态的SetActive方法 &#…

UniGoal 具身导航 | 通用零样本目标导航 CVPR 2025

UniGoal的提出了一个通用的零样本目标导航框架&#xff0c;能够统一处理多种类型的导航任务 &#xff08;如对象类别导航、实例图像目标导航和文本目标导航&#xff09;&#xff0c;而无需针对特定任务进行训练或微调。 它的特点是 图匹配与多阶段探索策略&#xff01;&#x…

通过Quartus II实现Nios II编程

目录 一、认识Nios II二、使用Quartus II 18.0Lite搭建Nios II硬件部分三、软件部分四、运行项目 一、认识Nios II Nios II软核处理器简介 Nios II是Altera公司推出的一款32位RISC嵌入式处理器&#xff0c;专门设计用于在FPGA上运行。作为软核处理器&#xff0c;Nios II可以通…

Linux/AndroidOS中进程间的通信线程间的同步 - IPC方式简介

前言 从来没有总结过Linux/Android系统中进程间的通信方式和线程间的同步方式&#xff0c;这个专栏就系统总结讨论一下。首先从标题可知&#xff0c;讨论问题的主体是进程和线程、通信和同步&#xff1b;在这里默认你理解进程和线程的区别。通信和同步有什么概念上的区别&…

Windows:注册表配置应用

0、简介 本篇博客记录一下&#xff0c;日常的系统注册表配置选项&#xff0c;以防再次遇到问题不知如何解决。 1、开机启动配置 HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run :: 此位置存储了所有用户登录时需要启动的程序。 在该项下新建字符串值&#…

WebXR教学 05 项目3 太空飞船小游戏

准备工作 自动创建 package.json 文件 npm init -y 安装Three.js 3D 图形库&#xff0c;安装现代前端构建工具Vite&#xff08;用于开发/打包&#xff09; npm install three vite 启动 Vite 开发服务器&#xff08;推荐&#xff09;&#xff08;正式项目开发&#xff09; …

达梦统计信息收集情况检查

查询达梦某个对象上是否有统计信息 select id,T_TOTAL,N_SMAPLE,N_DISTINCT,N_NULL,BLEVEL,N_LEAF_PAGES,N_LEAF_USED_PAGES,LAST_GATHERED from sysstats where id IN (select id from sysobjects where upper(name)upper(&objname));可能有系统对象&#xff0c;可以增加…

【matlab】气泡图的应用

【matlab】气泡图的应用 .rtcContent { padding: 30px; } .lineNode {font-size: 12pt; font-family: "Times New Roman", Menlo, Monaco, Consolas, "Courier New", monospace; font-style: normal; font-weight: normal; } clear load zb_equi.mat load …

飞帆控件:在编辑模式下额外加载的库

飞帆是一个自由的控件设计平台。在飞帆中&#xff0c;我们可以很方便地创建基于 Vue 2 组件的控件&#xff0c;并使用控件来搭建网页。 他山之石&#xff0c;可以攻玉。在创建控件中&#xff0c;使用 js 、css 依赖库能让我们的控件更强大。 有些时候&#xff0c;在编辑模式下…

Super-Vlan和MUX-Vlan的原理、配置、区别

Super-Vlan 原理 Super-Vlan也叫Aggregate-Vlan。 一般的三层交换机中&#xff0c;通常是采用一个VLAN对应一个vlanif接口的方式实现广播域之间的互通&#xff0c;这在某些情况下导致了IP地址的浪费。因为一个VLAN对应的子网中&#xff0c;子网号、子网定向广播地址、子网缺…

el-table怎么显示 特殊单元格的值

1. 在 el-table-column 上绑定了 formatter 方法 formatEntityName &#xff0c;它会对每一行该列的数据&#xff08; cellValue &#xff09;进行处理。 2. 在 formatEntityName 方法中&#xff0c;尝试对传入的 cellValue 进行 JSON.parse 操作&#xff0c;并根…

2025-04-23 Python深度学习3——Tensor

文章目录 1 张量1.1 数学定义1.2 PyTorch中的张量 2 创建 Tensor2.1 直接创建**torch.tensor()****torch.from_numpy()** 2.2 依据数值创建**torch.zeros() / torch.zeros_like()****torch.ones() / torch.ones_like()****torch.full() / torch.full_like()****torch.arange() …

在统信UOS/麒麟Kylin OS操作系统中配置APT和GIT代理

在统信UOS/麒麟Kylin OS操作系统中配置APT和GIT代理 在内网环境中&#xff0c;直接访问外部资源可能会受到限制&#xff0c;这时候配置APT和GIT的代理就显得尤为重要。本文将详细介绍如何在统信UOS和麒麟Kylin OS操作系统中配置APT和GIT的代理。 为什么需要配置APT和GIT代理&…

第十七讲、Isaaclab中使用操作空间控制器

0 前言 官方教程&#xff1a;https://isaac-sim.github.io/IsaacLab/main/source/tutorials/05_controllers/run_osc.html IsaacsimIsaaclab安装&#xff1a;https://blog.csdn.net/m0_47719040/article/details/146389391?spm1001.2014.3001.5502 有时候&#xff0c;仅使用…

基于SpringBoot的校园二手商品在线交易系统+含项目运行说明文档

基于SpringBoot的校园二手商品在线交易系统含项目运行说明文档 专注校园二手交易平台是一个基于Java的在线市场&#xff0c;专为学生设计&#xff0c;便于买卖二手商品。平台提供全面的用户管理功能&#xff0c;包括学生、管理员和二手商品卖家账户管理。商品管理功能允许用户…

详解springcloud gateway工作原理、断言、filter、uri、id、全局跨域、globalfilter等以及关键源码实现

1.gateway概念 网关就是当前微服务项目的"统一入口"程序中的网关就是当前微服务项目对外界开放的统一入口所有外界的请求都需要先经过网关才能访问到我们的程序提供了统一入口之后,方便对所有请求进行统一的检查和管理 2. 网关的主要功能 将所有请求统一经过网关网…

C++面向对象特性之继承篇

C语音是面向过程的语言&#xff0c;而C在其之上多了面向对象的特性&#xff0c;面向对象三大特性:封装性、继承性、多态性。今天主包来讲讲自己学到的关于C继承特性的知识。 一、继承是什么 继承是提高代码复用的一种重要手段。正如C的模版、泛型编程等等都是为了实现代码复用…

【AI News | 20250423】每日AI进展

AI Repos 1、suna Suna是一款完全开源的AI助手&#xff0c;旨在通过自然对话帮助用户轻松完成现实世界的任务。它作为您的数字伙伴&#xff0c;提供研究、数据分析和日常问题解决等功能&#xff0c;并结合强大的能力与直观的界面&#xff0c;理解您的需求并交付成果。Suna的工…

【学习准备】算法和开发知识大纲

1 缘起 今年&#xff08;2025年&#xff09;的职业升级结果&#xff1a;不通过。没办法升职加薪了。 需要开始完善学习&#xff0c;以应对不同的发展趋势&#xff0c;为了督促自己学习&#xff0c;梳理出相关学习大纲。 分为算法和开发两部分。 算法&#xff0c;包括基础算法和…