Frage C # Produzent / Verbraucher


Ich bin kürzlich auf eine c # Implementierung von Produzenten / Konsumenten gestoßen. es ist sehr einfach und (zumindest für mich) sehr elegant.

es scheint um 2006 herum entwickelt worden zu sein, also habe ich mich gefragt, ob diese Implementierung ist
- sicher
- noch anwendbar

Der Code ist unten (der ursprüngliche Code wurde unter http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

21
2017-11-01 04:36


Ursprung


Antworten:


Der Code ist älter als das - ich schrieb es einige Zeit bevor .NET 2.0 herauskam. Das Konzept einer Erzeuger / Verbraucher-Warteschlange ist Weg älter als das obwohl :)

Ja, dieser Code ist soweit ich weiß sicher - aber er hat einige Mängel:

  • Es ist nicht-generisch. Eine moderne Version wäre sicherlich generisch.
  • Es gibt keine Möglichkeit, die Warteschlange anzuhalten. Eine einfache Möglichkeit, die Warteschlange anzuhalten (so dass alle Consumer-Threads in den Ruhestand gehen), besteht darin, ein Token für "Arbeit stoppen" zu haben, das in die Warteschlange eingereiht werden kann. Sie fügen dann so viele Token hinzu, wie Sie Threads haben. Alternativ haben Sie ein separates Flag, um anzuzeigen, dass Sie anhalten möchten. (Dadurch können die anderen Threads angehalten werden, bevor die aktuelle Arbeit in der Warteschlange beendet wird.)
  • Wenn die Jobs sehr klein sind, ist es oft nicht die effizienteste Aufgabe, einen einzelnen Job zu einem bestimmten Zeitpunkt zu konsumieren.

Die Ideen hinter dem Code sind, um ehrlich zu sein, wichtiger als der Code selbst.


29
2017-11-01 08:04



Sie könnten etwas wie das folgende Code-Snippet machen. Es ist generisch und verfügt über eine Methode zum Einreihen von Nullen (oder eines anderen Flags, das Sie verwenden möchten), um den Worker-Threads zu sagen, dass sie beendet werden sollen.

Der Code stammt von hier: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

23
2017-11-01 08:29



Damals habe ich gelernt, wie Monitor.Wait / Pulse funktioniert (und eine Menge über Threads im Allgemeinen) Artikelserie es ist aus. Also, wie Jon sagt, es hat viel Wert und ist in der Tat sicher und anwendbar.

Ab .NET 4 gibt es jedoch ein Producer-Consumer-Queue-Implementierung im Framework. Ich habe es gerade erst selbst gefunden, aber bis jetzt macht es alles, was ich brauche.


13
2018-05-02 14:10



Warnung: Wenn Sie die Kommentare lesen, werden Sie verstehen, dass meine Antwort falsch ist :)

Es gibt ein mögliches Sackgasse in deinem Code.

Stellen Sie sich den folgenden Fall vor, aus Gründen der Übersichtlichkeit verwendete ich einen Single-Thread-Ansatz, sollte aber leicht in Multi-Thread mit Schlaf konvertiert werden können:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

Bitte lassen Sie mich wissen, ob das keinen Sinn ergibt.

Wenn dies bestätigt wird, lautet die Antwort auf Ihre Frage: "Nein, es ist nicht sicher";) Ich hoffe das hilft.


1
2018-01-22 00:01