Frage Java & RabbitMQ - Queuing & Multithreading - oder Couchbase als Job-Queue


Ich habe eine Job Distributor wer veröffentlicht Nachrichten auf verschiedenen Channels.

Außerdem möchte ich zwei (und mehr in der Zukunft) haben Consumers die an verschiedenen Aufgaben arbeiten und auf verschiedenen Maschinen laufen. (Derzeit habe ich nur einen und muss es skalieren)

Lassen Sie uns diese Aufgaben benennen (nur Beispiele):

  • FIBONACCI (erzeugt Fibonacci-Zahlen)
  • RANDOMBOOKS (erzeugt zufällige Sätze, um ein Buch zu schreiben)

Diese Aufgaben laufen bis zu 2-3 Stunden und sollte für jeden gleich aufgeteilt werden Consumer.

Jeder Verbraucher kann haben x  parallel Threads zum Bearbeiten dieser Aufgaben. Also sage ich: (diese Zahlen sind nur Beispiele und werden durch Variablen ersetzt)

  • Maschine 1 kann 3 verbrauchen parallel Jobs für FIBONACCI und 5 parallel Jobs für RANDOMBOOKS
  • Maschine 2 kann 7 verbrauchen parallel Jobs für FIBONACCI und 3 parallel Jobs für RANDOMBOOKS

Wie kann ich das erreichen?

Muss ich anfangen? x Threads für jeden Channel auf jedem zuhören Consumer ?

Wann muss ich das bestätigen?

Mein aktueller Ansatz für nur einen Consumer ist: Start x Threads für jede Aufgabe - jeder Thread ist ein Defaultconsumer, der implementiert Runnable. In dem handleDelivery Methode, rufe ich an basicAck(deliveryTag,false) und dann die Arbeit machen.

Weiter: Ich möchte einige Aufgaben an einen speziellen Verbraucher senden. Wie kann ich das in Kombination mit der oben erwähnten fairen Verteilung erreichen?

Das ist mein Code für publishing

String QUEUE_NAME = "FIBONACCI";

Channel channel = this.clientManager.getRabbitMQConnection().createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

channel.basicPublish("", QUEUE_NAME,
                MessageProperties.BASIC,
                Control.getBytes(this.getArgument()));

channel.close();

Dies ist mein Code für die Consumer

public final class Worker extends DefaultConsumer implements Runnable {
    @Override
    public void run() {

        try {
            this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
            this.getChannel().basicConsume(this.jobType.toString(), this);

            this.getChannel().basicQos(1);
        } catch (IOException e) {
            // catch something
        }
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Control.getLogger().error("Exception!", e);
            }

        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        this.getChannel().basicAck(deliveryTag, false); // Is this right?
        // Start new Thread for this task with my own ExecutorService

    }
}

Die Klasse Worker wird in diesem Fall zweimal gestartet: Einmal für FIBUNACCI und einmal für RANDOMBOOKS

AKTUALISIEREN

Wie die Antworten zeigen, wäre RabbitMQ nicht die beste Lösung dafür, aber ein Couchbase- oder MongoDB-Pull-Ansatz wäre am besten. Ich bin neu in diesen Systemen, kann mir irgendjemand erklären, wie das erreicht werden könnte?


10
2018-03-13 23:45


Ursprung


Antworten:


Hier ist eine konzeptionelle Ansicht, wie ich das auf Couchbase bauen würde.

  1. Sie haben eine gewisse Anzahl von Maschinen, um Jobs zu bearbeiten, und einige Maschinen (vielleicht die gleichen), um Jobs zu erstellen.
  2. Sie können ein Dokument für jeden Job in einem Bucket in CouchBase erstellen (und seinen Typ auf "Job" oder etwas einstellen, wenn Sie ihn mit anderen Daten in diesem Bucket mischen).
  3. Jede Jobbeschreibung kann zusammen mit den spezifischen Befehlen, die ausgeführt werden müssen, die Zeit, zu der sie erstellt wurde, die Fälligkeitszeit (wenn eine bestimmte Zeit fällig ist) und eine Art von erzeugtem Arbeitswert enthalten. Dieser Arbeitswert wäre willkürliche Einheiten.
  4. Jeder Verbraucher von Arbeitsplätzen weiß, wie viele Arbeitseinheiten er gleichzeitig ausführen kann und wie viele Arbeitsbereiche verfügbar sind (weil möglicherweise andere Arbeitnehmer arbeiten).
  5. So würde eine Maschine mit beispielsweise 10 Arbeitseinheiten Kapazität, die 6 Arbeitseinheiten hat, eine Anfrage machen, nach Jobs von 4 Arbeitseinheiten oder weniger zu suchen.
  6. In couchbase gibt es Ansichten, die inkrementell aktualisiert werden, um Jobs zu map / reduce zu machen, ich denke du brauchst nur die Map Phase hier. Sie würden eine Ansicht schreiben, in der Sie nach der Fälligkeit, der Zeit, die in das System eingegeben wurde, und nach der Anzahl der Arbeitseinheiten fragen können. Auf diese Weise können Sie "den überfälligsten Job von 4 Arbeitseinheiten oder weniger" bekommen.
  7. Diese Art von Abfrage wird, sobald die Kapazität frei wird, die am meisten überfälligen Jobs zuerst erhalten, obwohl Sie den größten überfälligen Job erhalten könnten, und wenn es keinen gibt, dann den größten nicht überfälligen Job. ("Überfällig" ist das Delta zwischen der aktuellen Zeit und dem Fälligkeitsdatum des Auftrags.)
  8. Couchbase-Ansichten ermöglichen sehr anspruchsvolle Abfragen wie diese. Und während sie inkrementell aktualisiert werden, sind sie nicht perfekt in Echtzeit. Sie suchen also nicht nach einem einzigen Job, sondern nach einer Liste von Stellenbewerbern (geordnet nach Ihren Wünschen).
  9. Der nächste Schritt wäre also, die Liste der Job-Kandidaten zu nehmen und einen zweiten Speicherort zu prüfen - möglicherweise einen Member-Bucket (z. B. RAM-Cache, nicht persistent) für eine Sperrdatei. Die Sperrdatei würde mehrere Phasen haben (hier machen Sie ein bisschen Partitionsauflösungslogik mit CRDTs oder anderen Methoden, die für Ihre Anforderungen am besten geeignet sind.)
  10. Da dieser Bucket RAM-basiert ist, ist er schneller als Sichten und wird weniger Verzögerung vom Gesamtzustand haben. Wenn keine Sperrdatei vorhanden ist, erstellen Sie eine Datei mit dem Statusflag "provisorisch".
  11. Wenn ein anderer Worker denselben Job erhält und die Sperrdatei sieht, kann er diesen Jobkandidaten überspringen und den nächsten auf der Liste ausführen.
  12. Wenn zwei Arbeiter versuchen, Sperrdateien für denselben Job zu erstellen, tritt ein Konflikt auf. Im Falle eines Konflikts können Sie einfach kicken. Oder Sie können eine Logik verwenden, bei der jeder Worker eine Aktualisierung der Sperrdatei vornimmt (CRDT-Auflösung, so dass diese Idempotente so erstellt werden, dass Geschwister zusammengeführt werden können), wobei möglicherweise eine Zufallszahl oder eine Prioritätszahl eingegeben wird.
  13. Nach einer bestimmten Zeit (wahrscheinlich ein paar Sekunden) wird die Sperrdatei vom Arbeiter überprüft, und wenn sie keine Änderungen der Rennauflösung vornehmen musste, ändert sie den Status der Sperrdatei von "vorläufig" auf "genommen" "
  14. Es aktualisiert dann den Job selbst mit dem Status "genommen" oder so, dass er nicht in den Ansichten angezeigt wird, wenn andere Worker nach verfügbaren Jobs suchen.
  15. Schließlich möchten Sie noch einen weiteren Schritt hinzufügen. Bevor Sie die Abfrage ausführen, um die oben beschriebenen Job-Kandidaten zu erhalten, führen Sie eine spezielle Abfrage durch, um Jobs zu finden, die ausgeführt wurden, aber an denen der betreffende Mitarbeiter gestorben ist. (zB: Jobs, die überfällig sind).
  16. Eine Möglichkeit zu wissen, wann Arbeiter sterben, ist, dass die Sperrdatei, die in den Member Bucket gestellt wird, eine Ablaufzeit haben sollte, die sie schließlich verschwinden lässt. Möglicherweise kann diese Zeit kurz sein und der Mitarbeiter berührt sie einfach, um den Ablauf zu aktualisieren (dies wird in der Couchbase-API unterstützt).
  17. Wenn ein Arbeiter stirbt, verschwinden schließlich seine Sperrdateien und die verwaisten Jobs werden als "genommen" markiert, aber ohne Sperrdatei, was eine Bedingung ist, nach der die Arbeiter, die nach Jobs suchen, suchen können.

Zusammengefasst führt jeder Worker eine Abfrage nach verwaisten Jobs durch. Wenn es welche gibt, prüft er, ob für ihn eine Sperrdatei vorhanden ist, und wenn keiner von ihnen einen erstellt, folgt er dem normalen Sperrprotokoll wie oben. Wenn keine verwaisten Jobs vorhanden sind, sucht es nach überfälligen Jobs und folgt dem Sperrprotokoll. Wenn keine überfälligen Jobs vorhanden sind, nimmt es nur den ältesten Job und folgt dem Sperrprotokoll.

Natürlich funktioniert das auch, wenn es für Ihr System keine "Überfälligkeit" gibt, und wenn Pünktlichkeit keine Rolle spielt, können Sie anstelle des ältesten Jobs eine andere Methode verwenden.

Eine andere Methode könnte darin bestehen, einen zufälligen Wert zwischen 1-N zu erzeugen, wobei N eine ziemlich große Zahl ist, etwa 4 × die Anzahl der Arbeiter, und jeder Auftrag mit diesem Wert markiert werden soll. Jedes Mal, wenn ein Arbeiter nach einem Job sucht, könnte er die Würfel werfen und sehen, ob es Jobs mit dieser Nummer gibt. Wenn nicht, würde es dies wieder tun, bis es einen Job mit dieser Nummer findet. Auf diese Weise würden statt mehrerer Arbeiter, die um die wenigen "ältesten" Jobs oder Jobs mit der höchsten Priorität streiten, und die Wahrscheinlichkeit von Sperrkonflikten erhöht werden, und zwar auf Kosten der Zeit in der Warteschlange, die zufälliger als eine FIFO-Situation ist .

Die Zufallsmethode kann auch in Situationen angewendet werden, in denen Lastwerte berücksichtigt werden müssen (damit eine einzelne Maschine nicht zu viel Last aufnimmt) und anstatt den ältesten Kandidaten zu nehmen, nehmen Sie einfach einen zufälligen Kandidaten aus der Liste der lebensfähigen Arbeitsplätze und versuchen Sie es zu tun.

Bearbeiten zum Hinzufügen:

In Schritt 12, wo ich sage "möglicherweise eine Zufallszahl eingeben", was ich meine, ist, wenn die Arbeiter die Priorität kennen (zB: welche Person muss die Arbeit am meisten erledigen), können sie eine Figur in die Datei stellen. Wenn es kein Konzept gibt, den Job "zu brauchen", dann können beide die Würfel werfen. Sie aktualisieren diese Datei mit ihrer Rolle des Würfels. Dann können sie beide sehen und sehen, was der andere rollte. Wenn sie verloren haben, dann kicken sie und der andere Arbeiter weiß, dass er es hat. Auf diese Weise können Sie feststellen, welcher Mitarbeiter eine Aufgabe ohne viel komplexes Protokoll oder Verhandlung übernimmt. Ich gehe davon aus, dass beide Arbeiter die gleiche Lock-Datei hier treffen, könnte es mit zwei Lock-Dateien und einer Abfrage implementiert werden, die alle von ihnen findet. Wenn nach einer gewissen Zeit kein Arbeiter eine höhere Zahl gerollt hat (und neue Arbeiter, die darüber nachdenken, dass der Job schon weiß, dass andere bereits darauf rollen, damit sie ihn überspringen), können Sie den Job sicher annehmen, da Sie der einzige Arbeiter sind arbeiten daran.


7
2018-03-14 00:26



Zuerst möchte ich sagen, dass ich Java nicht für die Kommunikation mit RabbitMQ verwendet habe, so dass ich keine Code-Beispiele bereitstellen kann. Das sollte jedoch kein Problem sein, da Sie nicht danach gefragt haben. Diese Frage bezieht sich mehr auf das allgemeine Design Ihrer Anwendung.

Lasst uns ein bisschen abbrechen, denn hier gibt es viele Fragen.

Aufteilen der Aufgaben auf verschiedene Konsumenten

Nun, eine Möglichkeit ist Round-Robin zu verwenden, aber das ist ziemlich grob und berücksichtigt nicht, dass unterschiedliche Aufgaben unterschiedlich lange dauern können. Also was zu tun ist. Nun, eine Möglichkeit, dies zu tun, ist das Setzen der prefetch zu 1. Prefetch bedeutet, dass der Verbraucher die Nachrichten lokal zwischenspeichert (Hinweis: die Nachricht ist noch nicht verbraucht). Wenn Sie diesen Wert auf 1 setzen, wird kein Vorabruf ausgeführt. Dies bedeutet, dass Ihr Kunde nur die Nachricht kennt und nur die Nachricht hat, an der er gerade arbeitet. Dies ermöglicht es, Nachrichten nur zu empfangen, wenn der Worker im Leerlauf ist.

Wann zu bestätigen

Mit der oben beschriebenen Konfiguration ist es möglich, eine Nachricht aus der Warteschlange zu lesen, sie an einen Ihrer Threads weiterzugeben und dann die Nachricht zu bestätigen. Tun Sie dies für alle verfügbaren Threads -1. Sie möchten die letzte Nachricht nicht bestätigen, da dies bedeutet, dass Sie sich öffnen, um eine weitere Nachricht zu erhalten, die Sie noch nicht an einen Ihrer Mitarbeiter weitergeben können. Wenn einer der Threads beendet ist, dh wenn Sie diese Nachricht bestätigen, haben Sie Ihre Threads immer mit etwas zu tun.

Weitergabe spezieller Nachrichten

Das hängt davon ab, was Sie tun wollen, aber im Allgemeinen würde ich sagen, dass Ihre Produzenten wissen sollten, was sie weitergeben. Dies bedeutet, dass Sie es an eine bestimmte Vermittlungsstelle oder vielmehr mit einem bestimmten Routing-Schlüssel senden könnten, der diese Nachricht an eine geeignete Warteschlange weitergibt, die einen Verbraucher hören lässt, der weiß, was mit dieser Nachricht zu tun ist.

Ich empfehle Ihnen, AMQP und RabbitMQ zu lesen, das könnte gut sein Startpunkt.

Vorbehalte

Es gibt einen großen Fehler in meinem Vorschlag und in Ihrem Design, und das ist es, was wir tun ACK die Nachricht, bevor wir mit der Verarbeitung fertig sind. Dies bedeutet, dass wir, wenn (nicht wenn) unsere Anwendung crackt, keine Möglichkeit haben, die ACKed Mitteilungen. Dies könnte gelöst werden, wenn Sie wissen, wie viele Threads Sie im Voraus starten werden. Ich weiß nicht, ob Sie die Prefetch-Anzahl dynamisch ändern können, aber irgendwie bezweifle ich das.

Einige Gedanken

Von meiner, wenn auch begrenzten, Erfahrung mit RabbitMQ sollten Sie keine Angst davor haben, Austausch und Warteschlangen zu schaffen, diese können Ihr Anwendungsdesign erheblich verbessern und vereinfachen, wenn Sie es richtig machen. Vielleicht solltest du keine Anwendung haben, die eine Reihe von Consumer-Threads startet. Stattdessen möchten Sie vielleicht eine Art Wrapper haben, der die Konsumenten basierend auf dem verfügbaren Speicher in Ihrem System oder ähnlichem startet. Wenn Sie das tun, können Sie sicherstellen, dass keine Nachrichten verloren gehen, sollte Ihre Anwendung abstürzen, denn wenn Sie dies tun, werden Sie natürlich die Nachricht bestätigen, wenn Sie damit fertig sind.

Literatur-Empfehlungen

Lassen Sie mich wissen, wenn etwas unklar ist, oder wenn ich Ihren Punkt vermisse und ich werde versuchen, meine Antwort zu erweitern oder sie zu verbessern, wenn ich kann.


5
2017-09-05 09:21



Hier sind meine Gedanken zu Ihrer Frage. Wie @Daniel in seiner Antwort erwähnt, glaube ich, dass dies eher eine Frage der architektonischen Prinzipien als der Umsetzung ist. Sobald die Architektur klar ist, wird die Implementierung trivial.

Zunächst möchte ich etwas in Zusammenhang mit der Terminplanungstheorie behandeln. Sie haben hier sehr langwierige Aufgaben, und wenn sie nicht ordnungsgemäß geplant werden, werden Sie entweder (a) Ihre Server mit weniger als der vollen Kapazität ausführen oder (b) viel länger dauern, um die Aufgaben zu beenden, als es sonst möglich wäre . Also, ich habe ein paar Fragen an Sie bezüglich Ihres Planungsparadigmas:

  1. Können Sie abschätzen, wie lange jeder Job dauert?
  2. Haben die Jobs ein Fälligkeitsdatum, und wenn ja, wie wird es bestimmt?

Ist RabbitMQ in diesem Fall geeignet?

Ich glaube nicht, dass RabbitMQ die richtige Lösung ist, um extrem lang laufende Jobs zu versenden. In der Tat, ich denke, Sie haben diese Fragen aufgrund der Tatsache, dass RabbitMQ nicht das richtige Werkzeug für den Job ist. Standardmäßig haben Sie nicht genügend Einblick in die Jobs, bevor Sie sie aus der Warteschlange entfernen, um festzustellen, welche als nächstes verarbeitet werden sollen. Zweitens, wie in @ Daniels Antwort erwähnt, wird es Ihnen wahrscheinlich nicht möglich sein, den eingebauten ACK-Mechanismus zu verwenden, da es wahrscheinlich schlecht für einen Job wäre, wenn die Verbindung zum RabbitMQ-Server fehlschlägt.

Stattdessen würde ich nach etwas wie MongoDB oder Couchbase suchen, um Ihre "Warteschlange" für Jobs zu speichern. Dann können Sie die Dispatching-Logik vollständig steuern, anstatt sich auf das integrierte Round-Robin-Verfahren von RabbitMQ verlassen zu müssen.

Andere Überlegungen:

Außerdem möchte ich zwei (und mehr in der Zukunft) Verbraucher haben, die an verschiedenen Aufgaben arbeiten und auf verschiedenen Maschinen laufen. (Derzeit habe ich nur einen und muss es skalieren)

In diesem Fall glaube ich nicht, dass Sie einen Push-basierten Verbraucher verwenden möchten. Verwenden Sie stattdessen ein Pull-basiertes System (in RabbitMQ würde dies Basic.Get heißen). Auf diese Weise würden Sie die Verantwortung für die Jobplanung übernehmen

Consumer 1 hat 3 Threads für FIBONACCI und 5 Threads für RANDOMBOOKS.   Consumer 2 hat 7 Themen für FIBONACCI und 3 Themen für RANDOMBOOKS.   Wie kann ich das erreichen?

In diesem Fall bin ich mir nicht sicher, ob ich das verstehe. Hast du eins fibonacci Job, und Sie führen es irgendwie parallel auf Ihrem Server aus? Oder möchten Sie, dass Ihr Server VIELE ausführt? fibonacci Jobs gleichzeitig? Angenommen, Sie würden die Threads für die Arbeit auf dem Server erstellen und ihnen Jobs zuweisen, bis alle Ihre Threads voll sind. Wenn ein Thread verfügbar wird, werden Sie die Warteschlange für einen anderen Job abfragen, um zu starten.

Andere Fragen, die Sie hatten:

  • Muss ich x Threads für jeden Kanal starten, um auf jedem Consumer zu hören?
  • Wann muss ich das bestätigen?
  • Mein derzeitiger Ansatz für nur einen Consumer ist: Start x Threads für jeden
  • Aufgabe - Jeder Thread ist ein DefaultConsumer, der Runnable implementiert. In der Methode handleDelivery rufe ich basicAck (deliveryTag, false) auf und mache dann die Arbeit.
  • Weiter: Ich möchte einige Aufgaben an einen speziellen Verbraucher senden. Wie kann ich das in Kombination mit der oben erwähnten fairen Verteilung erreichen?

Ich glaube, dass die oben genannten Fragen keine Fragen mehr sein werden, sobald Sie die Dispatching-Verantwortlichkeit von RabbitMQ-Server auf Ihre einzelnen Verbraucher übertragen, wie oben erwähnt (und durch Verbraucher, ich meine Threads zu konsumieren). Außerdem, wenn Sie etwas mehr datenbankgesteuert (sagen Couchbase) verwenden, können Sie diese Dinge selbst programmieren und Sie können die volle Kontrolle über die Logik haben.

Couchbase verwenden

Obwohl eine detaillierte Erklärung, wie Couchbase als Warteschlange verwendet wird, den Rahmen dieser Frage sprengt, kann ich einige Hinweise geben.

  • Zuerst werden Sie sich darüber informieren wollen Couchbase
  • Ich empfehle, Jobs in einem Couchbase-Bucket zu speichern und auf eine indizierte Sicht zu vertrauen, um die verfügbaren Jobs aufzulisten. Es gibt viele Möglichkeiten, einen Schlüssel für jeden Job zu definieren, aber der Job selbst muss in JSON serialisiert werden. Vielleicht benutzen ServiceStack.Text
  • Wenn ein Job zur Verarbeitung abgerufen wird, muss eine Logik vorhanden sein, um den Status des Jobs in Couchbase zu kennzeichnen. Sie müssen ein verwenden CAS Methode um sicherzustellen, dass jemand anderes den Job zur gleichen Zeit nicht bearbeitet hat.
  • Sie benötigen eine Art von Richtlinie zum Löschen fehlgeschlagener und abgeschlossener Jobs aus Ihrer Warteschlange.

Zusammenfassung

  1. Verwenden Sie RabbitMQ nicht dafür
  2. Verwenden Sie die Parameter jedes Jobs, um einen intelligenten Dispatching-Algorithmus zu entwickeln. Ich kann Ihnen dabei helfen, wenn ich mehr über die Art Ihrer Arbeit weiß.
  3. Ziehen Sie Jobs auf Basis des Algorithmus in # 2 in die Worker, anstatt sie vom Server zu schieben.
  4. Machen Sie sich mit Ihrem eigenen Weg vertraut, um den Status von Jobs in Ihrem System zu verfolgen (in der Warteschlange, ausgeführt, fehlgeschlagen, erfolgreich usw.) und wann / ob Sie angehaltene Jobs erneut versenden.

3



Wenn Sie eine Feder verwenden oder eine Feder verwenden möchten, können Sie die Halterung des Federhörers verwenden, um dies zu erreichen. Das wird Ihnen ein ähnliches Callback-Programmiermodell bieten, nach dem Sie suchen.

Beispielcode von der Spring AMQP Referenzdokumentation

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

1



Vor kurzem habe ich den Zweig bug18384 gedrückt, der die Art und Weise ändert, wie Callbacks an Consumer-Implementierungen gesendet werden.

Nach dieser Änderung behält die Verbindung einen Versand-Thread bei, der zum Senden von Rückrufen an die Konsumenten verwendet wird. Auf diese Weise werden die Methoden zum Blockieren von Anrufen durch den Verbraucher für die Verbindung und den Kanal freigegeben.

Auf Twitter wurde eine Frage gestellt, wie man das konfigurierbar machen kann, indem man einen benutzerdefinierten Executor in die ConnectionFactory einfügt. Ich wollte skizzieren, warum das kompliziert ist, eine mögliche Implementierung diskutieren und sehen, ob es viel Interesse gibt.

Zunächst sollten wir festlegen, dass jeder Verbraucher nur Callbacks in einem einzigen Thread erhalten soll. Wenn dies nicht der Fall ist, entsteht Chaos und die Verbraucher müssen sich um ihre eigene Fadensicherheit kümmern, die über die Initialisierungssicherheit hinausgeht.

Mit nur einem einzigen Versandthread für alle Verbraucher wird dieses Consumer-Thread-Pairing einfach eingehalten.

Wenn wir mehrere Threads einführen, müssen wir sicherstellen, dass jeder Consumer mit nur einem Thread gepaart ist. Wenn Sie die Executor-Abstraktion verwenden, verhindert dies, dass jeder Callback-Versand in eine ausführbare Datei eingeschlossen und an Executor gesendet wird, da Sie nicht garantieren können, welcher Thread verwendet wird.

Um dies zu umgehen, kann der Executor so eingestellt werden, dass er 'n' lange laufende Aufgaben ausführt (n ist die Anzahl der Threads im Executor). Jede dieser Aufgaben ruft Dispatch-Anweisungen aus einer Warteschlange ab und führt sie aus. Jeder Verbraucher ist mit einer Dispatch-Instruktions-Warteschlange gepaart, die wahrscheinlich auf einer Round-Robin-Basis zugewiesen ist. Dies ist nicht zu kompliziert und bietet einen einfachen Ausgleich der Versandlast über die Threads im Executor.

Jetzt gibt es noch einige Probleme:

  1. Die Anzahl der Threads in einem Executor ist nicht unbedingt festgelegt (wie bei ThreadPoolExecutor).
  2. Es gibt keine Möglichkeit, über Executor oder ExecutorService herauszufinden, wie viele Threads es gibt. Daher können wir nicht wissen, wie viele Dispatch-Anweisungswarteschlangen erstellt werden sollen.

Allerdings können wir sicherlich eine ConnectionFactory.setDispatchThreadCount (int) einführen. Hinter den Kulissen wird dies eine Executors.newFixedThreadPool () und die korrekte Anzahl von Versandwarteschlangen und Versandaufgaben erstellen.

Ich bin daran interessiert zu hören, wenn jemand denkt, ich übersehe einen einfacheren Weg, um dies zu lösen, und in der Tat, wenn das überhaupt eine Lösung wert ist.


0