Frage Spark Streaming: Staatenlose OverAlp Windows im Vergleich zu halten Zustand


Was wären einige Überlegungen zum Auswählen von statuslosen Gleitfensteroperationen (z. B. reduceByKeyAndWindow) im Gegensatz zum Auswählen des Beibehaltens eines Zustands (z. B. über updateStateByKey oder den neuen mapStateByKey) beim Behandeln eines Stroms sequentieller, endlicher Ereignissitzungen mit Spark Streaming?

Betrachten Sie beispielsweise das folgende Szenario:

Ein tragbares Gerät verfolgt körperliche Übungen von   der Träger. Das Gerät erkennt automatisch, wann eine Übung beginnt,   und sendet eine Nachricht aus; gibt während der Übung zusätzliche Nachrichten aus   wird durchgemacht (z. B. Herzfrequenz); und gibt schließlich eine Nachricht aus, wenn der   Übung ist getan.

Das gewünschte Ergebnis ist ein Strom aggregierter Datensätze pro Trainingseinheit. d.h. alle Ereignisse derselben Sitzung sollten zusammen aggregiert werden (z. B. so, dass jede Sitzung in einer einzelnen DB-Zeile gespeichert werden könnte). Beachten Sie, dass jede Sitzung eine endliche Länge hat, der gesamte Datenstrom von mehreren Geräten jedoch kontinuierlich ist. Nehmen wir an, dass das Gerät eine GUID für jede Trainingseinheit erstellt.

Ich kann zwei Ansätze für die Handhabung dieses Anwendungsfalls mit Spark Streaming sehen:

  1. Nicht überlappende Fenster verwenden und Status beibehalten. Ein Status wird pro GUID gespeichert, wobei alle Ereignisse übereinstimmen. Wenn ein neues Ereignis eintrifft, wird der Status aktualisiert (z. B. unter Verwendung von mapWithState), und falls das Ereignis "Ende der Trainingseinheit" ist, wird ein aggregierter Datensatz basierend auf dem Status ausgegeben und der Schlüssel entfernt.

  2. Überlappende Schiebefenster verwenden und nur die ersten Sitzungen beibehalten. Nehmen Sie ein Schiebefenster der Länge 2 und des Intervalls 1 an (siehe Abbildung unten). Angenommen, die Fensterlänge beträgt 2 X (maximal mögliche Trainingszeit). In jedem Fenster werden Ereignisse durch GUID, z. Verwenden von reduceByKeyAndWindow. Dann werden alle Sitzungen, die in der zweiten Hälfte des Fensters begonnen haben, ausgegeben und die verbleibenden Sitzungen ausgegeben. Dies ermöglicht die Verwendung jedes Ereignisses genau einmal und stellt sicher, dass alle Ereignisse, die zu derselben Sitzung gehören, zusammen aggregiert werden.

Diagramm für Ansatz # 2:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------

Vor- und Nachteile sehe ich:

Ansatz Nr. 1 ist weniger rechenintensiv, erfordert jedoch das Speichern und Verwalten des Zustands (z. B. wenn die Anzahl gleichzeitiger Sitzungen zunimmt, kann der Zustand größer als der Speicher werden). Wenn jedoch die maximale Anzahl gleichzeitiger Sitzungen begrenzt ist, ist dies möglicherweise kein Problem.

Ansatz Nr. 2 ist doppelt so teuer (jedes Ereignis wird zweimal verarbeitet) und mit höherer Latenz (2 X maximale Trainingszeit), aber einfacher und leicht zu handhaben, da kein Zustand beibehalten wird.

Was wäre der beste Weg, um mit diesem Anwendungsfall umzugehen - ist einer dieser Ansätze der "richtige", oder gibt es bessere Möglichkeiten?

Welche anderen Vor- / Nachteile sollten in Betracht gezogen werden?


18
2018-01-06 09:59


Ursprung


Antworten:


Normalerweise gibt es keine Recht Ansatz, jeder hat Kompromisse. Daher würde ich eine zusätzliche Herangehensweise an den Mix hinzufügen und meine Überlegungen zu ihren Vor- und Nachteilen skizzieren. So können Sie entscheiden, welches für Sie besser geeignet ist.

Externer Staatenansatz (Ansatz Nr. 3)

Sie können den Status der Ereignisse im externen Speicher akkumulieren. Cassandra wird dafür oft benutzt. Sie können abschließende und laufende Ereignisse getrennt behandeln, zum Beispiel wie folgt:

val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }

trackStateByKey Ansatz (Ansatz # 1.1)

Es könnte möglicherweise die optimale Lösung für Sie sein, da es die Nachteile von updateStateByKey beseitigt, aber wenn man bedenkt, dass es gerade als Teil von Spark 1.6 veröffentlicht wurde, könnte es auch riskant sein (da es aus irgendeinem Grund nicht sehr beworben wird). Du kannst den ... benutzen Verknüpfung als Ausgangspunkt, wenn Sie mehr erfahren möchten

Für und Wider

Ansatz # 1 (updateStateByKey)

Pros

  • Leicht zu verstehen oder zu erklären (Rest des Teams, Neulinge, etc.) (subjektiv)
  • Lager: Eine bessere Nutzung des Speichers speichert nur den letzten Trainingszustand
  • Lager: Behält nur laufende Übungen bei und wirft sie ab, sobald sie fertig sind
  • Die Latenz ist nur durch die Durchführung jeder Mikro-Batch-Verarbeitung begrenzt

Nachteile

  • Lager: Wenn die Anzahl der Schlüssel (gleichzeitige Übungen) groß ist, passt sie möglicherweise nicht in den Speicher Ihres Clusters
  • wird bearbeitet: Es wird die updateState-Funktion für jeden Schlüssel innerhalb der Status-Map ausgeführt. Wenn also die Anzahl gleichzeitiger Übungen groß ist, wird die Leistung darunter leiden

Ansatz # 2 (Fenster)

Während es möglich ist, mit Windows zu erreichen, was Sie brauchen, sieht es in Ihrem Szenario deutlich weniger natürlich aus.

Pros

  • wird bearbeitet in einigen Fällen (abhängig von den Daten) möglicherweise effektiver als updateStateByKey, aufgrund der updateStateByKey Tendenz, Updates für jeden Schlüssel auszuführen, auch wenn keine aktuellen Updates vorhanden sind

Nachteile

  • "Maximale mögliche Trainingszeit" - das klingt nach einem riesigen Risiko - es könnte eine ziemlich willkürliche Dauer sein, basierend auf einem menschlichen Verhalten. Manche Leute vergessen vielleicht, "Sport zu machen". Dies hängt auch von der Art des Trainings ab, kann aber von Sekunden bis zu Stunden reichen, wenn Sie eine niedrigere Latenzzeit für schnelle Übungen wünschen, während Sie die Latenzzeit so hoch halten müssen wie die längste mögliche Übung
  • Fühlt sich schwerer an, anderen zu erklären, wie es funktionieren wird (subjektiv)
  • Lager: Muss alle Daten innerhalb des Fensterrahmens behalten, nicht nur den letzten. Außerdem wird der Speicher nur freigegeben, wenn das Fenster von diesem Zeitfenster wegrutscht, nicht wenn das Training tatsächlich beendet ist. Es ist zwar kein großer Unterschied, wenn Sie nur die letzten beiden Zeitfenster behalten möchten - es erhöht sich, wenn Sie versuchen, mehr Flexibilität zu erreichen, indem Sie das Fenster öfter verschieben.

Ansatz # 3 (externer Zustand)

Pros

  • Leicht zu erklären usw. (subjektiv)
  • Pure streaming processing approach, was bedeutet, dass spark verantwortlich ist, auf jedes einzelne Ereignis zu reagieren, aber nicht versucht, den Zustand usw. zu speichern (subjektiv)
  • Lager: Nicht begrenzt durch den Speicher des Clusters, um den Status zu speichern - kann eine große Anzahl gleichzeitiger Übungen bewältigen
  • wird bearbeitet: State wird nur aktualisiert, wenn es tatsächlich Updates gibt (im Gegensatz zu updateStateByKey)
  • Die Latenz ist vergleichbar mit updateStateByKey und nur durch die Zeit begrenzt, die für die Verarbeitung der einzelnen Mikrochargen benötigt wird

Nachteile

  • Zusätzliche Komponente in Ihrer Architektur (es sei denn, Sie verwenden Cassandra bereits für Ihre endgültige Ausgabe)
  • wird bearbeitet: Standardmäßig ist langsamer als die Verarbeitung nur in Spark als nicht im Speicher + Sie müssen die Daten über das Netzwerk übertragen
  • Sie müssen genau einmal semantisch implementieren, um Daten in Cassandra auszugeben (für den Fall eines Arbeitsausfalls während der foreachRDD).

Vorgeschlagener Ansatz

Ich würde Folgendes versuchen:

  • Testen Sie den updateStateByKey-Ansatz für Ihre Daten und Ihren Cluster
  • Prüfen Sie, ob der Speicherverbrauch und die Verarbeitung auch bei einer großen Anzahl gleichzeitiger Übungen akzeptabel sind (zu Stoßzeiten erwartet)
  • zurückfallen, um sich mit Cassandra zu nähern, falls nicht

11
2018-01-11 13:03



Ich denke, einer der anderen Nachteile des dritten Ansatzes ist, dass die RDDs nicht chronologisch empfangen werden, wenn man sie auf einem Cluster laufen lässt.

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }

Und was ist mit dem Check-Pointing und dem Versagen des Treiberknotens? In diesem Fall lesen Sie die gesamten Daten erneut? Neugierig zu wissen, wie du damit umgehen willst?

Ich denke, vielleicht ist mapwithstate ein besserer Ansatz, warum Sie all diese Szenarien in Betracht ziehen.


1
2017-08-18 04:21