Frage Wie kann ich synchrone Fortsetzungen bei einer Aufgabe verhindern?


Ich habe einige Bibliothek (Socket-Netzwerk) Code, der eine bereitstellt Taskbasierend auf API für ausstehende Antworten auf Anforderungen TaskCompletionSource<T>. Es ist jedoch ein Ärgernis in der TPL, dass es unmöglich zu sein scheint, synchrone Fortsetzungen zu verhindern. Was ich würde mögen in der Lage zu sein ist entweder

  • sag a TaskCompletionSource<T> das sollte Callern nicht erlauben, sich anzuschließen TaskContinuationOptions.ExecuteSynchronously, oder
  • setze das Ergebnis (SetResult / TrySetResult) in einer Weise, die das spezifiziert TaskContinuationOptions.ExecuteSynchronously sollte ignoriert werden, stattdessen den Pool verwenden

Das Problem, das ich habe, ist, dass die eingehenden Daten von einem dedizierten Lesegerät verarbeitet werden, und wenn ein Anrufer sich anhängen kann TaskContinuationOptions.ExecuteSynchronously sie können den Leser aufhalten (was mehr als nur sie betrifft). Zuvor habe ich das von einigen Hackern, die feststellen, ob das funktioniert irgendein Fortsetzungen sind vorhanden, und wenn sie es sind, drückt es die Vollendung auf die ThreadPoolDies hat jedoch erhebliche Auswirkungen, wenn der Aufrufer seine Arbeitswarteschlange gesättigt hat, da die Vervollständigung nicht rechtzeitig verarbeitet wird. Wenn sie verwenden Task.Wait() (oder ähnlich), sie werden sich dann im Wesentlichen selbst blockieren. Aus diesem Grund ist der Leser auf einem dedizierten Thread, anstatt Arbeiter zu verwenden.

Damit; bevor ich versuche, das TPL-Team zu nerven: Fehle ich eine Option?

Schlüsselpunkte:

  • Ich möchte nicht, dass externe Anrufer meinen Thread entführen können
  • Ich kann das nicht benutzen ThreadPool als eine Implementierung, wie es funktionieren muss, wenn der Pool gesättigt ist

Im folgenden Beispiel wird die Ausgabe erzeugt (die Reihenfolge kann je nach Timing variieren):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Das Problem ist, dass ein zufälliger Aufrufer eine Fortsetzung auf "Main thread" geschafft hat. Im echten Code würde dies den primären Leser unterbrechen; schlechte Dinge!

Code:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

76
2018-03-22 14:56


Ursprung


Antworten:


Neu in .NET 4.6:

.NET 4.6 enthält ein neues TaskCreationOptions: RunContinuationsAsynchronously.


Da Sie Reflection für den Zugriff auf private Felder verwenden möchten ...

Sie können die Aufgabe des TCS mit dem markieren TASK_STATE_THREAD_WAS_ABORTED Flagge, die dazu führen würde, dass alle Fortsetzungen nicht inline sind.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Bearbeiten:

Anstatt Reflection emit zu verwenden, schlage ich vor, Ausdrücke zu verwenden. Dies ist deutlich lesbarer und hat den Vorteil PCL-kompatibel zu sein:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Ohne Reflection zu verwenden:

Wenn jemand interessiert ist, habe ich einen Weg gefunden, dies ohne Reflection zu tun, aber es ist auch ein bisschen "dreckig" und hat natürlich eine nicht zu vernachlässigende Perf-Strafe:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

46
2018-03-23 08:06



Ich denke nicht, dass es in TPL etwas gibt, das es geben würde explizit API-Kontrolle über TaskCompletionSource.SetResult Fortsetzungen. Ich beschloss, meine zu behalten erste Antwort zum Steuern dieses Verhaltens für async/await Szenarien.

Hier ist eine andere Lösung, die asynchron ist ContinueWithwenn die tcs.SetResulttriggered Fortsetzung findet auf dem gleichen Thread statt SetResult wurde angerufen:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Aktualisiert, um den Kommentar zu adressieren:

Ich kontrolliere den Anrufer nicht - ich kann sie nicht dazu bringen, einen bestimmten zu verwenden   Fortsetzung Variante: wenn ich könnte, würde das Problem nicht in der   erster Platz

Mir war nicht bewusst, dass du den Anrufer nicht kontrollierst. Dennoch, wenn Sie es nicht kontrollieren, werden Sie wahrscheinlich nicht übergeben TaskCompletionSource Objekt direkt für den Anrufer, entweder. Logischerweise würden Sie die Zeichen ein Teil davon, d.h. tcs.Task. In diesem Fall ist die Lösung möglicherweise noch einfacher, wenn Sie eine weitere Erweiterungsmethode wie folgt hinzufügen:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Benutzen:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Dies eigentlich funktioniert für beide await und ContinueWith (Geige) und ist frei von Reflektionshacken.


10
2018-03-23 04:16



Was ist mit anstatt zu tun

var task = source.Task;

Sie tun dies stattdessen

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Sie fügen also immer eine Fortsetzung hinzu, die asynchron ausgeführt wird, und dann spielt es keine Rolle, ob die Abonnenten eine Fortsetzung im selben Kontext wünschen. Es ist irgendwie die Aufgabe, nicht wahr?


3
2018-03-22 22:25



Wenn Sie können und sind bereit, Reflexion zu verwenden, sollte dies tun;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

3
2018-03-23 02:22



Aktualisiert, Habe ich eine getrennte Antwort damit umgehen ContinueWith im Gegensatz zu await (weil ContinueWith kümmert sich nicht um den aktuellen Synchronisationskontext).

Sie können einen dummen Synchronisierungskontext verwenden, um bei einer Fortsetzung durch Aufruf eine Asynchronität zu erzwingen SetResult/SetCancelled/SetException auf TaskCompletionSource. Ich glaube der aktuelle Synchronisationskontext (an der Stelle von await tcs.Task) ist das Kriterium, das TPL verwendet, um zu entscheiden, ob eine solche Fortsetzung synchron oder asynchron gemacht wird.

Folgendes funktioniert für mich:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync ist so implementiert:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext  ist sehr günstig in Bezug auf den Overhead fügt es hinzu. In der Tat wird ein sehr ähnlicher Ansatz von der Implementierung von WPF Dispatcher.BeginInvoke.

TPL vergleicht den Zielsynchronisationskontext an der Stelle von await zu dem des Punktes von tcs.SetResult. Wenn der Synchronisationskontext identisch ist (oder an beiden Stellen kein Synchronisationskontext vorhanden ist), wird die Fortsetzung direkt synchron aufgerufen. Andernfalls wird es in der Warteschlange verwendet SynchronizationContext.Post im Zielsynchronisationskontext, d. h. der Normalen await Verhalten. Was dieser Ansatz bewirkt, ist immer der SynchronizationContext.Post Verhalten (oder eine Pool-Thread-Fortsetzung, wenn kein Zielsynchronisierungskontext vorhanden ist).

Aktualisiert, das wird nicht funktionieren task.ContinueWith, weil ContinueWith interessiert sich nicht für den aktuellen Synchronisationskontext. Es funktioniert jedoch für await task (Geige). Es funktioniert auch für await task.ConfigureAwait(false).

OTOH, dieser Ansatz funktioniert für ContinueWith.


3
2018-03-23 01:42



Das Abbruch simulieren Ansatz sah wirklich gut aus, führte aber zu den TPL-Hijacking-Threads in einigen Szenarien.

Ich hatte dann eine ähnliche Implementierung Überprüfen des Fortsetzungsobjekts, aber nur nach etwas suchen irgendein Fortsetzung, da es tatsächlich zu viele Szenarien für den gegebenen Code gibt, um gut zu funktionieren, aber das bedeutete, dass sogar Dinge wie Task.Wait führte zu einer Thread-Pool-Suche.

Letztendlich, nach der Inspektion von viel und viel IL, das einzige sicher und nützlich Szenario ist das SetOnInvokeMres Szenario (manual-reset-event-slim-Fortsetzung). Es gibt viele andere Szenarien:

  • einige sind nicht sicher und führen zu Thread-Hijacking
  • der Rest ist nicht sinnvoll, da sie letztlich zum Thread-Pool führen

Am Ende entschied ich mich, nach einem Nicht-Null-Fortsetzungsobjekt zu suchen; wenn es null ist, gut (keine Fortsetzungen); wenn es nicht null ist, Sonderprüfung auf SetOnInvokeMres - wenn es das ist: gut (sicher zu rufen); andernfalls lass den Thread-Pool das ausführen TrySetComplete, ohne die Aufgabe zu sagen, irgendetwas Spezielles wie Spoofing abzubrechen. Task.Wait nutzt die SetOnInvokeMres Ansatz, der das spezifische Szenario ist, das wir versuchen möchten Ja wirklich schwer, nicht zu blockieren.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

3
2017-09-01 12:30