在程序中,通常竞争使用临界资源,但如果不加限制就很可能出现异常或未达到预期的结果。
临界资源一次仅允许被一个线程使用,它可以是一块内存、一个数据结构、一个文件或者任何其他具有排他性使用的东西。
这些必须互斥执行的代码段称为“临界区(Critical Section,CS)”。临界区(代码段)实施对临界资源的操作,为了阻止问题的产生,一次只能有一个线程进入临界区。
一、互斥量
互斥量是通过QMutex和QMutexLocker实现。
QMutex 的目的是保护一个对象、数据结构或代码段,以便一次只有一个线程可以访问它(这类似于 Java 关键字synchronized
)。通常最好将互斥锁与QMutexLocker一起使用,因为这样可以轻松确保锁定和解锁的执行一致。
QMutex构造的锁,默认是QMutex::NonRecursive,也就是说只能锁定一次,如果无法确定时,可以用QMutex::try_lock()得到目前锁的状态;
QMutex::QMutex(RecursionMode mode = NonRecursive)
如下演示锁和临界资源直接的影响:
main.cpp
#include <QCoreApplication>
#include "mythread.h"
#include <QDebug>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// creating three thread instances
MyThread thread1("A"), thread2("B"), thread3("C");
qDebug() << "hello from GUI thread " << a.thread()->currentThreadId();
// thread start -> call run()
thread1.start();
thread2.start();
thread3.start();
return a.exec();
}
mythread.h
#ifndef MYTHREAD_H
#define MYTHREAD_H
#include <QThread>
#include <QString>
class MyThread : public QThread
{
public:
// constructor
// set name and Stop is set as false by default
MyThread(QString s, bool b = false);
// overriding the QThread's run() method
void run();
// variable that mutex protects
bool Stop;
private:
QString name;
};
#endif // MYTHREAD_H
mythread.cpp
#include "mythread.h"
#include <QDebug>
#include <QMutex>
MyThread::MyThread(QString s, bool b) : name(s), Stop(b
{
}
// run() will be called when a thread starts
void MyThread::run()
{
qDebug() << this->name << " " << this->Stop;
for(int i = 0; i <= 5; i++)
{
QMutex mutex;
// prevent other threads from changing the "Stop" value
mutex.lock();
if(this->Stop) break;
mutex.unlock();
qDebug() << this->name << " " << i;
}
}
QMutexLocker可以简化互斥量的处理,自动lock和unlock,可以避免复杂情况时出错。
如下所示:在mythread.h增加一个 QMutex mutex;然后mythread.cpp就如下所示了,通常仅使用一条语句叫解决了复杂的lock和unlock问题。
void MyThread::run()
{
qDebug() << this->name << " " << this->Stop;
for(int i = 0; i <= 5; i++)
{
QMutexLocker locker(&mutex);
if(this->Stop) break;
qDebug() << this->name << " " << i;
}
}
二、信号量
信号量可以理解为对互斥量功能的扩展,互斥量只能锁定一次而信号量可以获取多次,它可以用来保护一定数量的同种资源。
信号量的典型用法是控制生产者/消费者之间共享的环形缓冲区。
qmythread.cpp
#ifndef QMYTHREAD_H
#define QMYTHREAD_H
//#include <QObject>
#include <QThread>
//#include <QMutex>
class QThreadDAQ : public QThread
{
Q_OBJECT
private:
bool m_stop=false; //停止线程
protected:
void run() Q_DECL_OVERRIDE;
public:
QThreadDAQ();
void stopThread();
};
class QThreadShow : public QThread
{
Q_OBJECT
private:
bool m_stop=false; //停止线程
protected:
void run() Q_DECL_OVERRIDE;
public:
QThreadShow();
void stopThread();
signals:
void newValue(int *data,int count, int seq);
};
#endif // QMYTHREAD_H
qmythread.h
#include "qmythread.h"
#include <QSemaphore>
//#include <QTime>
const int BufferSize = 8;
int buffer1[BufferSize];
int buffer2[BufferSize];
int curBuf=1; //当前正在写入的Buffer
int bufNo=0; //采集的缓冲区序号
quint8 counter=0;//数据生成器
QSemaphore emptyBufs(2);//信号量:空的缓冲区个数,初始资源个数为2
QSemaphore fullBufs; //满的缓冲区个数,初始资源为0
QThreadDAQ::QThreadDAQ()
{
}
void QThreadDAQ::stopThread()
{
// QMutexLocker locker(&mutex);
m_stop=true;
}
void QThreadDAQ::run()
{
m_stop=false;//启动线程时令m_stop=false
bufNo=0;//缓冲区序号
curBuf=1; //当前写入使用的缓冲区
counter=0;//数据生成器
int n=emptyBufs.available();
if (n<2) //保证 线程启动时emptyBufs.available==2
emptyBufs.release(2-n);
while(!m_stop)//循环主体
{
emptyBufs.acquire();//获取一个空的缓冲区
for(int i=0;i<BufferSize;i++) //产生一个缓冲区的数据
{
if (curBuf==1)
buffer1[i]=counter; //向缓冲区写入数据
else
buffer2[i]=counter;
counter++; //模拟数据采集卡产生数据
msleep(20); //每50ms产生一个数
}
bufNo++;//缓冲区序号
if (curBuf==1) // 切换当前写入缓冲区
curBuf=2;
else
curBuf=1;
fullBufs.release(); //有了一个满的缓冲区,available==1
}
quit();
}
void QThreadShow::run()
{
m_stop=false;//启动线程时令m_stop=false
int n=fullBufs.available();
if (n>0)
fullBufs.acquire(n); //将fullBufs可用资源个数初始化为0
while(!m_stop)//循环主体
{
fullBufs.acquire(); //等待有缓冲区满,当fullBufs.available==0阻塞
int bufferData[BufferSize];
int seq=bufNo;
if(curBuf==1) //当前在写入的缓冲区是1,那么满的缓冲区是2
for (int i=0;i<BufferSize;i++)
bufferData[i]=buffer2[i]; //快速拷贝缓冲区数据
else
for (int i=0;i<BufferSize;i++)
bufferData[i]=buffer1[i];
emptyBufs.release();//释放一个空缓冲区
emit newValue(bufferData,BufferSize,seq);//给主线程传递数据
}
quit();
}
QThreadShow::QThreadShow()
{
}
void QThreadShow::stopThread()
{
// QMutexLocker locker(&mutex);
m_stop=true;
}
dialog.h
#ifndef DIALOG_H
#define DIALOG_H
#include <QDialog>
#include <QTimer>
#include "qmythread.h"
namespace Ui {
class Dialog;
}
class Dialog : public QDialog
{
Q_OBJECT
private:
QThreadDAQ threadProducer;
QThreadShow threadConsumer;
protected:
void closeEvent(QCloseEvent *event);
public:
explicit Dialog(QWidget *parent = 0);
~Dialog();
private slots:
void onthreadA_started();
void onthreadA_finished();
void onthreadB_started();
void onthreadB_finished();
void onthreadB_newValue(int *data, int count, int bufNo);
void on_btnClear_clicked();
void on_btnStopThread_clicked();
void on_btnStartThread_clicked();
private:
Ui::Dialog *ui;
};
#endif // DIALOG_H
dialog.cpp
#include "dialog.h"
#include "ui_dialog.h"
void Dialog::closeEvent(QCloseEvent *event)
{//窗口关闭
if (threadProducer.isRunning())
{
threadProducer.terminate();//结束线程的run()函数执行
threadProducer.wait();//
}
if (threadConsumer.isRunning())
{
threadConsumer.terminate(); //因为threadB可能处于等待状态,所以用terminate强制结束
threadConsumer.wait();//
}
event->accept();
}
Dialog::Dialog(QWidget *parent) :
QDialog(parent),
ui(new Ui::Dialog)
{
ui->setupUi(this);
connect(&threadProducer,SIGNAL(started()),this,SLOT(onthreadA_started()));
connect(&threadProducer,SIGNAL(finished()),this,SLOT(onthreadA_finished()));
connect(&threadConsumer,SIGNAL(started()),this,SLOT(onthreadB_started()));
connect(&threadConsumer,SIGNAL(finished()),this,SLOT(onthreadB_finished()));
connect(&threadConsumer,SIGNAL(newValue(int*,int,int)),
this,SLOT(onthreadB_newValue(int*,int,int)));
}
Dialog::~Dialog()
{
delete ui;
}
void Dialog::onthreadA_started()
{
ui->LabA->setText("Thread Producer状态: started");
}
void Dialog::onthreadA_finished()
{
ui->LabA->setText("Thread Producer状态: finished");
}
void Dialog::onthreadB_started()
{
ui->LabB->setText("Thread Consumer状态: started");
}
void Dialog::onthreadB_finished()
{
ui->LabB->setText("Thread Consumer状态: finished");
}
void Dialog::onthreadB_newValue(int *data, int count, int bufNo)
{ //读取threadConsumer 传递的缓冲区的数据
QString str=QString::asprintf("第 %d 个缓冲区:",bufNo);
for (int i=0;i<count;i++)
{
str=str+QString::asprintf("%d, ",*data);
data++;
}
str=str+'\n';
ui->plainTextEdit->appendPlainText(str);
}
void Dialog::on_btnClear_clicked()
{
ui->plainTextEdit->clear();
}
void Dialog::on_btnStopThread_clicked()
{//结束线程
// threadConsumer.stopThread();//结束线程的run()函数执行
threadConsumer.terminate(); //因为threadB可能处于等待状态,所以用terminate强制结束
threadConsumer.wait();//
threadProducer.terminate();//结束线程的run()函数执行
threadProducer.wait();//
ui->btnStartThread->setEnabled(true);
ui->btnStopThread->setEnabled(false);
}
void Dialog::on_btnStartThread_clicked()
{//启动线程
threadConsumer.start();
threadProducer.start();
ui->btnStartThread->setEnabled(false);
ui->btnStopThread->setEnabled(true);
}
如图可以看出,没有出现丢失缓冲区或数据点的情况,两个线程之间协调的很好,将run函数中模拟采样率的延时时间调整为2毫秒也没问题(正常设置为 50 毫秒)在实际的数据采集中,要保证不丢失缓冲区或数据点,数据读取线程的速度必须快过数据写入缓冲区的线程的速度。
//信号量源代码已经上传为压缩包,可在文章顶部下载;