邮箱(deepin-mail)主要使用Qt框架开发,是一个有大量并行任务且需要监控进度和结果的项目,任务的优先级调整和支持取消回滚也是必不可少。Qt已经为我们提供了多种线程设计的方法,可以满足绝大部分使用场景,但是每一种方案单拎出来都不能很恰到好处的在项目中使用,本文主要对项目中的线程设计进行介绍,通过将多个Qt线程方案相结合,衍生出一种适合邮箱这类型项目的应用方法。
方案改造
为了更好的利用这个特性,需要对它进行了改造以接地气一些,我们通过使用
QThreadPool
+
QRunnable
方案中的设计思路来控制线程池任务 。
首先继承
QRunnable
创建一个类模板用于后面衍生出各式各样的任务:
template
<
typename
T
>
class
AbstractTask
:
public
QRunnable
{
public
:
QFuture
<
T
>
future
();
protected
:
inline void
reportStarted
();
inline void
setProgressRange
(
int
minimum,
int
maximum);
inline void
setProgressValueAndText
(
int
progressValue,
const
QString
&progressText =
""
);
inline void
reportResult
(
const
T
&result,
const int
&index = -
1
);
inline void
reportFinished
(
const
T
*result =
0
);
virtual void
run
() =
0
;
private
:
QFutureInterface
<
T
>
d
;
};
模板参数是每个任务想要对外提供的返回结果,可以只返回错误码和错误描述用于表示执行结果,也可以添加更多的参数比如同时将下载的文件通过future返回。不用担心额外的拷贝开销,因为另外一个让人省心的地方是,
QFutureInterface
已经通过引用计数为自己实现了隐式共享。
template
<
typename
T
>
class
QFutureInterface
:
public
QFutureInterfaceBase
{
public
:
QFutureInterface
(
State
initialState
=
NoState
)
:
QFutureInterfaceBase
(
initialState
)
{
refT
();
}
QFutureInterface
(
const
QFutureInterface
&
other
)
:
QFutureInterfaceBase
(
other
)
{
refT
();
}
~
QFutureInterface
()
{
if
(!
derefT
())
resultStoreBase
().
template
clear<
T
>();
}
...
}
QFutureInterface
通过原子变量来实现引用计数,提供一个平台无关的原子操作,但并不是所有的处理器都支持
QAtomicInt
,如今国产芯片百家争鸣,如果你是特殊的架构,使用前检测一下当前处理器是否支持某个API是很重要的。使用原子变量会比使用互斥锁的方式更加简单和高效:
class
RefCount
{
public
:
inline
RefCount
(
int
r
=
0
,
int
rt
=
0
):
m_refCount
(
r
),
m_refCountT
(
rt
) {}
inline bool
ref
() {
return
m_refCount.ref(); }
inline bool
deref
() {
return
m_refCount.deref(); }
inline int
load
()
const
{
return
m_refCount.load(); }
inline bool
refT
() {
return
m_refCountT.ref(); }
inline bool
derefT
() {
return
m_refCountT.deref(); }
inline int
loadT
()
const
{
return
m_refCountT.load(); }
private
:
QAtomicInt
m_refCount
;
QAtomicInt
m_refCountT
;
};
QFutureInterface
通过
future
()方法创建出的
QFuture
都会存有一份自己的引用实例,参与了引用计数 的计算。只有当所有的
QFutureInterface
对象都被析构(包括
QFuture
中的),他们所指向的
result
()结果空间才也会释放。出于灵活同一个任务是可以返回多个
QFuture
对象分发到不同的模块以被监控的, 但在任务完成后记得重置
QFuture
以释放内存,赋值为一个
QFuture
()即可。
template
<
typename
T
>
class
QFuture
{
public
:
explicit
QFuture
(
QFutureInterface
<
T
> *
p
)
:
d
(*
p
)
{ }
mutable
QFutureInterface
<
T
>
d
;
}
template
<
typename
T
>
class
QFutureInterface
:
public
QFutureInterfaceBase
{
inline
QFuture
<
T
>
future
()
{
return
QFuture
<
T
>(
this
);
}
}
使用案例
上图是任务流转的一个简要流程,结合这个流程下面将给出项目中的一个实现,一个为自己的账号创建邮件目录的任务:
class
CreateMailFolderTask
:
public
AbstractTask
<
ResultResponse
>
{
public
:
CreateMailFolderTask
(
QSharedPointer
<
ProtocolEngine
>
engine
,
QSharedPointer
<
ProtocolCache
>
cache
);
void
run
();
void
initParam
(
const
QString
&
folderName
);
private
:
QSharedPointer
<
ProtocolEngine
>
m_engine
;
QSharedPointer
<
ProtocolCache
>
m_cache
;
QString
m_folderName
;
};
...
void
CreateMailFolderTask
::
run
()
{
setProgressRange
(
0
,
100
);
reportProgressValueAndText
(
0,"
Started
"
);
//do something
const
ResultResponse
&
result
=
m_engine
->createMailFolder
(
m_folderName
);
reportProgressValueAndText
(
50,"
Ongoing");
//different processing according to
d
.
isPaused
()
,
d
.
isCanceled
()
if
(
RESULT_SUCCESS
==
result
.
type
)
m_cache
->createFolderId
(
m_folderName
);
//do something
reportProgressValueAndText
(
100,"
Finished");
reportResult
(
result
);
reportFinished
();
}
首先按我们的模板实现一个创建目录的任务,在任务的run()方法中实现相关功能,这时候就可以根据需要自由的通过多种
report
()方法将进度、状态描述和结果抛出,以便外部可以在每个节点获取当前任务的状态,根据是否被暂停或者被取消等通过
QFuture
设置的状态来做出不同的处理,如果有必要比如在邮箱项目中,我们传递了一个
QAtomic
原子变量到任务甚至子任务中,进行更加精的控制。类中有两个成员变量
m_engine
和
m_cache
,这个是项目中用于执行邮件协议和本地缓存代码的控制类,线程安全,不做过多扩展说明。接下来是使用:
QFuture
<
ResultResponse
>
createFolder
(
const
QString
&
folderId
){
CreateMailFolderTask
*
task
=
new
CreateMailFolderTask
(
m_protocolEngine
,
m_protocolCache
);
task
->
initParam
(
folderId
);
QFuture
<
PrepareResponse
>
future
=
task
->
future
();
emit
sigNewTask
(
task
);
return
future
;
}
我们创建了一个任务,但是任务并不需要立刻开始,而是通过信号将task抛出等待处理,可以在合适的时候通过线程池
pool
->
tryStart
(
task
)来执行,可以丢在数据结构中保存下来进行优先级排序后等待唤起,还可以格式化存储到文件中保存退出等待下次应用启动后继续执行。拿到
QFuture
对象的模块立刻就能够进行监控,是否开始、是否结束和进度都可以通过
QFuture
的方法获取或使用
QFutursSynchronizer
组合监控,也可以通过
QFutureWatcher
监控
QFuture
实现被动处理,这个具体看看官方说明即可:
QFuture
represents the result of an asynchronous computation.
QFutureIterator
allows iterating through results available via
QFuture
.
QFutureWatcher
allows monitoring a
QFuture
using signals-and-slots.
QFutureSynchronizer
is a convenience class that automatically synchronizes several
QFutures.
QFutureWatcher
<
ResultResponse
>
watcher
;
watcher
.
setFuture
(
future
);
QObject
::
connect
(&
watcher
, &
QFutureWatcher
<
ResultResponse
>::
progressValueChanged
,
[ = ](
int
progressValue
) {
progressValue
;
//do something
});
任务的返回可以通过
QFuture
的
result
()方法获取,如果是逐步抛出结果的批处理任务,可以通过
results
()或者
resultAt
(int index)方法获取已取得的结果列表。
result
()的提前调用不会产生错误,它会阻塞当前的线程,等待任务完成后得到结果才会继续向下执行,也可以主动调用
waitForFinished
() 方法阻塞等待任务完成达到一样的效果。阻塞等待可以不用为了耗时极短的任务去写监控代码,也为写单元测试代码带来了非常大的便利性:
#
include
<
gtest
/
gtest
.
h
>
TEST_F
(
ut_session
,
createFolder
){
QFuture
<
ResultResponse
>
future
=
session
->
createFolder
("MyBox");
future
.
waitForFinished
();
EXPECT_TRUE
(
ResultCode
::
Success
==
future
.
result
().
code
);
}
小结
总结一下,Qt future-promise结合
QRunnable
的方案十分灵活,其实还有很多特性没有在此演
示,我们已经将它落地在邮箱项目中,接口稳定运行,取得了不错的效果。