久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

  • <i id='GrKTK'><tr id='GrKTK'><dt id='GrKTK'><q id='GrKTK'><span id='GrKTK'><b id='GrKTK'><form id='GrKTK'><ins id='GrKTK'></ins><ul id='GrKTK'></ul><sub id='GrKTK'></sub></form><legend id='GrKTK'></legend><bdo id='GrKTK'><pre id='GrKTK'><center id='GrKTK'></center></pre></bdo></b><th id='GrKTK'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='GrKTK'><tfoot id='GrKTK'></tfoot><dl id='GrKTK'><fieldset id='GrKTK'></fieldset></dl></div>

    <legend id='GrKTK'><style id='GrKTK'><dir id='GrKTK'><q id='GrKTK'></q></dir></style></legend>

      <small id='GrKTK'></small><noframes id='GrKTK'>

        • <bdo id='GrKTK'></bdo><ul id='GrKTK'></ul>

        <tfoot id='GrKTK'></tfoot>

        多處理 - 生產(chǎn)者/消費(fèi)者設(shè)計(jì)

        Multiprocessing - producer/consumer design(多處理 - 生產(chǎn)者/消費(fèi)者設(shè)計(jì))
      1. <tfoot id='xYIJF'></tfoot>

                  <tbody id='xYIJF'></tbody>

                <legend id='xYIJF'><style id='xYIJF'><dir id='xYIJF'><q id='xYIJF'></q></dir></style></legend>

              1. <small id='xYIJF'></small><noframes id='xYIJF'>

              2. <i id='xYIJF'><tr id='xYIJF'><dt id='xYIJF'><q id='xYIJF'><span id='xYIJF'><b id='xYIJF'><form id='xYIJF'><ins id='xYIJF'></ins><ul id='xYIJF'></ul><sub id='xYIJF'></sub></form><legend id='xYIJF'></legend><bdo id='xYIJF'><pre id='xYIJF'><center id='xYIJF'></center></pre></bdo></b><th id='xYIJF'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='xYIJF'><tfoot id='xYIJF'></tfoot><dl id='xYIJF'><fieldset id='xYIJF'></fieldset></dl></div>
                  <bdo id='xYIJF'></bdo><ul id='xYIJF'></ul>
                  本文介紹了多處理 - 生產(chǎn)者/消費(fèi)者設(shè)計(jì)的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

                  問題描述

                  限時(shí)送ChatGPT賬號(hào)..

                  我正在使用多處理模塊來拆分一個(gè)非常大的任務(wù).它在大多數(shù)情況下都有效,但我的設(shè)計(jì)肯定遺漏了一些明顯的東西,因?yàn)檫@樣我很難有效地判斷何時(shí)處理了所有數(shù)據(jù).

                  I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.

                  我有兩個(gè)單獨(dú)的任務(wù)在運(yùn)行;一個(gè)喂另一個(gè).我想這是一個(gè)生產(chǎn)者/消費(fèi)者問題.我在所有進(jìn)程之間使用共享隊(duì)列,生產(chǎn)者填滿隊(duì)列,消費(fèi)者從隊(duì)列中讀取并進(jìn)行處理.問題是數(shù)據(jù)量是有限的,所以在某些時(shí)候每個(gè)人都需要知道所有數(shù)據(jù)都已處理,以便系統(tǒng)可以正常關(guān)閉.

                  I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.

                  使用 map_async() 函數(shù)似乎很有意義,但由于生產(chǎn)者正在填滿隊(duì)列,我不知道前面的所有項(xiàng)目,所以我必須進(jìn)入一個(gè) while 循環(huán)并使用apply_async() 并嘗試檢測(cè)何時(shí)一切都完成了某種超時(shí)......丑陋.

                  It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.

                  我覺得我錯(cuò)過了一些明顯的東西.如何設(shè)計(jì)得更好?

                  I feel like I'm missing something obvious. How can this be better designed?

                  制片人

                  class ProducerProcess(multiprocessing.Process):
                      def __init__(self, item, consumer_queue):
                          self.item = item
                          self.consumer_queue = consumer_queue
                          multiprocessing.Process.__init__(self)
                  
                      def run(self):
                          for record in get_records_for_item(self.item): # this takes time
                              self.consumer_queue.put(record)
                  
                  def start_producer_processes(producer_queue, consumer_queue, max_running):
                      running = []
                  
                      while not producer_queue.empty():
                          running = [r for r in running if r.is_alive()]
                          if len(running) < max_running:
                              producer_item = producer_queue.get()
                              p = ProducerProcess(producer_item, consumer_queue)
                              p.start()
                              running.append(p)
                          time.sleep(1)
                  

                  消費(fèi)者

                  def process_consumer_chunk(queue, chunksize=10000):
                      for i in xrange(0, chunksize):
                          try:
                              # don't wait too long for an item
                              # if new records don't arrive in 10 seconds, process what you have
                              # and let the next process pick up more items.
                  
                              record = queue.get(True, 10)
                          except Queue.Empty:                
                              break
                  
                          do_stuff_with_record(record)
                  

                  主要

                  if __name__ == "__main__":
                      manager = multiprocessing.Manager()
                      consumer_queue = manager.Queue(1024*1024)
                      producer_queue = manager.Queue()
                  
                      producer_items = xrange(0,10)
                  
                      for item in producer_items:
                          producer_queue.put(item)
                  
                      p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
                      p.start()
                  
                      consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
                  

                  這就是它變得俗氣的地方.我不能使用地圖,因?yàn)橐M(fèi)的列表同時(shí)被填滿.所以我必須進(jìn)入一個(gè)while循環(huán)并嘗試檢測(cè)超時(shí).當(dāng)生產(chǎn)者仍在嘗試填充時(shí),consumer_queue 可能會(huì)變?yōu)榭眨虼宋也荒苤粰z測(cè)到空隊(duì)列并退出.

                  Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.

                      timed_out = False
                      timeout= 1800
                      while 1:
                          try:
                              result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
                              if timed_out:
                                  timed_out = False
                  
                          except Queue.Empty:
                              if timed_out:
                                  break
                  
                              timed_out = True
                              time.sleep(timeout)
                          time.sleep(1)
                  
                      consumer_queue.join()
                      consumer_pool.close()
                      consumer_pool.join()
                  

                  我認(rèn)為也許我可以在主線程中獲取()記錄并將它們傳遞給消費(fèi)者而不是傳遞隊(duì)列,但我認(rèn)為我最終會(huì)遇到同樣的問題.我仍然需要運(yùn)行一個(gè) while 循環(huán)并使用 apply_async() 提前感謝您的任何建議!

                  I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!

                  推薦答案

                  您可以使用 manager.Event 來表示工作的結(jié)束.此事件可以在您的所有進(jìn)程之間共享,然后當(dāng)您從主進(jìn)程發(fā)出信號(hào)時(shí),其他工作人員可以正常關(guān)閉.

                  You could use a manager.Event to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.

                  while not event.is_set():
                   ...rest of code...
                  

                  因此,您的消費(fèi)者將等待事件設(shè)置并在設(shè)置后處理清理.

                  So, your consumers would wait for the event to be set and handle the cleanup once it is set.

                  要確定何時(shí)設(shè)置此標(biāo)志,您可以在生產(chǎn)者線程上執(zhí)行 join,當(dāng)這些都完成后,您可以在消費(fèi)者線程上加入.

                  To determine when to set this flag you can do a join on the producer threads and when those are all complete you can then join on the consumer threads.

                  這篇關(guān)于多處理 - 生產(chǎn)者/消費(fèi)者設(shè)計(jì)的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

                  【網(wǎng)站聲明】本站部分內(nèi)容來源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

                  相關(guān)文檔推薦

                  What exactly is Python multiprocessing Module#39;s .join() Method Doing?(Python 多處理模塊的 .join() 方法到底在做什么?)
                  Passing multiple parameters to pool.map() function in Python(在 Python 中將多個(gè)參數(shù)傳遞給 pool.map() 函數(shù))
                  multiprocessing.pool.MaybeEncodingError: #39;TypeError(quot;cannot serialize #39;_io.BufferedReader#39; objectquot;,)#39;(multiprocessing.pool.MaybeEncodingError: TypeError(cannot serialize _io.BufferedReader object,)) - IT屋-程序員軟件開
                  Python Multiprocess Pool. How to exit the script when one of the worker process determines no more work needs to be done?(Python 多進(jìn)程池.當(dāng)其中一個(gè)工作進(jìn)程確定不再需要完成工作時(shí),如何退出腳本?) - IT屋-程序員
                  How do you pass a Queue reference to a function managed by pool.map_async()?(如何將隊(duì)列引用傳遞給 pool.map_async() 管理的函數(shù)?)
                  yet another confusion with multiprocessing error, #39;module#39; object has no attribute #39;f#39;(與多處理錯(cuò)誤的另一個(gè)混淆,“模塊對(duì)象沒有屬性“f)
                  <legend id='WT1ey'><style id='WT1ey'><dir id='WT1ey'><q id='WT1ey'></q></dir></style></legend>

                • <small id='WT1ey'></small><noframes id='WT1ey'>

                      <tfoot id='WT1ey'></tfoot>

                        <tbody id='WT1ey'></tbody>

                          • <bdo id='WT1ey'></bdo><ul id='WT1ey'></ul>

                            <i id='WT1ey'><tr id='WT1ey'><dt id='WT1ey'><q id='WT1ey'><span id='WT1ey'><b id='WT1ey'><form id='WT1ey'><ins id='WT1ey'></ins><ul id='WT1ey'></ul><sub id='WT1ey'></sub></form><legend id='WT1ey'></legend><bdo id='WT1ey'><pre id='WT1ey'><center id='WT1ey'></center></pre></bdo></b><th id='WT1ey'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='WT1ey'><tfoot id='WT1ey'></tfoot><dl id='WT1ey'><fieldset id='WT1ey'></fieldset></dl></div>
                            主站蜘蛛池模板: 天天拍天天操 | 91社区在线观看 | 精品视频在线观看 | 国产999精品久久久久久 | 欧美一区二区久久 | 久久99精品久久久久婷婷 | 日韩在线一区二区三区 | 久久久新视频 | 综合久久一区 | 亚洲欧美国产一区二区三区 | 免费一区 | 尤物在线 | 黄色成人在线网站 | 成人精品国产一区二区4080 | 欧美精品区 | 亚洲欧美在线观看 | 精品免费视频 | 欧美a区 | 九色在线视频 | av大片在线 | 日韩精品久久 | 久久99精品久久久久久 | 一区二区三区四区免费视频 | 午夜国产羞羞视频免费网站 | www在线视频 | 日韩欧美理论片 | 久久91| 中国免费黄色片 | 欧美一区永久视频免费观看 | 一区二区三区高清 | 精品一区二区久久久久久久网站 | 九九热精品在线视频 | 亚洲精品视频一区二区三区 | 国产精品一区二区视频 | 成人网址在线观看 | www国产亚洲精品久久网站 | 久久99精品久久久久久 | www.99精品 | 青青草视频免费观看 | 中文字幕一级 | 国产在线精品一区二区 |