Spark Structured Streaming: Birden Fazla Kafka Topic’e Produce Etmek

Merhabalar. Apache Spark Structured Streaming yaygın kullanılan akan veri işleme platformlarından birisi. Elbette akan verinin işlendiği bir ekosistemde Apache Kafka da bir şekilde yerini alıyor. Spark+Kafka bu tür tasarımlar için iyi bir ikili. Bu yazımızda Spark Structured Streaming ile işlenen bir verinin üretilen çıktıya bağlı olarak birden fazla topic’e gönderimine dair bir örnek yapacağız.

 

 

Şekil-1: Spark Structured Streaming Uygulama Veri Akışı ve Bileşenler
Şekil-1: Spark Structured Streaming Uygulama Veri Akışı ve Bileşenler

 

 

Uygulamanın bileşenleri ve veri akışı yukarıda Şekil-1‘de görülmektedir. Buna göre kafka-console-producer‘dan elle veri girişi yapılacaktır. Girdi veri yapısı iki sütundan oluşacak ve virgülle ayrılacaktır. Birinci sütun (virgülün solu) girdileri temsil ederken, ikincisi (virgülün sağı) ise hedef değişkeni temsil edecektir.  Bu kafka-console-producer’dan ürettiğimiz mesajlar kafka input_topic‘e gidecektir (1 numaralı akış). Apache Spark Streaming uygulaması tarafından input_topic consume edilecektir (2 numaralı akış). Okunan veriler daha öncesinden eğitilmiş modelden geçirilerek hedef değişken tespit edilecektir (Is Outlier kontrolü). Yeri gelmişken söyleyeyim yazıyı sade tutmak amacıyla gerçekte böyle birşey yapmayacağım, bunu basit bir filtreleme ile temsil edeceğiz. Is Outlier kontrolünden elde edilen sonuca göre (outlier veya normal) ilgili Kafka topic’e mesaj gönderilecektir (3 numaralı akış). Son olarak iki kafka-console-consumer tarafından iki faklı topic’teki mesajlar okunacaktır (4 numaralı akış). Uygulamanın mantığını anladıysak kod kısmına geçelim.

Ortam Bilgileri

Spark: 2.4.0

Kafka: 2.5.0

Kafka Broker IP: 192.168.206.130

Java: 1.8.0

Scala: 2.11.8

İşletim sistemi: Windows 10 (Spark geliştirme), Centos7 (Kafka single broker)

IDE: IntelliJ IDEA

Kafka Topic’leri Oluşturmak

Toplamda 3 tane topic ihtiyacımız var (Şekil-1’de Kafka üzerindeki turuncu kutucuklar).

[train@localhost ~]$ kafka-topics.sh --bootstrap-server 192.168.206.130:9092 \
--create --topic input_topic \
--partitions 3 --replication-factor 1


[train@localhost ~]$ kafka-topics.sh --bootstrap-server 192.168.206.130:9092 \
--create --topic normal \
--partitions 3 --replication-factor 1


[train@localhost ~]$ kafka-topics.sh --bootstrap-server 192.168.206.130:9092 \
--create --topic outlier \
--partitions 3 --replication-factor 1

Spark Streaming

Kullanılan kütüphaneler:

import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
import org.apache.spark.sql.types._
import org.apache.log4j.{Level, Logger}

Mesaj iletim garantisi için bir checkpoint dizini belirleyelim

val checkpoint_dir = "C:\\tmp\\kafkaCheckPointDir"

Spark Session oluşturma

val spark = SparkSession.builder()
      .master("local[4]")
      .appName("readWriteKafkaForEachBatch")
      .config("spark.driver.memory","2g")
      .config("spark.executor.memory","4g")
      .getOrCreate()

Scala diline özgü implicits kütüphanesini dahil edelim. SparkSession sonrası kullanmamız gerekiyor.

import spark.implicits._

2 numaralı veri akışı için Kafka input_topic’den mesaj okuyan Spark kod bloğu:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.206.130:9092")
      .option("subscribe", "input_topic")
      .load()

Kafka’dan okuduğumuz veride standart sütunlar var. Bunlardan asıl veriyi tutan value sütununu binary’den string türüne çeviriyoruz ve parse ediyoruz ki dataframe’i daha yapısal hale getirerek filtreleme imkanı bulabilelim. Diğer sütunlar salata tutuyor.

val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .withColumn("input_values", F.split(F.col("value"), ",")(0).cast(StringType))
 .withColumn("output_value", F.split(F.col("value"), ",")(1).cast(StringType))

ForeachBatch için fonksiyon

Spark Structured Streaming uygulamalarında birden fazla hedef  (output sink) varsa genellikle foreachBatch kullanılır. Bu yöntemde adından da anlaşılacağı gibi aslında her bir microbatch üzerinde bildiğimiz dataframe operasyonlarını uygulama imkanımız olur. Yani her bir mikro-parti için batch dataframe API’nin zengin dünyası kapılarını bize açar ve böylelikle Spark Structured Streming’de bulunmayan Cassandra, Elasticsearch vb. output sinkler’e streaming uygulaması içinden yazma imkanımız olur. Her nekadar Structured Streaming’de Kafka sinki bulunsa da birden fazla topic’e mesaj göndereceğimiz için foreachBatch sink’ini seçiyoruz. ForeachBatch’te kullanılan yaygın bir yaklaşım ise batch içinde yapılacak tüm pis işleri bir fonksiyonda toplayarak sink tarafına sadece foreachBatch ve fonksiyon adını götürmektir.

Aşağıdaki fonksiyonda yapılanlar şunlar. Önce dataframe’i output_value sütunu (bu producer’a elle girdiğimiz veride virgülün sağ tarafına karşılık geliyor) üzerinden filtreliyoruz. Eğer bu değer outlier ise outlierDF ‘te topluyoruz eğer normal ise inlierDF’te topluyoruz. Böylelikle her microbatch içinde elimizde normal ve outlier kayıtlardan müteşekkil iki dataframe oluyor. Bu aşama Şekil-1’deki Is Outlier kontrolünü temsil ediyor basitçe. Gerçek uygulamada burada veriyi makine öğrenmesi modelinden geçirerek prediction değerlerine göre filtrelemelisiniz.  Sonrada bu iki dataframe’i Kafka’da farklı iki topic’e yazıyoruz (3 numaralı akış).

def myCustomEBFunction( inputDF:DataFrame, batchID:Long ) : Unit = {
      val outlierDF = inputDF.filter(F.col("output_value").equalTo("outlier"))
      outlierDF.show(5)

      val outlierDF2 = outlierDF.withColumn("value",
        F.concat(F.col("input_values"), F.lit(","), F.col("output_value")))

      outlierDF2.select("value").write
        .format("kafka")
        .option("kafka.bootstrap.servers","192.168.206.130:9092")
        .option("topic","outlier")
        .save()
   
    val inlierDF = inputDF.filter(F.col("output_value").equalTo("normal"))
    inlierDF.show(5)

    val inlierDF2 = inlierDF.withColumn("value",
    F.concat(F.col("input_values"), F.lit(","), F.col("output_value")))

    inlierDF2.select("value").write
    .format("kafka")
    .option("kafka.bootstrap.servers","192.168.206.130:9092")
    .option("topic","normal")
    .save()

Spark tarafında son olarak streaming akışını başlatıyoruz.

val query = df2.writeStream.foreachBatch(myCustomEBFunction _).start()

query.awaitTermination()

Yukarıda dikkat ettiyseniz çok sade bir kod var. Bunun sebebi pis işlerin fonksiyon marifetiyle yapılıyor olmasıdır. myCustomEBFunction‘ın yukarıda tanımladığımız fonksiyon olduğuna dikkat edelim. Fonksiyon parametrelerine (inputDF:DataFrame, batchID:Long) gelince inputDF bildiğimiz Kafka’dan gelen veri, batchID ise hani nerede hiç argüman olarak göndermedik derseniz o da Spark her microbatch için bir id üretiyor bu fonksiyona kendi otomatik gidiyor. Elle bir şey belirtmenize gerek yok. Yerini yapın yeter 🙂

Akışın başlatılması

Spark uygulamamız hazır, Kafka broker çalışıyor. IntelliJ içinden uygulamayı çalıştırdık, SparkSession oluştu ve uygulama çalışmaya başladı. Sonrasında bir terminalde kafka-console-producer açıyoruz ve içine mesaj yazıyoruz.

[train@localhost big_data]$ kafka-console-producer.sh \
--bootstrap-server 192.168.206.130:9092 \
--topic input_topic

>bu outlier girdisi,outlier
>bu inlier,normal
>bu outlier girdisi,outlier
>bu inlier,normal
>ikinci outlier,outlier
>ikinci inlier,normal

Daha sonra iki farklı teminalde kafka-console-consumer açıyoruz.

[train@localhost ~]$ kafka-console-consumer.sh \
--bootstrap-server 192.168.206.130:9092 \
--topic outlier



[train@localhost ~]$ kafka-console-consumer.sh \
--bootstrap-server 192.168.206.130:9092 \
--topic normal

Sonuçların ilgili kafka-console-consumer‘a gittiğini gözlemliyoruz. Eğer outlier consumer terminal üzerinden

bu outlier girdisi,outlier
ikinci outlier,outlier

ve normal consumer terminal üzerinden

bu inlier,normal
ikinci inlier,normal

benzeri sonuçlar görüyorsanız uygulamanız başarıyla çalışıyor demektir. Geriye sadece foreachBatch fonksiyonu içinde ML modelinden veri geçirmek kalır.

Bu yazıma ait tüm Intellij projesi ve kodlar buradadır.

Başka bir yazıda görüşmak üzere esen kalın.

Kapak Görseli: Eric Prouzet on Unsplash

Yazar Hakkında
Toplam 174 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (2 yorum)
Nur
Nur Yanıtla
- 14:03

Merhaba, Bilişim Sistemleri Mühendizliği öğrencisiyim. “Büyük veri alanında gerçek zamanlı analizleri etkileyen parametrelerin araştırılması ve öneri modeli (ieee)”
konulu bir projeye başlıyorum. Yol haritamı belirlememde ve projeyi geliştirmemde yardım ve tavsiyelerinizi bekliyorum. Teşekkürler 🙂

    Erkan ŞİRİN
    Erkan ŞİRİN Yanıtla
    - 10:24

    Merhabalar. LinkedIn üzerinden iletişim kurabilirsiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara