女人自慰AV免费观看内涵网,日韩国产剧情在线观看网址,神马电影网特片网,最新一级电影欧美,在线观看亚洲欧美日韩,黄色视频在线播放免费观看,ABO涨奶期羡澄,第一导航fulione,美女主播操b

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

關于Python多進程和多線程詳解

新機器視覺 ? 來源:pythonhosted ? 2023-11-06 14:46 ? 次閱讀

在學習Python的過程中,有接觸到多線程編程相關的知識點,先前一直都沒有徹底的搞明白。今天準備花一些時間,把里面的細節盡可能的梳理清楚。

線程與進程的區別

進程(process)和線程(thread)是操作系統的基本概念,但是它們比較抽象,不容易掌握。關于多進程和多線程,教科書上最經典的一句話是“進程是資源分配的最小單位,線程是CPU調度的最小單位”。線程是程序中一個單一的順序控制流程。進程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派CPU的基本單位指運行中的程序的調度單位。在單個程序中同時運行多個線程完成不同的工作,稱為多線程。

7004dea8-7b16-11ee-939d-92fbcf53809c.png

進程和線程區別

進程是資源分配的基本單位。所有與該進程有關的資源,都被記錄在進程控制塊PCB中。以表示該進程擁有這些資源或正在使用它們。另外,進程也是搶占處理機的調度單位,它擁有一個完整的虛擬地址空間。當進程發生調度時,不同的進程擁有不同的虛擬地址空間,而同一進程內的不同線程共享同一地址空間。 與進程相對應,線程與資源分配無關,它屬于某一個進程,并與進程內的其他線程一起共享進程的資源。線程只由相關堆棧(系統棧或用戶棧)寄存器和線程控制表TCB組成。寄存器可被用來存儲線程內的局部變量,但不能存儲其他線程的相關變量。 通常在一個進程中可以包含若干個線程,它們可以利用進程所擁有的資源。在引入線程的操作系統中,通常都是把進程作為分配資源的基本單位,而把線程作為獨立運行和獨立調度的基本單位。 由于線程比進程更小,基本上不擁有系統資源,故對它的調度所付出的開銷就會小得多,能更高效的提高系統內多個程序間并發執行的程度,從而顯著提高系統資源的利用率和吞吐量。 因而近年來推出的通用操作系統都引入了線程,以便進一步提高系統的并發性,并把它視為現代操作系統的一個重要指標。

70258676-7b16-11ee-939d-92fbcf53809c.png

線程與進程的區別可以歸納為以下4點:

地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。

通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。

調度和切換:線程上下文切換比進程上下文切換要快得多。

在多線程OS中,進程不是一個可執行的實體。

多進程和多線程的比較

對比維度 多進程 多線程 總結
數據共享、同步 數據共享復雜,同步簡單 數據共享簡單,同步復雜 各有優劣
內存、CPU 占用內存多,切換復雜,CPU利用率低 占用內存少,切換簡單,CPU利用率高 線程占優
創建、銷毀、切換 復雜,速度慢 簡單,速度快 線程占優
編程、調試 編程簡單,調試簡單 編程復雜,調試復雜 進程占優
可靠性 進程間不會互相影響 一個線程掛掉將導致整個進程掛掉 進程占優
分布式 適用于多核、多機,擴展到多臺機器簡單 適合于多核 進程占優

總結,進程和線程還可以類比為火車和車廂:

線程在進程下行進(單純的車廂無法運行)

一個進程可以包含多個線程(一輛火車可以有多個車廂)

不同進程間數據很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點換乘)

同一進程下不同線程間數據很易共享(A車廂換到B車廂很容易)

進程要比線程消耗更多的計算機資源(采用多列火車相比多個車廂更耗資源)

進程間不會相互影響,一個線程掛掉將導致整個進程掛掉(一列火車不會影響到另外一列火車,但是如果一列火車上中間的一節車廂著火了,將影響到該趟火車的所有車廂)

進程可以拓展到多機,進程最多適合多核(不同火車可以開在多個軌道上,同一火車的車廂不能在行進的不同的軌道上)

進程使用的內存地址可以上鎖,即一個線程使用某些共享內存時,其他線程必須等它結束,才能使用這一塊內存。(比如火車上的洗手間)-”互斥鎖(mutex)”

進程使用的內存地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進入,如果滿了需要在門口等,等有人出來了才能進去)-“信號量(semaphore)”

Python全局解釋器鎖GIL

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),并不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。由于CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。那么CPython實現中的GIL又是什么呢?來看看官方的解釋:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Python代碼的執行由Python 虛擬機(也叫解釋器主循環,CPython版本)來控制,Python 在設計之初就考慮到要在解釋器的主循環中,同時只有一個線程在執行,即在任意時刻,只有一個線程在解釋器中運行。對Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

7036224c-7b16-11ee-939d-92fbcf53809c.png

GIL 有什么好處?簡單來說,它在單線程的情況更快,并且在和 C 庫結合時更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應用場景和優勢。另外,GIL的設計簡化了CPython的實現,使得對象模型,包括關鍵的內建類型如字典,都是隱含可以并發訪問的。鎖住全局解釋器使得比較容易的實現對多線程的支持,但也損失了多處理器主機的并行計算能力。

在多線程環境中,Python 虛擬機按以下方式執行:

設置GIL

切換到一個線程去運行

運行直至指定數量的字節碼指令,或者線程主動讓出控制(可以調用sleep(0))

把線程設置為睡眠狀態

解鎖GIL

再次重復以上所有步驟

7049d6de-7b16-11ee-939d-92fbcf53809c.png

Python3.2前,GIL的釋放邏輯是當前線程遇見IO操作或者ticks計數達到100(ticks可以看作是python自身的一個計數器,專門做用于GIL,每次釋放后歸零,這個計數可以通過 sys.setcheckinterval 來調整),進行釋放。因為計算密集型線程在釋放GIL之后又會立即去申請GIL,并且通常在其它線程還沒有調度完之前它就已經重新獲取到了GIL,就會導致一旦計算密集型線程獲得了GIL,那么它在很長一段時間內都將占據GIL,甚至一直到該線程執行結束。 Python 3.2開始使用新的GIL。新的GIL實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其他線程請求這個鎖時,當前線程就會在5毫秒后被強制釋放該鎖。該改進在單核的情況下,對于單個線程長期占用GIL的情況有所好轉。 在單核CPU上,數百次的間隔檢查才會導致一次線程切換。在多核CPU上,存在嚴重的線程顛簸(thrashing)。而每次釋放GIL鎖,線程進行鎖競爭、切換線程,會消耗資源。單核下多線程,每次釋放GIL,喚醒的那個線程都能獲取到GIL鎖,所以能夠無縫執行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會進行競爭,但GIL可能會馬上又被CPU0拿到,導致其他幾個CPU上被喚醒后的線程會醒著等待到切換時間后又進入待調度狀態,這樣會造成線程顛簸(thrashing),導致效率更低。 另外,從上面的實現機制可以推導出,Python的多線程對IO密集型代碼要比CPU密集型代碼更加友好。 針對GIL的應對措施:

使用更高版本Python(對GIL機制進行了優化)

使用多進程替換多線程(多進程之間沒有GIL,但是進程本身的資源消耗較多)

指定cpu運行線程(使用affinity模塊)

使用Jython、IronPython等無GIL解釋器

全IO密集型任務時才使用多線程

使用協程(高效的單線程模式,也稱微線程;通常與多進程配合使用)

將關鍵組件用C/C++編寫為Python擴展,通過ctypes使Python程序直接調用C語言編譯的動態鏈接庫的導出函數。(with nogil調出GIL限制)

Python的多進程包multiprocessing

Python的threading包主要運用多線程的開發,但由于GIL的存在,Python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,大部分情況需要使用多進程。在Python 2.6版本的時候引入了multiprocessing包,它完整的復制了一套threading所提供的接口方便遷移。唯一的不同就是它使用了多進程而不是多線程。每個進程有自己的獨立的GIL,因此也不會出現進程之間的GIL爭搶。 借助這個multiprocessing,你可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

Multiprocessing產生的背景

除了應對Python的GIL以外,產生multiprocessing的另外一個原因時Windows操作系統與Linux/Unix系統的不一致。 Unix/Linux操作系統提供了一個fork()系統調用,它非常特殊。普通的函數,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(父進程)復制了一份(子進程),然后,分別在父進程和子進程內返回。子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getpid()就可以拿到父進程的ID。 Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:

importos

print('Process(%s)start...'%os.getpid())

#OnlyworksonUnix/Linux/Mac:

pid=os.fork()

ifpid==0:

print('Iamchildprocess(%s)andmyparentis%s.'%(os.getpid(),os.getppid()))

else:

print('I(%s)justcreatedachildprocess(%s).'%(os.getpid(),pid))

上述代碼在Linux、Unix和Mac上的執行結果為:

Process(876)start...

I(876)justcreatedachildprocess(877).

Iamchildprocess(877)andmyparentis876.
有了fork調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。 由于Windows沒有fork調用,上面的代碼在Windows上無法運行。由于Python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。multiprocessing模塊封裝了fork()調用,使我們不需要關注fork()的細節。由于Windows沒有fork調用,因此,multiprocessing需要“模擬”出fork的效果。

multiprocessing常用組件及功能

7057ecce-7b16-11ee-939d-92fbcf53809c.jpg

創建管理進程模塊:

Process(用于創建進程)

Pool(用于創建管理進程池)

Queue(用于進程通信,資源共享)

Value,Array(用于進程通信,資源共享)

Pipe(用于管道通信)

Manager(用于資源共享)

同步子進程模塊:

Condition(條件變量)

Event(事件)

Lock(互斥鎖)

RLock(可重入的互斥鎖(同一個進程可以多次獲得它,同時不會造成阻塞)

Semaphore(信號量)

接下來就一起來學習下每個組件及功能的具體使用方法。

Process(用于創建進程)

multiprocessing模塊提供了一個Process類來代表一個進程對象。

在multiprocessing中,每一個進程都用一個Process類來表示。

構造方法:Process([group [, target [, name [, args [, kwargs]]]]])

group:分組,實際上不使用,值始終為None

target:表示調用對象,即子進程要執行的任務,你可以傳入方法名

name:為子進程設定名稱

args:要傳給target函數的位置參數,以元組方式進行傳入。

kwargs:要傳給target函數的字典參數,以字典方式進行傳入。

實例方法:

start():啟動進程,并調用該子進程中的p.run()

run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法

terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

is_alive():返回進程是否在運行。如果p仍然運行,返回True

join([timeout]):進程同步,主進程等待子進程完成后再執行后面的代碼。線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。timeout是可選的超時時間(超過這個時間,父線程不再等待子線程,繼續往下執行),需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性介紹:

daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程;當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創建自己的新進程;必須在p.start()之前設置

name:進程的名稱

pid:進程的pid

exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)

authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)

frommultiprocessingimportProcess

importos

defrun_proc(name):

print('Runchildprocess%s(%s)...'%(name,os.getpid()))

if__name__=='__main__':

print('Parentprocess%s.'%os.getpid())

p=Process(target=run_proc,args=('test',))

print('Childprocesswillstart.')

p.start()

p.join()

print('Childprocessend.')

Pool(用于創建管理進程池)

7062dfda-7b16-11ee-939d-92fbcf53809c.png

Pool類用于需要執行的目標很多,而手動限制進程數量又太繁瑣時,如果目標少且不用控制進程數量則可以用Process類。Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。 構造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :要創建的進程數,如果省略,將默認使用cpu_count()返回的數量。

initializer:每個工作進程啟動時要執行的可調用對象,默認為None。如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。

initargs:是要傳給initializer的參數組。

maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。

context: 用在制定工作進程啟動時的上下文,一般使用Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。

實例方法:

apply(func[, args[, kwargs]]):在一個池工作進程中執行func(args,*kwargs),然后返回結果。需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()。它是阻塞的。apply很少使用

apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個池工作進程中執行func(args,*kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。它是非阻塞。

map(func, iterable[, chunksize=None]):Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

map_async(func, iterable[, chunksize=None]):map_async與map的關系同apply與apply_async

imap():imap 與 map的區別是,map是當所有的進程都已經執行完了,并將結果返回了,imap()則是立即返回一個iterable可迭代對象。

imap_unordered():不保證返回的結果順序與進程添加的順序一致。

close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成。

join():等待所有工作進程退出。此方法只能在close()或teminate()之后調用,讓其不再接受新的Process。

terminate():結束工作進程,不再處理未處理的任務。

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:

get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發異常。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。

ready():如果調用完成,返回True

successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常

wait([timeout]):等待結果變為可用。

terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

使用示例:

#-*-coding:utf-8-*-

#Pool+map

frommultiprocessing importPool

deftest(i):

print(i)

if__name__=="__main__":

lists=range(100)

pool=Pool(8)

pool.map(test,lists)

pool.close()

pool.join()
#-*-coding:utf-8-*-

#異步進程池(非阻塞)

frommultiprocessingimportPool

deftest(i):

print(i)

if__name__=="__main__":

pool=Pool(8)

foriinrange(100):

'''

 For循環中執行步驟:

(1)循環遍歷,將100個子進程添加到進程池(相對父進程會阻塞)

(2)每次執行8個子進程,等一個子進程執行完后,立馬啟動新的子進程。(相對父進程不阻塞)

 apply_async為異步進程池寫法。異步指的是啟動子進程的過程,與父進程本身的執行(print)是異步的,而For循環中往進程池添加子進程的過程,與父進程本身的執行卻是同步的。

'''

pool.apply_async(test,args=(i,))#維持執行的進程總數為8,當一個進程執行完后啟動一個新進程.

print("test")

pool.close()

pool.join()
#-*-coding:utf-8-*-

#異步進程池(非阻塞)

frommultiprocessingimportPool

deftest(i):

print(i)

if__name__=="__main__":

pool=Pool(8)

foriinrange(100):

'''

實際測試發現,for循環內部執行步驟:

(1)遍歷100個可迭代對象,往進程池放一個子進程

(2)執行這個子進程,等子進程執行完畢,再往進程池放一個子進程,再執行。(同時只執行一個子進程)

 for循環執行完畢,再執行print函數。

'''

pool.apply(test,args=(i,))#維持執行的進程總數為8,當一個進程執行完后啟動一個新進程.

print("test")

pool.close()

pool.join()

Queue(用于進程通信,資源共享)

在使用多進程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進程所共享的,只有通過Multiprocessing組件構造的數據結構可以被共享。 Queue是用來創建進程間資源共享的隊列的類,使用Queue可以達到多進程間數據傳遞的功能(缺點:只適用Process類,不能在Pool進程池中使用)。 構造方法:Queue([maxsize])

maxsize是隊列中允許最大項數,省略則無大小限制。

實例方法:

put():用以插入數據到隊列。put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。

get():可以從隊列讀取并且刪除一個元素。get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。若不希望在empty的時候拋出異常,令blocked為True或者參數全部置空即可。

get_nowait():同q.get(False)

put_nowait():同q.put(False)

empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。

full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。

qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

使用示例:

frommultiprocessingimportProcess,Queue

importos,time,random

defwrite(q):

print('Processtowrite:%s'%os.getpid())

forvaluein['A','B','C']:

print('Put%stoqueue...'%value)

q.put(value)

time.sleep(random.random())

defread(q):

print('Processtoread:%s'%os.getpid())

whileTrue:

value=q.get(True)

print('Get%sfromqueue.'%value)

if__name__=="__main__":

q=Queue()

pw=Process(target=write,args=(q,))

pr=Process(target=read,args=(q,))

pw.start()

pr.start()

pw.join()#等待pw結束

pr.terminate()#pr進程里是死循環,無法等待其結束,只能強行終止
JoinableQueue就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 構造方法:JoinableQueue([maxsize])

maxsize:隊列中允許最大項數,省略則無大小限制。

實例方法 JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常

join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

使用示例:

#-*-coding:utf-8-*-

frommultiprocessingimportProcess,JoinableQueue

importtime,random

defconsumer(q):

whileTrue:

res=q.get()

print('消費者拿到了%s'%res)

q.task_done()

defproducer(seq,q):

foriteminseq:

time.sleep(random.randrange(1,2))

q.put(item)

print('生產者做好了%s'%item)

q.join()

if__name__=="__main__":

q=JoinableQueue()

seq=('產品%s'%iforiinrange(5))

p=Process(target=consumer,args=(q,))

p.daemon=True#設置為守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素

p.start()

producer(seq,q)

print('主線程')

Value,Array(用于進程通信,資源共享)

multiprocessing 中Value和Array的實現原理都是在共享內存中創建ctypes()對象來達到共享數據的目的,兩者實現方法大同小異,只是選用不同的ctypes數據類型而已。 Value 構造方法:Value((typecode_or_type, args[, lock])

typecode_or_type:定義ctypes()對象的類型,可以傳Type code或 C Type,具體對照表見下文。

args:傳遞給typecode_or_type構造函數的參數

lock:默認為True,創建一個互斥鎖來限制對Value對象的訪問,如果傳入一個鎖,如Lock或RLock的實例,將用于同步。如果傳入False,Value的實例就不會被鎖保護,它將不是進程安全的。

typecode_or_type支持的類型:

|Typecode|CType|PythonType|Minimumsizeinbytes|

|---------|------------------|-----------------|---------------------|

|`'b'`|signedchar|int|1|

|`'B'`|unsignedchar|int|1|

|`'u'`|Py_UNICODE|Unicodecharacter|2|

|`'h'`|signedshort|int|2|

|`'H'`|unsignedshort|int|2|

|`'i'`|signedint|int|2|

|`'I'`|unsignedint|int|2|

|`'l'`|signedlong|int|4|

|`'L'`|unsignedlong|int|4|

|`'q'`|signedlonglong|int|8|

|`'Q'`|unsignedlonglong|int|8|

|`'f'`|float|float|4|

|`'d'`|double|float|8|

參考地址:https://docs.python.org/3/library/array.html

Array

構造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

typecode_or_type:同上

size_or_initializer:如果它是一個整數,那么它確定數組的長度,并且數組將被初始化為零。否則,size_or_initializer是用于初始化數組的序列,其長度決定數組的長度。

kwds:傳遞給typecode_or_type構造函數的參數

lock:同上

使用示例:

importmultiprocessing

deff(n,a):

n.value=3.14

a[0]=5

if__name__=='__main__':

num=multiprocessing.Value('d',0.0)

arr=multiprocessing.Array('i',range(10))

p=multiprocessing.Process(target=f,args=(num,arr))

p.start()

p.join()

print(num.value)

print(arr[:])

注意:Value和Array只適用于Process類。

Pipe(用于管道通信)

多進程還有一種數據傳遞方式叫管道原理和 Queue相同。Pipe可以在進程之間創建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道。 構造方法:Pipe([duplex])

dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送。

實例方法:

send(obj):通過連接發送對象。obj是與序列化兼容的任意對象

recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。

close():關閉連接。如果conn1被垃圾回收,將自動調用此方法

fileno():返回連接使用的整數文件描述符

poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。

recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,并且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。

send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收

recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大于可用的緩沖區空間,將引發BufferTooShort異常。

使用示例:

frommultiprocessingimportProcess,Pipe

importtime

#子進程執行方法

deff(Subconn):

time.sleep(1)

Subconn.send("吃了嗎")

print("來自父親的問候:",Subconn.recv())

Subconn.close()

if__name__=="__main__":

parent_conn,child_conn=Pipe()#創建管道兩端

p=Process(target=f,args=(child_conn,))#創建子進程

p.start()

print("來自兒子的問候:",parent_conn.recv())

parent_conn.send("嗯")

Manager(用于資源共享)

Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。Manager模塊常與Pool模塊一起使用。 Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。 管理器是獨立運行的子進程,其中存在真實的對象,并以服務器的形式運行,其他進程通過使用代理訪問共享對象,這些代理作為客戶端運行。Manager()是BaseManager的子類,返回一個啟動的SyncManager()實例,可用于創建共享對象并返回訪問這些共享對象的代理。 BaseManager,創建管理器服務器的基類 構造方法:BaseManager([address[, authkey]])

address:(hostname,port),指定服務器的網址地址,默認為簡單分配一個空閑的端口

authkey:連接到服務器的客戶端的身份驗證,默認為current_process().authkey的值

實例方法:

start([initializer[, initargs]]):啟動一個單獨的子進程,并在該子進程中啟動管理器服務器

get_server():獲取服務器對象

connect():連接管理器對象

shutdown():關閉管理器對象,只能在調用了start()方法之后調用

實例屬性:

address:只讀屬性,管理器服務器正在使用的地址

SyncManager以下類型均不是進程安全的,需要加鎖.. 實例方法:

Array(self,*args,**kwds)

BoundedSemaphore(self,*args,**kwds)

Condition(self,*args,**kwds)

Event(self,*args,**kwds)

JoinableQueue(self,*args,**kwds)

Lock(self,*args,**kwds)

Namespace(self,*args,**kwds)

Pool(self,*args,**kwds)

Queue(self,*args,**kwds)

RLock(self,*args,**kwds)

Semaphore(self,*args,**kwds)

Value(self,*args,**kwds)

dict(self,*args,**kwds)

list(self,*args,**kwds)

使用示例:

importmultiprocessing

deff(x,arr,l,d,n):

x.value=3.14

arr[0]=5

l.append('Hello')

d[1]=2

n.a=10

if__name__=='__main__':

server=multiprocessing.Manager()

x=server.Value('d',0.0)

arr=server.Array('i',range(10))

l=server.list()

d=server.dict()

n=server.Namespace()

proc=multiprocessing.Process(target=f,args=(x,arr,l,d,n))

proc.start()

proc.join()

print(x.value)

print(arr)

print(l)

print(d)

print(n)

同步子進程模塊

Lock(互斥鎖)

Lock鎖的作用是當多個進程需要訪問共享資源的時候,避免訪問的沖突。加鎖保證了多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,犧牲了速度但保證了數據安全。Lock包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。

構造方法:Lock()

實例方法:

acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。

release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

frommultiprocessingimportProcess,Lock

defl(lock,num):

lock.acquire()

print("HelloNum:%s"%(num))

lock.release()

if__name__=='__main__':

lock=Lock()#這個一定要定義為全局

fornuminrange(20):

Process(target=l,args=(lock,num)).start()

RLock(可重入的互斥鎖(同一個進程可以多次獲得它,同時不會造成阻塞)

RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處于鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程可以再次調用acquire(),釋放鎖時需要調用release()相同次數。可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,為0時鎖處于未鎖定狀態。

構造方法:RLock()

實例方法:

acquire([timeout]):同Lock

release(): 同Lock

Semaphore(信號量)

信號量是一個更高級的鎖機制。信號量內部有一個計數器而不像鎖對象內部有鎖標識,而且只有當占用信號量的線程數超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區。比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去,如果指定信號量為3,那么來一個人獲得一把鎖,計數加1,當計數等于3時,后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

構造方法:Semaphore([value])

value:設定信號量,默認值為1

實例方法:

acquire([timeout]):同Lock

release(): 同Lock

使用示例:

frommultiprocessingimportProcess,Semaphore

importtime,random

defgo_wc(sem,user):

sem.acquire()

print('%s占到一個茅坑'%user)

time.sleep(random.randint(0,3))

sem.release()

print(user,'OK')

if__name__=='__main__':

sem=Semaphore(2)

p_l=[]

foriinrange(5):

p=Process(target=go_wc,args=(sem,'user%s'%i,))

p.start()

p_l.append(p)

foriinp_l:

i.join()

Condition(條件變量)

可以把Condition理解為一把高級的鎖,它提供了比Lock, RLock更高級的功能,允許我們能夠控制復雜的線程同步問題。Condition在內部維護一個鎖對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對象作為參數傳入。Condition也提供了acquire, release方法,其含義與鎖的acquire, release方法一致,其實它只是簡單的調用內部鎖對象的對應的方法而已。Condition還提供了其他的一些方法。

構造方法:Condition([lock/rlock])

可以傳遞一個Lock/RLock實例給構造方法,否則它將自己生成一個RLock實例。

實例方法:

acquire([timeout]):首先進行acquire,然后判斷一些條件。如果條件不滿足則wait

release():釋放 Lock

wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態的線程接到通知后會重新判斷條件。

notify(): 調用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

importmultiprocessing

importtime

defstage_1(cond):

"""performfirststageofwork,

thennotifystage_2tocontinue

"""

name=multiprocessing.current_process().name

print('Starting',name)

withcond:

print('{}doneandreadyforstage2'.format(name))

cond.notify_all()

defstage_2(cond):

"""waitfortheconditiontellingusstage_1isdone"""

name=multiprocessing.current_process().name

print('Starting',name)

withcond:

cond.wait()

print('{}running'.format(name))

if__name__=='__main__':

condition=multiprocessing.Condition()

s1=multiprocessing.Process(name='s1',

target=stage_1,

args=(condition,))

s2_clients=[

multiprocessing.Process(

name='stage_2[{}]'.format(i),

target=stage_2,

args=(condition,),

)

foriinrange(1,3)

]

forcins2_clients:

c.start()

time.sleep(1)

s1.start()

s1.join()

forcins2_clients:

c.join()

Event(事件)

Event內部包含了一個標志位,初始的時候為false。可以使用set()來將其設置為true;或者使用clear()將其從新設置為false;可以使用is_set()來檢查標志位的狀態;另一個最重要的函數就是wait(timeout=None),用來阻塞當前線程,直到event的內部標志位被設置為true或者timeout超時。如果內部標志位為true則wait()函數理解返回。

使用示例:

importmultiprocessing

importtime

defwait_for_event(e):

"""Waitfortheeventtobesetbeforedoinganything"""

print('wait_for_event:starting')

e.wait()

print('wait_for_event:e.is_set()->',e.is_set())

defwait_for_event_timeout(e,t):

"""Waittsecondsandthentimeout"""

print('wait_for_event_timeout:starting')

e.wait(t)

print('wait_for_event_timeout:e.is_set()->',e.is_set())

if__name__=='__main__':

e=multiprocessing.Event()

w1=multiprocessing.Process(

name='block',

target=wait_for_event,

args=(e,),

)

w1.start()

w2=multiprocessing.Process(

name='nonblock',

target=wait_for_event_timeout,

args=(e,2),

)

w2.start()

print('main:waitingbeforecallingEvent.set()')

time.sleep(3)

e.set()

print('main:eventisset')

其他內容

multiprocessing.dummy 模塊與 multiprocessing 模塊的區別:dummy 模塊是多線程,而 multiprocessing 是多進程, api 都是通用的。所有可以很方便將代碼在多線程和多進程之間切換。multiprocessing.dummy通常在IO場景可以嘗試使用,比如使用如下方式引入線程池。

frommultiprocessing.dummyimportPoolasThreadPool

multiprocessing.dummy與早期的threading,不同的點好像是在多多核CPU下,只綁定了一個核心(具體未考證)。

參考文檔:

https://docs.python.org/3/library/multiprocessing.html

https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/

Python并發之concurrent.futures

Python標準庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。concurrent.futures基礎模塊是executor和future。

Executor

Executor是一個抽象類,它不能被直接使用。它為具體的異步執行定義了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。

ThreadPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用線程池執行異步調用。

classconcurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers數目的線程池執行異步調用。

ProcessPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用進程池執行異步調用。

classconcurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers數目的進程池執行異步調用,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個進程進行異步并發)。

submit()方法

Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,并返回一個future實例。future對象代表的就是給定的調用。

Executor.submit(fn, *args, **kwargs)

fn:需要異步執行的函數

*args, **kwargs:fn參數

使用示例:

fromconcurrentimportfutures

deftest(num):

importtime

returntime.ctime(),num

withfutures.ThreadPoolExecutor(max_workers=1)asexecutor:

future=executor.submit(test,1)

print(future.result())

map()方法

除了submit,Exectuor還為我們提供了map方法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。

Executor.map(func, *iterables, timeout=None)

func:需要異步執行的函數

*iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。

timeout:設置每次異步操作的超時時間,timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數,則不設置超時間。

使用示例:

fromconcurrentimportfutures

deftest(num):

importtime

returntime.ctime(),num

data=[1,2,3]

withfutures.ThreadPoolExecutor(max_workers=1)asexecutor:

forfutureinexecutor.map(test,data):

print(future)

shutdown()方法

釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法。

Executor.shutdown(wait=True)

Future

Future可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。 Future類封裝了可調用的異步執行。Future 實例通過 Executor.submit()方法創建。

cancel():試圖取消調用。如果調用當前正在執行,并且不能被取消,那么該方法將返回False,否則調用將被取消,方法將返回True。

cancelled():如果成功取消調用,返回True。

running():如果調用當前正在執行并且不能被取消,返回True。

done():如果調用成功地取消或結束了,返回True。

result(timeout=None):返回調用返回的值。如果調用還沒有完成,那么這個方法將等待超時秒。如果調用在超時秒內沒有完成,那么就會有一個Futures.TimeoutError將報出。timeout可以是一個整形或者浮點型數值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。

exception(timeout=None):返回調用拋出的異常,如果調用還未完成,該方法會等待timeout指定的時長,如果該時長后調用還未完成,就會報出超時錯誤futures.TimeoutError。timeout可以是一個整形或者浮點型數值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。如果調用完成并且無異常報出,返回None.

add_done_callback(fn):將可調用fn捆綁到future上,當Future被取消或者結束運行,fn作為future的唯一參數將會被調用。如果future已經運行完成或者取消,fn將會被立即調用。

wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待fs提供的 Future 實例(possibly created by different Executor instances) 運行結束。返回一個命名的2元集合,分表代表已完成的和未完成的

return_when 表明什么時候函數應該返回。它的值必須是一下值之一:

FIRST_COMPLETED :函數在任何future結束或者取消的時候返回。

FIRST_EXCEPTION :函數在任何future因為異常結束的時候返回,如果沒有future報錯,效果等于

ALL_COMPLETED :函數在所有future結束后才會返回。

as_completed(fs, timeout=None):參數是一個 Future 實例列表,返回值是一個迭代器,在運行結束后產出 Future實例 。

使用示例:

fromconcurrent.futuresimportThreadPoolExecutor,wait,as_completed

fromtimeimportsleep

fromrandomimportrandint

defreturn_after_5_secs(num):

sleep(randint(1,5))

return"Returnof{}".format(num)

pool=ThreadPoolExecutor(5)

futures=[]

forxinrange(5):

futures.append(pool.submit(return_after_5_secs,x))

print(1)

forxinas_completed(futures):

print(x.result())

print(2)

作者:錢魏Way

編輯:黃飛

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • cpu
    cpu
    +關注

    關注

    68

    文章

    11031

    瀏覽量

    215933
  • 計算機
    +關注

    關注

    19

    文章

    7626

    瀏覽量

    90135
  • python
    +關注

    關注

    56

    文章

    4823

    瀏覽量

    86142
  • 進程
    +關注

    關注

    0

    文章

    206

    瀏覽量

    14211
  • 多進程
    +關注

    關注

    0

    文章

    14

    瀏覽量

    2674

原文標題:詳解Python多線程、多進程

文章出處:【微信號:vision263com,微信公眾號:新機器視覺】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦
    熱點推薦

    請問如何在Python中實現多線程多進程的協作?

    大家好!我最近在開發一個Python項目時,需要同時處理多個任務,且每個任務需要不同的計算資源。我想通過多線程多進程的組合來實現并發,但遇到了一些問題。 具體來說,我有兩個任務,一個是I/O密集型
    發表于 03-11 06:57

    多線程多進程的區別

    6.你的數據庫一會又500個連接數,一會有10個,你分析一下情況7.udp和tcp的區別8.多線程多進程的區別9.有一臺web服務器,你選擇用多線程還是多進程,...
    發表于 07-19 07:21

    淺談多進程多線程的選擇

    魚還是熊掌:淺談多進程多線程的選擇關于多進程多線程,教科書上最經典的一句話是“進程是資源分配的
    發表于 08-24 07:38

    python多線程多進程對比

    電視邊吃飯邊聊天。這就是我們的 多進程 才能做的事了。2. 單線程VS多線程VS多進程文字總是蒼白無力的,不如用代碼直接來測試一下。開始對比之前,首先定義四種類型的場景 - CPU計算
    發表于 03-15 16:42

    LINUX系統下多線程多進程性能分析

    采用多進程處理多個任務,會占用很多系統資源(主要是CPU 和內存的使用)。在LINUX 中,則對這種弊端進行了改進,在用戶態實現了多線程處理多任務。本文系統論述了多線程
    發表于 08-13 08:31 ?20次下載

    VC-MFC多線程編程詳解

    VC編程中關于 MFC多線程編程的詳解文檔
    發表于 09-01 15:01 ?0次下載

    如何選好多線程多進程

    關于多進程多線程,教科書上最經典的一句話是“進程是資源分配的最小單位,線程是CPU調度的最小單位”,這句話應付考試基本上夠了,但如果在工作
    的頭像 發表于 05-11 16:16 ?3135次閱讀
    如何選好<b class='flag-5'>多線程</b>和<b class='flag-5'>多進程</b>

    多進程多線程的深度比較

    嵌入式Linux中文站,關于多進程多線程,教科書上最經典的一句話是“進程是資源分配的最小單位,線程是CPU調度的最小單位”
    發表于 04-02 14:42 ?582次閱讀

    多進程多線程的基本概念

    stack),自己的寄存器環境(register context),自己的線程本地存儲(thread-local storage)。一個進程可以有很多線程,每條線程并行執行不同的任務
    發表于 04-02 14:49 ?829次閱讀

    使用Python多進程的理由

    Python 是運行在解釋器中的語言,查找資料知道, python 中有一個全局鎖( GI),在使用多進程( Threa)的情況下,不能發揮多核的優勢。而使用多進程( Multipro
    的頭像 發表于 04-04 16:50 ?1774次閱讀
    使用<b class='flag-5'>Python</b><b class='flag-5'>多進程</b>的理由

    Python多進程學習

    Python 多進程 (Multiprocessing) 是一種同時利用計算機多個處理器核心 (CPU cores) 進行并行處理的技術,它與 Python多線程 (Multith
    的頭像 發表于 04-26 11:04 ?716次閱讀

    淺談Linux網絡編程中的多進程多線程

    在Linux網絡編程中,我們應該見過很多網絡框架或者server,有多進程的處理方式,也有多線程處理方式,孰好孰壞并沒有可比性,首先選擇多進程還是多線程我們需要考慮業務場景,其次結合當
    發表于 08-08 16:56 ?1008次閱讀
    淺談Linux網絡編程中的<b class='flag-5'>多進程</b>和<b class='flag-5'>多線程</b>

    Linux系統上多線程多進程的運行效率

    關于多進程多線程,教科書上最經典的一句話是“進程是資源分配的最小單位,線程是CPU調度的最小單位”,這句話應付考試基本上夠了,但如果在工作
    的頭像 發表于 11-10 10:54 ?1775次閱讀
    Linux系統上<b class='flag-5'>多線程</b>和<b class='flag-5'>多進程</b>的運行效率

    你還是分不清多進程多線程嗎?一文搞懂!

    你還是分不清多進程多線程嗎?一文搞懂! 多進程多線程是并發編程中常見的兩個概念,它們都可以用于提高程序的性能和效率。但是它們的實現方式和使用場景略有不同。 1.
    的頭像 發表于 12-19 16:07 ?849次閱讀

    Python多線程多進程的區別

    Python作為一種高級編程語言,提供了多種并發編程的方式,其中多線程多進程是最常見的兩種方式之一。在本文中,我們將探討Python多線程
    的頭像 發表于 10-23 11:48 ?901次閱讀
    <b class='flag-5'>Python</b>中<b class='flag-5'>多線程</b>和<b class='flag-5'>多進程</b>的區別