【導(dǎo)讀】本文介紹了 Go 移步任務(wù)隊(duì)列的實(shí)現(xiàn)。
在一些常見(jiàn)的場(chǎng)景中,如果遇到了某些請(qǐng)求特別耗時(shí)間,為了不影響其它用戶的請(qǐng)求以及節(jié)約服務(wù)器資源,我們通常會(huì)考慮使用異步任務(wù)隊(duì)列去解決,這樣可以快速地處理請(qǐng)求、只返回給用戶任務(wù)創(chuàng)建結(jié)果,等待任務(wù)完成之后,我們?cè)俑嬷脩羧蝿?wù)的完成情況。
對(duì)于 Golang,我們可以通過(guò) Worker pool 異步處理任務(wù),在大多數(shù)情況下,如果不在意數(shù)據(jù)丟失以及服務(wù)器性能足夠,我們就沒(méi)有必要考慮別的方案,畢竟這樣實(shí)現(xiàn)非常簡(jiǎn)單。
接下來(lái)我們先來(lái)說(shuō)說(shuō)如何用 Worker pool 解決異步任務(wù)的問(wèn)題。
Worker pool
Worker pool,也有叫做 Goroutine pool 的,都是指用 Go channel 以及 Goroutine 來(lái)實(shí)現(xiàn)的任務(wù)隊(duì)列。Go channel 本質(zhì)上就是一個(gè)隊(duì)列,因此用作任務(wù)隊(duì)列是很自然的。
在我們不用 Go channel 的時(shí)候,我們也許會(huì)使用這樣的方式來(lái)處理異步任務(wù):
fori:=0;i100;i++{
gofunc(){
//processjob
}()
}
這樣的方式是不推薦的,因?yàn)樵谡?qǐng)求量到達(dá)一定程度,系統(tǒng)處理不過(guò)來(lái)的時(shí)候,會(huì)造成 Goroutine 的爆炸,拖慢整個(gè)系統(tǒng)的運(yùn)行,甚至程序的崩潰,數(shù)據(jù)也就完全丟失了。
如果我們用簡(jiǎn)單的方式,可以看看接下來(lái)的例子:一個(gè)發(fā)送者(也叫做生產(chǎn)者),一個(gè)接受者(也叫做消費(fèi)者,或者 Worker):
typeJobstruct{...}
jobChan:=make(chanJob)
quit:=make(chanbool)
gofunc(){
select{
casejob:=<-jobChan:
???case<-?quit:
???return
}
}()
fori:=0;i100;i++{
jobChan<-?Job{...}
}
close(jobChan)
quit<-?true
如果 Worker 不夠,我們可以增加,這樣可以并行處理任務(wù):
fori:=0;i10;i++{
gofunc(){
forjob:=rangejobChan{
//processjob
}
}()
}
這樣,一個(gè)非常簡(jiǎn)單的 Worker pool 就完成了,只是,它對(duì)任務(wù)的處理還會(huì)有問(wèn)題,比如無(wú)法設(shè)置超時(shí)、無(wú)法處理 panic 錯(cuò)誤等。
實(shí)際上,目前已經(jīng)有很多的開(kāi)源庫(kù)可以幫你實(shí)現(xiàn)了,以worker pool為關(guān)鍵詞在 GitHub 上可以搜到一大堆:
- GitHub - Jeffail/tunny: A goroutine pool for Go
- GitHub - gammazero/workerpool: Concurrency limiting goroutine pool
那么,它們的缺點(diǎn)呢?
很明顯,它們的缺點(diǎn)就在于缺乏管理,可以說(shuō)是完全不管任務(wù)的結(jié)果,即使我們加日志輸出也只是為了簡(jiǎn)單監(jiān)控,更要命的就是進(jìn)程重啟的時(shí)候,比如進(jìn)程掛了,或者程序更新,都會(huì)導(dǎo)致數(shù)據(jù)丟失,畢竟生產(chǎn)者與消費(fèi)者在一個(gè)進(jìn)程中的時(shí)候,會(huì)互相影響(搶占 CPU 與內(nèi)存資源)。因此前面我也說(shuō)了,在不管這兩個(gè)問(wèn)題的時(shí)候,可以考慮用。
如果數(shù)據(jù)很重要(實(shí)際上,我認(rèn)為用戶上傳的業(yè)務(wù)數(shù)據(jù)都重要,不能丟失),為了解決這些問(wèn)題,我們必須換一種解決方案。
分布式異步任務(wù)隊(duì)列
接下來(lái)再說(shuō)說(shuō)異步的分布式任務(wù)隊(duì)列,要用到這個(gè)工具的時(shí)候,我們大致有以下幾個(gè)需求:
- 分布式:生產(chǎn)者與消費(fèi)者隔離;
- 數(shù)據(jù)持久化:在程序重啟的時(shí)候,不丟失已有的數(shù)據(jù);
- 任務(wù)重試:會(huì)有任務(wù)偶然失敗的場(chǎng)景,重試是最簡(jiǎn)單的方式,但需要保證任務(wù)的執(zhí)行時(shí)是冪等的;
- 任務(wù)延時(shí):延遲執(zhí)行,比如 5 分鐘后給用戶發(fā)紅包;
- 任務(wù)結(jié)果的臨時(shí)存儲(chǔ),可用于儲(chǔ)存;
- 任務(wù)處理情況監(jiān)控:及時(shí)發(fā)現(xiàn)任務(wù)執(zhí)行出錯(cuò)情況;
對(duì)于 Python 來(lái)說(shuō),有個(gè)大名鼎鼎的 Celery(https://github.com/celery/celery),它完全包含上面的功能。它包含兩個(gè)比較重要的組件:一個(gè)是消息隊(duì)列,比如 Redis/RabbitMQ 等,Celery中叫做Broker,然后還需要有數(shù)據(jù)庫(kù),用于存儲(chǔ)任務(wù)狀態(tài),叫做Result Backend。
顯然對(duì)于 Go 也有很多不錯(cuò)的開(kāi)源庫(kù),其中一個(gè)學(xué) Celery 的是 Machinery(github.com/RichardKnop/machinery),它目前能滿足大部分需求,而且一直在積極維護(hù),也是我們團(tuán)隊(duì)目前在用的。
它目前支持的 Broker 有 AMQP(RabbitMQ)、Redis、AWS SQS、GCP Pub/Sub,目前對(duì)國(guó)內(nèi)同行來(lái)說(shuō),RabbitMQ 或者 Redis 會(huì)相對(duì)比較合適。
另外它還支持幾個(gè)高級(jí)用法:
- Groups:允許你定義多個(gè)并行的任務(wù),在最后取任務(wù)結(jié)果的時(shí)候,可以一起返回;
- Chords:允許你定義一個(gè)回調(diào)任務(wù),在 Group 任務(wù)執(zhí)行完畢后執(zhí)行對(duì)應(yīng)的回調(diào)任務(wù);
- Chains:允許你定義串行執(zhí)行的任務(wù),任務(wù)將會(huì)被串行執(zhí)行;
說(shuō)了優(yōu)點(diǎn),再說(shuō)說(shuō)它的缺點(diǎn):
- 任務(wù)監(jiān)控支持不夠,目前只有分布式追蹤 opentracing 的支持,假如我要使用 prometheus,會(huì)比較困難,它的自定義錯(cuò)誤處理過(guò)于簡(jiǎn)單,連上下文都不給你;
- 傳入的參數(shù)目前只支持非常簡(jiǎn)單的參數(shù),不支持 struct、map,還得定義參數(shù)的類型,這樣的方式會(huì)將這個(gè)庫(kù)限制在 Golang 世界中,而無(wú)法拓展適用于其它語(yǔ)言;
P.S.
其實(shí)對(duì)于 Goroutine 的方案,在以下兩種情況下,可以考慮使用:
- 必須同步返回給用戶請(qǐng)求結(jié)果;
- 服務(wù)器資源足夠,僅僅用 Worker pool 就能降低請(qǐng)求的響應(yīng)時(shí)長(zhǎng)到可接受范圍;
這兩種方案都會(huì)返回請(qǐng)求結(jié)果,失敗的情況下靠客戶端重新請(qǐng)求來(lái)解決數(shù)據(jù)丟失的問(wèn)題。
原文標(biāo)題:Golang 中的異步任務(wù)隊(duì)列
文章出處:【微信公眾號(hào):Linux愛(ài)好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
-
Channel
+關(guān)注
關(guān)注
0文章
31瀏覽量
12161 -
異步
+關(guān)注
關(guān)注
0文章
62瀏覽量
18299 -
Worker
+關(guān)注
關(guān)注
0文章
8瀏覽量
6587
原文標(biāo)題:Golang 中的異步任務(wù)隊(duì)列
文章出處:【微信號(hào):LinuxHub,微信公眾號(hào):Linux愛(ài)好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
Spring Boot如何實(shí)現(xiàn)異步任務(wù)
鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類庫(kù)多線程TaskPool和Worker的對(duì)比(一)
鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類庫(kù)多線程TaskPool和Worker的對(duì)比(二)
鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類庫(kù)多線程TaskPool和Worker的對(duì)比(三)
TaskPool和Worker的對(duì)比分析
CPU密集型任務(wù)開(kāi)發(fā)指導(dǎo)
同步任務(wù)開(kāi)發(fā)指導(dǎo)
HarmonyOS CPU與I/O密集型任務(wù)開(kāi)發(fā)指導(dǎo)
如何用VxWorks的信號(hào)量機(jī)制實(shí)現(xiàn)任務(wù)同步
Android異步任務(wù)處理
詳解移動(dòng)通信領(lǐng)域里的組POOL

normal worker_pool詳細(xì)的創(chuàng)建過(guò)程代碼分析

為何需要CMWQ?CMWQ如何解決問(wèn)題的呢?
鴻蒙語(yǔ)言基礎(chǔ)類庫(kù):ohos.worker 啟動(dòng)一個(gè)Worker

評(píng)論