作為一個分布式異步計算框架,Celery雖然常用于Web框架中,但也可以單獨使用。 雖然常規搭配的消息隊列是RabbitMQ,但是由于某些情況下系統已經包含了Redis,那就可以復用。
以下撇開Web框架,介紹基于Redis配置Celery任務的方法。
pip install celery[redis]
項目結構
其中,main.py是觸發Task的業務代碼。當然,文件名可以隨意改。celery.py是Celery的app定義的位置,tasks.py是Task定義的位置,文件名不建議修改。
配置Celery
在celery.py中寫入如下代碼:
其中,REDIS_URL從同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。這里既用Redis來當broker,又用來當backend。即,既當消息隊列,又當結果反饋的數據庫(默認僅保存1天)。
在include=,需要填一個下游worker的包名列表。這里選擇了同一個包的tasks.py文件。
額外設置的task_track_started,是命令Worker反饋STARTED狀態。默認情況下,是無法知道任務什么時候開始執行的。
編寫任務并調用
在tasks.py文件中,添加異步任務的實現。
在需要發起任務的地方,用.apply_async可以觸發異步調用。即,實際只是向消息隊列發送消息,真正的執行操作在遠程。
運行Worker:
celery -A your_project worker
運行原理
一次Task從觸發到完成,序列圖如下:
其中,main代表業務代碼主進程。它可能是Django、Flask這類Web服務,也可能是一個其它類型的進程。worker就是指Celery的Worker。
main發送消息后,會得到一個AsyncResult,其中包含task_id。僅通過task_id,也可以自己構造一個AsyncResult,查詢相關信息。其中,代表運行過程的,主要是state。
worker會持續保持對Redis(或其它消息隊列,如RabbitMQ)的關注,查詢新的消息。如果獲得新消息,將其消費后,開始運行do_sth。運行完成會把返回值對應的結果,以及一些運行信息,回寫到Redis(或其它backend,如Django數據庫等)上。在系統的任何地方,通過對應的AsyncResult(task_id)就可以查詢到結果。
Celery Task的狀態
以下是狀態圖:
其中,除SUCCESS外,還有失敗(FAILURE)、取消(REVOKED)兩個結束狀態。而RETRY則是在設置了重試機制后,進入的臨時等待狀態。
另外,如果保存在Redis的結果信息被清理(默認僅保存1天),那么任務狀態又會變成PENDING。這在設計上是個巨大的問題,使用時要做對應容錯。
常見控制操作
有時,在業務主進程中需要等待異步運行的結果,這時需要使用wait。如果要取消一個排隊中、或已執行的任務,則可以使用revoke。即使任務已經執行完成,也可以使用revoke,但不會有任何變化。如果需要提前刪除任務記錄,可以使用forget。
責編AJX
-
Web
+關注
關注
2文章
1287瀏覽量
71385 -
分布式
+關注
關注
1文章
997瀏覽量
75392 -
Redis
+關注
關注
0文章
386瀏覽量
11440
發布評論請先 登錄
【幸狐Omni3576邊緣計算套件試用體驗】Redis最新8.0.2版本源碼安裝及性能測試
【昉·星光 2 高性能RISC-V單板計算機體驗】Redis源碼編譯和性能測試以及與樹莓派4B對比
【愛芯派 Pro 開發板試用體驗】Redis源碼編譯和基準測試
Schedule:簡單實用的 Python 周期任務調度工具
Celery Beat 的周期調度機制及實現原理
基于Django的Celery異步任務和定時任務的實戰教程

redis連接數配置多少合適
云容器redis持久化配置
【經驗分享】在Omni3576上編譯Redis-8.0.2源碼,并安裝及性能測試

評論