問題描述
我正在嘗試實現基本的多處理,但遇到了問題.下面附上python腳本.
I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.
import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue(10)
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
item = append_queue.get()
database.append(item)
print("Appended to database in %.4f seconds" % database.append_time)
append_queue.task_done()
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
append_queue.join()
#append_queue_process.join()
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
AnalyzeFrequency 分析文件中單詞的頻率,get_results()
返回所述單詞和頻率的排序列表.列表非常大,可能有 10000 項.
The AnalyzeFrequency analyzes the frequencies of words in a file and get_results()
returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.
然后將該列表傳遞給 add_to_append_queue
方法,該方法將其添加到隊列中.process_append_queue 一項一項地獲取項目并將頻率添加到數據庫".此操作比 main()
中的實際分析需要更長的時間,因此我嘗試對此方法使用單獨的過程.當我嘗試使用線程模塊執行此操作時,一切正常,沒有錯誤.當我嘗試使用 Process 時,腳本掛在 item = append_queue.get()
.
This list is then passed to the add_to_append_queue
method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis in main()
so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs at item = append_queue.get()
.
有人能解釋一下這里發生了什么,或許可以指導我解決問題嗎?
Could someone please explain what is happening here, and perhaps direct me toward a fix?
感謝所有答案!
更新
泡菜錯誤是我的錯,只是一個錯字.現在我在多處理中使用 Queue 類,但 append_queue.get() 方法仍然掛起.新代碼
import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue()
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
database.append(append_queue.get())
print("Appended to database in %.4f seconds" % database.append_time)
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
#append_queue.join()
#append_queue_process.join()
print str(append_queue.qsize())
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
更新 2
這是數據庫代碼:
class FrequencyStore:
def __init__(self):
self.sorter = Sorter()
self.db = {}
self.load_time = -1
self.save_time = -1
self.append_time = -1
self.sort_time = -1
def load_db(self):
start_time = time.time()
try:
file = open("results.txt", 'r')
except:
raise IOError
self.db = {}
for line in file:
word, count = line.strip("
").split("=")
self.db[word] = int(count)
file.close()
self.load_time = time.time() - start_time
def save_db(self):
start_time = time.time()
_db = []
for key in self.db:
_db.append([key, self.db[key]])
_db = self.sort(_db)
try:
file = open("results.txt", 'w')
except:
raise IOError
file.truncate(0)
for x in _db:
file.write(x[0] + "=" + str(x[1]) + "
")
file.close()
self.save_time = time.time() - start_time
def create_sorted_db(self):
_temp_db = []
for key in self.db:
_temp_db.append([key, self.db[key]])
_temp_db = self.sort(_temp_db)
_temp_db.reverse()
return _temp_db
def get_db(self):
return self.db
def sort(self, _list):
start_time = time.time()
_list = self.sorter.mergesort(_list)
_list.reverse()
self.sort_time = time.time() - start_time
return _list
def append(self, _list):
start_time = time.time()
for x in _list:
if x[0] not in self.db:
self.db[x[0]] = x[1]
else:
self.db[x[0]] += x[1]
self.append_time = time.time() - start_time
推薦答案
評論建議您嘗試在 Windows 上運行它.正如我在評論中所說,
Comments suggest you're trying to run this on Windows. As I said in a comment,
如果你在 Windows 上運行它,它就不能工作 - Windows 不能有 fork()
,所以每個進程都有自己的隊列,他們什么都沒有彼此做.整個模塊由從頭開始"導入Windows 上的每個進程.您需要在 main()
中創建隊列,并將其作為參數傳遞給工作函數.
If you're running this on Windows, it can't work - Windows doesn't have
fork()
, so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue inmain()
, and pass it as an argument to the worker function.
這里充實了您需要做的事情以使其可移植,盡管我刪除了所有數據庫內容,因為它與您迄今為止描述的問題無關.我還刪除了 daemon
擺弄,因為這通常只是避免干凈地關閉事物的一種懶惰方式,而且通常以后會回來咬你:
Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon
fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:
def process_append_queue(append_queue):
while True:
x = append_queue.get()
if x is None:
break
print("processed %d" % x)
print("worker done")
def main():
import multiprocessing as mp
append_queue = mp.Queue(10)
append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
append_queue_process.start()
for i in range(100):
append_queue.put(i)
append_queue.put(None) # tell worker we're done
append_queue_process.join()
if __name__=="__main__":
main()
輸出是明顯"的東西:
processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done
注意:因為 Windows 不(不能)fork()
,所以工作進程不可能繼承 Windows 上的任何 Python 對象.每個進程從一開始就運行整個程序.這就是為什么您的原始程序無法運行的原因:每個進程都創建了自己的 Queue
,與另一個進程中的 Queue
完全無關.在上面顯示的方法中,只有主進程創建了一個 Queue
,主進程將它(作為參數)傳遞給工作進程.
Note: because Windows doesn't (can't) fork()
, it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue
, wholly unrelated to the Queue
in the other process. In the approach shown above, only the main process creates a Queue
, and the main process passes it (as an argument) to the worker process.
這篇關于多處理 Queue.get() 掛起的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!