問題描述
我有一個函數可以執行一些模擬和返回一個字符串格式的數組.
我想運行模擬(函數)不同的輸入參數值,超過 10000 個可能的輸入值,并將結果寫入單個文件.
我正在使用多處理,特別是 pool.map 函數并行運行模擬.
自整個過程運行模擬功能超過10000次需要很長的時間,我真的很想跟蹤整個操作的過程.
我認為下面我當前代碼中的問題是,pool.map 運行該函數 10000 次,在這些操作期間沒有任何進程跟蹤.一旦并行處理完成運行 10000 個模擬(可能是幾小時到幾天),然后我會繼續跟蹤 10000 個模擬結果何時保存到文件中.所以這并不是真正跟蹤 pool.map 操作的處理.
我的代碼是否有一個簡單的修復方法可以允許進程跟蹤?
def simFunction(輸入):# 進行一些模擬并輸出 simResult返回 str(simResult)# 并行處理輸入 = np.arange(0,10000,1)如果 __name__ == "__main__":numCores = multiprocessing.cpu_count()池 = 多處理.池(進程 = numCores)t = pool.map(simFunction,輸入)與 open('results.txt','w') 一樣:print("開始模擬" + str(len(inputs)) + "輸入值...")計數器 = 0對于我在 t:out.write(i + '
')計數器 = 計數器 + 1如果計數器%100==0:print(str(counter) + " of " + str(len(inputs)) + " 輸入值模擬")print('完成了?。。。?)
請注意,我使用的是 pathos.multiprocessing
而不是 multiprocessing
. 它只是 multiprocessing
的一個分支,使您能夠使用多個輸入執行 map
函數,具有更好的序列化,并允許您執行 map
在任何地方調用(不僅僅是在 __main__
中).您也可以使用 multiprocessing
來執行以下操作,但是代碼會略有不同.
如果您使用迭代的 map
函數,則很容易跟蹤進度.
from pathos.multiprocessing import ProcessingPool as Pooldef simFunction(x,y):進口時間時間.sleep(2)返回 x**2 + yx,y = 范圍(100),范圍(-100,100,2)res = Pool().imap(simFunction, x,y)與 open('results.txt', 'w') 一樣:對于 x 中的 i:out.write("%s
" % res.next())如果 i%10 為 0:打印%s of %s 模擬";% (i, len(x))
<塊引用>
0 of 100 模擬100 個模擬中的 10 個100 個模擬中的 20 個100 個模擬中的 30 個100 個模擬中的 40 個100 個模擬中的 50 個100 個模擬中的 60 個100 個模擬中的 70 個100 個模擬中的 80 個100 個模擬中的 90 個
或者,您可以使用異步 map
.在這里我會做一些不同的事情,只是為了混合起來.
導入時間res = Pool().amap(simFunction, x,y)雖然不是 res.ready():打印等待..."時間.sleep(5)
<塊引用>
等待中...等待...等待...等待...
res.get()[-100、-97、-92、-85、-76、-65、-52、-37、-20、-1、20、43、68、95、124、155、188、223、260、299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1663, 1748,,2015,2108,2203,2300,2399,2500,2603,2708,2815,2924,3035,3148,3263,3380,3499,30020,323,3868,3995,4124,4255,4388,4124,4255,4388,4523,4660,4799,4940,5083,5228,5375,5524,5675,5828,5983,6140,6299,6460,6623,6788,6955,7124,7295,7468,7124,7820,7999,7643,7820,7999,8180,8363,8548,8735,8924, 9115, 9308, 9503, 9700, 9899]
無論是迭代的還是異步的map
,您都可以編寫任何您想要更好地跟蹤流程的代碼.例如,傳遞一個唯一的id".到每個工作,并觀察哪個返回,或者讓每個工作返回它的進程 ID.有很多方法可以跟蹤進度和過程……但以上內容應該可以為您提供一個開始.
您可以在這里獲取pathos
.
I have a function which performs some simulation and returns an array in string format.
I want to run the simulation (the function) for varying input parameter values, over 10000 possible input values, and write the results to a single file.
I am using multiprocessing, specifically, pool.map function to run the simulations in parallel.
Since the whole process of running the simulation function over 10000 times takes a very long time, I really would like to track the process of the entire operation.
I think the problem in my current code below is that, pool.map runs the function 10000 times, without any process tracking during those operations. Once the parallel processing finishes running 10000 simulations (could be hours to days.), then I keep tracking when 10000 simulation results are being saved to a file..So this is not really tracking the processing of pool.map operation.
Is there an easy fix to my code that will allow process tracking?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '
')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
Note that I'm using pathos.multiprocessing
instead of multiprocessing
. It's just a fork of multiprocessing
that enables you to do map
functions with multiple inputs, has much better serialization, and allows you to execute map
calls anywhere (not just in __main__
). You could use multiprocessing
to do the below as well, however the code would be very slightly different.
If you use an iterated map
function, it's pretty easy to keep track of progress.
from pathos.multiprocessing import ProcessingPool as Pool
def simFunction(x,y):
import time
time.sleep(2)
return x**2 + y
x,y = range(100),range(-100,100,2)
res = Pool().imap(simFunction, x,y)
with open('results.txt', 'w') as out:
for i in x:
out.write("%s
" % res.next())
if i%10 is 0:
print "%s of %s simulated" % (i, len(x))
0 of 100 simulated 10 of 100 simulated 20 of 100 simulated 30 of 100 simulated 40 of 100 simulated 50 of 100 simulated 60 of 100 simulated 70 of 100 simulated 80 of 100 simulated 90 of 100 simulated
Or, you can use an asynchronous map
. Here I'll do things a little differently, just to mix it up.
import time
res = Pool().amap(simFunction, x,y)
while not res.ready():
print "waiting..."
time.sleep(5)
waiting... waiting... waiting... waiting...
res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
Either an iterated or asynchronous map
will enable you to write whatever code you want to do better process tracking. For example, pass a unique "id" to each job, and watch which come back, or have each job return it's process id. There are lots of ways to track progress and processes… but the above should give you a start.
You can get pathos
here.
這篇關于Python多處理——跟蹤pool.map操作的過程的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!