問題描述
我有一些代碼需要在其他幾個可能掛起或出現不受我控制的問題的系統上運行.我想使用 python 的多處理來生成子進程以獨立于主程序運行,然后當它們掛起或出現問題時終止它們,但我不確定解決這個問題的最佳方法.
當調用 terminate 時,它??確實殺死了子進程,但隨后它變成了一個已失效的僵尸,直到進程對象消失才會釋放.下面的示例代碼中循環永遠不會結束的地方可以殺死它并在再次調用時允許重生,但似乎不是解決這個問題的好方法(即在 __init__() 中使用 multiprocessing.Process() 會更好).
有人有什么建議嗎?
類進程(對象):def __init__(self):self.thing = 事物()self.running_flag = multiprocessing.Value("i", 1)定義運行(自我):self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))self.process.start()打印 self.process.piddef pause_resume(self):self.running_flag.value = 不是 self.running_flag.valuedef 終止(自我):self.process.terminate()類事物(對象):def __init__(self):自我計數 = 1def 工人(自我,running_flag):而真:如果 running_flag.value:self.do_work()def do_work(self):打印工作 {0} ...".format(self.count)self.count += 1時間.sleep(1)
您可以在后臺將子進程作為守護進程運行.
process.daemon = True
守護進程中的任何錯誤和掛起(或無限循環)都不會影響主進程,只有在主進程退出時才會終止.
這將適用于簡單的問題,直到您遇到許多子守護進程,這些子守護進程將不斷從父進程獲取內存而沒有任何顯式控制.
最好的方法是設置一個 Queue
讓所有子進程與父進程通信,這樣我們就可以 join
并很好地清理它們.下面是一些簡單的代碼,它將檢查子進程是否掛起(又名 time.sleep(1000)
),并向隊列發送消息以供主進程對其采取措施:
import multiprocessing as mp進口時間導入隊列running_flag = mp.Value(i", 1)def worker(running_flag, q):計數 = 1而真:如果 running_flag.value:打印工作 {0} ...".format(count)計數 += 1q.put(計數)時間.sleep(1)如果計數 >3:# 模擬掛著睡覺打印掛..."時間.sleep(1000)def 看門狗(q):""這會檢查隊列是否有更新并向其發送信號當子進程太長時間沒有發送任何東西時""而真:嘗試:味精 = q.get(超時=10.0)除了 queue.Empty 作為 e:打印[WATCHDOG]:也許 WORKER 偷懶了"q.put("殺死工人")定義主():""""主進程""""q = mp.Queue()worker = mp.Process(target=worker, args=(running_flag, q))wdog = mp.Process(target=watchdog, args=(q,))# 將看門狗作為守護進程運行,因此它以主進程終止wdog.daemon = 真worker.start()print "[MAIN]: 啟動進程 P1"wdog.start()# 輪詢隊列而真:味精 = q.get()如果 msg == 殺死工人":print "[MAIN]: Terminating slacking WORKER"worker.terminate()時間.睡眠(0.1)如果不是 worker.is_alive():print "[MAIN]: WORKER is a goner"worker.join(超時=1.0)print "[MAIN]: 加入 WORKER 成功!"q.close()break # 看門狗進程守護進程被終止如果 __name__ == '__main__':主要的()
如果不終止 worker
,嘗試將其 join()
到主進程將永遠阻塞,因為 worker
從未完成.p>
I have some code that needs to run against several other systems that may hang or have problems not under my control. I would like to use python's multiprocessing to spawn child processes to run independent of the main program and then when they hang or have problems terminate them, but I am not sure of the best way to go about this.
When terminate is called it does kill the child process, but then it becomes a defunct zombie that is not released until the process object is gone. The example code below where the loop never ends works to kill it and allow a respawn when called again, but does not seem like a good way of going about this (ie multiprocessing.Process() would be better in the __init__()).
Anyone have a suggestion?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
You might run the child processes as daemons in the background.
process.daemon = True
Any errors and hangs (or an infinite loop) in a daemon process will not affect the main process, and it will only be terminated once the main process exits.
This will work for simple problems until you run into a lot of child daemon processes which will keep reaping memories from the parent process without any explicit control.
Best way is to set up a Queue
to have all the child processes communicate to the parent process so that we can join
them and clean up nicely. Here is some simple code that will check if a child processing is hanging (aka time.sleep(1000)
), and send a message to the queue for the main process to take action on it:
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print "working {0} ...".format(count)
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print "hanging..."
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print "[WATCHDOG]: Maybe WORKER is slacking"
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print "[MAIN]: starting process P1"
wdog.start()
# Poll the queue
while True:
msg = q.get()
if msg == "KILL WORKER":
print "[MAIN]: Terminating slacking WORKER"
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print "[MAIN]: WORKER is a goner"
workr.join(timeout=1.0)
print "[MAIN]: Joined WORKER successfully!"
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
Without terminating worker
, attempt to join()
it to the main process would have blocked forever since worker
has never finished.
這篇關于如何使用python的多處理終止進程的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!