問題描述
我想創建一個在 While True 循環中永遠運行的線程或進程.
I would like to create either a Thread or a Process which runs forever in a While True loop.
我需要以隊列的形式向工作人員發送和接收數據,無論是 multiprocessing.Queue() 還是 collections.deque().我更喜歡使用 collections.deque(),因為它明顯更快.
I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.
我還需要最終能夠殺死工作人員(因為它在 while True 循環中運行.這是我整理的一些測試代碼,以嘗試了解線程、進程、隊列和 deque 之間的區別..
I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
如您所見,t1 可以是 ThreadingTest 或 ProcessTest.此外,傳遞給它的隊列可以是 multiprocessing.Queue 或 collections.deque.
As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.
ThreadingTest 與 Queue 或 deque() 一起使用.當調用 stop() 方法時,它也會正確終止 run().
ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest 只有在隊列類型為 multiprocessing.Queue 時才能從隊列中讀取.它不適用于 collections.deque.此外,我無法使用 stop() 終止進程.
ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
我想知道為什么?另外,在進程中使用雙端隊列的最佳方法是什么?而且,我將如何使用某種 stop() 方法來終止進程.
I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.
推薦答案
你不能使用 collections.deque
在兩個 multiprocessing.Process
實例之間傳遞數據,因為 collections.deque
不是進程感知的.multiprocessing.Queue
在內部將其內容寫入 multiprocessing.Pipe
,這意味著其中的數據可以在一個進程中排隊并在另一個進程中檢索.collections.deque
沒有那種管道,所以它不會工作.當您在一個進程中寫入 deque
時,另一個進程中的 deque
實例完全不會受到影響;它們是完全獨立的實例.
You can't use a collections.deque
to pass data between two multiprocessing.Process
instances, because collections.deque
is not process-aware. multiprocessing.Queue
writes its contents to a multiprocessing.Pipe
internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque
doesn't have that kind of plumbing, so it won't work. When you write to the deque
in one process, the deque
instance in the other process won't be affected at all; they're completely separate instances.
您的 stop()
方法也發生了類似的問題.您正在主進程中更改 toRun
的值,但這根本不會影響子進程.它們是完全獨立的實例.結束孩子的最好方法是向 Queue
發送一些哨兵.當你在child中獲得哨兵時,跳出無限循環:
A similar issue is happening to your stop()
method. You're changing the value of toRun
in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue
. When you get the sentinel in the child, break out of the infinite loop:
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
if i is None:
break # Got sentinel, so break
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.put(None) # Send sentinel
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
如果你確實需要兩個進程之間的 deque
語義,你可以使用 自定義 multiprocessing.Manager()
以在 Manager
進程中創建共享 deque
,以及您的每個 Process
實例都會獲得一個 Proxy
:
If you actually do need deque
semantics between two process, you can use a custom multiprocessing.Manager()
to create a shared deque
in a Manager
process, and each of your Process
instances will get a Proxy
to it:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque
SyncManager.register('deque', deque)
def Manager():
m = SyncManager()
m.start()
return m
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if self.q._getvalue():
i = self.q.popleft()
if i is None:
break
print("Process deque: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.append(None)
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
m = Manager()
q = m.deque()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
q.append(i)
time.sleep(1)
t1.stop()
t1.join()
print(q)
請注意,這可能不會比 multiprocessing.Queue
快,因為每次訪問 deque
都會產生 IPC 成本.對于以您的方式傳遞消息來說,它也是一種不太自然的數據結構.
Note that this probably isn't going to be faster than a multiprocessing.Queue
, though, since there's an IPC cost for every time you access the deque
. It's also a much less natural data structure for passing messages the way you are.
這篇關于關于使用 Queue()/deque() 和類變量進行通信和“毒丸"的進程與線程的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!