Frage Wie bekomme ich ThreadPoolExecutor, um die Threads vor dem Einreihen in die Warteschlange zu erhöhen?


Ich war einige Zeit mit dem Standardverhalten von frustriert ThreadPoolExecutor was unterstützt die ExecutorService Thread-Pools, die so viele von uns verwenden. Um aus den Javadocs zu zitieren:

Wenn mehr als corePoolSize aber weniger als maximumPoolSize-Threads ausgeführt werden, wird ein neuer Thread erstellt nur wenn die Warteschlange voll ist.

Dies bedeutet, dass wenn Sie einen Thread-Pool mit dem folgenden Code definieren noch nie starte den 2. Thread, weil der LinkedBlockingQueue ist unbegrenzt.

ExecutorService threadPool =
    new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Nur wenn du eine hast begrenzt Warteschlange und die Warteschlange ist voll Sind irgendwelche Threads über der Core-Nummer gestartet? Ich vermute, dass eine große Anzahl von Junior-Java-Multithread-Programmierern dieses Verhalten der ThreadPoolExecutor.

Jetzt habe ich einen spezifischen Anwendungsfall, wo dies nicht optimal ist. Ich suche nach Wegen, ohne meine eigene TPE-Klasse zu schreiben, um es zu umgehen.

Meine Anforderungen sind für einen Webservice, der Rückrufe zu einer möglicherweise unzuverlässigen dritten Partei macht.

  • Ich möchte den Rückruf nicht synchron mit der Web-Anfrage durchführen, deshalb möchte ich einen Thread-Pool verwenden.
  • Normalerweise bekomme ich ein paar davon, also möchte ich keine haben newFixedThreadPool(...) mit einer großen Anzahl von Threads, die meist ruhend sind.
  • Hin und wieder bekomme ich diesen Traffic und ich möchte die Anzahl der Threads auf einen maximalen Wert erhöhen (sagen wir 50).
  • Ich muss einen machen Beste versuche, alle Callbacks durchzuführen, damit ich alle weiteren über 50 in die Warteschlange stellen kann. Ich möchte den Rest meines Webservers nicht mit a überlasten newCachedThreadPool().

Wie kann ich diese Einschränkung umgehen? ThreadPoolExecutor wo die Warteschlange begrenzt und voll sein muss Vor Mehr Threads werden gestartet? Wie kann ich es bekommen, um mehr Threads zu starten Vor Warteschlangenaufgaben?

Bearbeiten:

@Flavio macht einen guten Punkt über die Verwendung der ThreadPoolExecutor.allowCoreThreadTimeOut(true) Zeitüberschreitung und Beenden des Kernthreads. Ich dachte darüber nach, aber ich wollte immer noch die Core-Threads-Funktion. Ich wollte nicht, dass die Anzahl der Threads im Pool möglichst unter die Core-Größe fällt.


76
2017-10-22 21:02


Ursprung


Antworten:


Wie kann ich diese Einschränkung umgehen? ThreadPoolExecutor Die Warteschlange muss begrenzt und voll sein, bevor weitere Threads gestartet werden.

Ich glaube, ich habe endlich eine etwas elegante (vielleicht etwas hacky) Lösung für diese Einschränkung gefunden ThreadPoolExecutor. Es beinhaltet Erweiterung LinkedBlockingQueue um es zurückkehren zu lassen false zum queue.offer(...) wenn sich bereits einige Aufgaben in der Warteschlange befinden. Wenn die aktuellen Threads nicht mit den Aufgaben in der Warteschlange mithalten, fügt das TPE zusätzliche Threads hinzu. Wenn der Pool bereits bei max Threads ist, dann die RejectedExecutionHandler wird angerufen werden. Es ist der Handler, der dann das tut put(...) in die Warteschlange.

Es ist sicherlich seltsam, eine Warteschlange zu schreiben offer(...) kann zurückkehren false und put() blockiert nie, das ist der Hack-Teil. Aber das funktioniert gut mit TPEs Verwendung der Warteschlange, so sehe ich kein Problem damit.

Hier ist der Code:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Mit diesem Mechanismus, wenn ich Aufgaben an die Warteschlange übergebe, ThreadPoolExecutor werden:

  1. Skalieren Sie die Anzahl der Threads zunächst auf die Kerngröße (hier 1).
  2. Biete es der Warteschlange an. Wenn die Warteschlange leer ist, wird sie in die Warteschlange eingereiht, um von den vorhandenen Threads behandelt zu werden.
  3. Wenn die Warteschlange bereits 1 oder mehr Elemente enthält, offer(...) wird falsch zurückgeben.
  4. Wenn false zurückgegeben wird, skalieren Sie die Anzahl der Threads im Pool, bis sie die maximale Anzahl erreichen (hier 50).
  5. Wenn das Maximum ist, ruft es die RejectedExecutionHandler
  6. Das RejectedExecutionHandler stellt dann die Aufgabe in die Warteschlange, um von dem ersten verfügbaren Thread in der FIFO-Reihenfolge verarbeitet zu werden.

Obwohl in meinem Beispielcode die Warteschlange unbegrenzt ist, können Sie sie auch als beschränkte Warteschlange definieren. Zum Beispiel, wenn Sie eine Kapazität von 1000 hinzufügen LinkedBlockingQueue dann wird es:

  1. skaliere die Gewinde bis max
  2. dann anstehen, bis es mit 1000 Aufgaben voll ist
  3. Sperren Sie dann den Anrufer, bis Speicherplatz für die Warteschlange verfügbar wird.

Außerdem, wenn Sie wirklich gebraucht haben offer(...) in dem RejectedExecutionHandler dann könntest du das benutzen offer(E, long, TimeUnit) Methode stattdessen mit Long.MAX_VALUE als die Zeitüberschreitung.

Bearbeiten:

Ich habe meine verändert offer(...) Methode Override per @ Ralf's Feedback. Dadurch wird die Anzahl der Threads im Pool nur erhöht, wenn sie nicht mit der Last Schritt halten.

Bearbeiten:

Eine weitere Optimierung dieser Antwort könnte darin bestehen, das TPE tatsächlich zu fragen, ob es inaktive Threads gibt und das Element nur dann in die Warteschlange stellt, wenn dies der Fall ist. Sie müssten dafür eine echte Klasse erstellen und eine hinzufügen ourQueue.setThreadPoolExecutor(tpe); Methode darauf.

Dann dein offer(...) Methode könnte etwa so aussehen:

  1. Überprüfen Sie, ob die tpe.getPoolSize() == tpe.getMaximumPoolSize() In diesem Fall rufen Sie einfach an super.offer(...).
  2. Sonst wenn tpe.getPoolSize() > tpe.getActiveCount() Dann ruf an super.offer(...) da es scheinbar untätige Threads gibt.
  3. Sonst zurück false einen anderen Thread abzweigen.

Vielleicht das:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Beachten Sie, dass die get-Methoden auf TPE teuer sind, da sie darauf zugreifen volatile Felder oder (im Falle von getActiveCount()) sperre das TPE und beginne die Thread-Liste. Außerdem gibt es hier Race Conditions, die dazu führen können, dass eine Task falsch in die Warteschlange eingereiht wird, oder ein anderer Thread wird gegabelt, wenn ein Leerlauf-Thread vorhanden war.


35
2017-10-22 21:22



Setzen Sie die Kerngröße und die maximale Größe auf den gleichen Wert und lassen Sie Kernthreads mit dem Pool aus dem Pool entfernen allowCoreThreadTimeOut(true).


21
2018-06-30 15:37



Ich habe bereits zwei andere Antworten auf diese Frage, aber ich vermute, dass dieser der Beste ist.

Es basiert auf der Technik von die derzeit akzeptierte Antwort, nämlich:

  1. Überschreiben Sie die Warteschlangen offer() Methode, um (manchmal) falsch zurückzugeben,
  2. was verursacht die ThreadPoolExecutor entweder einen neuen Thread spawnen oder die Aufgabe ablehnen, und
  3. setze die RejectedExecutionHandler zu tatsächlich die Aufgabe bei Ablehnung anordnen.

Das Problem ist, wann offer() sollte falsch zurückgeben. Die aktuell akzeptierte Antwort gibt false zurück, wenn die Warteschlange einige Aufgaben enthält, aber wie ich in meinem Kommentar darauf hingewiesen habe, verursacht dies unerwünschte Effekte. Wenn Sie immer den Wert false zurückgeben, werden Sie auch dann neue Threads generieren, wenn Threads auf die Warteschlange warten.

Die Lösung ist Java 7 zu verwenden LinkedTransferQueue und haben offer() Anruf tryTransfer(). Wenn ein wartender Consumer-Thread vorhanden ist, wird die Aufgabe nur an diesen Thread übergeben. Andernfalls, offer() wird falsch und die zurückgeben ThreadPoolExecutor wird einen neuen Thread spawnen.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

14
2017-11-22 19:43



Hinweis: Ich bevorzuge und empfehle jetzt meine andere Antwort.

Hier ist eine Version, die mir viel einfacher erscheint: Erhöhen Sie die corePoolSize (bis zum Limit von maximumPoolSize), wenn eine neue Aufgabe ausgeführt wird, und verringern Sie dann die corePoolSize (bis zur Grenze der vom Benutzer angegebenen "Kernpoolgröße"), wenn a Aufgabe wird abgeschlossen.

Anders ausgedrückt: Verfolgen Sie die Anzahl der ausgeführten oder in die Warteschlange eingereihten Aufgaben, und stellen Sie sicher, dass corePoolSize der Anzahl der Aufgaben entspricht, solange sie zwischen der vom Benutzer angegebenen "Kernpoolgröße" und der maximumPoolSize liegt.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Wie geschrieben, unterstützt die Klasse nicht das Ändern der vom Benutzer angegebenen corePoolSize oder maximumPoolSize nach der Konstruktion und unterstützt nicht das direkte Bearbeiten der Arbeitswarteschlange oder über remove() oder purge().


7
2017-10-23 10:15



Wir haben eine Unterklasse von ThreadPoolExecutor das braucht ein zusätzliches creationThreshold und überschreibt execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

vielleicht hilft das auch, aber deins sieht natürlich künstlerischer aus ...


5
2018-06-20 14:02



Die empfohlene Antwort löst nur einen (1) des Problems mit dem JDK-Threadpool auf:

  1. JDK-Thread-Pools sind auf Warteschlangen ausgerichtet. Anstatt also einen neuen Thread zu erstellen, wird die Aufgabe in eine Warteschlange gestellt. Nur wenn die Warteschlange ihr Limit erreicht, erstellt der Thread-Pool einen neuen Thread.

  2. Die Fadenabschaltung tritt nicht auf, wenn die Ladung hellt. Wenn zum Beispiel eine Anzahl von Jobs den Pool erreicht, die den Pool auf Maximum bringen, gefolgt von einer geringen Auslastung von maximal 2 Tasks gleichzeitig, verwendet der Pool alle Threads, um die leichte Last zu unterstützen, die den Thread-Ausfall verhindert. (nur 2 Threads werden benötigt ...)

Unzufrieden mit dem obigen Verhalten, ging ich voran und implementierte einen Pool, um die oben genannten Mängel zu beheben.

Lösung 2) Die Verwendung der Lifo-Planung behebt das Problem. Diese Idee wurde von Ben Maurer auf der ACM Applicative 2015 Konferenz vorgestellt: Systems @ Facebook skalieren 

So wurde eine neue Implementierung geboren:

LifoThreadPoolExecutorSQP

Bis jetzt verbessert diese Implementierung die asynchrone Ausführungsleistung für ZEL.

Die Implementierung ist spinfähig, um den Overhead des Kontextwechsels zu reduzieren, was für bestimmte Anwendungsfälle eine überlegene Leistung liefert.

Ich hoffe es hilft...

PS: JDK Fork Join Pool implementieren ExecutorService und arbeitet als "normalen" Thread-Pool, Implementierung ist performant, Es nutzt LIFO Thread-Planung, aber es gibt keine Kontrolle über die interne Größe der Warteschlange, Ruhestand Timeout ...


3
2017-11-27 19:15



Hinweis: Ich bevorzuge und empfehle jetzt meine andere Antwort.

Ich habe einen weiteren Vorschlag, der der ursprünglichen Idee folgt, die Warteschlange so zu ändern, dass sie false zurückgibt. In diesem können alle Aufgaben in die Warteschlange eingereiht werden, jedoch immer dann, wenn eine Aufgabe in die Warteschlange eingereiht wird execute()Wir folgen mit einer Sentinel-no-op Aufgabe, die die Warteschlange verwirft, ein neuer Thread, um laichen verursacht, welche der nicht-op unmittelbar gefolgt von etwas aus der Warteschlange ausgeführt wird.

Weil Worker-Threads den Aufruf abfragen können LinkedBlockingQueue Bei einer neuen Aufgabe ist es möglich, dass eine Aufgabe in die Warteschlange eingereiht wird, auch wenn ein verfügbarer Thread vorhanden ist. Um zu vermeiden, dass neue Threads erstellt werden, selbst wenn Threads verfügbar sind, müssen wir verfolgen, wie viele Threads auf neue Tasks in der Warteschlange warten, und nur dann einen neuen Thread erzeugen, wenn mehr Aufgaben in der Warteschlange sind als wartende Threads.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});

1
2017-10-22 21:52



Die beste Lösung, die ich mir vorstellen kann, ist zu erweitern.

ThreadPoolExecutor bietet ein paar Hook-Methoden: beforeExecute und afterExecute. In Ihrer Erweiterung könnten Sie eine beschränkte Warteschlange zum Eingeben von Aufgaben und eine zweite unbegrenzte Warteschlange zum Behandeln von Überläufen verwenden. Wenn jemand anruft submit, könnten Sie versuchen, die Anfrage in die beschränkte Warteschlange zu stellen. Wenn Sie eine Ausnahme haben, stecken Sie die Aufgabe einfach in Ihre Überlaufwarteschlange. Sie könnten dann die afterExecute haken, um zu sehen, ob nach Beendigung einer Aufgabe etwas in der Überlaufwarteschlange vorhanden ist. Auf diese Weise kümmert sich der Executor zuerst um die Objekte in der begrenzten Warteschlange und zieht automatisch aus dieser unbegrenzten Warteschlange, sobald es die Zeit erlaubt.

Es scheint mehr Arbeit als Ihre Lösung zu sein, aber es bedeutet zumindest nicht, dass Warteschlangen unerwartetes Verhalten erhalten. Ich stelle mir auch vor, dass es eine bessere Möglichkeit gibt, den Status der Warteschlange und der Threads zu überprüfen, statt sich auf Ausnahmen zu verlassen, die ziemlich langsam zu werfen sind.


0