Apache Flink ile Kafka’dan Mesaj Okuma (Scala)

Herkese merhaba. Apache Flink gerçek zamanlı veri işleme konusunda oldukça başarılı ve popüler bir araç. Böyle olmasına rağmen ilginç bir şekilde Flink öğrenmek için yeterli kaynak ve güncel örnekler bulmak zor. Mesela benzer bir araç olan Apache Spark ile tonlarca kitap, kurs, makale ve örnek varken Flink’te bunun onda biri bile yok. Daha önce Apache Flink ile ilgili bir kaç yazı yazmıştım. Şimdi ise Kafka’dan Flink ile nasıl mesaj okuruz ile ilgili oldukça basit bir örnek yapacağız. Kullanacağımız dil Scala olacaktır.

Ortam Bilgileri

Operating System: CentOS7

IDE: IntelliJ IDEA 2022.2.3 (Community Edition)

Proje Türü: Maven

Maven: 3.8.6

Java: Java-11

Scala: 2.12.12

Flink: 1.16.0

Kafka: 3.2.0

Ön Gereksinimler

  • Flink kurulmuş ve çalışıyor ve versiyon 1.16.0. (Dikkat: Eski versiyonlarda Kafka source farklı olabilir.)
  • IntelliJ Scala plugin eklenmiş.
  • Maven projesi oluşturulmuş
    • Şu komutu kullanarak ve sorulan sorulara cevap vererek maven scala projesi oluşturabilirsiniz.
    • mvn archetype:generate -DarchetypeGroupId=net.alchim31.maven -DarchetypeArtifactId=scala-archetype-simple
  • localhost:9092‘den erişilebilir bir Kafka mevcut ve çalışıyor.

pom.xml Bağımlılıklar

Tamamı yazının sonunda paylaşılmıştır. Burada ilgili bağımlıklar mevcuttur.

...
...
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>1.16.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>1.16.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.16.0</version>
    </dependency>
...
...

Kütüphaneleri indirelim

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._

Flink StreamExecutionEnvironment

Her Flink uygulaması bir environment oluşturmakla başlar burada StreamExecutionEnvironment oluşturuyoruz.

val env = StreamExecutionEnvironment.getExecutionEnvironment

Kafka Kaynağı

Eski sürümler için farklı örnekler görebilirsiniz.

val kafkaSource = KafkaSource.builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("flink-example")
    .setGroupId("flink-consumer-group")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build()

Kafkadan Gelen Mesajlar

StreamExecutionEnvironment (env)’a az önce tanımladığımız KafkaSource’u (localhost:9092’deki Kafka’nın “flink-example” topiği) göstererek mesajları okumaya (consume etmeye) başlıyoruz ve sonucu lines isminde DataStream[String] türünde bir değişkene atıyoruz.

val lines = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")

Mesajları Konsola yazdırma

Okuduğumuz mesajlara hiç bir şey yapmadan olduğu gibi konsola yazdıracağız. Dilerseniz siz burada artık dönüşüm, temizleme ve filtreleme gibi operasyonları yapabilirsiniz.

lines.print()

Stream Uygulamasını Başlatma

Aşağıdaki kod stream akışını başlatmak için. Bunu kullanmadığınızda stream başlamayacaktır. Kodu çalıştırmadan bekleyelim ve Kafka tarafına geçip topic oluşturalım.

env.execute("Read from Kafka")

Kafka Topic Oluşturma

kafka/bin/kafka-topics.sh --bootstrap-server kafka2:9092 --create --topic flink-example --partitions 3 --replication-factor 1

Flink Uygulamasını Çalıştırma

IntelliJ’de kodların içinden sağ tıklayıp run dediğinizde Flink çalışacak ve Kafka’dan mesaj consume etmeye başlayacaktır.

Kafka’ya Mesaj Göndermek

Kafka ile hazır gelen kafka-console-producer.sh işimizi fazlasıyla görecektir.

kafka/bin/kafka-console-producer.sh --bootstrap-server kafka2:9092 --topic flink-example

> işaretini gördükten sonra mesaj göndermeye başlayabilirsiniz. Örnek olarak ben aşağıdaki mesajları gönderdim:

hi
flink

IntelliJ konsolundan tüketilen mesajları görebilirsiniz.

Şekil-1: Kafka’dan Okunan Örnek Mesajlar

Kodların Tamamı

package myPackage
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._

import java.util.Properties
/*
kafka/bin/kafka-topics.sh --bootstrap-server kafka2:9092 --create --topic flink-example --partitions 3 --replication-factor 3
 */
object ReadFromKafka extends App{
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val kafkaSource = KafkaSource.builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("flink-example")
    .setGroupId("flink-consumer-group")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build()

  val lines = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")

  lines.print()

  env.execute("Read from Kafka")


}

pom.xml Tamamı

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>local.vbo</groupId>
  <artifactId>flink-examples</artifactId>
  <version>0.0.1</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.6</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <spec2.version>4.2.0</spec2.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>1.16.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>1.16.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.16.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.json4s/json4s-native -->
    <dependency>
      <groupId>org.json4s</groupId>
      <artifactId>json4s-native_2.12</artifactId>
      <version>4.0.4</version>
    </dependency>


    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>3.0.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.21.0</version>
        <configuration>
          <!-- Tests will be run with scalatest-maven-plugin instead -->
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>2.0.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>TestSuiteReport.txt</filereports>
          <!-- Comma separated list of JUnit test class names to execute -->
          <jUnitClasses>samples.AppTest</jUnitClasses>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

Çok basit haliyle ve Scala dilinde Flink ile Kafka’dan mesaj okuma için bir örnek vermeye çalıştık. Umarım faydalı olmuştur.

Başka bir yazıda daha görüşmek dileğiyle. Esen kalın…

 

Görsel kapak: Photo by Shane Young on Unsplash

Yazar Hakkında
Toplam 177 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara