
Bu yazıda sizlerle Apache Kafka’nın en kritik ve en çok yanlış anlaşılan özelliklerinden biri olan exactly-once semantics (tam bir kez semantiği) konusunu detaylıca konuşacağız. Bu özellik dağıtık sistemlerde çözmesi en zor problemlerden biri olarak kabul ediliyor ve bazı uzmanlar bunun matematiksel olarak imkansız olduğunu bile iddia ediyordu [2][7]. Ama Kafka bunu başardı! Gelin birlikte nasıl olduğunu öğrenelim.
Üç Garanti Seviyesi
Mesaj iletiminde üç farklı garanti seviyesi var. Bunları günlük hayattan örneklerle açıklayayım [2][7]:
At-Most-Once (En Fazla Bir Kez): Mesajı bir kez gönderirsiniz, eğer hata olursa tekrar denemezsiniz. Mesaj kaybolabilir ama asla çift gönderim olmaz. Tıpkı WhatsApp’tan mesaj gönderirken internetiniz kesildiğinde, “canım boş ver, tekrar göndermeyeyim” demeniz gibi [2].
At-Least-Once (En Az Bir Kez): Mesajın mutlaka ulaşmasını istersiniz, bu yüzden hata olursa tekrar tekrar denersiniz. Mesaj kaybolmaz ama bazen aynı mesaj birden fazla gidebilir. Bankacıya “paranın geçtiğinden emin misin?” diye tekrar tekrar sormanız gibi – para mutlaka gider ama belki iki kez kesilir [2].
Exactly-Once (Tam Bir Kez): İşte biz buradayız! Mesaj ne kaybolur ne de tekrar eder. Her mesaj bir kez ve sadece bir kez işlenir [2][7].
Tarihi Bir Adım: Kafka 0.11.0
Kafka başlangıçta sadece at-most-once ve at-least-once desteği sunuyordu. Ama 2017 yılında Kafka 0.11.0 sürümü ile birlikte exactly-once semantics dünyasına geldi ve bu gerçekten devrim niteliğinde bir özellik oldu [7][12]. Confluent’ın CTO’su Neha Narkhede bu özelliği açıkladığında topluluğun şoku büyüktü çünkü herkes bunun dağıtık sistemlerde imkansız olduğuna inanıyordu [6][7].
İlginç bir detay: Kafka 3.0 sürümünden itibaren (2021), idempotent producer özellikleri varsayılan olarak aktif hale getirildi [2][17]. Yani artık özel bir ayar yapmasanız bile, producer’larınız acks=all
ve enable.idempotence=true
ayarlarıyla çalışıyor [2][17][27].
1. İdempotent Producer
Ne İşe Yarar?
İdempotent producer, tek bir partition içinde mesajların tekrarlanmasını engelleyen mekanizmadır [3][5]. Peki nasıl çalışır?
Kafka her producer’a özel bir Producer ID (PID) verir ve her mesaja bir sequence number (sıra numarası) ekler. Broker bu numaraları takip eder [2][9].
Şöyle düşünün: Bir arkadaşınıza aynı mesajı yanlışlıkla iki kez göndermek istiyorsunuz ama telefon akıllı ve diyor ki “Dur, bu mesajı zaten bir dakika önce gönderdin!” İşte Kafka’nın yaptığı da bu [9].
Konfigürasyonu
enable.idempotence=true
Bu tek satır, producer’ınızı idempotent yapar [1][4]. Kafka 3.0’dan sonra bu varsayılan olarak aktif, yani elle ayarlamanıza gerek yok [2][17][27].
Performans Etkisi
İyi haber: İdempotent producer’ın performans maliyeti ihmal edilebilir seviyede [7][30]. Confluent’ın raporlarına göre, idempotent olmayan producer’a kıyasla neredeyse hiç fark yok [30].
Kritik Sınırlamalar: İdempotence Ne Zaman Yetmez?
Burası çok önemli! İdempotent producer şu durumlarda yeterli DEĞİLDİR [3][5][13]:
- Producer yeniden başlatılırsa: Her yeniden başlatmada yeni bir PID alınır, bu yüzden önceki PID ile gönderilen mesajların tekrar elenmesi garanti edilemez [5][13][26].
- Birden fazla partition’a yazıyorsanız: İdempotence sadece tek bir partition için geçerlidir [3][5].
- Birden fazla topic’e aynı anda yazıyorsanız: Farklı topic’lere yapılan yazmaların atomik olması garanti edilmez [3].
- Consume-transform-produce pattern’i kullanıyorsanız: Bir topic’ten okuyup, işleyip, başka bir topic’e yazıyorsanız, sadece idempotence yeterli değil [3][8].
İşte bu durumlar için transactional producer devreye giriyor!
2. Transactional Producer: Tam Koruma
Ne Zaman Kullanmalıyız?
Transactional producer, yukarıda saydığımız durumlar için tasarlanmıştır [3][4]. Özellikle şu senaryolarda kritiktir:
- Çoklu partition yazma: Birden fazla partition’a yazıyorsanız [1][3]
- Çoklu topic yazma: Farklı topic’lere yazıyorsanız [1][3]
- Stream processing: Kafka’dan oku, işle, Kafka’ya yaz senaryoları [1][3]
- Offset yönetimi: Consumer offset’lerini de transaction’a dahil etmek istiyorsanız [1][3]
Nasıl Çalışır?
Bankadan para transfer ederken olduğu gibi düşünün: Ya hem sizin hesabınızdan para çekilecek hem de karşı tarafa yatırılacak, ya da hiçbir işlem olmayacak. Yarım kalan işlem olmaz [1][10].
Kafka’da da aynı mantık [1][10]:
- Producer bir transaction başlatır:
beginTransaction()
- Birden fazla topic/partition’a mesaj gönderir
commitTransaction()
veyaabortTransaction()
çağrısı yapar- Transaction coordinator tüm mesajları ya hep birlikte görünür yapar, ya da hiçbirini göstermez
Konfigürasyonu
enable.idempotence=true transactional.id=benzersiz-transaction-id
Önemli: transactional.id
ayarlandığında, idempotence otomatik olarak aktif hale gelir [4][17]. Ayrıca bu ID her producer instance için benzersiz olmalı ama yeniden başlatmalarda aynı kalmalıdır [3][4].
Performans Etkisi: Maliyeti Var mı?
Evet, transaction’ların bir maliyeti var [11][25][26]:
Throughput etkisi:
- Kısa commit interval (100ms): %15-30 throughput düşüşü (100-1000 byte mesajlar için) [30]
- Uzun commit interval (30 saniye): 1KB+ mesajlarda neredeyse sıfır maliyet [30]
Latency etkisi:
- Her producer aynı anda sadece bir transaction çalıştırabilir (seri çalışır) [2][26]
- Transaction başlatma, commit ve abort işlemleri ek RPC çağrıları gerektirir [25][26]
- Az partition, çok mesaj: Maliyet dağılır, performans iyi [26]
- Çok partition, az mesaj: RPC maliyetleri artabilir [26]
Çözüm: Uygulamanızın ihtiyacına göre commit.interval.ms
ayarını optimize edin [11][28]. Düşük latency mi önemli, yoksa yüksek throughput mu? Bu dengeyi siz kurmalısınız.
3. Transactional Coordinator: Trafik Polisi
Broker tarafında çalışan bu bileşen, transaction’ları yöneten trafik polisi gibidir [1][10]. İki fazlı commit protokolüne (2PC) benzer şekilde çalışır [1]:
- Hangi producer’ların açık transaction’ları olduğunu
__transaction_state
topic’inde takip eder [2][17] commitTransaction()
geldiğinde, tüm mesajları atomik olarak “committed” işaretler [1]- Commit edilmemiş mesajlar consumer’lar tarafından görülemez [1][2]
- Transaction timeout kontrolü yapar, zamanında tamamlanmayan transaction’ları abort eder [15]
4. Read-Committed Consumer: Güvenli Okuma
Consumer tarafında da bir ayar yapmamız gerekiyor [1][4]:
isolation.level=read_committed
Bu ayarla consumer’ınız sadece commit edilmiş mesajları okur. Yarım kalmış veya iptal edilmiş transaction’ların mesajlarını asla görmez [1][2].
İki seçenek var [1][16]:
read_committed
: Sadece commit edilmiş mesajları okur (EOS için gerekli)read_uncommitted
: Her şeyi okur (varsayılan, eski client’lar için)
Pratik Örnekler
Python confluent_kafka ile Transactional Producer
Python’da confluent_kafka kütüphanesi ile exactly-once semantics kullanmak çok kolay [33][34]:
from confluent_kafka import Producer, Consumer # Producer konfigürasyonu producer_config = { 'bootstrap.servers': 'localhost:9092', 'transactional.id': 'python-txn-producer-1', 'enable.idempotence': True # Otomatik aktif olur ama açıkça yazabiliriz } producer = Producer(producer_config) # Transaction'ları başlat producer.init_transactions() # Consumer konfigürasyonu (consume-transform-produce için) consumer_config = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'python-consumer-group', 'enable.auto.commit': False, # Manuel commit için 'isolation.level': 'read_committed' # Sadece commit edilmiş mesajları oku } consumer = Consumer(consumer_config) consumer.subscribe(['input-topic']) try: while True: # Mesajları oku msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print(f"Consumer error: {msg.error()}") continue # Transaction başlat producer.begin_transaction() try: # Mesajı işle ve yaz processed_value = msg.value().decode('utf-8').upper() producer.produce( 'output-topic', key=msg.key(), value=processed_value.encode('utf-8') ) # Offset'i transaction'a dahil et producer.send_offsets_to_transaction( [{'topic': msg.topic(), 'partition': msg.partition(), 'offset': msg.offset() + 1}], consumer.consumer_group_metadata() ) # Transaction'ı commit et producer.commit_transaction() except Exception as e: # Hata durumunda abort et print(f"Error: {e}") producer.abort_transaction() except KeyboardInterrupt: pass finally: producer.flush() consumer.close()
Bu kod [33][36]:
- Input topic’ten mesaj okur
- Mesajı işler (büyük harfe çevirir)
- Output topic’e yazar
- Offset’i aynı transaction’da commit eder
- Hata durumunda tüm transaction’ı abort eder
Apache Flink ile Exactly-Once
Apache Flink checkpoint mekanizması ve two-phase commit protokolü ile Kafka’da exactly-once garantisi sağlar [42][49]:
// Scala/Java örneği import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka._ import org.apache.flink.api.common.serialization.SimpleStringSchema import java.util.Properties val env = StreamExecutionEnvironment.getExecutionEnvironment // Checkpointing aktif et env.enableCheckpointing(5000) // 5 saniyede bir checkpoint // Kafka Consumer val consumerProps = new Properties() consumerProps.setProperty("bootstrap.servers", "localhost:9092") consumerProps.setProperty("group.id", "flink-consumer") consumerProps.setProperty("isolation.level", "read_committed") val kafkaConsumer = new FlinkKafkaConsumer[String]( "input-topic", new SimpleStringSchema(), consumerProps ) // Kafka Producer - Exactly-Once semantics val producerProps = new Properties() producerProps.setProperty("bootstrap.servers", "localhost:9092") producerProps.setProperty("transaction.timeout.ms", "900000") // 15 dakika val kafkaProducer = new KafkaSink.Builder[String] .setBootstrapServers("localhost:9092") .setRecordSerializer(...) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("flink-kafka-") .setKafkaProducerConfig(producerProps) .build() // Stream processing env .addSource(kafkaConsumer) .map(value => value.toUpperCase()) .sinkTo(kafkaProducer) env.execute("Flink Exactly-Once Example")
Önemli Flink detayları [42][43][46]:
- Checkpoint mekanizması: Flink düzenli aralıklarla state snapshot’ları alır [42][45]
- Two-phase commit: Kafka transaction’ları Flink checkpoint’leriyle koordine edilir [42][49]
- Transaction timeout: Checkpoint süresinden ve beklenen downtime’dan uzun olmalı [46][48]
- İzolasyon seviyesi: Consumer’lar
read_committed
olmalı [43][50]
PySpark Structured Streaming ile Exactly-Once
PySpark Structured Streaming checkpoint ve idempotent sink mekanizmaları ile exactly-once garanti verir [54][58]:
from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json, to_json, struct # Spark session spark = SparkSession.builder \ .appName("Kafka Exactly-Once") \ .getOrCreate() # Kafka'dan oku kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "input-topic") \ .option("startingOffsets", "earliest") \ .option("failOnDataLoss", "false") \ .load() # Mesajı işle processed_df = kafka_df.selectExpr("CAST(value AS STRING)") \ .selectExpr("upper(value) as value") # Kafka'ya yaz - foreachBatch ile exactly-once def write_to_kafka(batch_df, batch_id): """ Her batch için çağrılan fonksiyon batch_id: Monotonic artan unique ID (checkpoint'lerle tutarlı) """ batch_df.selectExpr("CAST(value AS STRING) as value") \ .write \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "output-topic") \ .save() print(f"Batch {batch_id} yazıldı") # Streaming query başlat query = processed_df.writeStream \ .foreachBatch(write_to_kafka) \ .option("checkpointLocation", "/path/to/checkpoint") \ .trigger(processingTime='10 seconds') \ .start() query.awaitTermination()
PySpark’ta exactly-once nasıl çalışır [54][55][57]:
- Checkpointing: Spark her batch için offset’leri Write-Ahead Log’a kaydeder [54][55]
- Idempotent sinks: Sink’ler reprocessing’e dayanıklı tasarlanmıştır [54][58]
- Replay mekanizması: Hata durumunda source replay edilebilir olmalı [54][58]
- Batch ID: Her batch unique ve monotonic artan ID alır [55][57]
Alternatif: Doğrudan Kafka sink kullanımı:
query = processed_df \ .selectExpr("CAST(value AS STRING) as value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "output-topic") \ .option("checkpointLocation", "/path/to/checkpoint") \ .outputMode("append") \ .start()
Önemli Sınırlamalar ve Gerçekler
1. Sadece Kafka İçinde Geçerli
Kafka’nın exactly-once garantisi sadece Kafka içinde geçerlidir [8][11][14][28]. Yani:
❌ External database’e yazıyorsanız: Kafka bunu geri alamaz
❌ REST API çağrısı yapıyorsanız: Kafka bunu kontrol edemez
❌ Email/SMS gönderiyorsanız: Kafka bunu geri çekemez
Çözüm: İki ayrı transaction olarak tasarlayın [11][28]:
- Kafka içinde transaction ile çıktı üretin
- Bu çıktıyı idempotent şekilde external sisteme yazın
Transactional Outbox Pattern kullanarak external sistemlerle entegrasyonu iyileştirebilirsiniz [8].
2. Upstream Producer’ları Kontrol Edemezsiniz
Eğer başka ekiplerin/servislerin topic’lerinden veri okuyorsanız, onların exactly-once kullanıp kullanmadığını kontrol edemezsiniz [8]. Bu durumda kendi tarafınızda ek deduplication (tekrar eleme) mekanizmaları eklemeniz gerekebilir.
3. Performans vs Garanti Trade-off’u
Exactly-once kullanmak her zaman gerekli değildir [8]:
- At-least-once + application-level deduplication bazen daha pratik ve performanslı olabilir
- Finansal işlemler, tıbbi veriler gibi kritik durumlarda exactly-once şarttır [19]
- Metrik toplama gibi durumlarda at-most-once bile yeterli olabilir [3]
4. Framework-Spesifik Dikkat Edilmesi Gerekenler
Flink için [46][48][51]:
- Transaction timeout > (max checkpoint duration + max downtime)
- Plansız kapanmalarda data loss riski var, planlı kapanmalarda
stop
komutu kullanın [51]
PySpark için [54][56][57]:
foreachBatch
içinde exception varsa batch tekrar çalıştırılır, deduplication önemli [56](partitionId, epochId)
garantisi yok, optimizer değişimleri sonucu farklı partition sayıları olabilir [54][58]
Özet Tablo
Özellik | İdempotent Producer | Transactional Producer |
---|---|---|
Kapsam | Tek partition | Çoklu partition/topic |
Producer restart koruması | ❌ Yok | ✅ Var (transactional.id ile) |
Çoklu topic yazma | ❌ Atomik değil | ✅ Atomik |
Offset commit entegrasyonu | ❌ Yok | ✅ Var |
Performans maliyeti | İhmal edilebilir | %15-30 (ayarlara göre) |
Kafka 3.0+ varsayılan | ✅ Evet | ❌ Hayır (manuel aktif) |
Python desteği | ✅ confluent_kafka | ✅ confluent_kafka |
Flink desteği | ✅ Otomatik | ✅ Checkpoint ile entegre |
PySpark desteği | ✅ Checkpoint ile | ✅ foreachBatch ile |
Son Sözler ve Öneriler
Exactly-once semantics kullanmak için [8][19]:
- ✅ İhtiyacınızı değerlendirin: Her uygulama exactly-once gerektirmez
- ✅ Doğru framework’ü seçin: Python için confluent_kafka, büyük ölçek için Flink, Spark ekosistemindeyseniz PySpark
- ✅ Performans testleri yapın: Commit interval, checkpoint interval’ı optimize edin
- ✅ Monitoring ekleyin: Transaction başarı/başarısızlık metriklerini izleyin
- ✅ İdempotent tüketimi planlayın: External sistemlerle çalışırken uygulama seviyesinde idempotence sağlayın
- ✅ Kafka sürümünüzü kontrol edin: Kafka 3.0+ kullanıyorsanız idempotence zaten aktif
Özetle: Kafka’nın exactly-once semantics özelliği dağıtık sistemlerin en zor problemlerinden birini çözüyor. İdempotent producer tek partition için, transactional producer ise çoklu partition/topic senaryoları için tam koruma sağlıyor [1][3][7]. Python, Flink ve PySpark gibi farklı ekosistemlerden bu özelliği kullanabilir, her biri kendi avantajlarıyla exactly-once garantisi verebilirsiniz.
Umarım bu yazıyla Kafka’nın exactly-once garantisini nasıl sağladığını ve farklı framework’lerle nasıl kullanacağınızı tam anlamıyla kavramışsınızdır. Pratik yaparken bu mekanizmaları kullanacak, kendi uygulamalarınızda veri kaybı ve tekrar sorunlarından kurtulacaksınız!
Sorularınız varsa yorum bölümünde buluşalım! 🚀
Kaynaklar
[1] Apache Kafka Documentation – KIP-98: Exactly Once Delivery and Transactional Messaging
[2] Strimzi Blog – Exactly-once semantics with Kafka transactions (2023)
[3] Apache Kafka Confluence – KIP-98 Technical Specification
[4] Baeldung – Exactly Once Processing in Kafka with Java (2018)
[5] Stack Overflow – Kafka Producer Idempotence Discussion
[6] Hevo Data – What is Kafka Exactly Once Semantics (2025)
[7] Confluent Blog – Exactly-once Semantics: How Apache Kafka Does it (2017)
[8] Nejc Korasa – Idempotent Processing with Kafka (2023)
[9] Medium – Kafka Idempotent Producer and Consumer (2020)
[10] AutoMQ – Kafka Exactly Once Semantics Implementation (2024)