問題描述
我正在使用多處理模塊來拆分一個(gè)非常大的任務(wù).它在大多數(shù)情況下都有效,但我的設(shè)計(jì)肯定遺漏了一些明顯的東西,因?yàn)檫@樣我很難有效地判斷何時(shí)處理了所有數(shù)據(jù).
I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.
我有兩個(gè)單獨(dú)的任務(wù)在運(yùn)行;一個(gè)喂另一個(gè).我想這是一個(gè)生產(chǎn)者/消費(fèi)者問題.我在所有進(jìn)程之間使用共享隊(duì)列,生產(chǎn)者填滿隊(duì)列,消費(fèi)者從隊(duì)列中讀取并進(jìn)行處理.問題是數(shù)據(jù)量是有限的,所以在某些時(shí)候每個(gè)人都需要知道所有數(shù)據(jù)都已處理,以便系統(tǒng)可以正常關(guān)閉.
I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.
使用 map_async() 函數(shù)似乎很有意義,但由于生產(chǎn)者正在填滿隊(duì)列,我不知道前面的所有項(xiàng)目,所以我必須進(jìn)入一個(gè) while 循環(huán)并使用apply_async() 并嘗試檢測(cè)何時(shí)一切都完成了某種超時(shí)......丑陋.
It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.
我覺得我錯(cuò)過了一些明顯的東西.如何設(shè)計(jì)得更好?
I feel like I'm missing something obvious. How can this be better designed?
制片人
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
消費(fèi)者
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
主要
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
這就是它變得俗氣的地方.我不能使用地圖,因?yàn)橐M(fèi)的列表同時(shí)被填滿.所以我必須進(jìn)入一個(gè)while循環(huán)并嘗試檢測(cè)超時(shí).當(dāng)生產(chǎn)者仍在嘗試填充時(shí),consumer_queue 可能會(huì)變?yōu)榭眨虼宋也荒苤粰z測(cè)到空隊(duì)列并退出.
Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
我認(rèn)為也許我可以在主線程中獲取()記錄并將它們傳遞給消費(fèi)者而不是傳遞隊(duì)列,但我認(rèn)為我最終會(huì)遇到同樣的問題.我仍然需要運(yùn)行一個(gè) while 循環(huán)并使用 apply_async() 提前感謝您的任何建議!
I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!
推薦答案
您可以使用 manager.Event
來表示工作的結(jié)束.此事件可以在您的所有進(jìn)程之間共享,然后當(dāng)您從主進(jìn)程發(fā)出信號(hào)時(shí),其他工作人員可以正常關(guān)閉.
You could use a manager.Event
to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.
while not event.is_set():
...rest of code...
因此,您的消費(fèi)者將等待事件設(shè)置并在設(shè)置后處理清理.
So, your consumers would wait for the event to be set and handle the cleanup once it is set.
要確定何時(shí)設(shè)置此標(biāo)志,您可以在生產(chǎn)者線程上執(zhí)行 join
,當(dāng)這些都完成后,您可以在消費(fèi)者線程上加入.
To determine when to set this flag you can do a join
on the producer threads and when those are all complete you can then join on the consumer threads.
這篇關(guān)于多處理 - 生產(chǎn)者/消費(fèi)者設(shè)計(jì)的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!