![]()
【USparkle專欄】如果你深懷絕技,愛“搞點(diǎn)研究”,樂于分享也博采眾長(zhǎng),我們期待你的加入,讓智慧的火花碰撞交織,讓知識(shí)的傳遞生生不息!
這是侑虎科技第1917篇文章,感謝作者南京周潤發(fā)供稿。歡迎轉(zhuǎn)發(fā)分享,未經(jīng)作者授權(quán)請(qǐng)勿轉(zhuǎn)載。如果您有任何獨(dú)到的見解或者發(fā)現(xiàn)也歡迎聯(lián)系我們,一起探討。(QQ群:793972859)
作者主頁:
https://www.zhihu.com/people/xu-chen-71-65
當(dāng)有持續(xù)時(shí)間短,又比較雜的異步任務(wù)時(shí),可以使用ThreadPool,用固定數(shù)量的工作線程執(zhí)行任務(wù),不每次都創(chuàng)建新線程。UE4和UE5的線程池有很大區(qū)別,UE4線程池會(huì)真的創(chuàng)建很多線程,而UE5主要線程池底層復(fù)用了TaskGraph的線程,線程池只是邏輯上的概念。
一、創(chuàng)建線程池
線程池在FEngineLoop::PreInitPreStartupScreen函數(shù)中創(chuàng)建。
GThreadPool
類型為FQueuedLowLevelThreadPool,是UE5中的新實(shí)現(xiàn),線程數(shù)量由FPlatformMisc::NumberOfWorkerThreadsToSpawn()確定。
GIOThreadPool
類型為FQueuedThreadPool,線程數(shù)量由FPlatformMisc::NumberOfIOWorkerThreadsToSpawn()確定,Client為4,Server為2。
GBackgroundPriorityThreadPool
類型為FQueuedThreadPool,Client為2,Server為1。
GLargeThreadPool
類型為FQueuedLowLevelThreadPool,數(shù)量由FPlatformMisc::NumberOfCoresIncludingHyperthreads()確定。
二、使用線程池
雖然線程池實(shí)現(xiàn)比Runnable復(fù)雜,但使用方式也比較簡(jiǎn)單。
1. Async函數(shù)
最常見用法,Async函數(shù)可設(shè)置EAsyncExecution::ThreadPool參數(shù),指定任務(wù)在ThreadPool里執(zhí)行。
![]()
函數(shù)內(nèi)部會(huì)創(chuàng)建TAsyncQueuedWork封裝Function和Promise,然后使用AddQueuedWork接口把任務(wù)加到GThreadPool中。
AddQueuedWork是線程池最重要的接口。
![]()
2. AsyncPool函數(shù)
與Async類似,但可以指定線程池和Work優(yōu)先級(jí)。
![]()
3.手動(dòng)調(diào)用AddQueuedWork
AddQueuedWork函數(shù)只需要接受IQueuedWork作為參數(shù),TAsyncQueuedWork只是一個(gè)子類,我們可以創(chuàng)建子類,做自定義操作,這樣也能指定使用哪個(gè)線程池。
比如引擎中Encode LightMap的操作,就使用了FAsyncEncode類:
![]()
三、線程池實(shí)現(xiàn)
1.類型定義
類型定義可分為線程池,線程池線程,任務(wù)。
1. 線程池
FQueuedThreadPool:線程池基類,定義了線程池的接口。
Allocate:創(chuàng)建線程池,類型為FQueuedThreadPoolBase。
Create:創(chuàng)建若干工作線程。
AddQueuedWork:向線程池添加任務(wù)。
RetractQueuedWork:撤回任務(wù)。
AddQueuedWork和RetractQueuedWork是線程池提供給外部調(diào)用的主要接口,注意會(huì)在多線程中被調(diào)用。
FQueuedThreadPool有多種實(shí)現(xiàn):
FQueuedThreadPoolBase
最常用,線程池的基礎(chǔ)實(shí)現(xiàn),GIOThreadPool和GBackgroundPriorityThreadPool都會(huì)使用。
成員:
FThreadPoolPriorityQueue QueuedWork:待處理任務(wù)的隊(duì)列。
TArray QueuedThreads:等待接收任務(wù)的空閑線程。
TArray AllThreads:所有工作線程。
FCriticalSection* SynchQueue:保護(hù)任務(wù)隊(duì)列的CriticalSection,因?yàn)槿蝿?wù)隊(duì)列會(huì)被多線程修改。
FQueuedLowLevelThreadPool
底層線程使用TaskGraph的ThreadPool,UE5中GThreadPool的默認(rèn)實(shí)現(xiàn)。
FQueuedThreadPoolWrapper
FQueuedThreadPoolDynamicWrapper
FQueuedThreadPoolTaskGraphWrapper
2. 線程池線程
FQueuedThread:繼承自FRunnable,表示線程池中的工作線程?梢韵胂,它大部分時(shí)間都處于idle狀態(tài),當(dāng)有任務(wù)來時(shí)才工作。
成員:
DoWorkEvent:通知線程有任務(wù)要執(zhí)行的Event。
QueuedWork:當(dāng)前線程正在執(zhí)行的Work。
Thread:Runnable對(duì)應(yīng)的線程。
函數(shù):
Run:主函數(shù),可認(rèn)為是一個(gè)等待、執(zhí)行任務(wù)的循環(huán)。
DoWork:由ThreadPool調(diào)用,傳入一個(gè)任務(wù)并執(zhí)行。
3. 任務(wù)
IQueuedWork:可排隊(duì)任務(wù)的基類接口,供線程池使用。
接口:
DoThreadedWork:執(zhí)行任務(wù)。
IQueuedWork有多種實(shí)現(xiàn):
TAsyncQueuedWork
最常用,Async和AsyncPool函數(shù)中使用。
DoThreadedWork:通過SetPromise執(zhí)行任務(wù)。
FAsyncTaskBase
可操作內(nèi)容更多。
DoThreadedWork:通過Task執(zhí)行任務(wù)。
類圖如下:
![]()
常用部分已高亮顯示
2. FQueuedThreadPoolBase
線程池創(chuàng)建
FQueuedThreadPoolBase是默認(rèn)線程池,F(xiàn)QueuedThreadPool::Allocate函數(shù)中構(gòu)造。
![]()
線程池通過Create函數(shù)初始化,主要工作是創(chuàng)建InNumQueuedThreads數(shù)量的工作線程,使用FQueuedThread類封裝,并把創(chuàng)建的線程加入QueuedThreads和AllThreads容器中,QueuedThreads中存儲(chǔ)了當(dāng)前線程池中處于空閑狀態(tài)的線程。還要?jiǎng)?chuàng)建CriticalSection對(duì)象SynchQueue,用于保護(hù)對(duì)QueuedWork和QueuedThreads的訪問。
![]()
FQueuedThread
FQueuedThread繼承自FRunnable,是一個(gè)可運(yùn)行任務(wù)的抽象,其Create函數(shù)如下。首先創(chuàng)建DoWorkEvent,用于做多線程同步,然后創(chuàng)建一個(gè)底層的Thread。線程創(chuàng)建好后進(jìn)入Run方法,初始沒有任務(wù),線程在DoWorkEvent上等待,處于休眠狀態(tài)。
![]()
添加任務(wù)
觀察AddQueuedWork函數(shù),添加任務(wù)時(shí)分成了兩種情況。
如果線程池中尚有空閑線程,即下圖中的情況1,QueuedThreads中有元素,那么把任務(wù)分配給其中一個(gè)線程即可,這里還有一個(gè)細(xì)節(jié),QueuedThreads采用棧管理,先進(jìn)后出,這可以更好利用CPU Cache,因?yàn)檫@個(gè)Thread可能剛運(yùn)行過,同時(shí)也可以避免數(shù)組中的元素移動(dòng)。得到Thread后,調(diào)用DoWork方法添加任務(wù)。
另一種情況是所有線程都在忙碌,QueuedThreads中沒有元素,這時(shí)只能把InQueuedWork暫存到QueuedWork中,等線程執(zhí)行完之前任務(wù)后再做處理。
![]()
FQueuedThread::DoWork方法用于通知一個(gè)Thread要執(zhí)行任務(wù)了,首先把InQueuedWork設(shè)置到其QueuedWork屬性上,然后執(zhí)行DoWorkEvent的Trigger方法,喚醒該Thread。注意這里加了一個(gè)MemoryBarrier,是為了避免CPU指令亂序優(yōu)化導(dǎo)致1071行在1074行之后執(zhí)行,導(dǎo)致錯(cuò)誤。
![]()
執(zhí)行任務(wù)
執(zhí)行任務(wù)通過屬性的Run函數(shù)實(shí)現(xiàn)。Thread一開始會(huì)在DoWorkEvent上等待,被DoWork函數(shù)喚醒后,會(huì)獲取之前被賦值的QueuedWork,執(zhí)行DoThreadedWork函數(shù),這里是真正執(zhí)行任務(wù)。執(zhí)行完成后再調(diào)用ThreadPool的ReturnToPoolOrGetNextJob函數(shù),嘗試獲取暫存的QueuedWork并執(zhí)行,若沒有就把Thread歸還到QueuedThreads中,之后在DoWorkEvent上等待,進(jìn)入休眠狀態(tài)。
![]()
![]()
流程圖示:
![]()
3. TAsyncQueuedWork
線程池中的任務(wù),包裝了一個(gè)Function對(duì)象,DoThreadWork函數(shù)中使用給Promise SetValue的形式來執(zhí)行Function。
![]()
以上就是UE線程池常用的FQueuedThreadPoolBase,F(xiàn)QueuedThread,TAsyncQueuedWork組合。
以下內(nèi)容是UE5的改動(dòng)。
4. FQueuedLowLevelThreadPool
在UE5中,非Editor模式下GThreadPool實(shí)現(xiàn)變成了FQueuedLowLevelThreadPool。底層使用了TaskGraph,相關(guān)內(nèi)容放在后面看,這里只分析與線程池相關(guān)的部分。
UE希望把多線程操作盡量放在TaskGraph里,這樣好管理。CPU物理核心數(shù)量是有限的,如果TaskGraph和ThreadPool都創(chuàng)建了核心數(shù)量的線程,其實(shí)在各自管理,兩邊線程都跑滿就會(huì)產(chǎn)生更多的CPU調(diào)度開銷。
Create
其實(shí)不需要Create了,因?yàn)樽约翰粍?chuàng)建線程,初始化在構(gòu)造函數(shù)里完成,主要任務(wù)是獲取LowLevelTasks::FScheduler單例。
![]()
FQueuedThreadPool::Create只是實(shí)現(xiàn)一下純虛函數(shù)。
![]()
LowLevelTasks::Fscheduler管理了TaskGraph中的Workers線程,包括ForegroundWorkers和BackgroundWorkers,向Worker線程分發(fā)任務(wù),細(xì)節(jié)后面再看。
5. AddQueuedWork
![]()
首先創(chuàng)建FQueuedWorkInternalData對(duì)象來存儲(chǔ)QueuedWork相關(guān)數(shù)據(jù),然后設(shè)置到InQueuedWork.InternalData屬性。
FQueuedWorkInternalData類包裝了一個(gè)LowLevelTasks::FTask,F(xiàn)Task用于把QueuedWork包裝成TaskGraph里可執(zhí)行的東西。Retract函數(shù)用于取消任務(wù),但線程池場(chǎng)景下不需要考慮取消。
![]()
Task.Init函數(shù)調(diào)用有點(diǎn)繞,464行先把InQueuedWork包裝成一個(gè)Lambda函數(shù),然后在Init實(shí)現(xiàn)里面再把Lambda包裝到另一個(gè)TFunction里面。這樣就把InQueuedWork存到Task里面了,往后操作只和TaskGraph有關(guān),與線程池?zé)o關(guān)了。
![]()
FScheduler::TryLaunch把Task添加到任務(wù)隊(duì)列中,等待Worker線程來消費(fèi)。
![]()
6. 執(zhí)行任務(wù)
TaskGraph中Worker線程的Run函數(shù)會(huì)循環(huán)獲取任務(wù)執(zhí)行,細(xì)節(jié)放后面TashGraph里看,這里只看一個(gè)調(diào)用棧。
下圖中1的位置是Worker線程取Task,2的位置是執(zhí)行InQueuedWork->DoThreadedWork(),終于又回到了線程池。
![]()
總體來看,F(xiàn)QueuedLowLevelThreadPool其實(shí)就是TaskGraph,和Async函數(shù)中傳EAsyncExecution::TaskGraph是一個(gè)效果。
7. FQueuedThreadPoolWrapper
不是真正的線程池,而是另一個(gè)線程池的包裝,任務(wù)都會(huì)轉(zhuǎn)發(fā)過去。UE5 Editor下GThreadPool就會(huì)設(shè)置成這個(gè),包裝了GLargeThreadPool,目的為共用GLargeThreadPool中的線程,類似FQueuedLowLevelThreadPool共用TaskGraph的線程,因?yàn)镋ditor下后臺(tái)任務(wù)更多,因此單獨(dú)使用了GLargeThreadPool。這么做的目的還是減少線程創(chuàng)建。
![]()
主要成員
FQueuedThreadPool* WrappedQueuedThreadPool; 包裝的ThreadPool。
TArray WorkPool; Work集合。
TMap ScheduledWork; 當(dāng)前正在被執(zhí)行的Work。
std::atomic MaxConcurrency; 最多允許多少Work在后臺(tái)線程池中運(yùn)行。
std::atomic CurrentConcurrency; 當(dāng)前在后臺(tái)線程池中運(yùn)行的Work。
FScheduledWork
成員中出現(xiàn)了FScheduledWork類型,它是一個(gè)容器,存儲(chǔ)了真正的IQueuedWork,同時(shí)也是IQueuedWork的子類,有DoThreadedWork接口。
![]()
其中128行執(zhí)行了異步任務(wù),131行通知FQueuedThreadPoolWrapper任務(wù)執(zhí)行完,可調(diào)度下個(gè)任務(wù),會(huì)在下面介紹。
初始化
構(gòu)造函數(shù)如下,主要接受一個(gè)線程池作為后臺(tái)線程池,InMaxConcurrency表示最多同時(shí)在后臺(tái)線程池中執(zhí)行多少個(gè)任務(wù)。
![]()
AddQueuedWork
AddQueuedWork首先把任務(wù)加到QueuedWork中,然后執(zhí)行Schedule函數(shù),默認(rèn)參數(shù)為空。
![]()
Schedule函數(shù)最重要的是下面幾行。首先從QueuedWork中獲取要執(zhí)行的任務(wù),然后遞增CurrentConcurrency。接著通過AllocateWork獲取一個(gè)FScheduledWork對(duì)象,并把InnerWork封裝在里面,然后把FScheduledWork交給后臺(tái)線程池運(yùn)行。
WorkPool容器就緩存了已創(chuàng)建的FScheduledWork對(duì)象,AllocateWork會(huì)首先從中獲取,沒有再創(chuàng)建,避免性能上的浪費(fèi)。
![]()
執(zhí)行
FScheduledWork執(zhí)行完DoThreadedWork后,會(huì)調(diào)用Release,繼續(xù)讓線程池執(zhí)行剩余任務(wù),并把自己重置,加入WorkPool中,等待下次使用。
![]()
圖示如下:
![]()
文末,再次感謝南京周潤發(fā) 的分享, 作者主頁:https://www.zhihu.com/people/xu-chen-71-65, 如果您有任何獨(dú)到的見解或者發(fā)現(xiàn)也歡迎聯(lián)系我們,一起探討。(QQ群: 793972859 )。
近期精彩回顧
特別聲明:以上內(nèi)容(如有圖片或視頻亦包括在內(nèi))為自媒體平臺(tái)“網(wǎng)易號(hào)”用戶上傳并發(fā)布,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.