C++多线程复习
下面的代码搭建了简单的一个生产者-消费者模型,在capture()函数中进行入队操作,infer()函数中进行出队操作,为了模拟采图-推理流程,在函数中调用Sleep()函数延时。
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <windows.h>
std::queue<std::string> jobs;
void capture()
{
int id = 0;
while (true)
{
std::string name = std::to_string(id++) + ".jpg";
std::cout << "capture: " << name << " jobs.size():" << jobs.size() << std::endl;
jobs.push(name);
Sleep(1000);
}
}
void infer()
{
while (true)
{
if (!jobs.empty())
{
auto pic = jobs.front();
jobs.pop();
std::cout <<"infer: "<< pic << std::endl;
Sleep(1000);
}
}
}
int main()
{
std::thread t0(capture);
std::thread t1(infer);
t0.join();
t1.join();
return 0;
}
输出结果:
capture: 0.jpg jobs.size():0
infer: 0.jpg
capture: 1.jpg jobs.size():0
infer: 1.jpg
capture: 2.jpg jobs.size():0
infer: 2.jpg
capture: 3.jpg jobs.size():0
infer: 3.jpg
capture: 4.jpg jobs.size():0
infer: 4.jpg
capture: 5.jpg jobs.size():0
infer: 5.jpg
capture: 6.jpg jobs.size():0
infer: 6.jpg
capture: 7.jpg jobs.size():0
infer: 7.jpg
capture: 8.jpg jobs.size():0
infer: 8.jpg
capture: 9.jpg jobs.size():0
infer: 9.jpg
capture: 10.jpg jobs.size():0
infer: 10.jpg
...
现在我们把capture函数中的Sleep(1000)改成Sleep(500),再次执行程序,则输出:
capture: 0.jpg jobs.size():0
infer: 0.jpg
capture: 1.jpg jobs.size():0
infer: 1.jpg
capture: 2.jpg jobs.size():0
capture: 3.jpg jobs.size():1
infer: 2.jpg
capture: 4.jpg jobs.size():1
capture: 5.jpg jobs.size():2
infer: 3.jpg
capture: 6.jpg jobs.size():2
capture: 7.jpg jobs.size():3
infer: 4.jpg
capture: 8.jpg jobs.size():3
capture: 9.jpg jobs.size():4
infer: 5.jpg
capture: 10.jpg jobs.size():4
...
此时发现采图-推理流程不能同步。为了解决这个问题,加入对队列长度的限制:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <windows.h>
std::queue<std::string> jobs;
const int limit = 3;
void capture()
{
int id = 0;
while (true)
{
std::string name = std::to_string(id++) + ".jpg";
std::cout << "capture: " << name << " jobs.size():" << jobs.size() << std::endl;
if(jobs.size()< limit)
jobs.push(name);
Sleep(500);
}
}
void infer()
{
while (true)
{
if (!jobs.empty())
{
auto pic = jobs.front();
jobs.pop();
std::cout <<"infer: "<< pic << std::endl;
Sleep(1000);
}
}
}
int main()
{
std::thread t0(capture);
std::thread t1(infer);
t0.join();
t1.join();
return 0;
}
此时输出结果:
capture: 0.jpg jobs.size():0
infer: 0.jpg
capture: 1.jpg jobs.size():0
infer: 1.jpg
capture: 2.jpg jobs.size():0
capture: 3.jpg jobs.size():1
infer: 2.jpg
capture: 4.jpg jobs.size():1
capture: 5.jpg jobs.size():2
infer: 3.jpg
capture: 6.jpg jobs.size():2
capture: 7.jpg jobs.size():3
infer: 4.jpg
capture: 8.jpg jobs.size():2
capture: 9.jpg jobs.size():3
infer: 5.jpg
capture: 10.jpg jobs.size():2
...
由于std::queue不是线程安全的数据结构,故引入锁std::mutex:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
std::queue<std::string> jobs;
std::mutex lock;
void capture()
{
int id = 0;
while (true)
{
{
std::unique_lock<std::mutex> l(lock);
std::string name = std::to_string(id++) + ".jpg";
std::cout << "capture: " << name << " " << "jobs.size(): " << jobs.size() << std::endl;
}
Sleep(500);
}
}
void infer()
{
while (true)
{
if (!jobs.empty())
{
{
std::lock_guard<std::mutex> l(lock);
auto job = jobs.front();
jobs.pop();
std::cout << "infer: " << job << std::endl;
}
Sleep(1000);
}
}
}
int main()
{
std::thread t0(capture);
std::thread t1(infer);
t0.join();
t1.join();
return 0;
}
此时输出:
capture: 0.jpg jobs.size(): 0
capture: 1.jpg jobs.size(): 0
capture: 2.jpg jobs.size(): 0
capture: 3.jpg jobs.size(): 0
capture: 4.jpg jobs.size(): 0
capture: 5.jpg jobs.size(): 0
capture: 6.jpg jobs.size(): 0
capture: 7.jpg jobs.size(): 0
capture: 8.jpg jobs.size(): 0
capture: 9.jpg jobs.size(): 0
capture: 10.jpg jobs.size(): 0
...
有时候生产者还需要拿到消费者处理之后的结果,因此引入std::promise和std::condition_variable对程序进行完善:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
struct Job
{
std::string input;
std::shared_ptr<std::promise<std::string>> pro;
};
std::queue<Job> jobs;
std::mutex lock;
std::condition_variable cv;
const int limit = 5;
void capture()
{
int id = 0;
while (true)
{
Job job;
{
std::unique_lock<std::mutex> l(lock);
std::string name = std::to_string(id++) + ".jpg";
std::cout << "capture: " << name << " " << "jobs.size(): " << qjobs.size() << std::endl;
cv.wait(l, [&]() { return qjobs.size() < limit; });
job.input = name;
job.pro.reset(new std::promise<std::string>());
jobs.push(job);
}
auto result = job.pro->get_future().get();
std::cout << result << std::endl;
Sleep(500);
}
}
void infer()
{
while (true)
{
if (!qjobs.empty())
{
{
std::lock_guard<std::mutex> l(lock);
auto job = jobs.front();
jobs.pop();
cv.notify_all();
std::cout << "infer: " << job.input << std::endl;
auto result = job.input + " after infer";
job.pro->set_value(result);
}
Sleep(1000);
}
}
}
int main()
{
std::thread t0(capture);
std::thread t1(infer);
t0.join();
t1.join();
return 0;
}
输出:
capture: 0.jpg jobs.size(): 0
infer: 0.jpg
0.jpg after infer
capture: 1.jpg jobs.size(): 0
infer: 1.jpg
1.jpg after infer
capture: 2.jpg jobs.size(): 0
infer: 2.jpg
2.jpg after infer
capture: 3.jpg jobs.size(): 0
infer: 3.jpg
3.jpg after infer
capture: 4.jpg jobs.size(): 0
infer: 4.jpg
4.jpg after infer
capture: 5.jpg jobs.size(): 0
infer: 5.jpg
5.jpg after infer
capture: 6.jpg jobs.size(): 0
infer: 6.jpg
6.jpg after infer
capture: 7.jpg jobs.size(): 0
infer: 7.jpg
7.jpg after infer
capture: 8.jpg jobs.size(): 0
infer: 8.jpg
8.jpg after infer
capture: 9.jpg jobs.size(): 0
infer: 9.jpg
9.jpg after infer
capture: 10.jpg jobs.size(): 0
infer: 10.jpg
10.jpg after infer
...
yolov5目标检测多线程C++部署
有了上面的基础,我们来写一个基本的目标检测多线程部署程序,为了简单起见选用OpenCV的dnn作为推理框架,出于篇幅限制下面只给出main.cpp部分:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
#include "yolov5.h"
struct Job
{
cv::Mat input_image;
std::shared_ptr<std::promise<cv::Mat>> output_image;
};
std::queue<Job> jobs;
std::mutex lock;
std::condition_variable c_v;
const int limit = 10;
void capture(cv::VideoCapture cap)
{
while (true)
{
Job job;
cv::Mat frame;
{
cap.read(frame);
if (frame.empty())
break;
std::unique_lock<std::mutex> l(lock);
c_v.wait(l, [&]() { return jobs.size() < limit; });
job.input_image = frame;
job.output_image.reset(new std::promise<cv::Mat>());
jobs.push(job);
}
cv::Mat result = job.output_image->get_future().get();
cv::imshow("result", result);
cv::waitKey(1);
}
}
void infer(cv::dnn::Net net)
{
while (true)
{
if (!jobs.empty())
{
std::lock_guard<std::mutex> l(lock);
auto job = jobs.front();
jobs.pop();
c_v.notify_all();
cv::Mat input_image = job.input_image, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
job.output_image->set_value(output_image);
}
}
}
int main(int argc, char* argv[])
{
cv::VideoCapture cap("test.mp4");
cv::dnn::Net net = cv::dnn::readNet("yolov5n.onnx");
std::thread t0(capture, cap);
std::thread t1(infer, net);
t0.join();
t1.join();
return 0;
}
接下来我们模拟多个模型同时推理,先给出单线程串行的程序:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
#include "yolov5.h"
int main(int argc, char* argv[])
{
cv::VideoCapture cap("test.mp4");
//cap.open(0);
cv::dnn::Net net1 = cv::dnn::readNet("yolov5n.onnx");
cv::dnn::Net net2 = cv::dnn::readNet("yolov5s.onnx");
cv::Mat frame;
while (true)
{
clock_t start = clock();
cap.read(frame);
if (frame.empty())
break;
cv::Mat input_image = frame, blob;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs1, network_outputs2;
process(blob, net1, network_outputs1);
process(blob, net2, network_outputs2);
cv::Mat output_image1, output_image2;
post_process(input_image, output_image1, network_outputs1);
post_process(input_image, output_image2, network_outputs2);
clock_t end = clock();
std::cout << end - start << "ms" << std::endl;
cv::imshow("result1", output_image1);
cv::imshow("result2", output_image2);
cv::waitKey(1);
}
return 0;
}
输出结果:
infer1+infer2:191ms
infer1+infer2:142ms
infer1+infer2:134ms
infer1+infer2:130ms
infer1+infer2:129ms
infer1+infer2:124ms
infer1+infer2:124ms
infer1+infer2:121ms
infer1+infer2:124ms
infer1+infer2:122ms
...
多线程并行的写法修改如下:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
#include "yolov5.h"
struct Job
{
cv::Mat input_image;
std::shared_ptr<std::promise<cv::Mat>> output_image;
};
std::queue<Job> jobs1,jobs2;
std::mutex lock1, lock2;
std::condition_variable cv1, cv2;
const int limit = 10;
void capture(cv::VideoCapture cap)
{
while (true)
{
Job job1, job2;
cv::Mat frame;
clock_t start = clock();
cap.read(frame);
if (frame.empty())
break;
{
std::unique_lock<std::mutex> l1(lock1);
cv1.wait(l1, [&]() { return jobs1.size() < limit; });
job1.input_image = frame;
job1.output_image.reset(new std::promise<cv::Mat>());
jobs1.push(job1);
}
{
std::unique_lock<std::mutex> l2(lock2);
cv1.wait(l2, [&]() { return jobs2.size() < limit; });
job2.input_image = frame;
job2.output_image.reset(new std::promise<cv::Mat>());
jobs2.push(job2);
}
cv::Mat result1 = job1.output_image->get_future().get();
cv::Mat result2 = job2.output_image->get_future().get();
clock_t end = clock();
std::cout <<"capture: "<< end - start << "ms" << std::endl;
cv::imshow("result1", result1);
cv::imshow("result2", result2);
cv::waitKey(1);
}
}
void infer1(cv::dnn::Net net)
{
while (true)
{
if (!jobs1.empty())
{
clock_t start = clock();
std::lock_guard<std::mutex> l1(lock1);
auto job = jobs1.front();
jobs1.pop();
cv1.notify_all();
cv::Mat input_image = job.input_image, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
job.output_image->set_value(output_image);
clock_t end = clock();
std::cout << "infer1: " << end - start << "ms" << std::endl;
}
}
}
void infer2(cv::dnn::Net net)
{
while (true)
{
if (!jobs2.empty())
{
clock_t start = clock();
std::lock_guard<std::mutex> l2(lock2);
auto job = jobs2.front();
jobs2.pop();
cv2.notify_all();
cv::Mat input_image = job.input_image, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
job.output_image->set_value(output_image);
clock_t end = clock();
std::cout << "infer2: " << end - start << "ms" << std::endl;
}
}
}
int main(int argc, char* argv[])
{
cv::VideoCapture cap("test.mp4");
//cap.open(0);
cv::dnn::Net net1 = cv::dnn::readNet("yolov5n.onnx");
cv::dnn::Net net2 = cv::dnn::readNet("yolov5s.onnx");
std::thread t0(capture, cap);
std::thread t1(infer1, net1);
std::thread t2(infer2, net2);
t0.join();
t1.join();
t2.join();
return 0;
}
输出:
infer1: 98ms
infer2: 136mscapture: 155ms
infer1: 80ms
infer2: 110ms
capture: 113ms
infer1: 92ms
infer2: 101mscapture: 103ms
infer1: 85ms
infer2: 97ms
capture: 100ms
infer1: 85ms
infer2: 100mscapture: 102ms
...
上面的程序还有一点小问题:视频播放完时程序无法正常退出。继续修正如下:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
#include "yolov5.h"
struct Job
{
cv::Mat input_image;
std::shared_ptr<std::promise<cv::Mat>> output_image;
};
std::queue<Job> jobs1,jobs2;
std::mutex lock1, lock2;
std::condition_variable cv1, cv2;
const int limit = 10;
bool stop = false;
void capture(cv::VideoCapture cap)
{
while (true)
{
Job job1, job2;
cv::Mat frame;
clock_t start = clock();
cap.read(frame);
if (frame.empty())
{
stop = true;
break;
}
{
std::unique_lock<std::mutex> l1(lock1);
cv1.wait(l1, [&]() { return jobs1.size()<limit; });
job1.input_image = frame;
job1.output_image.reset(new std::promise<cv::Mat>());
jobs1.push(job1);
}
{
std::unique_lock<std::mutex> l2(lock2);
cv1.wait(l2, [&]() { return jobs2.size() < limit; });
job2.input_image = frame;
job2.output_image.reset(new std::promise<cv::Mat>());
jobs2.push(job2);
}
cv::Mat result1 = job1.output_image->get_future().get();
cv::Mat result2 = job2.output_image->get_future().get();
clock_t end = clock();
std::cout <<"capture: "<< end - start << "ms" << std::endl;
cv::imshow("result1", result1);
cv::imshow("result2", result2);
cv::waitKey(1);
}
}
void infer1(cv::dnn::Net net)
{
while (true)
{
if (stop)
break; //不加程序无法退出
if (!jobs1.empty())
{
clock_t start = clock();
std::lock_guard<std::mutex> l1(lock1);
auto job = jobs1.front();
jobs1.pop();
cv1.notify_all();
cv::Mat input_image = job.input_image, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
job.output_image->set_value(output_image);
clock_t end = clock();
std::cout << "infer1: " << end - start << "ms" << std::endl;
}
std::this_thread::yield(); //不加程序无法退出
}
}
void infer2(cv::dnn::Net net)
{
while (true)
{
if (stop)
break; //不加程序无法退出
if (!jobs2.empty())
{
clock_t start = clock();
std::lock_guard<std::mutex> l2(lock2);
auto job = jobs2.front();
jobs2.pop();
cv2.notify_all();
cv::Mat input_image = job.input_image, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
job.output_image->set_value(output_image);
clock_t end = clock();
std::cout << "infer2: " << end - start << "ms" << std::endl;
}
std::this_thread::yield(); //不加程序无法退出
}
}
int main(int argc, char* argv[])
{
cv::VideoCapture cap("test.mp4");
cv::dnn::Net net1 = cv::dnn::readNet("yolov5n.onnx");
cv::dnn::Net net2 = cv::dnn::readNet("yolov5s.onnx");
std::thread t0(capture, cap);
std::thread t1(infer1, net1);
std::thread t2(infer2, net2);
t0.join();
t1.join();
t2.join();
return 0;
}
多个视频不同模型同时推理:
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <windows.h>
#include "yolov5.h"
bool stop = false;
void print_time(std::string video)
{
auto now = std::chrono::system_clock::now();
uint64_t dis_millseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count()
- std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count() * 1000;
time_t tt = std::chrono::system_clock::to_time_t(now);
auto time_tm = localtime(&tt);
char time[100] = { 0 };
sprintf(time, "%d-%02d-%02d %02d:%02d:%02d %03d", time_tm->tm_year + 1900,
time_tm->tm_mon + 1, time_tm->tm_mday, time_tm->tm_hour,
time_tm->tm_min, time_tm->tm_sec, (int)dis_millseconds);
std::cout << "infer " << video << " 当前时间为:" << time << std::endl;
}
void capture(std::string video, cv::dnn::Net net)
{
cv::VideoCapture cap(video);
while (cv::waitKey(1) < 0)
{
cv::Mat frame;
cap.read(frame);
if (frame.empty())
{
stop = true;
break;
}
cv::Mat input_image = frame, blob, output_image;
pre_process(input_image, blob);
std::vector<cv::Mat> network_outputs;
process(blob, net, network_outputs);
post_process(input_image, output_image, network_outputs);
print_time(video);
cv::imshow(video, output_image);
}
}
int main(int argc, char* argv[])
{
std::string video1("test1.mp4");
std::string video2("test2.mp4");
cv::dnn::Net net1 = cv::dnn::readNet("yolov5n.onnx");
cv::dnn::Net net2 = cv::dnn::readNet("yolov5s.onnx");
std::thread t1(capture, video1, net1);
std::thread t2(capture, video2, net2);
t1.join();
t2.join();
return 0;
}
推理效果如下: