問題描述
最初,我有一個類來存儲一些已處理的值并通過其他方法重用這些值.
Initially, I have a class to store some processed values and re-use those with its other methods.
問題是當我試圖將類方法分成多個進程以加快速度時,python 產生了進程,但它似乎不起作用(正如我在任務管理器中看到的只有 1 個進程正在運行)并且結果從未交付.
The problem is when i tried to divide the class method into multiple process to speed up, python spawned processes but it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.
我做了幾次搜索,發現 pathos.multiprocessing 可以代替,但我想知道標準庫是否可以解決這個問題?
I did couple of search and found that pathos.multiprocessing can do this instead but I wonder if standard library can solve this problems?
from multiprocessing import Pool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return t
a = A(2)
a.run(list(range(10)))
推薦答案
你的代碼失敗,因為它不能 pickle
實例方法(self.cal
),這是什么當您通過將多個進程映射到 multiprocessing.Pool
來生成多個進程時,Python 會嘗試這樣做(嗯,有一種方法可以做到這一點,但它太復雜了,而且無論如何都不是非常有用)——因為有沒有共享內存訪問,它必須打包"數據并將其發送到生成的進程進行解包.如果您嘗試腌制 a
實例,也會發生同樣的情況.
Your code fails as it cannot pickle
the instance method (self.cal
), which is what Python attempts to do when you're spawning multiple processes by mapping them to multiprocessing.Pool
(well, there is a way to do it, but it's way too convoluted and not extremely useful anyway) - since there is no shared memory access it has to 'pack' the data and send it to the spawned process for unpacking. The same would happen to you if you tried to pickle the a
instance.
multiprocessing
包中唯一可用的共享內存訪問是鮮為人知的multiprocessing.pool.ThreadPool
,所以如果你真的想這樣做:
The only shared memory access available in the multiprocessing
package is a little known multiprocessing.pool.ThreadPool
so if you really want to do this:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但這不會給你并行化,因為它本質上映射到你的常規線程,這些線程確實可以訪問共享內存.您應該傳遞類/靜態方法(如果您需要調用它們)以及您希望它們使用的數據(在您的情況下為 self.vl
).如果您需要跨進程共享該數據,則必須使用一些共享內存抽象,例如 multiprocessing.Value
,當然還要應用互斥鎖.
But this will not give you parallelization as it essentially maps to your regular threads which do have access to the shared memory. You should pass class/static methods instead (if you need them called) accompanied with the data you want them to work with (in your case self.vl
). If you need to share that data across processes you'll have to use some shared memory abstraction, like multiprocessing.Value
, applying mutex along the way of course.
更新
我說過你可以做到(并且有些模塊或多或少正在這樣做,例如檢查 pathos.multiprocessing
)但我認為這不值得 - 當你來的時候到了你必須欺騙你的系統做你想做的事的地步,你可能要么使用了錯誤的系統,要么你應該重新考慮你的設計.但為了了解情況,這里有一種方法可以在多處理設置中執行您想要的操作:
I said you could do it (and there are modules that more or less are doing it, check pathos.multiprocessing
for example) but I don't think it's worth the trouble - when you come to a point where you have to trick your system into doing what you want, chances are you're either using a wrong system or you should rethink your design. But for the sake of informedness, here is one way to do what you want in a multiprocessing setting:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
我認為它是如何工作的非常不言自明,但簡而言之,它將你的類的名稱、它的當前狀態(無信號,tho)、要調用的所需方法和調用它的參數傳遞給 parallel_call
函數,為 Pool
中的每個進程調用.Python 自動腌制和取消腌制所有這些數據,所以所有 parallel_call
需要做的就是重建原始對象,在其中找到所需的方法并使用提供的參數調用它.
I think it's pretty self explanatory how it works, but in short it passes the name of your class, its current state (sans signals, tho), a desired method to be called and arguments to invoke it with to a parallel_call
function which is called for each process in the Pool
. Python automatically pickles and unpickles all this data so all parallel_call
needs to do is reconstruct the original object, find a desired method in it and call it with the provided param(s).
這樣,我們只傳遞數據而不嘗試傳遞活動對象,因此 Python 不會抱怨(好吧,在這種情況下,嘗試將實例方法的引用添加到類參數中,看看會發生什么)和一切工作得很好.
This way we're passing only the data without trying to pass active objects so Python doesn't complain (well, in this case, try adding a reference to a instance method to your class parameters and see what happens) and everything works just fine.
如果你想重用魔法",你可以讓它看起來和你的代碼一模一樣(創建你自己的 Pool
處理程序,從函數中提取名稱并將名稱發送到實際進程等),但這應該為您的示例提供足夠的功能.
If you want to go heavy on the 'magic' you can make it look exactly like your code (create your own Pool
handler, pick up names from the functions and send the names to actual processes, etc.) but this should serve a sufficient function for your example.
但是,在您提高希望之前,請記住,這僅在共享靜態"實例(一旦您開始在多處理上下文中調用它就不會改變其初始狀態的實例)時有效.如果 A.cal
方法要更改 vl
屬性的內部狀態 - 它只會影響它更改的實例(除非它在調用的主實例中更改調用之間的 Pool
).如果你也想共享狀態,你可以升級parallel_call
調用后獲取instance.__dict__
,連同方法調用結果一起返回,然后在調用方您必須使用返回的數據更新本地 __dict__
以更改原始狀態.這還不夠——您實際上必須創建一個共享字典并處理所有互斥體人員,以便所有進程同時訪問它(您可以為此使用 multiprocessing.Manager
).
However, before you raise your hopes up, keep in mind that this will work only when sharing a 'static' instance (an instance that doesn't change its initial state once you start invoking it in a multiprocessing context). If the A.cal
method is to change the internal state of the vl
property - it would affect only the instance where it changes (unless it changes in the main instance that calls the Pool
between calls). If you want to share the state as well, you can upgrade parallel_call
to pick up instance.__dict__
after the call and return it together with the method call result, then on the calling side you'd have to update the local __dict__
with the returned data to change the original state. And that's not enough - you'd actually have to create a shared dict and handle all the mutex staff to have it concurrently accessed by all the processes (you can use multiprocessing.Manager
for that).
所以,正如我所說,麻煩多于其價值......
So, as I was saying, more trouble than its worth...
這篇關于在類方法Python中調用多處理的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!