Frage Peinliche parallele Probleme mit Python-Multiprocessing lösen


Wie benutzt man? Mehrfachverarbeitung anpacken peinlich parallele Probleme?

Schrecklich parallele Probleme bestehen typischerweise aus drei grundlegenden Teilen:

  1. Lesen Eingabedaten (aus einer Datei, Datenbank, TCP-Verbindung, etc.).
  2. Lauf Berechnungen zu den Eingangsdaten, wo jede Berechnung ist unabhängig von einer anderen Berechnung.
  3. Schreiben Ergebnisse von Berechnungen (zu einer Datei, Datenbank, TCP-Verbindung, etc.).

Wir können das Programm in zwei Dimensionen parallelisieren:

  • Teil 2 kann auf mehreren Kernen laufen, da jede Berechnung unabhängig ist; Reihenfolge der Verarbeitung spielt keine Rolle.
  • Jeder Teil kann unabhängig voneinander laufen. Teil 1 kann Daten in eine Eingabewarteschlange stellen, Teil 2 kann Daten aus der Eingabewarteschlange ziehen und Ergebnisse in eine Ausgabewarteschlange stellen, und Teil 3 kann Ergebnisse aus der Ausgabewarteschlange ziehen und aufschreiben.

Dies scheint ein grundlegendes Muster bei der gleichzeitigen Programmierung zu sein, aber ich bin immer noch verloren, wenn ich versuche, es zu lösen Lassen Sie uns ein kanonisches Beispiel schreiben, um zu veranschaulichen, wie dies mit Multiprozessing gemacht wird.

Hier ist das Beispielproblem: Gegeben ein CSV-Datei mit Zeilen von ganzen Zahlen als Eingabe berechnen Sie ihre Summen. Trennen Sie das Problem in drei Teile, die alle parallel laufen können:

  1. Verarbeitet die Eingabedatei in Rohdaten (Listen / Iterables von Ganzzahlen)
  2. Berechnen Sie die Summen der Daten parallel
  3. Ausgabe der Summen

Im Folgenden finden Sie ein traditionelles Python-Programm mit einem Prozess, das diese drei Aufgaben löst:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

Nehmen wir dieses Programm und schreiben es um, um Multiprocessing zu verwenden, um die drei oben beschriebenen Teile zu parallelisieren. Unten ist ein Skelett dieses neuen, parallelisierten Programms, das ausgearbeitet werden muss, um die Teile in den Kommentaren zu adressieren:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

Diese Teile des Codes sowie ein weiterer Code, mit dem Beispiel-CSV-Dateien generiert werden können zu Testzwecken, kann sein gefunden auf GitHub.

Ich würde mich über jede Einsicht freuen, wie Sie sich mit Nebenläufigkeitsgurus diesem Problem nähern würden.


Hier sind einige Fragen, die ich hatte, als ich über dieses Problem nachdachte. Bonuspunkte für beliebige / alle:

  • Sollte ich untergeordnete Prozesse haben, um die Daten einzulesen und in die Warteschlange zu stellen, oder kann der Hauptprozess dies ohne Blockierung tun, bis alle Eingaben gelesen sind?
  • Ebenso, sollte ich einen untergeordneten Prozess haben, um die Ergebnisse aus der verarbeiteten Warteschlange zu schreiben, oder kann der Hauptprozess dies tun, ohne auf alle Ergebnisse warten zu müssen?
  • Sollte ich ein Prozesse Pool für die Summenoperationen?
    • Wenn ja, welche Methode rufe ich den Pool auf, damit er mit der Verarbeitung der in die Eingabewarteschlange eingehenden Ergebnisse beginnt, ohne die Eingabe- und Ausgabeprozesse zu blockieren? apply_async ()? map_async ()? imap ()? imap_unordered ()?
  • Angenommen, wir mussten die Eingabe- und Ausgabewarteschlangen nicht als eingegebene Daten absaugen, sondern konnten warten, bis alle Eingaben analysiert wurden und alle Ergebnisse berechnet wurden (z. B. weil wir wissen, dass alle Eingaben und Ausgaben in den Systemspeicher passen). Sollten wir den Algorithmus in irgendeiner Weise ändern (z. B. keine Prozesse gleichzeitig mit I / O ausführen)?

76
2018-03-01 21:38


Ursprung


Antworten:


Meine Lösung hat eine zusätzliche Glocke und Pfeife, um sicherzustellen, dass die Reihenfolge der Ausgabe die gleiche wie die Reihenfolge der Eingabe ist. Ich benutze multiprocessing.queues, um Daten zwischen Prozessen zu senden, stopp Nachrichten zu senden, so dass jeder Prozess weiß, die Warteschlangen zu überprüfen. Ich denke, die Kommentare in der Quelle sollten klarstellen, was vor sich geht, aber wenn nicht, lass es mich wissen.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

64
2018-03-02 16:16



Ich merke, dass ich ein bisschen spät für die Party bin, aber ich habe kürzlich entdeckt GNU parallelund zeigen möchten, wie einfach es ist, diese typische Aufgabe damit zu bewältigen.

cat input.csv | parallel ./sum.py --pipe > sums

So etwas wird es tun sum.py:

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

Parallel wird ausgeführt sum.py für jede Zeile in input.csv (parallel, natürlich), dann geben Sie die Ergebnisse an sums. Klar besser als multiprocessing Ärger


5
2017-08-23 11:00



Zu spät zur Party kommen ...

Joblib hat eine Schicht über Multiprocessing, um parallele for-Schleifen zu erstellen. Es bietet Ihnen Funktionen wie eine faule Verteilung von Jobs und eine bessere Fehlerberichterstattung zusätzlich zu seiner sehr einfachen Syntax.

Als Haftungsausschluss bin ich der ursprüngliche Autor von joblib.


5
2018-01-15 08:13



Alte Schule.

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

Hier ist die Multi-Processing-Endstruktur.

python p1.py | python p2.py | python p3.py

Ja, die Shell hat diese auf OS-Ebene zusammengestrickt. Es scheint mir einfacher zu sein und es funktioniert sehr gut.

Ja, es gibt etwas mehr Aufwand bei der Verwendung von Pickle (oder cPickle). Die Vereinfachung scheint jedoch die Mühe wert.

Wenn Sie möchten, dass der Dateiname ein Argument ist p1.pyDas ist eine leichte Veränderung.

Noch wichtiger ist, dass eine Funktion wie die folgende sehr nützlich ist.

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

Das ermöglicht Ihnen Folgendes:

for item in get_stdin():
     process item

Das ist sehr einfach, tut es aber nicht leicht Damit können Sie mehrere Kopien von P2.py ausführen.

Sie haben zwei Probleme: Fan-Out und Fan-In. Das P1.py muss sich irgendwie auf mehrere P2.py's verteilen. Und die P2.pys müssen ihre Ergebnisse irgendwie in einen einzigen P3.py zusammenführen.

Der Old-School-Ansatz für Fan-Out ist eine "Push" -Architektur, die sehr effektiv ist.

Theoretisch sind mehrere P2.pys, die aus einer gemeinsamen Warteschlange ziehen, die optimale Zuweisung von Ressourcen. Dies ist oft ideal, aber es ist auch eine Menge Programmierung. Ist die Programmierung wirklich notwendig? Oder wird die Round-Robin-Verarbeitung gut genug sein?

Praktisch, Sie werden feststellen, dass P1.py einen einfachen "Round-Robin" -Tausch zwischen mehreren P2.pys machen kann. Sie hätten P1.py für die Verarbeitung konfiguriert n Kopien von P2.py über Named Pipes. Die P2.pys würden jeweils von ihrer passenden Pipe lesen.

Was passiert, wenn ein P2.py alle "worst case" Daten bekommt und weit hinterher läuft? Ja, Round-Robin ist nicht perfekt. Aber es ist besser als nur eine P2.py und Sie können diese Verzerrung mit einfacher Randomisierung adressieren.

Fan-in von mehreren P2.py's zu einem P3.py ist ein bisschen komplexer, immer noch. An diesem Punkt hört der Old-School-Ansatz auf, vorteilhaft zu sein. P3.py muss von mehreren Named Pipes mit Hilfe des select Bibliothek zum Verschachteln der Lesevorgänge.


4
2018-03-01 21:55



Es ist wahrscheinlich möglich, auch in Teil 1 etwas Parallelität einzuführen. Wahrscheinlich kein Problem mit einem Format, das so einfach wie CSV ist, aber wenn die Verarbeitung der Eingabedaten merklich langsamer ist als das Lesen der Daten, könnten Sie größere Teile lesen und weiterlesen, bis Sie ein "Zeilentrennzeichen" finden ( Newline im CSV-Fall, aber auch das hängt vom gelesenen Format ab; funktioniert nicht, wenn das Format ausreichend komplex ist).

Diese Chunks, von denen jeder wahrscheinlich mehrere Einträge enthält, können dann an eine Menge paralleler Prozesse übergeben werden, die Jobs aus einer Warteschlange lesen, wo sie geparst und aufgeteilt werden und dann in die Warteschlange für Stufe 2 gelegt werden.


0
2018-03-10 16:39