Frage Shared-Memory-Objekte in Multiprocessing


Angenommen, ich habe ein großes nummy Array im Speicher, ich habe eine Funktion func Das nimmt dieses riesige Array als Eingabe (zusammen mit einigen anderen Parametern). func mit verschiedenen Parametern können parallel ausgeführt werden. Beispielsweise:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Wenn ich eine Multiprocessing-Bibliothek verwende, wird dieses riesige Array für mehrere Male in verschiedene Prozesse kopiert.

Gibt es eine Möglichkeit, verschiedene Prozesse das gleiche Array teilen zu lassen? Dieses Array-Objekt ist schreibgeschützt und wird niemals geändert.

Was ist komplizierter, wenn arr kein Array ist, sondern ein beliebiges Python-Objekt, gibt es eine Möglichkeit, es zu teilen?

[BEARBEITET]

Ich lese die Antwort, aber ich bin immer noch ein bisschen verwirrt. Da fork () Copy-on-Write ist, sollten wir beim Erstellen neuer Prozesse in der Python-Multiprocessing-Bibliothek keine zusätzlichen Kosten berechnen. Aber der folgende Code deutet darauf hin, dass es einen riesigen Overhead gibt:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

Ausgabe (und übrigens, die Kosten steigen mit der Größe des Arrays, so dass ich vermute, es gibt noch Overhead im Zusammenhang mit Speicher kopieren):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Warum gibt es so viel Aufwand, wenn wir das Array nicht kopieren? Und welche Rolle spielt mir die gemeinsame Erinnerung?


75
2018-05-23 14:20


Ursprung


Antworten:


Wenn Sie ein Betriebssystem verwenden, das Copy-on-Write verwendet fork() Semantik (wie jedes gewöhnliche Unix), solange Sie Ihre Datenstruktur nie ändern, steht sie allen untergeordneten Prozessen zur Verfügung, ohne zusätzlichen Speicher zu belegen. Sie müssen nichts Besonderes machen (außer dass Sie absolut sicher sind, dass Sie das Objekt nicht verändern).

Die effizienteste Sache Sie kann für Ihr Problem tun wäre, Ihr Array in eine effiziente Array - Struktur zu packen (mit numpy oder array), platziere das im Shared Memory, wickle es mit multiprocessing.Arrayund gib das an deine Funktionen weiter. Diese Antwort zeigt, wie das geht.

Wenn du willst beschreibbar Shared Object, dann müssen Sie es mit einer Art Synchronisation oder Locking umschließen. multiprocessing bietet zwei Methoden, dies zu tun: eine mit Shared Memory (geeignet für einfache Werte, Arrays oder Ctypes) oder a Manager Proxy, wo ein Prozess den Speicher hält und ein Manager den Zugriff darauf von anderen Prozessen (sogar über ein Netzwerk) vermittelt.

Das Manager approach kann mit beliebigen Python-Objekten verwendet werden, ist aber langsamer als das Äquivalent mit Shared Memory, da die Objekte serialisiert / deserialisiert und zwischen Prozessen gesendet werden müssen.

Es gibt a Vielzahl von Bibliotheken und Ansätzen für die Parallelverarbeitung, die in Python verfügbar sind. multiprocessing ist eine ausgezeichnete und gut abgerundete Bibliothek, aber wenn Sie spezielle Bedürfnisse haben, vielleicht einer der anderen Ansätze möglicherweise besser sein.


75
2018-05-23 16:42



Ich stieß auf das gleiche Problem und schrieb eine kleine Shared-Memory-Utility-Klasse, um es zu umgehen.

Ich benutze multiprocessing.RawArray (lockfree), und auch der Zugriff auf die Arrays ist überhaupt nicht synchronisiert (lockfree), achten Sie darauf, nicht Ihre eigenen Füße zu schießen.

Mit der Lösung erhalte ich auf einem Quad-Core i7 Beschleunigungen um den Faktor 3.

Hier ist der Code: Fühlen Sie sich frei, es zu benutzen und zu verbessern, und melden Sie bitte irgendwelche Fehler zurück.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

11
2018-05-15 10:55