Qt中多线程下载大文件
#pragma once
#include <QWidget>
#include <QPushButton>
#include "ThreadPool.h"
#include <QProgressBar>
#include <QLabel>
#include <QHBoxLayout>
#include <QVBoxLayout>
class MainWindow : public QWidget
{
Q_OBJECT
public:
MainWindow(QWidget *parent = Q_NULLPTR);
private:
void initUI();
void initConnect();
private:
QPushButton* m_dlBtn;
DownLoad::ThreadPool threadPool;
QMap<QString ,std::pair<QLabel*,QProgressBar*> > controlMap;
};
#include "MainWindow.h"
#include "Task.h"
#include "ThreadPool.h"
MainWindow::MainWindow(QWidget *parent)
: QWidget(parent)
{
initUI();
initConnect();
}
void MainWindow::initUI()
{
m_dlBtn = new QPushButton(this);
m_dlBtn->setText(QString("DownLoad"));
QVBoxLayout* layout = new QVBoxLayout();
layout->setSpacing(10);
layout->setContentsMargins(10, 10, 10, 10);
layout->addWidget(m_dlBtn);
for (int i = 0; i < 10; i++)
{
DownLoad::Task* task = new DownLoad::Task("http://mirrors.tuna.tsinghua.edu.cn/archlinux/iso/2023.12.01/archlinux-2023.12.01-x86_64.iso", QString("C:/Users/gd09861-hlw/Desktop/11111/archlinux-2023.12.01-x86_64_%1.iso").arg(i), DownLoad::Task::WorkModel::DOWNLOAD);
threadPool.push(task);
QLabel *label = new QLabel(this);
label->setText(QString("%1").arg(i));
QProgressBar *progressBar = new QProgressBar(this);
controlMap.insert(task->id(), std::make_pair(label, progressBar));
QHBoxLayout *hLayout = new QHBoxLayout;
hLayout->addWidget(label);
hLayout->addWidget(progressBar);
layout->addLayout(hLayout);
}
this->setLayout(layout);
}
void MainWindow::initConnect()
{
connect(m_dlBtn, &QPushButton::clicked, [&]() {
threadPool.startAll();
}
);
connect(&threadPool, &DownLoad::ThreadPool::sigUpdateTaskProgress, this, [&](QString id, qint64 bytesR, qint64 bytesT) {
controlMap[id].second->setValue((bytesR*100.0f) / (bytesT*1.0f));
});
connect(&threadPool, &DownLoad::ThreadPool::sigUpdateTaskState, this, [&](QString id,DownLoad::Task::State state) {
switch (state)
{
case DownLoad::Task::Start:
{
controlMap[id].second->setValue(0);
}
break;
case DownLoad::Task::Stop: {
controlMap[id].first->setText("Stop");
}
break;
case DownLoad::Task::Finish:
controlMap[id].first->setText("Finish");
break;
case DownLoad::Task::Error:
controlMap[id].first->setText("error");
break;
default:
break;
}
});
}
#ifndef __TASK_QUEUE_H__
#define __TASK_QUEUE_H__
#include <QObject>
#include <QString>
#include "Task.h"
#include <QQueue>
#include <QMutex>
namespace DownLoad {
#define DEFAULT_THREAD_MAX_COUNT 3
class ThreadPool :public QObject
{
Q_OBJECT
public:
ThreadPool();
~ThreadPool();
void init();
void push(Task *task);
Task* pop();
void startAll();
void slotUpdateTaskState(QString id, Task::State state);
signals :
void sigUpdateTaskProgress(QString id, qint64 bytesReceived, qint64 bytesTotal);
void sigUpdateTaskState(QString id, Task::State state);
private:
QQueue<Task*> m_tasks;
QList<QThread*> m_threads;
};
};
#endif
#include "ThreadPool.h"
#include <QMutexLocker>
#include <QThread>
#include "Task.h"
DownLoad::ThreadPool::ThreadPool()
{
init();
}
DownLoad::ThreadPool::~ThreadPool()
{
}
void DownLoad::ThreadPool::init()
{
for (int i = 0; i < DEFAULT_THREAD_MAX_COUNT; i++) {
QThread *thread = new QThread();
m_threads.push_back(thread);
}
}
void DownLoad::ThreadPool::push(Task *task)
{
m_tasks.enqueue(task);
}
DownLoad::Task* DownLoad::ThreadPool::pop()
{
return m_tasks.dequeue();
}
void DownLoad::ThreadPool::startAll()
{
if (m_threads.isEmpty()) {
return;
}
for (int i = 0; i < m_threads.count(); i++)
{
QThread * thread = m_threads.at(i);
if (thread->isRunning()) {
continue;
}
if (m_tasks.isEmpty()) {
return;
}
Task* task = pop();
task->moveToThread(thread);
connect(task, &Task::sigUpdateProgress, this, &ThreadPool::sigUpdateTaskProgress, Qt::QueuedConnection);
connect(task, &Task::sigUpdateState, this, &ThreadPool::slotUpdateTaskState, Qt::QueuedConnection);
connect(thread, &QThread::started, task, &Task::slotDoWork,Qt::QueuedConnection);
connect(thread, &QThread::finished, task, &Task::deleteLater);
thread->start();
}
}
void DownLoad::ThreadPool::slotUpdateTaskState(QString id, Task::State state)
{
emit sigUpdateTaskState(id, state);
if (state == DownLoad::Task::Finish || state == DownLoad::Task::Error||state==DownLoad::Task::Stop) {
startAll();
}
}
#ifndef __TASK_H__
#define __TASK_H__
#include <QObject>
#include <QNetworkRequest>
#include <QNetworkReply>
#include <QNetworkAccessManager>
#include <QSharedPointer>
#include <QUrl>
#include <QString>
#include <QEventLoop>
#include <QMetaType>
namespace DownLoad {
#define DOWNLOAD_FILE_SUFFIX ".tmp"
class Task :public QObject
{
Q_OBJECT
public:
enum WorkModel
{
UPLOAD,
DOWNLOAD,
};
enum State
{
Start,
Stop,
Finish,
Error,
};
QString id();
Task(const QString &strUrl, const QString &filePath, const WorkModel& workModel);
~Task();
void setSupportBreakPoint(bool isSupport);
QString lastError();
signals:
void sigUpdateState(QString id,State state);
void sigUpdateProgress(QString id,qint64 bytesReceived, qint64 bytesTotal);
public slots:
void slotDoWork();
void slotStopWork();
void slotCancelWork();
protected:
void removeTmpFile(const QString &filePath);
void slotUpdateProgress(qint64 bytesReceived, qint64 bytesTotal);
void slotWriteFile();
void slotFinish();
void slotError(QNetworkReply::NetworkError code);
protected:
void doDownWork();
void doUploadWork();
private:
QNetworkReply* m_reply = nullptr;
QNetworkRequest m_request;
QNetworkAccessManager m_manager;
QUrl m_url;
QString m_filePath = "";
WorkModel m_workModel = UPLOAD;
QSharedPointer<QEventLoop> m_loop;
bool m_bSupportBPoint = false;
qint64 m_bytesReceived;
qint64 m_bytesTotal;
qint64 m_bytesCurrentReceived;
QString m_error = "";
QString m_id = "";
};
};
Q_DECLARE_METATYPE(DownLoad::Task::State);
#endif //__TASK_H__
#include "Task.h"
#include <QFileInfo>
#include <QUuid>
#include <QDebug>
#include <QThread>
#include <QDir>
QString DownLoad::Task::id()
{
return m_id;
}
DownLoad::Task::Task(const QString &strUrl, const QString &filePath, const WorkModel& workModel)
:QObject(nullptr), m_url(strUrl), m_filePath(filePath), m_workModel(workModel),m_bytesTotal(0)
,m_bytesReceived(0),m_bytesCurrentReceived(0),m_bSupportBPoint(false),m_id(QUuid::createUuid().toString())
{
}
DownLoad::Task::~Task()
{
if (m_reply) {
m_reply->deleteLater();
}
}
void DownLoad::Task::setSupportBreakPoint(bool isSupport)
{
m_bSupportBPoint = isSupport;
}
QString DownLoad::Task::lastError()
{
return m_error;
}
void DownLoad::Task::slotDoWork()
{
qDebug() << "UUID:" << m_id << "TID:" << QThread::currentThreadId()<<"\t"<<m_filePath;
if (m_url.isEmpty() || m_filePath.isEmpty()) {
return;
}
switch (m_workModel)
{
case DownLoad::Task::UPLOAD:
doUploadWork();
break;
case DownLoad::Task::DOWNLOAD:
doDownWork();
break;
default:
break;
}
}
void DownLoad::Task::slotStopWork()
{
m_bytesCurrentReceived += m_bytesReceived;
if (m_reply) {
disconnect(m_reply, 0, this, 0);
m_reply->abort();
m_reply->deleteLater();
m_reply = nullptr;
this->thread()->exit();
emit sigUpdateState(m_id,State::Stop);
}
}
void DownLoad::Task::slotCancelWork()
{
slotStopWork();
m_bytesCurrentReceived = 0;
m_bytesReceived = 0;
m_bytesTotal = 0;
removeTmpFile(m_filePath + DOWNLOAD_FILE_SUFFIX);
}
void DownLoad::Task::removeTmpFile(const QString &filePath)
{
QFileInfo fileInfo(filePath);
if (fileInfo.exists()) {
QFile::remove(filePath);
}
}
void DownLoad::Task::slotUpdateProgress(qint64 bytesReceived, qint64 bytesTotal)
{
m_bytesReceived = bytesReceived;
m_bytesTotal = bytesTotal;
emit sigUpdateProgress(m_id,m_bytesReceived + m_bytesCurrentReceived, m_bytesTotal + m_bytesCurrentReceived);
}
void DownLoad::Task::slotWriteFile()
{
QFile file(m_filePath + DOWNLOAD_FILE_SUFFIX);
QDir dir=QFileInfo(m_filePath + DOWNLOAD_FILE_SUFFIX).absoluteDir();
if (!dir.exists()) {
dir.mkpath(dir.absolutePath());
}
if (file.open(QIODevice::WriteOnly | QIODevice::Append)) {
file.write(m_reply->readAll());
}
file.close();
}
void DownLoad::Task::slotFinish()
{
QVariant code = m_reply->attribute(QNetworkRequest::HttpStatusCodeAttribute);
qDebug() << "Error Code:" << code.toInt();
if (m_reply->error() == QNetworkReply::NoError) {
QFileInfo fileInfo(m_filePath + DOWNLOAD_FILE_SUFFIX);
if (fileInfo.exists()) {
QFile::rename(m_filePath+DOWNLOAD_FILE_SUFFIX,m_filePath);
this->thread()->exit();
emit sigUpdateState(m_id, State::Finish);
}
}
else {
m_error = m_reply->errorString();
this->thread()->exit();
emit sigUpdateState(m_id, State::Error);
}
}
void DownLoad::Task::slotError(QNetworkReply::NetworkError code)
{
if (code == QNetworkReply::NoError)
return;
slotStopWork();
removeTmpFile(m_filePath + DOWNLOAD_FILE_SUFFIX);
this->thread()->exit();
emit sigUpdateState(m_id, State::Error);
m_error = m_reply->errorString();
}
void DownLoad::Task::doDownWork()
{
if (m_bytesCurrentReceived <= 0) {
removeTmpFile(m_filePath + DOWNLOAD_FILE_SUFFIX);
}
QFileInfo fileInfo(m_filePath + DOWNLOAD_FILE_SUFFIX);
if (fileInfo.exists()) {
m_bytesCurrentReceived = fileInfo.size();
}
QString strUrl = m_url.toString();
m_request.setUrl(strUrl);
if (m_bSupportBPoint) {
QString strRange = QString("bytes=%1-").arg(m_bytesCurrentReceived);
m_request.setRawHeader("Range", strRange.toLatin1());
}
m_reply = m_manager.get(m_request);
connect(m_reply, &QNetworkReply::downloadProgress, this, &Task::slotUpdateProgress);
connect(m_reply, &QNetworkReply::readyRead, this, &Task::slotWriteFile);
connect(m_reply, &QNetworkReply::finished, this, &Task::slotFinish);
connect(m_reply, SIGNAL(error(QNetworkReply::NetworkError code)), this,SLOT(slotError(QNetworkReply::NetworkError code)));
}
void DownLoad::Task::doUploadWork()
{
}
上处代码运行可以看出,QThread 中,每次调用start() 时 ,都会改变线程ID
因此,QT 的线程开启就是在创建线程,只不过其中含有事件循环机制。另外对于自定义类型,必须指定队列连接。
运行后