問題描述
我正在使用多處理的進程和隊列.我并行啟動了幾個函數,并且大多數函數都表現良好:它們完成,它們的輸出進入它們的隊列,它們顯示為 .is_alive() == False.但是由于某種原因,一些函數沒有運行.它們總是顯示 .is_alive() == True,即使在函數的最后一行(打印語句說完成")完成之后也是如此.無論我啟動了哪些功能,都會發生這種情況,即使它只有一個.如果不并行運行,則函數運行良好并正常返回.什么種類可能是問題?
I am using multiprocessing's Process and Queue. I start several functions in parallel and most behave nicely: they finish, their output goes to their Queue, and they show up as .is_alive() == False. But for some reason a couple of functions are not behaving. They always show .is_alive() == True, even after the last line in the function (a print statement saying "Finished") is complete. This happens regardless of the set of functions I launch, even it there's only one. If not run in parallel, the functions behave fine and return normally. What kind of thing might be the problem?
這是我用來管理作業的通用函數.我沒有展示的只是我傳遞給它的函數.它們很長,經常使用 matplotlib,有時會啟動一些 shell 命令,但我不知道失敗的命令有什么共同點.
Here's the generic function I'm using to manage the jobs. All I'm not showing is the functions I'm passing to it. They're long, often use matplotlib, sometimes launch some shell commands, but I cannot figure out what the failing ones have in common.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('
---------------------------------------------------
'+ ' '.join(['alive?','Job','exitcode','Func',])+ '
---------------------------------------------------')
print('
'.join(['%s: %s: %s: %s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------
')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
推薦答案
好吧,好像函數的輸出太大時,用來填充Queue的管道被堵塞了(我粗略的理解?這是一個未解決的/關閉的錯誤?http://bugs.python.org/issue8237).我已經修改了我的問題中的代碼,以便有一些緩沖(在進程運行時定期清空隊列),這解決了我所有的問題.所以現在這需要一組任務(函數及其參數),啟動它們,并收集輸出.我希望它看起來更簡單/更干凈.
Alright, it seems that the pipe used to fill the Queue gets plugged when the output of a function is too big (my crude understanding? This is an unresolved/closed bug? http://bugs.python.org/issue8237). I have modified the code in my question so that there is some buffering (queues are regularly emptied while processes are running), which solves all my problems. So now this takes a collection of tasks (functions and their arguments), launches them, and collects the outputs. I wish it were simpler /cleaner looking.
編輯(2014 年 9 月;2017 年 11 月更新:重寫以提高可讀性):我正在使用我此后所做的增強來更新代碼.新代碼(功能相同,但功能更好)在這里:https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py
Edit (2014 Sep; update 2017 Nov: rewritten for readability): I'm updating the code with the enhancements I've made since. The new code (same function, but better features) is here: https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py
調用說明也在下方.
def runFunctionsInParallel(*args, **kwargs):
""" This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
"""
return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()
###########################################################################################
###
class cRunFunctionsInParallel():
###
#######################################################################################
"""Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
Parameters
----------
listOf_FuncAndArgLists : a list of lists
List of up-to-three-element-lists, like [function, args, kwargs],
specifying the set of functions to be launched in parallel. If an
element is just a function, rather than a list, then it is assumed
to have no arguments or keyword arguments. Thus, possible formats
for elements of the outer list are:
function
[function, list]
[function, list, dict]
kwargs: dict
One can also supply the kwargs once, for all jobs (or for those
without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
If omitted, the function name is used, so if all the functions are
the same (ie merely with different arguments), then they would be
named indistinguishably
offsetsSeconds: int or list of ints
delay some functions' start times
expectNonzeroExit: True/False
Normal behaviour is to not proceed if any function exits with a
failed exit code. This can be used to override this behaviour.
parallel: True/False
Whenever the list of functions is longer than one, functions will
be run in parallel unless this parameter is passed as False
maxAtOnce: int
If nonzero, this limits how many jobs will be allowed to run at
once. By default, this is set according to how many processors
the hardware has available.
showFinished : int
Specifies the maximum number of successfully finished jobs to show
in the text interface (before the last report, which should always
show them all).
Returns
-------
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Issues
-------
Only tested on POSIX OSes.
Examples
--------
See the testParallel() method in this module
"""
這篇關于python多處理:某些函數完成后不返回(隊列材料太大)的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!