Veri Bilimi Okulu

Spark Structured Streaming Output Mode'ları: Complete ve Update Modunu Anlamak
Spark Structured Streaming Output Mode’ları: Complete ve Update Modunu Anlamak
spark_structured_streaming_kapak

Loading

Bugün Spark Structured Streaming output mode’ları Spark ile stream data işlemenin en önemli kavramlarından biridir. Bu yazı bunun, output mode’lar (çıktı modları), hakkında olacak. Özellikle Complete ve Update modları arasındaki farkları detaylıca inceleyeceğiz. Bu iki mod hep bana flu gelmişti umarım bu yazıda farkları iyice netleşir.

Output Mode Nedir?

Öncelikle temel bir soruyla başlayalım: Output mode (çıktı modu) tam olarak nedir? [1]

Spark Structured Streaming’de output mode, streaming sorgunuzun işlediği verileri sink’e (hedef konuma) nasıl yazacağını belirler. Yani her tetikleme (trigger) sonrasında hangi satırların konsola, Kafka’ya, dosyaya veya veritabanına yazılacağına karar verir [2].

Spark’ta üç ana output mode vardır:

  • Append Mode (Ekleme Modu): Sadece yeni satırlar yazılır
  • Complete Mode (Tam Mod): Her seferinde tüm sonuç tablosu yazılır
  • Update Mode (Güncelleme Modu): Sadece güncellenen satırlar yazılır

Şimdi özellikle Complete ve Update modlarına odaklanalım çünkü bunlar aggregation (toplama) işlemlerinde en çok kafamızı karıştıran modlar!

Complete Mode (Tam Mod) Nedir?

Complete mode, isminden de anlaşılacağı gibi, her tetiklemede tüm sonuç tablosunu sink’e yazar [3]. Düşünün ki bir kelime sayma (word count) uygulamanız var. Complete mode’da her yeni veri geldiğinde, o ana kadar gördüğünüz tüm kelimelerin sayısını tekrar tekrar yazarsınız.

Complete Mode’un Özellikleri

  1. Sadece Aggregation İçin: Complete mode yalnızca aggregation sorguları için desteklenir [4]. Yani groupBy(), count(), sum() gibi işlemler yapıyorsanız kullanabilirsiniz.
  2. Watermark Kullanmaz: Complete mode, watermark (su işareti) kullanmaz [5]. Bu çok önemli bir detay! Watermark ile eski verileri temizleyemez çünkü her seferinde tüm sonuçları yazması gerekir.
  3. Her Şeyi Yazar: Her tetiklemede result table’daki (sonuç tablosu) tüm satırları sink’e yazar [6].

Complete Mode Örneği

Kod örneğiyle görelim [7]:

val wordCountDF = df
  .select(explode(split(col("value")," ")).alias("word"))
  .groupBy("word")
  .count()

wordCountDF.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

Bu örnekte, stream’e her yeni cümle geldiğinde, başlangıçtan beri gördüğümüz tüm kelimelerin sayısını ekrana yazdırıyoruz. Mesela ilk batch’te “merhaba” kelimesini 5 kez görmüşsek, ikinci batch’te de bu 5’i tekrar yazıyoruz, artı yeni gelen verilerle güncellenmiş değerleri.

Update Mode (Güncelleme Modu) Nedir?

Update mode ise Complete mode’a göre daha akıllı bir yaklaşım sergiler. Sadece son tetiklemeden beri güncellenen satırları sink’e yazar [8]. Bu, Complete mode’a kıyasla çok daha verimli bir yöntemdir!

Update Mode’un Özellikleri

  1. Daha Verimli: Complete mode’un aksine, sadece değişen satırları yazar. Tüm sonuç tablosunu her seferinde yazmaz [9].
  2. Watermark Kullanır: Update mode, watermark ile birlikte kullanılabilir ve geç gelen verileri (late data) yönetebilir [10].
  3. Hem Aggregation Hem de Diğerleri: Aggregation olmayan sorgularda update mode, append mode gibi davranır [11].
  4. State Management: Update mode, eski aggregate’leri otomatik olarak temizlemez [12]. Bu yüzden watermark kullanmak önemlidir.

Update Mode Örneği

Aynı word count örneğini update mode ile yapalım [13]:

val wordCountDF = df
  .select(explode(split(col("value")," ")).alias("word"))
  .groupBy("word")
  .count()

wordCountDF.writeStream
  .format("console")
  .outputMode("update")
  .start()
  .awaitTermination()

Bu örnekte, her batch’te sadece yeni gelen veya sayısı değişen kelimeleri yazıyoruz. İlk batch’te “merhaba” kelimesini 5 kez görmüşsek ve ikinci batch’te bu kelime hiç gelmemişse, ikinci batch’te “merhaba”yı yazmıyoruz bile!

Complete vs Update: Detaylı Karşılaştırma

Şimdi gelelim asıl konuya: bu iki mod arasındaki farklar nelerdir ve hangisini ne zaman kullanmalıyız?

1. Yazılan Veri Miktarı

Complete Mode: Her tetiklemede result table’daki tüm satırları yazar [14].

  • Örnek: 1000 farklı kelime varsa, her batch’te 1000 satır yazılır
  • Veri büyüdükçe performans sorunları olabilir

Update Mode: Sadece değişen satırları yazar [15].

  • Örnek: 1000 kelimeden sadece 50’sinin sayısı değiştiyse, sadece 50 satır yazılır
  • Çok daha verimli ve ölçeklenebilir

2. Watermark Desteği

Complete Mode: Watermark kullanmaz [16].

  • Eski verileri state’ten temizleyemez
  • Zamanla state’in büyümesi kaçınılmazdır
  • Memory sorunlarına yol açabilir

Update Mode: Watermark ile uyumludur [17].

  • Geç gelen verileri yönetebilir
  • Eski state’leri temizleyebilir
  • Daha sürdürülebilir bir çözüm sunar

3. Kullanım Senaryoları

Complete Mode İçin İdeal Senaryolar [18]:

  • Dashboard’lar için toplam sonuçlar
  • Küçük veri setleri
  • Tüm sonuçların sürekli görülmesi gereken durumlar
  • Reporting amaçlı kullanımlar

Update Mode İçin İdeal Senaryoları [19]:

  • Büyük veri setleri
  • Gerçek zamanlı güncellemeler
  • Window aggregations (pencere toplamaları) ile birlikte
  • Watermark gerektiren senaryolar
  • Production ETL işlemleri

4. Sink Compatibility (Hedef Uyumluluğu)

Her sink (hedef) her output mode’u desteklemez [20].

Complete Mode:

  • Console: ✅ Destekler
  • File: ❌ Desteklemez (çoğu file format için)
  • Kafka: ✅ Destekler
  • Delta Lake: ✅ Destekler

Update Mode:

  • Console: ✅ Destekler
  • File: ⚠️ Kısmen destekler
  • Kafka: ✅ Destekler
  • Delta Lake: ✅ Destekler

Pratik Örnek: Window Aggregation

Gelin gerçek bir production senaryosuna bakalım. Diyelim ki bir e-ticaret sitesinin dakikalık satış verilerini takip ediyoruz [21]:

val windowedAggregation = transactions
  .withWatermark("transactionTimestamp", "5 minutes")
  .groupBy(
    window($"transactionTimestamp", "1 minute", "1 minute"),
    $"shopId"
  )
  .agg(
    sum("totalCost").as("totalRevenue"),
    count("*").as("transactionCount")
  )

// Update Mode ile
windowedAggregation.writeStream
  .format("console")
  .outputMode("update")
  .start()

Bu örnekte:

  • 1 dakikalık pencerelerle gruplama yapıyoruz
  • 5 dakikalık watermark tanımlıyoruz
  • Update mode sayesinde sadece güncellenen pencereleri yazıyoruz

Eğer Complete mode kullansaydık:

  • Her batch’te tüm pencerelerin sonuçlarını tekrar yazardık
  • Watermark işe yaramazdı
  • State sürekli büyürdü ve memory sorunları yaşardık

Complete Mode Ne Zaman Tercih Edilmeli?

Complete mode’un da avantajlı olduğu durumlar var [22]:

  1. Küçük State: Result table’ınız küçükse (örneğin 100-1000 satır), complete mode sorun yaratmaz.
  2. Dashboard Requirements: Real-time dashboard’larda her zaman tam sonucu göstermek istiyorsanız.
  3. Basit Aggregations: Karmaşık window aggregation olmayan basit durumlarda.
  4. Testing: Development ve test ortamlarında tam sonuçları görmek için.

Update Mode Ne Zaman Tercih Edilmeli?

Update mode çoğu production senaryosunda daha uygun [23]:

  1. Büyük Veri Setleri: Binlerce veya milyonlarca unique key varsa.
  2. Window Aggregations: Özellikle sliding veya tumbling window’lar kullanıyorsanız.
  3. Late Data Handling: Geç gelen verileri watermark ile yönetmeniz gerekiyorsa.
  4. Performance: Sink’e yazma performansının kritik olduğu durumlarda.
  5. Cost Optimization: Cloud ortamlarında veri yazma maliyetini düşürmek için.

Dikkat Edilmesi Gereken Noktalar

Complete Mode İçin

  • Aggregation olmayan sorgularda hata alırsınız [24]
  • State sürekli büyür, temizlenmez
  • Büyük veri setlerinde performans sorunları yaşayabilirsiniz

Update Mode İçin

  • Non-aggregated sorgularda append mode gibi davranır [25]
  • Watermark olmadan eski state’ler temizlenmez
  • Stream-stream join’lerde kullanılamaz (sadece append) [26]

Gerçek Dünya Örneği: E-Ticaret Analytics

Son olarak, gerçek bir senaryoyla bitirelim. Bir e-ticaret platformu için real-time analytics sistemi kurduğunuzu düşünün [27]:

Senaryo 1: Günlük Toplam Satış Dashboard’u

// Complete mode uygun - küçük state, tüm sonuç gerekli
dailySales
  .groupBy("productCategory")
  .agg(sum("amount"))
  .writeStream
  .outputMode("complete")
  .format("memory")
  .start()

Senaryo 2: Saatlik Trend Analizi (Son 24 Saat)

// Update mode uygun - window aggregation, watermark var
hourlySales
  .withWatermark("eventTime", "2 hours")
  .groupBy(
    window($"eventTime", "1 hour"),
    $"productCategory"
  )
  .agg(sum("amount"))
  .writeStream
  .outputMode("update")
  .format("kafka")
  .start()

Sonuç

Complete ve Update mode’ları arasındaki farkı anlamak, Spark Structured Streaming ile çalışırken çok önemli [28].

Özet:

  • Complete Mode: Tüm sonuçları her seferinde yaz, küçük veri setleri için ideal
  • Update Mode: Sadece değişenleri yaz, production’da tercih edilen, ölçeklenebilir çözüm

Hangi modu seçeceğiniz, kullanım senaryonuza, veri büyüklüğünüze ve performans gereksinimlerinize bağlı. Genelde production ortamlarında Update mode ile başlamanızı öneririm. Eğer spesifik bir ihtiyacınız varsa (örneğin küçük bir dashboard), o zaman Complete mode’a geçebilirsiniz [29].

Unutmayın: doğru output mode seçimi, streaming uygulamanızın performansını, maliyetini ve sürdürülebilirliğini doğrudan etkiler [30]!

Umarım bu yazı Spark Structured Streaming output mode’larını anlamanıza yardımcı olmuştur. Uygulamalı olarak Spark öğrenmek isterseniz VBO Data Engineering Bootcamp harika bir seçim olabilir.

Başka bir yazıda görüşmek üzere hoşçakalın.

Kaynaklar

[1] https://www.waitingforcode.com/apache-spark-structured-streaming/output-modes-apache-spark-structured-streaming/read

[2] https://medium.com/@kiranvutukuri/output-modes-in-apache-spark-structured-streaming-part-3-9f4086c8667b

[3] https://medium.com/itversity/comprehensive-guide-to-output-modes-in-spark-structured-streaming-798e1d45aee1

[4] https://dvirgiln.github.io/spark-structured-streaming-output-modes/

[5] https://www.waitingforcode.com/apache-spark-structured-streaming/output-modes-structured-streaming/read

[6] https://sparkbyexamples.com/spark/spark-streaming-outputmode/

[7] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-OutputMode.html

[8] https://docs.databricks.com/aws/en/structured-streaming/output-mode

[9] https://medium.com/@vndhya/all-about-output-modes-in-spark-structured-streaming-efb008e22d1f

[10] https://www.databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

[11] https://stackoverflow.com/questions/48927192/what-is-the-real-difference-between-append-mode-and-update-mode-in-spark-streami

[12] http://49.235.228.196/sparkbyexamples/spark/spark-streaming-difference-between-complete-append-update-outputmode/index.html

[13] https://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-3

[14] https://books.japila.pl/spark-structured-streaming-internals/streaming-aggregation/

[15] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-aggregation.html

[16] https://docs.databricks.com/en/structured-streaming/output-mode.html

[17] https://medium.com/@kiranvutukuri/output-modes-in-apache-spark-structured-streaming-part-3-9f4086c8667b

[18] https://medium.com/itversity/comprehensive-guide-to-output-modes-in-spark-structured-streaming-798e1d45aee1

[19] https://dvirgiln.github.io/spark-structured-streaming-output-modes/

[20] https://docs.databricks.com/aws/en/structured-streaming/output-mode

[21] https://dvirgiln.github.io/spark-structured-streaming-output-modes/

[22] https://medium.com/itversity/comprehensive-guide-to-output-modes-in-spark-structured-streaming-798e1d45aee1

[23] https://docs.databricks.com/aws/en/structured-streaming/output-mode

[24] https://sparkbyexamples.com/spark/spark-streaming-outputmode/

[25] https://stackoverflow.com/questions/48927192/what-is-the-real-difference-between-append-mode-and-update-mode-in-spark-streami

[26] https://medium.com/@vndhya/all-about-output-modes-in-spark-structured-streaming-efb008e22d1f

[27] https://dvirgiln.github.io/spark-structured-streaming-output-modes/

[28] https://www.waitingforcode.com/apache-spark-structured-streaming/output-modes-apache-spark-structured-streaming/read

[29] https://medium.com/@kiranvutukuri/output-modes-in-apache-spark-structured-streaming-part-3-9f4086c8667b

[30] https://docs.databricks.com/aws/en/structured-streaming/output-mode

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character