問題描述
我正在使用 multiprocessing
python生成庫(kù) 4 Process()
對(duì)象以并行化 CPU 密集型任務(wù).任務(wù)(靈感和代碼來自這個(gè)偉大的 article) 是計(jì)算列表中每個(gè)整數(shù)的素因子.
I am using the multiprocessing
python library to spawn 4 Process()
objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.
ma??in.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
觀察:
- 如果每個(gè)進(jìn)程的目標(biāo)是函數(shù)
worker1
,對(duì)于大于 4000 個(gè)元素的輸入列表,主線程會(huì)卡在.join()
上,等待產(chǎn)生的進(jìn)程終止并且永不返回. - 如果每個(gè)進(jìn)程的目標(biāo)是函數(shù)
worker2
,對(duì)于相同的輸入列表,代碼工作正常,主線程返回.
- If the target for each process is the function
worker1
, for an input list larger than 4000 elements the main thread gets stuck on.join()
, waiting for the spawned processes to terminate and never returns. - If the target for each process is the function
worker2
, for the same input list the code works just fine and the main thread returns.
這讓我很困惑,因?yàn)?worker1
和 worker2
之間的唯一區(qū)別(見下文)是前者在 Queue
而后者為每個(gè)進(jìn)程插入一個(gè)列表.
This is very confusing to me, as the only difference between worker1
and worker2
(see below) is that the former inserts individual lists in the Queue
whereas the latter inserts a single list of lists for each process.
為什么使用 worker1
而沒有使用 worker2
目標(biāo)會(huì)出現(xiàn)死鎖?兩者(或都不)不應(yīng)該超出 多處理隊(duì)列最大大小限制為 32767?
Why is there deadlock using worker1
and not using worker2
target?
Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?
worker1 與 worker2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
<小時(shí)>
關(guān)于 SO 有很多類似的問題,但我相信這些問題的核心顯然與所有問題不同.
There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.
相關(guān)鏈接:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python 多處理問題
- python 多處理 - 進(jìn)程在加入大型隊(duì)列時(shí)掛起
- Process.join() 和隊(duì)列不起作用大數(shù)
- Python 3 多處理在隊(duì)列為空之前調(diào)用join時(shí)出現(xiàn)隊(duì)列死鎖
- 使用多處理模塊的腳本不會(huì)終止
- 為什么 multiprocessing.Process.join() 掛起?李>
- 何時(shí)在進(jìn)程上調(diào)用 .join()?
- Python多處理模塊的.join()方法到底是什么在做什么?
推薦答案
文檔對(duì)此提出警告:
警告:如上所述,如果子進(jìn)程已將項(xiàng)目放入隊(duì)列(并且尚未使用 JoinableQueue.cancel_join_thread),則該進(jìn)程將不會(huì)終止,直到所有緩沖項(xiàng)目都已刷新到管道.
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
這意味著如果您嘗試加入該進(jìn)程,您可能會(huì)遇到死鎖,除非您確定已放入隊(duì)列的所有項(xiàng)目都已被消耗.同樣,如果子進(jìn)程是非守護(hù)進(jìn)程,則父進(jìn)程在嘗試加入其所有非守護(hù)子進(jìn)程時(shí)可能會(huì)在退出時(shí)掛起.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
雖然 Queue
似乎是無界的,但實(shí)際上,隊(duì)列中的項(xiàng)目被緩沖在內(nèi)存中,以避免進(jìn)程間管道過載.在刷新這些內(nèi)存緩沖區(qū)之前,進(jìn)程無法正常結(jié)束.您的 worker1()
將比 worker2()
更多的項(xiàng)目 on 放入隊(duì)列中,僅此而已.請(qǐng)注意,在實(shí)施訴諸于內(nèi)存中緩沖之前可以排隊(duì)的項(xiàng)目數(shù)量沒有定義:它可能因操作系統(tǒng)和 Python 版本而異.
While a Queue
appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1()
puts a lot more items on the queue than your worker2()
, and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.
正如文檔建議的那樣,避免這種情況的正常方法是 .get()
所有項(xiàng)目 off 隊(duì)列 before 您嘗試.join()
進(jìn)程.正如您所發(fā)現(xiàn)的,是否有必要這樣做取決于每個(gè)工作進(jìn)程已將多少項(xiàng)放入隊(duì)列中.
As the docs suggest, the normal way to avoid this is to .get()
all the items off the queue before you attempt to .join()
the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.
這篇關(guān)于python multiprocessing .join() 死鎖依賴于worker函數(shù)的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!