Apache Flink HDFS: Okuma ve Yazma

Merhabalar. Bu yazımızda Apache Flink Datastream API ile HDFS’ten veri okuma ve tekrar HDFS’e yazma ile ilgili basit bir örnek yapacağız. Bu yazıyı yazmaya beni motive eden sebep şu oldu: Spark ile HDFS’e yazmaya alışkınız artık çocuk oyuncağı gibi geliyor. Apache Flink HDFS ne kadar zor olabilir ki diye düşündüm ve kendim bir deneyeyim dedim. O da ne? Hiç de beklediğim kadar kolay değilmiş. Aslında bilmeyene kolay şeyler de zor olur ancak ben en azından bir sürü örnek bulur onlardan birini alır kendime uyarlar rahatlıkla bu işi kıvırırım diye düşünmüştüm.  Ancak o bir sürü örnek yokmuş. Olan bir kaç zayıf örnek de eski versiyonlara aitmiş. Çıkan örneklerde kullanılan flink-connector-filesystem kütüphanesi versiyon 1.11.3’te durmuş. Ben şuan itibariyle son sürüm olan 1.13.0 kullanıyorum. Dolayısıyla örneklerden yürüme kolaycılığı işime yaramadı ve Flink dokümanlarını karıştırarak bir şeyler çıkarmaya çalıştım. Aslında işin doğrusu bu,  yani birincil kaynaklarla ilerlemek ancak dokümanların bir sıkıntılı yönü her türlü seçeneği ve alternatifi barındırdıkları için gideceğiniz yolu seçmekte zorlanmanız, hatta çoğu zaman dokümanların içinde kaybolmanız. Neyse laf salatasını fazla uzatmadan geçelim örneğe.

Ortam Bilgileri

Flink: 1.13.0
Hadoop 3.1.2
Scala: 2.12
Java: 1.8.0
IDE: IntelliJ IDEA
Proje türü: Maven
OS: CentOS7

Kütüphaneler

import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

Execution Environment Oluşturma

Spark’ta SparkSession olduğu gibi Flink’te de buna benzer bir ExecutionEnvironment var. Bunu oluşturalım.

// create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

HDFS kaynağından okuma

HDFS kaynağından okumak için çalışan bir Hadoop’un olması gerekiyor. Benim namenode’um localhost:9000‘den erişilebilir durumda. Ayrıca okumak istediğim veri seti /user/train/datasets dizininde mevcut.

// read data from hdfs
val stream = env.readTextFile("hdfs://localhost:9000/user/train/datasets/Advertising.csv")

Okuduklarımızı bir görelim. Bunun debugging’den başka bir amacı yok.

// print what you have read to console for debugging
stream.print()

HDFS’e yazma

HDFS’e yazmadan önce biraz hazırlık yapmak gerekiyor. Aslında bir FileSink nesnesi oluşturuyoruz.  Önce HDFS’te nereye yazacağız onu bir tanımlayalım. Bir çok yerde hedef dizini bir string içinde tanımlamaya alışkınız ancak burada Path sınıfından oluşturacağımız bir nesne içinde saklayacağız. Elbette bu benim tercihim değil 🙂

// define an hdfs directory for sink
val outputPath = new Path("hdfs://localhost:9000/user/flink/advertising")

FileSink oluşturma

// create a FileSink
val sink: StreamingFileSink[String] = StreamingFileSink
    .forRowFormat(outputPath, new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
      DefaultRollingPolicy.builder()
        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
        .withMaxPartSize(1024 * 1024)
        .build())
    .build()

Oluşturduğumuz file sink’i okuduğumuz veri ile buluşturalım

// Write to hdfs
stream.addSink(sink)

Akışı başlatma

// start the streaming app
env.execute("Read Write HDFS")

Şimdi artık IntelliJ üzerinden local modda Flink’i çalıştırabiliriz. Bundan önce şu ön hazırlıkları yapmış olmak gerekiyor:

  • Ortam değişkenleri tanımlı olmalıdır: HADOOP_HOME ve HADOOP_CLASSPATH
  • pom.xml içinde şu dependency eklenmelidir
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.2</version>
</dependency>

Çalıştırmak için kodların içine sağ tıklayıp güzel yeşil run üçgeni seçiyoruz.

IntelliJ consolde aşağıdakine benzer bir çıktı görmeliyiz.

3> 103,280.2,10.1,21.4,14.8
2> 52,100.4,9.6,3.6,10.7
4> 152,121,8.4,48.7,11.6
1> ID,TV,Radio,Newspaper,Sales
4> 153,197.6,23.3,14.2,16.6
1> 1,230.1,37.8,69.2,22.1
3> 104,187.9,17.2,17.9,14.7
. . .
. . .

Ayrıca sonucu hdfs komutları ile kontrol edelim.

[train@localhost ~]$ hdfs dfs -head /user/flink/advertising/2021-05-16--13/.part-3-0.inprogress.d05f6e10-b849-41b1-9307-7e623b53ec73
152,121,8.4,48.7,11.6
153,197.6,23.3,14.2,16.6
154,171.3,39.7,37.7,19
155,187.8,21.1,9.5,15.6
156,4.1,11.6,5.7,3.2
157,93.9,43.5,50.5,15.3
158,149.8,1.3,24.3,10.1
159,11.7,36.9,45.2,7.3
160,131.7,18.4,34.6,12.9
161,172.5,18.1,30.7,14.4
162,85.7,35.8,49.3,13.3
163,188.4,18.1,25.6,14.9

Okuduğumuzun aynısını yazmış görünüyoruz.

Belki başka daha iyi yöntemleri olabilir ancak şimdilik benim keşfedebildiğim bu. Ayrıca farklı formatlarda (parquet, avro, orc vb.) yazma ve sıkıştırma seçenekleri de muhakkak vardır. Yazıda bunlara değinemedik. StreamingFileSink içindeki seçeneklere ve bunların ne anlama geldiklerine de değinmedim. Bu yazıdaki temel kaygı iyi kötü farketmez basit bir Flink HDFS okuma yazma örneği paylaşmaktı. Zengin seçenekler artık bu temel yapının üzerine inşa edilebilir.

Başka bir yazıda görüşmek dileğiyle hoşça kalın.

Kapak Görseli:  by Benoit Gauzere on Unsplash

Yazar Hakkında
Toplam 174 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara