Frage Wie verwende ich Threading in Python?


Ich versuche das Threading in Python zu verstehen. Ich habe mir die Dokumentation und Beispiele angeschaut, aber ehrlich gesagt, viele Beispiele sind übermäßig ausgeklügelt und ich habe Probleme, sie zu verstehen.

Wie zeigen Sie deutlich, dass Aufgaben für Multi-Threading aufgeteilt werden?


928
2018-05-17 04:24


Ursprung


Antworten:


Seit diese Frage im Jahr 2010 gestellt wurde, gab es eine echte Vereinfachung, wie man einfaches Multithreading mit Python macht Karte und Schwimmbad.

Der folgende Code stammt aus einem Artikel / Blogbeitrag, den Sie unbedingt auschecken sollten (keine Mitgliedschaft) - Parallelität in einer Zeile: Ein besseres Modell für tägliche Threading-Aufgaben. Ich fasse unten zusammen - es endet nur ein paar Zeilen Code:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

Welches ist die Multithread-Version von:

results = []
for item in my_array:
    results.append(my_function(item))

Beschreibung

Map ist eine coole kleine Funktion und der Schlüssel, um einfach Parallelität in Ihren Python-Code einzufügen. Für diejenigen, die nicht vertraut sind, ist Map etwas, das aus funktionalen Sprachen wie Lisp entfernt wurde. Es ist eine Funktion, die eine andere Funktion einer Sequenz zuordnet.

Map verarbeitet die Iteration über die Sequenz für uns, wendet die Funktion an und speichert alle Ergebnisse am Ende in einer praktischen Liste.

enter image description here


Implementierung

Parallele Versionen der Map-Funktion werden von zwei Bibliotheken zur Verfügung gestellt: Multiprocessing und auch sein wenig bekannter, aber ebenso phantastischer Step-Child: multiprocessing.dummy.

multiprocessing.dummy ist genau das gleiche wie Multiprocessing-Modul, verwendet aber stattdessen Threads (eine wichtige Unterscheidung - Verwenden Sie mehrere Prozesse für CPU-intensive Aufgaben; Threads für (und während) IO):

multiprocessing.dummy repliziert die API der Mehrfachverarbeitung, ist jedoch nicht mehr als ein Wrapper um das Threading-Modul.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Und das Timing resultiert:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Mehrere Argumente übergeben (funktioniert so nur in Python 3.3 und höher):

Um mehrere Arrays zu übergeben:

results = pool.starmap(function, zip(list_a, list_b))

oder um eine Konstante und ein Array zu übergeben:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Wenn Sie eine frühere Version von Python verwenden, können Sie mehrere Argumente übergeben diese Problemumgehung.

(Dank an user136036 für den hilfreichen Kommentar)


1008
2018-02-11 19:53



Hier ein einfaches Beispiel: Sie müssen einige alternative URLs ausprobieren und den Inhalt des ersten zur Antwort zurückgeben.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Dies ist ein Fall, in dem Threading als einfache Optimierung verwendet wird: Jeder Unterpfad wartet auf eine URL, die aufgelöst und beantwortet wird, um den Inhalt in die Warteschlange zu stellen. Jeder Thread ist ein Daemon (wird den Prozess nicht aufrecht erhalten, wenn der Haupt-Thread endet - das ist häufiger als nicht); der Hauptthread startet alle Unterthreads, tut a get in der Warteschlange warten, bis einer von ihnen a put, gibt dann die Ergebnisse aus und beendet sie (wodurch Unterthreads, die möglicherweise noch ausgeführt werden, heruntergefahren werden, da sie Daemon-Threads sind).

Die korrekte Verwendung von Threads in Python ist immer mit E / A-Operationen verbunden (da CPython nicht mehrere Kerne verwendet, um CPU-gebundene Tasks auszuführen, ist der einzige Grund für das Threading der Prozess nicht zu blockieren, während auf einige E / A gewartet wird ). Warteschlangen sind übrigens fast immer die beste Möglichkeit, um die Arbeit an Threads zu verteilen und / oder die Ergebnisse der Arbeit zu sammeln, und sie sind von Natur aus threadsicher, so dass sie Sie davor bewahren, sich um Sperren, Bedingungen, Ereignisse, Semaphore und andere Interaktionen zu sorgen. Thread-Koordination / Kommunikationskonzepte.


672
2018-05-17 04:36



HINWEIS: Für die tatsächliche Parallelisierung in Python sollten Sie die verwenden Mehrfachverarbeitung Modul zur Verzweigung mehrerer Prozesse, die parallel ausgeführt werden (aufgrund der globalen Interpretersperre bieten Python-Threads Interleaving, werden aber tatsächlich seriell und nicht parallel ausgeführt und sind nur nützlich, wenn I / O-Operationen verschachtelt werden).

Wenn Sie jedoch nur nach Interleaving suchen (oder I / O-Operationen durchführen, die trotz der globalen Interpretersperre parallelisiert werden können), dann wird das Threading Modul ist der Ort, um zu starten. Als ein sehr einfaches Beispiel betrachten wir das Problem der Summierung eines großen Bereichs, indem wir Teilbereiche parallel summieren:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Beachten Sie, dass das Obige ein sehr dummes Beispiel ist, da es absolut keine I / O-Operationen ausführt und seriell ausgeführt wird, obwohl es in CPython aufgrund der globalen Interpretersperre verschachtelt ist (mit zusätzlichem Overhead der Kontextumschaltung).


226
2018-05-17 04:35



Wie bereits erwähnt, kann CPython Threads nur für I \ O Waits aufgrund von GIL verwenden. Wenn Sie von mehreren Kernen für CPU-gebundene Aufgaben profitieren möchten, verwenden Sie Mehrfachverarbeitung:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

86
2018-03-08 22:22



Nur eine Anmerkung, Warteschlange ist nicht für Threading erforderlich.

Dies ist das einfachste Beispiel, das ich mir vorstellen kann, dass 10 Prozesse gleichzeitig ausgeführt werden.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

84
2017-09-23 16:07



Die Antwort von Alex Martelli hat mir geholfen, aber hier ist eine modifizierte Version, die ich für nützlicher hielt (zumindest für mich).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

38
2017-10-01 15:50



Ich fand das sehr nützlich: Erzeuge so viele Threads wie Cores und lasse sie eine (große) Anzahl von Aufgaben ausführen (in diesem Fall ein Shell-Programm aufrufen):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

19
2018-06-06 23:51



Das perfekte Beispiel für Threading ist für mich die Überwachung von asynchronen Ereignissen. Schau dir diesen Code an.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Sie können mit diesem Code spielen, indem Sie eine IPython-Sitzung öffnen und Folgendes tun:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Warte ein paar Minuten

>>>a[0] = 2
Mon = 2

15
2018-04-14 04:18



Gegeben eine Funktion, f, fädle es so:

import threading
threading.Thread(target=f).start()

Argumente übergeben f

threading.Thread(target=f, args=(a,b,c)).start()

15
2018-03-16 16:07



Python 3 hat die Möglichkeit von Parallele Aufgaben starten. Dies erleichtert unsere Arbeit.

Es hat für Thread-Pooling und Prozess-Pooling.

Folgendes gibt einen Einblick:

ThreadPoolExecutor Beispiel

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

14
2017-07-20 11:17



Mit dem flammenden neuen concurrent.futures Modul

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

Der Executor-Ansatz mag all denen bekannt vorkommen, die sich vorher mit Java die Hände schmutzig gemacht haben.

Auch eine Randnotiz: Um das Universum gesund zu halten, vergessen Sie nicht, Ihre Pools / Executors zu schließen, wenn Sie nicht verwenden with Kontext (was so großartig ist, dass es das für dich tut)


13
2017-10-29 21:42