目录
概念
Async函数应用案例
概念
Async
函数的原型如下
template<typename TFunction>
auto Async(EAsyncExecution::Type ExecutionType, TFunction&& Function) -> decltype(Function);
Async
函数是一个模板函数,接受两个主要参数:
(1)EAsyncExecution::Type ExecutionType
:用于指定任务执行的方式或线程相关的属性。可指定如下参数:
/** Execute in Task Graph (for short running tasks). */
TaskGraph,
/** Execute in Task Graph on the main thread (for short running tasks). */
TaskGraphMainThread,
/** Execute in separate thread if supported (for long running tasks). */
Thread,
/** Execute in separate thread if supported or supported post fork (see FForkProcessHelper::CreateThreadIfForkSafe) (for long running tasks). */
ThreadIfForkSafe,
/** Execute in global queued thread pool. */
ThreadPool,
(2)TFunction&& Function
:这是一个可调用对象,通常可以是函数指针、lambda 表达式等,定义了具体要执行的任务逻辑。lambda表达式示例如下:
Async(EAsyncExecution::Thread, []() {
// 这里放置需要在单独线程中执行的任务逻辑,比如执行一些耗时计算
int sum = 0;
for (int i = 0; i < 1000; i++)
{
sum += i;
}
});
Async函数应用案例
同时提交100个耗时任务。
如下代码所示,通过循环 100 次调用 AsyncTask
向 ENamedThreads::AnyThread
提交异步任务,每个任务只是执行 FPlatformProcess::Sleep(2)
,也就是让执行该任务的线程休眠 2 秒。
通过执行结果可以看到,执行任务的线程有很多重复的线程。由于提交了大量这样的任务,会使得线程池中大量的线程被占用并处于阻塞休眠状态。在这 2 秒内,这些线程无法去执行其他任务,而如果此时还有其他需要线程资源来执行的任务,就没办法获取到可用线程,进而导致整个程序的运行像是卡住了一样,无法顺利推进后续业务逻辑。
改进:
通过循环 100 次调用 Async
函数提交异步任务,每个任务执行 FPlatformProcess::Sleep(10)
让线程休眠 10 秒,但虚幻引擎内部有一套相对复杂且智能的线程调度机制。在这种机制下,它会尝试合理地分配线程资源来执行这些任务,不会像 BlockThreadPool
函数那样简单粗暴地让大量线程同时进入阻塞休眠状态。虚幻引擎可能会根据系统当前的负载情况、线程优先级等因素,对任务进行排队或者分批次地安排线程去执行,避免一次性将所有线程资源耗尽,使得其他必要的任务依然有机会获取到线程来执行,从而保证程序整体上还能继续运行,不会出现明显的卡顿或卡住情况。
执行结果如下,可以看到执行任务的线程是100个不同的线程。
为了获取每个任务的结果,继续做如下改进:
void UThreadSubsystem::GetAsyncFuture()
{
AsyncTask(ENamedThreads::AnyThread, [this]() {
TArray<TFuture<int32>> FutureResults;
for (size_t i = 0; i < 100; i++)
{
FutureResults.AddDefaulted();
FutureResults.Last()=
Async(EAsyncExecution::Thread, [this, i]()->int32 {
int32 SleepTime = i % 5;
FPlatformProcess::Sleep(SleepTime);
uint32 CurrentID = FPlatformTLS::GetCurrentThreadId();
FString CurrentThread = FThreadManager::Get().GetThreadName(CurrentID);
FString Info = FString::Printf(TEXT("-- Thread Finished -- CurrentThreadID:[%d] -- CurrentThreadName:[%s] -- SleepTime:%d"), CurrentID, *CurrentThread, SleepTime);
PrintLogInThread(Info);
return SleepTime;
});
}
PrintLogInThread(TEXT("Task construction completed"));
for (auto& TmpFuture : FutureResults)
{
int32 SleepTime = TmpFuture.Get();
PrintLogInThread(FString::FromInt(SleepTime));
}
PrintLogInThread(TEXT("Task execution completed"));
});
}
首先通过 AsyncTask
函数将一个 lambda 表达式提交到 ENamedThreads::AnyThread
所代表的任意可用线程中执行。在 lambda 表达式中捕获了当前类指针 this
,以便在这个异步任务内部能够访问类的成员函数和成员变量。
然后在 AsyncTask
函数内部创建了一个 TArray<TFuture<int32>>
类型的数组 FutureResults
,用于存放 100 个 TFuture
对象,每个对象将与一个内层异步任务相关联,后续通过这些对象来获取对应异步任务的执行结果。
在for循环中,使用 FutureResults.AddDefaulted()
向 FutureResults
数组添加一个默认初始化的 TFuture<int32>
对象,然后通过 FutureResults.Last()
获取数组中刚添加的这个默认元素,并将 Async
函数返回的 TFuture
对象赋值给它。这里的 Async
函数用于创建实际执行具体任务逻辑的内层异步任务,传入参数 EAsyncExecution::Thread
表示每个内层异步任务会在单独的线程中执行。传递的 lambda 表达式定义了具体的任务逻辑,它捕获了当前类指针 this
和循环变量 i
,并指定返回值类型为 int32
。
最后通过一个 for
循环遍历 FutureResults
数组中的每个 TFuture
对象,然后调用 TmpFuture.Get()
来获取对应的内层异步任务结果。
执行效果如下:
应用案例代码:
“ThreadSubsystem.h”
// Fill out your copyright notice in the Description page of Project Settings.
#pragma once
#include "CoreMinimal.h"
#include "SimpleRunnable.h"
#include "HAL/ThreadManager.h"
#include "ThreadSubsystem.generated.h"
UCLASS()
class STUDY_API UThreadSubsystem : public UGameInstanceSubsystem
{
GENERATED_BODY()
public:
virtual bool ShouldCreateSubsystem(UObject* Outer) const override;
virtual void Initialize(FSubsystemCollectionBase& Collection) override;
virtual void Deinitialize() override;
public:
UFUNCTION(BlueprintCallable)
void BlockThreadPool(); //阻塞主线程的测试函数
UFUNCTION(BlueprintCallable)
void InitAsync();
UFUNCTION(BlueprintCallable)
void GetAsyncFuture();
void PrintLogInThread(FString Info);
};
“ThreadSubsystem.cpp”
// Fill out your copyright notice in the Description page of Project Settings.
#include "Thread/ThreadSubsystem.h"
bool UThreadSubsystem::ShouldCreateSubsystem(UObject* Outer) const
{
return true;
}
void UThreadSubsystem::Initialize(FSubsystemCollectionBase& Collection)
{
Super::Initialize(Collection);
//InitSimpleThread();
}
void UThreadSubsystem::Deinitialize()
{
//ReleaseSimpleThread();
Super::Deinitialize();
}
void UThreadSubsystem::BlockThreadPool()
{
for (size_t i = 0; i < 100; i++)
{
AsyncTask(ENamedThreads::AnyThread, []() {
FPlatformProcess::Sleep(2);
uint32 CurrentID = FPlatformTLS::GetCurrentThreadId();
FString CurrentThread = FThreadManager::Get().GetThreadName(CurrentID);
FString Info = FString::Printf(TEXT("-- Thread Finished -- CurrentThreadID:[%d] -- CurrentThreadName:[%s]"), CurrentID, *CurrentThread);
AsyncTask(ENamedThreads::GameThread, [Info]() {
UE_LOG(LogTemp, Warning, TEXT("ThreadLog:[%s]"), *Info);
});
});
}
}
void UThreadSubsystem::InitAsync()
{
for (size_t i = 0; i < 100; i++)
{
Async(EAsyncExecution::Thread, [this]() {
FPlatformProcess::Sleep(10);
uint32 CurrentID = FPlatformTLS::GetCurrentThreadId();
FString CurrentThread = FThreadManager::Get().GetThreadName(CurrentID);
FString Info = FString::Printf(TEXT("-- Thread Finished -- CurrentThreadID:[%d] -- CurrentThreadName:[%s]"), CurrentID, *CurrentThread);
PrintLogInThread(Info);
});
}
}
void UThreadSubsystem::GetAsyncFuture()
{
AsyncTask(ENamedThreads::AnyThread, [this]() {
TArray<TFuture<int32>> FutureResults;
for (size_t i = 0; i < 100; i++)
{
FutureResults.AddDefaulted();
FutureResults.Last()=
Async(EAsyncExecution::Thread, [this, i]()->int32 {
int32 SleepTime = i % 5;
FPlatformProcess::Sleep(SleepTime);
uint32 CurrentID = FPlatformTLS::GetCurrentThreadId();
FString CurrentThread = FThreadManager::Get().GetThreadName(CurrentID);
FString Info = FString::Printf(TEXT("-- Thread Finished -- CurrentThreadID:[%d] -- CurrentThreadName:[%s] -- SleepTime:%d"), CurrentID, *CurrentThread, SleepTime);
PrintLogInThread(Info);
return SleepTime;
});
}
PrintLogInThread(TEXT("Task construction completed"));
for (auto& TmpFuture : FutureResults)
{
int32 SleepTime = TmpFuture.Get();
PrintLogInThread(FString::FromInt(SleepTime));
}
PrintLogInThread(TEXT("Task execution completed"));
});
}
void UThreadSubsystem::PrintLogInThread(FString Info)
{
AsyncTask(ENamedThreads::GameThread, [Info]() {
UE_LOG(LogTemp, Warning, TEXT("ThreadLog:[%s]"), *Info);
});
}