Frage Kafka Stream beitreten


Ich habe 2 kafka Themen - recommendations und clicks. Das erste Thema enthält Empfehlungen, die durch eine eindeutige ID (genannt recommendationsId). Jedes Produkt hat eine URL, auf die der Benutzer klicken kann.

Das clicks topic ruft die Nachrichten ab, die durch Klicks auf die für den Benutzer empfohlenen Produkt-URLs generiert werden. Es wurde so eingerichtet, dass diese Klick - Nachrichten auch vom recommendationId.

Beachten Sie, dass

  1. Die Beziehung zwischen Empfehlungen und Klicks ist eins zu viele. Eine Empfehlung kann zu mehreren Klicks führen, ein Klick ist jedoch immer mit einer einzelnen Empfehlung verknüpft.

  2. Jedes Klickobjekt hätte ein entsprechendes Empfehlungsobjekt.

  3. Ein Klickobjekt hätte einen Zeitstempel später als das Empfehlungsobjekt.

  4. Der Abstand zwischen einer Empfehlung und dem / den entsprechenden Klick (en) könnte einige Sekunden bis einige Tage betragen (z. B. höchstens 7 Tage).

Mein Ziel ist es, diese beiden Themen mit Kafka Streams Join zu verbinden. Was mir nicht klar ist, ist, ob ich einen KStream x KStream Join oder einen KStream x KTable Join verwenden soll.

Ich habe das implementiert KStream x KTable Beitritt durch Beitritt clicks strömen durch recommendations Tabelle. Wenn die Empfehlungen generiert wurden, kann ich jedoch keine verknüpften Klicks sehen Vor der Tischler wurde gestartet und der Klick kommt nachdem der Tischler angefangen hat.

Benutze ich den richtigen Join? Sollte ich verwenden KStream x KStream Beitreten? Wenn dies der Fall ist, sollte ich die Fenstergröße auf 7 Tage setzen, um an einem Klick mit einer Empfehlung von höchstens 7 Tagen in der Vergangenheit teilnehmen zu können? Muss ich in diesem Fall auch die "Aufbewahrungsfrist" einstellen?

Mein Code zum Ausführen KStream x KTable Beitreten ist wie folgt. Beachten Sie, dass ich Klassen definiert habe Recommendations und Click und ihre entsprechende serde. Die Klick-Nachrichten sind einfach String (URL). Dieser URL-String ist mit verbunden Recommendations Objekt zum Erstellen eines Click Objekt, das an die emittiert wird jointTopic.

public static void main(String[] args){
    if(args.length!=4){
      throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
    }

    final String booststrapList = args[0];
    final String clicksTopic = args[1];
    final String recsTopic = args[2];
    final String jointTopic = args[3];

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();

    // load clicks as KStream
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);

    // load recommendations as KTable
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);

    // join the two
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));

    // emit the join to the jointTopic
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

    // let the action begin
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }

Dies funktioniert, solange sowohl Empfehlungen als auch Klicks generiert wurden nach der Tischler (das obige Programm) wird ausgeführt. Wenn jedoch ein Klick eintrifft, für den die Empfehlung generiert wurde Vor Der Tischler wurde geführt, ich sehe keinen Beitritt. Wie behebe ich das?

Wenn die Lösung zu verwenden ist KStream x KSTream beitreten, dann bitte helfen Sie mir zu verstehen, welche Fenstergröße ich auswählen und welche Aufbewahrungszeit wählen soll.


5
2017-09-25 19:09


Ursprung


Antworten:


Deine Gesamtbeobachtung ist korrekt. Konzeptionell können Sie das richtige Ergebnis in beide Richtungen erhalten. Wenn Sie einen Stream-Table-Join verwenden, haben Sie zwei Nachteile (dies könnte in zukünftigen Versionen von Kafka jedoch noch einmal überprüft und verbessert werden).

  • Sie haben bereits erwähnt, dass, wenn ein Klick vor der entsprechenden Empfehlung verarbeitet wird, der (innere) Join fehlschlägt. Da Sie jedoch wissen, dass es eine Empfehlung geben wird, könnten Sie einen Linksjoin statt Innerjoin verwenden, das Join-Ergebnis überprüfen und das Klickereignis in das Eingabethema zurückschreiben, wenn die Empfehlung lautet null (Das heißt, Sie erhalten eine Wiederholungslogik) - oder, konsekutive Klicks für eine einzelne Empfehlung könnten nicht in Ordnung sein, und Sie müssen dies möglicherweise in Ihrem Anwendungscode berücksichtigen.
  • Ein zweiter Nachteil von KTable wäre, dass es für immer und unbegrenzt im Laufe der Zeit wachsen wird, wie Sie mehr und mehr einzigartige Empfehlungen dazu hinzufügen werden. Daher müssen Sie eine "Ablauflogik" implementieren, indem Sie Tombstones-Datensätze des Formulars senden <recommendationsId, null> zu dem Empfehlungsthema, um alte Empfehlungen zu löschen, die Ihnen nicht mehr wichtig sind.
  • Der Vorteil dieses Ansatzes besteht darin, dass Sie im Vergleich zu einem Stream-Stream-Join weniger Speicherplatz / Speicherplatz benötigen, da Sie nur alle Empfehlungen in Ihrer Anwendung puffern müssen (aber keine Klicks).

Wenn Sie einen Stream-Stream-Join verwenden und ein Klick 7 Tage nach einer Empfehlung erfolgen kann, muss Ihre Fenstergröße 7 Tage betragen. Andernfalls würde der Klick nicht mit der Empfehlung verknüpft.

  • Der Nachteil dieses Ansatzes ist, dass Sie viel mehr Speicher / Festplatte benötigen, da Sie alle Klicks und alle Empfehlungen der letzten 7 Tage in Ihren Anwendungen puffern.
  • Der Vorteil ist, dass die Reihenfolge oder Verarbeitung (dh Empfehlung vs. Klick) keine Rolle mehr spielt (dh Sie müssen die Wiederholungsstrategie nicht wie oben beschrieben implementieren)
  • Außerdem werden alte Empfehlungen automatisch veraltet und Sie müssen keine spezielle "Ablauflogik" implementieren.

Beim Stream-Stream-Beitritt ist die Antwort auf die Aufbewahrungszeit etwas anders. Es muss mindestens 7 Tage sein, da die Fenstergröße 7 Tage beträgt. Andernfalls würden Sie Datensätze Ihres "laufenden Fensters" löschen. Sie können den Aufbewahrungszeitraum auch länger festlegen, um "späte Daten" verarbeiten zu können. Angenommen, ein Benutzer klickt am Ende des Fensterzeitraums (5 Minuten vor Ablauf der 7-Tage-Zeitspanne der Empfehlung), aber der Klick wird erst 1 Stunde später an Ihre Anwendung gemeldet. Wenn Ihre Aufbewahrungsfrist 7 Tage als Fenstergröße beträgt, kann dieser spät ankommende Datensatz nicht mehr verarbeitet werden (da die Empfehlung bereits gelöscht worden wäre). Wenn Sie einen längeren Aufbewahrungszeitraum von beispielsweise 8 Tagen festlegen, können Sie weiterhin verspätete Datensätze verarbeiten. Es hängt von Ihrer Anwendung / semantischen Notwendigkeit ab, welche Aufbewahrungszeit Sie verwenden möchten.

Zusammenfassung: Aus der Sicht der Implementierung ist die Verwendung von Stream-Stream-Join einfacher als die Verwendung von Stream-Table-Join. Speicher- / Datenträgereinsparungen werden jedoch erwartet und können abhängig von der Datenrate des Klickstroms groß sein.


6
2017-09-26 00:34