Veri Bilimi Okulu

Apache Kafka Exactly-Once Semantics: Mesajlar Ne Kaybolsun, Ne Tekrar Etsin!
Apache Kafka Exactly-Once Semantics: Mesajlar Ne Kaybolsun, Ne Tekrar Etsin!
apache_kafka_exacly_once

Loading

Bu yazıda sizlerle Apache Kafka’nın en kritik ve en çok yanlış anlaşılan özelliklerinden biri olan exactly-once semantics (tam bir kez semantiği) konusunu detaylıca konuşacağız. Bu özellik dağıtık sistemlerde çözmesi en zor problemlerden biri olarak kabul ediliyor ve bazı uzmanlar bunun matematiksel olarak imkansız olduğunu bile iddia ediyordu [2][7]. Ama Kafka bunu başardı! Gelin birlikte nasıl olduğunu öğrenelim.

Üç Garanti Seviyesi

Mesaj iletiminde üç farklı garanti seviyesi var. Bunları günlük hayattan örneklerle açıklayayım [2][7]:

At-Most-Once (En Fazla Bir Kez): Mesajı bir kez gönderirsiniz, eğer hata olursa tekrar denemezsiniz. Mesaj kaybolabilir ama asla çift gönderim olmaz. Tıpkı WhatsApp’tan mesaj gönderirken internetiniz kesildiğinde, “canım boş ver, tekrar göndermeyeyim” demeniz gibi [2].

At-Least-Once (En Az Bir Kez): Mesajın mutlaka ulaşmasını istersiniz, bu yüzden hata olursa tekrar tekrar denersiniz. Mesaj kaybolmaz ama bazen aynı mesaj birden fazla gidebilir. Bankacıya “paranın geçtiğinden emin misin?” diye tekrar tekrar sormanız gibi – para mutlaka gider ama belki iki kez kesilir [2].

Exactly-Once (Tam Bir Kez): İşte biz buradayız! Mesaj ne kaybolur ne de tekrar eder. Her mesaj bir kez ve sadece bir kez işlenir [2][7].

Tarihi Bir Adım: Kafka 0.11.0

Kafka başlangıçta sadece at-most-once ve at-least-once desteği sunuyordu. Ama 2017 yılında Kafka 0.11.0 sürümü ile birlikte exactly-once semantics dünyasına geldi ve bu gerçekten devrim niteliğinde bir özellik oldu [7][12]. Confluent’ın CTO’su Neha Narkhede bu özelliği açıkladığında topluluğun şoku büyüktü çünkü herkes bunun dağıtık sistemlerde imkansız olduğuna inanıyordu [6][7].

İlginç bir detay: Kafka 3.0 sürümünden itibaren (2021), idempotent producer özellikleri varsayılan olarak aktif hale getirildi [2][17]. Yani artık özel bir ayar yapmasanız bile, producer’larınız acks=all ve enable.idempotence=true ayarlarıyla çalışıyor [2][17][27].

1. İdempotent Producer

Ne İşe Yarar?

İdempotent producer, tek bir partition içinde mesajların tekrarlanmasını engelleyen mekanizmadır [3][5]. Peki nasıl çalışır?

Kafka her producer’a özel bir Producer ID (PID) verir ve her mesaja bir sequence number (sıra numarası) ekler. Broker bu numaraları takip eder [2][9].

Şöyle düşünün: Bir arkadaşınıza aynı mesajı yanlışlıkla iki kez göndermek istiyorsunuz ama telefon akıllı ve diyor ki “Dur, bu mesajı zaten bir dakika önce gönderdin!” İşte Kafka’nın yaptığı da bu [9].

Konfigürasyonu

enable.idempotence=true

Bu tek satır, producer’ınızı idempotent yapar [1][4]. Kafka 3.0’dan sonra bu varsayılan olarak aktif, yani elle ayarlamanıza gerek yok [2][17][27].

Performans Etkisi

İyi haber: İdempotent producer’ın performans maliyeti ihmal edilebilir seviyede [7][30]. Confluent’ın raporlarına göre, idempotent olmayan producer’a kıyasla neredeyse hiç fark yok [30].

Kritik Sınırlamalar: İdempotence Ne Zaman Yetmez?

Burası çok önemli! İdempotent producer şu durumlarda yeterli DEĞİLDİR [3][5][13]:

  1. Producer yeniden başlatılırsa: Her yeniden başlatmada yeni bir PID alınır, bu yüzden önceki PID ile gönderilen mesajların tekrar elenmesi garanti edilemez [5][13][26].
  2. Birden fazla partition’a yazıyorsanız: İdempotence sadece tek bir partition için geçerlidir [3][5].
  3. Birden fazla topic’e aynı anda yazıyorsanız: Farklı topic’lere yapılan yazmaların atomik olması garanti edilmez [3].
  4. Consume-transform-produce pattern’i kullanıyorsanız: Bir topic’ten okuyup, işleyip, başka bir topic’e yazıyorsanız, sadece idempotence yeterli değil [3][8].

İşte bu durumlar için transactional producer devreye giriyor!

2. Transactional Producer: Tam Koruma

Ne Zaman Kullanmalıyız?

Transactional producer, yukarıda saydığımız durumlar için tasarlanmıştır [3][4]. Özellikle şu senaryolarda kritiktir:

  • Çoklu partition yazma: Birden fazla partition’a yazıyorsanız [1][3]
  • Çoklu topic yazma: Farklı topic’lere yazıyorsanız [1][3]
  • Stream processing: Kafka’dan oku, işle, Kafka’ya yaz senaryoları [1][3]
  • Offset yönetimi: Consumer offset’lerini de transaction’a dahil etmek istiyorsanız [1][3]

Nasıl Çalışır?

Bankadan para transfer ederken olduğu gibi düşünün: Ya hem sizin hesabınızdan para çekilecek hem de karşı tarafa yatırılacak, ya da hiçbir işlem olmayacak. Yarım kalan işlem olmaz [1][10].

Kafka’da da aynı mantık [1][10]:

  1. Producer bir transaction başlatır: beginTransaction()
  2. Birden fazla topic/partition’a mesaj gönderir
  3. commitTransaction() veya abortTransaction() çağrısı yapar
  4. Transaction coordinator tüm mesajları ya hep birlikte görünür yapar, ya da hiçbirini göstermez

Konfigürasyonu

enable.idempotence=true
transactional.id=benzersiz-transaction-id

Önemli: transactional.id ayarlandığında, idempotence otomatik olarak aktif hale gelir [4][17]. Ayrıca bu ID her producer instance için benzersiz olmalı ama yeniden başlatmalarda aynı kalmalıdır [3][4].

Performans Etkisi: Maliyeti Var mı?

Evet, transaction’ların bir maliyeti var [11][25][26]:

Throughput etkisi:

  • Kısa commit interval (100ms): %15-30 throughput düşüşü (100-1000 byte mesajlar için) [30]
  • Uzun commit interval (30 saniye): 1KB+ mesajlarda neredeyse sıfır maliyet [30]

Latency etkisi:

  • Her producer aynı anda sadece bir transaction çalıştırabilir (seri çalışır) [2][26]
  • Transaction başlatma, commit ve abort işlemleri ek RPC çağrıları gerektirir [25][26]
  • Az partition, çok mesaj: Maliyet dağılır, performans iyi [26]
  • Çok partition, az mesaj: RPC maliyetleri artabilir [26]

Çözüm: Uygulamanızın ihtiyacına göre commit.interval.ms ayarını optimize edin [11][28]. Düşük latency mi önemli, yoksa yüksek throughput mu? Bu dengeyi siz kurmalısınız.

3. Transactional Coordinator: Trafik Polisi

Broker tarafında çalışan bu bileşen, transaction’ları yöneten trafik polisi gibidir [1][10]. İki fazlı commit protokolüne (2PC) benzer şekilde çalışır [1]:

  • Hangi producer’ların açık transaction’ları olduğunu __transaction_state topic’inde takip eder [2][17]
  • commitTransaction() geldiğinde, tüm mesajları atomik olarak “committed” işaretler [1]
  • Commit edilmemiş mesajlar consumer’lar tarafından görülemez [1][2]
  • Transaction timeout kontrolü yapar, zamanında tamamlanmayan transaction’ları abort eder [15]

4. Read-Committed Consumer: Güvenli Okuma

Consumer tarafında da bir ayar yapmamız gerekiyor [1][4]:

isolation.level=read_committed

Bu ayarla consumer’ınız sadece commit edilmiş mesajları okur. Yarım kalmış veya iptal edilmiş transaction’ların mesajlarını asla görmez [1][2].

İki seçenek var [1][16]:

  • read_committed: Sadece commit edilmiş mesajları okur (EOS için gerekli)
  • read_uncommitted: Her şeyi okur (varsayılan, eski client’lar için)

Pratik Örnekler

Python confluent_kafka ile Transactional Producer

Python’da confluent_kafka kütüphanesi ile exactly-once semantics kullanmak çok kolay [33][34]:

from confluent_kafka import Producer, Consumer

# Producer konfigürasyonu
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'python-txn-producer-1',
    'enable.idempotence': True  # Otomatik aktif olur ama açıkça yazabiliriz
}

producer = Producer(producer_config)

# Transaction'ları başlat
producer.init_transactions()

# Consumer konfigürasyonu (consume-transform-produce için)
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'python-consumer-group',
    'enable.auto.commit': False,  # Manuel commit için
    'isolation.level': 'read_committed'  # Sadece commit edilmiş mesajları oku
}

consumer = Consumer(consumer_config)
consumer.subscribe(['input-topic'])

try:
    while True:
        # Mesajları oku
        msg = consumer.poll(timeout=1.0)
        
        if msg is None:
            continue
            
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
        
        # Transaction başlat
        producer.begin_transaction()
        
        try:
            # Mesajı işle ve yaz
            processed_value = msg.value().decode('utf-8').upper()
            producer.produce(
                'output-topic',
                key=msg.key(),
                value=processed_value.encode('utf-8')
            )
            
            # Offset'i transaction'a dahil et
            producer.send_offsets_to_transaction(
                [{'topic': msg.topic(), 
                  'partition': msg.partition(), 
                  'offset': msg.offset() + 1}],
                consumer.consumer_group_metadata()
            )
            
            # Transaction'ı commit et
            producer.commit_transaction()
            
        except Exception as e:
            # Hata durumunda abort et
            print(f"Error: {e}")
            producer.abort_transaction()
            
except KeyboardInterrupt:
    pass
finally:
    producer.flush()
    consumer.close()

Bu kod [33][36]:

  • Input topic’ten mesaj okur
  • Mesajı işler (büyük harfe çevirir)
  • Output topic’e yazar
  • Offset’i aynı transaction’da commit eder
  • Hata durumunda tüm transaction’ı abort eder

Apache Flink ile Exactly-Once

Apache Flink checkpoint mekanizması ve two-phase commit protokolü ile Kafka’da exactly-once garantisi sağlar [42][49]:

// Scala/Java örneği
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util.Properties

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Checkpointing aktif et
env.enableCheckpointing(5000)  // 5 saniyede bir checkpoint

// Kafka Consumer
val consumerProps = new Properties()
consumerProps.setProperty("bootstrap.servers", "localhost:9092")
consumerProps.setProperty("group.id", "flink-consumer")
consumerProps.setProperty("isolation.level", "read_committed")

val kafkaConsumer = new FlinkKafkaConsumer[String](
    "input-topic",
    new SimpleStringSchema(),
    consumerProps
)

// Kafka Producer - Exactly-Once semantics
val producerProps = new Properties()
producerProps.setProperty("bootstrap.servers", "localhost:9092")
producerProps.setProperty("transaction.timeout.ms", "900000")  // 15 dakika

val kafkaProducer = new KafkaSink.Builder[String]
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(...)
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-kafka-")
    .setKafkaProducerConfig(producerProps)
    .build()

// Stream processing
env
    .addSource(kafkaConsumer)
    .map(value => value.toUpperCase())
    .sinkTo(kafkaProducer)

env.execute("Flink Exactly-Once Example")

Önemli Flink detayları [42][43][46]:

  1. Checkpoint mekanizması: Flink düzenli aralıklarla state snapshot’ları alır [42][45]
  2. Two-phase commit: Kafka transaction’ları Flink checkpoint’leriyle koordine edilir [42][49]
  3. Transaction timeout: Checkpoint süresinden ve beklenen downtime’dan uzun olmalı [46][48]
  4. İzolasyon seviyesi: Consumer’lar read_committed olmalı [43][50]

PySpark Structured Streaming ile Exactly-Once

PySpark Structured Streaming checkpoint ve idempotent sink mekanizmaları ile exactly-once garanti verir [54][58]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_json, struct

# Spark session
spark = SparkSession.builder \
    .appName("Kafka Exactly-Once") \
    .getOrCreate()

# Kafka'dan oku
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input-topic") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

# Mesajı işle
processed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .selectExpr("upper(value) as value")

# Kafka'ya yaz - foreachBatch ile exactly-once
def write_to_kafka(batch_df, batch_id):
    """
    Her batch için çağrılan fonksiyon
    batch_id: Monotonic artan unique ID (checkpoint'lerle tutarlı)
    """
    batch_df.selectExpr("CAST(value AS STRING) as value") \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "output-topic") \
        .save()
    
    print(f"Batch {batch_id} yazıldı")

# Streaming query başlat
query = processed_df.writeStream \
    .foreachBatch(write_to_kafka) \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .trigger(processingTime='10 seconds') \
    .start()

query.awaitTermination()

PySpark’ta exactly-once nasıl çalışır [54][55][57]:

  1. Checkpointing: Spark her batch için offset’leri Write-Ahead Log’a kaydeder [54][55]
  2. Idempotent sinks: Sink’ler reprocessing’e dayanıklı tasarlanmıştır [54][58]
  3. Replay mekanizması: Hata durumunda source replay edilebilir olmalı [54][58]
  4. Batch ID: Her batch unique ve monotonic artan ID alır [55][57]

Alternatif: Doğrudan Kafka sink kullanımı:

query = processed_df \
    .selectExpr("CAST(value AS STRING) as value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output-topic") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .outputMode("append") \
    .start()

Önemli Sınırlamalar ve Gerçekler

1. Sadece Kafka İçinde Geçerli

Kafka’nın exactly-once garantisi sadece Kafka içinde geçerlidir [8][11][14][28]. Yani:

❌ External database’e yazıyorsanız: Kafka bunu geri alamaz
❌ REST API çağrısı yapıyorsanız: Kafka bunu kontrol edemez
❌ Email/SMS gönderiyorsanız: Kafka bunu geri çekemez

Çözüm: İki ayrı transaction olarak tasarlayın [11][28]:

  1. Kafka içinde transaction ile çıktı üretin
  2. Bu çıktıyı idempotent şekilde external sisteme yazın

Transactional Outbox Pattern kullanarak external sistemlerle entegrasyonu iyileştirebilirsiniz [8].

2. Upstream Producer’ları Kontrol Edemezsiniz

Eğer başka ekiplerin/servislerin topic’lerinden veri okuyorsanız, onların exactly-once kullanıp kullanmadığını kontrol edemezsiniz [8]. Bu durumda kendi tarafınızda ek deduplication (tekrar eleme) mekanizmaları eklemeniz gerekebilir.

3. Performans vs Garanti Trade-off’u

Exactly-once kullanmak her zaman gerekli değildir [8]:

  • At-least-once + application-level deduplication bazen daha pratik ve performanslı olabilir
  • Finansal işlemler, tıbbi veriler gibi kritik durumlarda exactly-once şarttır [19]
  • Metrik toplama gibi durumlarda at-most-once bile yeterli olabilir [3]

4. Framework-Spesifik Dikkat Edilmesi Gerekenler

Flink için [46][48][51]:

  • Transaction timeout > (max checkpoint duration + max downtime)
  • Plansız kapanmalarda data loss riski var, planlı kapanmalarda stop komutu kullanın [51]

PySpark için [54][56][57]:

  • foreachBatch içinde exception varsa batch tekrar çalıştırılır, deduplication önemli [56]
  • (partitionId, epochId) garantisi yok, optimizer değişimleri sonucu farklı partition sayıları olabilir [54][58]

Özet Tablo

Özellikİdempotent ProducerTransactional Producer
KapsamTek partitionÇoklu partition/topic
Producer restart koruması❌ Yok✅ Var (transactional.id ile)
Çoklu topic yazma❌ Atomik değil✅ Atomik
Offset commit entegrasyonu❌ Yok✅ Var
Performans maliyetiİhmal edilebilir%15-30 (ayarlara göre)
Kafka 3.0+ varsayılan✅ Evet❌ Hayır (manuel aktif)
Python desteği✅ confluent_kafka✅ confluent_kafka
Flink desteği✅ Otomatik✅ Checkpoint ile entegre
PySpark desteği✅ Checkpoint ile✅ foreachBatch ile

Son Sözler ve Öneriler

Exactly-once semantics kullanmak için [8][19]:

  1. İhtiyacınızı değerlendirin: Her uygulama exactly-once gerektirmez
  2. Doğru framework’ü seçin: Python için confluent_kafka, büyük ölçek için Flink, Spark ekosistemindeyseniz PySpark
  3. Performans testleri yapın: Commit interval, checkpoint interval’ı optimize edin
  4. Monitoring ekleyin: Transaction başarı/başarısızlık metriklerini izleyin
  5. İdempotent tüketimi planlayın: External sistemlerle çalışırken uygulama seviyesinde idempotence sağlayın
  6. Kafka sürümünüzü kontrol edin: Kafka 3.0+ kullanıyorsanız idempotence zaten aktif

Özetle: Kafka’nın exactly-once semantics özelliği dağıtık sistemlerin en zor problemlerinden birini çözüyor. İdempotent producer tek partition için, transactional producer ise çoklu partition/topic senaryoları için tam koruma sağlıyor [1][3][7]. Python, Flink ve PySpark gibi farklı ekosistemlerden bu özelliği kullanabilir, her biri kendi avantajlarıyla exactly-once garantisi verebilirsiniz.

Umarım bu yazıyla Kafka’nın exactly-once garantisini nasıl sağladığını ve farklı framework’lerle nasıl kullanacağınızı tam anlamıyla kavramışsınızdır. Pratik yaparken bu mekanizmaları kullanacak, kendi uygulamalarınızda veri kaybı ve tekrar sorunlarından kurtulacaksınız!

Sorularınız varsa yorum bölümünde buluşalım! 🚀

Kaynaklar

[1] Apache Kafka Documentation – KIP-98: Exactly Once Delivery and Transactional Messaging
[2] Strimzi Blog – Exactly-once semantics with Kafka transactions (2023)
[3] Apache Kafka Confluence – KIP-98 Technical Specification
[4] Baeldung – Exactly Once Processing in Kafka with Java (2018)
[5] Stack Overflow – Kafka Producer Idempotence Discussion
[6] Hevo Data – What is Kafka Exactly Once Semantics (2025)
[7] Confluent Blog – Exactly-once Semantics: How Apache Kafka Does it (2017)
[8] Nejc Korasa – Idempotent Processing with Kafka (2023)
[9] Medium – Kafka Idempotent Producer and Consumer (2020)
[10] AutoMQ – Kafka Exactly Once Semantics Implementation (2024)

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character