Apache Flink FileStream Window Aggregation

Merhabalar. Bu yazımızda Apache Flink ile FileStream kaynağından veri okuyacağız ve okuduğumuz bu veri üzerinde zaman pencereli (window) aggregation yaparak ekrana yazdıracağız. Çalışmamda kullandığım ortam bilgileri şu şekildedir: Ubuntu, Intellij IDEA (maven projesi), Java8, Scala 2.11, Flink 1.9.2, github projesine buradan erişebilirsiniz.

Uygulamamızda iris veri setini (data-generator input klasörü içinde mevcuttur) data-generator ile bir dizine loglar halinde üreteceğiz. Flink datastream api ile bu dizini izleyerek okuduğumuz veri üzerinden 7 saniyelik pencereler ile çiçek türlerini sayacağız. Yani son yedi saniyede hangi çiçek türünden ne kadar üretiliyor onu göreceğiz. Şimdi başlayalım kodlamaya.

Kütüphaneleri indirelim

import java.sql.Timestamp
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

Flink StreamExecutionEnvironment

Her Flink uygulaması bir environment oluşturmakla başlar burada StreamExecutionEnvironment oluşturuyoruz.

// Create stream execution env
 val env = StreamExecutionEnvironment.getExecutionEnvironment

Apache Flink ile Veri Okuma

Veri kaynağı olan lokal dizini izleyelim. Monitör etme veya okuma da diyebiliriz.

val irisDataStreams = env.readFileStream(StreamPath = "file:///home/erkan/data-generator/output",
    intervalMillis=1000L)

Apache Flink Datastream Transformations ve Aggregations

Şimdi veriye şema giydirelim. Ben data-generator ile bir kaç log dosyası ürettim ve şemaya aşağıdaki şekilde karar verdim. Scala’nın meşhur case class yapısını kullanacağıız.

case class Iris(SepalLengthCm: Double,
                 SepalWidthCm: Double,
                 PetalLengthCm: Double,
                 PetalWidthCm: Double,
                 Species: String,
                 ts: String)

Yukarıda tanımladığımız şemayı veriye giydirelim. Aynı zamanda ayraçtan(,) alanları bölüyoruz.

val structuredIris = irisDataStreams.map(line => {
    val SepalLengthCm = line.split(",")(0).toDouble
    val SepalWidthCm = line.split(",")(1).toDouble
    val PetalLengthCm = line.split(",")(2).toDouble
    val PetalWidthCm = line.split(",")(3).toDouble
    val Species = line.split(",")(4)
    val ts = line.split(",")(5)

    Iris(SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species, ts)
  })

Şimdi geldik asıl operasyona. Benim amacım son 7 saniyede üretilen çiçek türlerini saymaktı bu yüzden sadece ilgilendiğim alanı (Species) seçiyorum ve yanına 1 koyuyorum (structuredIris.map(x => (x.Species, 1)). Böyle yaparak (String, Int) yapısında bir tuple elde ettim. Bunun üzerine window kullanıyorum. Flink’te farklı bir çok window var. Buradaki TumblingWindow zaman bazlı olup hiç bindirme yapmadan aralıkları kesin olarak ayırır. sum(1) ifadesi ise (String, Int) yapısındaki Int üzerinde yani 1’inci indeks üzerinde toplayarak git demek. Rakamlar hep 1 olduğu için bunlar toplandığında satırları saymış oluruz. Yani her satırda farklı bir çiçek türü olduğundan o türe bir ilave etmiş oluyoruz.

structuredIris.map(x => (x.Species, 1))
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(7)))
    .sum(1)
    .print()

Son olarak ekrana print ediyoruz.

Akışı başlatmak için env.execute() diyoruz.

env.execute("Iris Windowed Count")

Kodları çalıştırma

Uygulamayı ide üzerinden çalıştıralım. Ardından da data-generator’ü çalıştıralım. Bununla ilgili bilgi github sayfasında olduğu için buraya dahil etmiyorum oradaki bilgilerle rahatlıkla çalıştırabileceğinizi düşünüyorum. Benim data-generator ile çalıştırdığım komut:

(datagen) erkan@ubuntu:~/data-generator$ python dataframe_to_log.py -shf True -b 0.1 -r 5

Belli bir süre çalıştıktan sonra ide çıktı penceresine aşağıdakine benzer bir sonuç yazdıracaktır. Her seferinde kaç farklı çiçek türü varsa o kadar satır yazdıracaktır. Rakamlar son 7 saniyede o çiçek türünden üretilen miktarı gösterir.

3> (Iris-virginica,22)
4> (Iris-setosa,14)
1> (Iris-versicolor,14)
4> (Iris-setosa,25)
1> (Iris-versicolor,24)
3> (Iris-virginica,21)
3> (Iris-virginica,24)
4> (Iris-setosa,22)
1> (Iris-versicolor,24)
3> (Iris-virginica,22)
...
...
...

Sonuç

Yukarıda gördüğümüz gibi her üç çiçek türünden son 7 saniyede ne kadar üretildiğini sayan bir Flink Streaming uygulaması yazmış olduk.

Başka bir yazıda görüşmek dileğiyle hoşça kalın.

 

Kapak görseli: Alexey Savchenko on Unsplash

Yazar Hakkında
Toplam 174 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara