Apache Spark Elasticsearch Entegrasyonu

Merhabalar. Bu yazımızda csv dosyasından okuduğumuz bir veri setini Elasticsearch’e bir index olarak yazacağız ve yazdığımız index’i Spark ile okuyacağız.

1. Giriş

Apache Spark ve Elasticsearch büyük veri dünyasının en popüler araçlarından ikisi. Spark Elasticsearch ile harika işler çıkarabilirsiniz. Spark ile büyük veriyi işleyebilirken, Elasticserach ile büyük veriyi aranabilir, analiz edilebilir ve görselleştirilebilir bir şekilde depoluyoruz. Spark ile hem parti parti (batch) hem de akan veri (streaming) modunda Elasticsearch’e veri yazabiliriz. Uygulamamız parti (batch) modunda olacaktır. Yani okuduğumuz verinin tamamını bir solukta yazacağız.

Uygulama esnasında kullanılan ortam bilgileri:

  • İşletim sistemi: Windows 10 64 bit
  • Java: Java 8
  • Kod geliştirme: Intellij IDEA
  • Proje türü: Maven
  • Programlama dili: Scala 2.11
  • Spark: 2.4.0
  • Elasticsearch: 7.5.0
  • Kibana: 7.5.0

2. Uygulama

2.1. Veri seti hakkında bilgi

Veri seti, 1990 Kaliforniya nüfus sayımından alınan bilgileri içermektedir. Spesifik olarak bir konut fiyatını tahmin etmenize yardımcı olmasa da, makine öğreniminin temelleri hakkında bilgi vermek ve pratik yaptırmak için sıkça kullanılan bir veri setidir. Detaylı bilgiyi buradan edinebilirsiniz.

2.2. SparkSession Oluşturma

Intellij IDEA’da maven projesini açtım ve SparkESIntegrationBatch adında bir object oluşturdum. Takip eden kodlar ile Log seviyesini ERROR yapalım ve SparkSession oluşturalım. SparkSession Spark uygulamalarının giriş kapısıdır.

import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.log4j.{Logger, Level}

object SparkESIntegrationBatch extends App {
  Logger.getLogger("org").setLevel(Level.ERROR)

  val spark = SparkSession.builder
    .master("local[2]")
    .appName("SparkESIntegrationBatch")
    .getOrCreate()

  import spark.implicits._

// Following codes will be here

}

SparkSession oluşturduktan sonra veri setimizi okuyalım ve bir dataset/dataframe oluşturalım. Veri setine buradan ulaşabilirsiniz.

2.3. Veriyi Okuma

  // Read data
  val df = spark.read.format("csv")
    .option("header", true)
    .option("sep",",")
    .option("inferSchema", true)
    .load("D:\\Datasets\\housing.csv")

2.4. Veri Keşfi

Okuduğumuz verinin ilk 5 satırına bir göz atalım:

df.show(5)

Çıktı:

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+

Dataset olan df nasıl bir şemaya sahip bakalım.

df.printSchema()

Çıktı şu şekilde olacaktır:

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

Şemadan gördüğümüz gibi koordinatlar iki farklı sütun altında tutulmuş (longitude, latitude). Biz bunları Elasticsearch’e tek bir sütun altında göndermeliyiz. Niçin derseniz, Elasticsearch coğrafi koordinatlar için özel bir veri türü belirlemiş, biz de ona uymak durumundayız. geo_point adındaki bu veri türünün farklı yazım türlerini buradan öğrenebilirsiniz.

2.5. Veri Temizliği

Burada veri temizliği adına yapacağım tek şey boş değerler var ise onları düşürmek olacak. df’ten null değerleri düşürdükten sonra yeni bir dataset oluşacaktır. Bunu artık df2 ile tutacağım. Ve sonrasında satır sayılarına bakarak null dolayısı ile ne kadar satır silinmiş bakacağım.

// drop nulls
  val df2 = df.na.drop()

 // row counts before drop nulls
  println("row counts before drop nulls: " + df.count())

  // row counts after drop nulls
  println("row counts after drop nulls: " + df2.count())

Çıktı:

row counts before drop nulls: 20640
row counts after drop nulls: 20433

2.6. Veri Dönüştürme

Veri dönüştürme olarak tek ihtiyacım veri keşfinde bahsettiğimiz coğrafi koordinatları tek sütun altında toplamak. Bunun için case class ve kullanıcı tanımlı fonksiyon (udf) kullanacağım. Dönüşüm sonrasında yeni bir dataset oluşacak ve onu da df3 olarak tutacağım.

// datastructure to keep geolocation (fields must be named lat and lon)
  case class GeoPoint(lat: Double, lon: Double)
  // A function to transform two separate column into one single column (location)
  val makeGoepointForES = (latitude: Double, longitude: Double) => {
    GeoPoint(latitude, longitude)
  }
  // register function
  val makeGoepointForESFunc = F.udf(makeGoepointForES)

  // Put "latitude","longitude" column into one column named location
  val df3 = df2.withColumn("location", makeGoepointForESFunc('latitude, 'longitude))

Dönüşüm sonrası ne oldu bir görelim:

df3.show(5)

Çıktı:

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|        location|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[37.88, -122.23]|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[37.86, -122.22]|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[37.85, -122.24]|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|[37.85, -122.25]|
|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|[37.85, -122.25]|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------------+

Evet dönüşüm başarıyla tamamlandı. Şimdi longitude ve latitude sütunlarını düşürebilirim. Düşürme sonrası oluşacak dataset df4 olacaktır.

// If transformation operation successfull, drop "latitude","longitude" columns
  val df4 = df3.drop("latitude","longitude")

2.7. Elasticsearch Index ve Mapping Oluşturma

Spark tarafında verimiz hazır. Elasticsearch’e yazmadan önce bu tarafta son bir hazırlık yapmamız gerekir. Şimdi Kibana Dev Tools’u açalım. Burası aslında Elasticsearch için shell fonksiyonu görüyor. Buradan Elasticsearch ile her türlü etkileşimi sağlamak mümkün. Aşağıdaki kodlar ile hem Elasticsearch’te housing adında bir index (ilişkisel veri tabanlarındaki bir tablo olarak düşünebilirsiniz) hem de bu index’e ait mapping (şema) oluşturuyorum. Elbette bunu kafamıza göre değil Spark tarafındaki verinin yapısına göre oluşturacağız. Bu arada size bir ipucu vereyim. Aslında bu işlemi yapmadan doğrudan Spark ile Elasticsearch’e yazabilirsiniz ve Elasticsearch gelen veriden çıkarımda bulunarak kendine mapping oluşturur. Ancak bu mapping her zaman bizim istediğimiz gibi olmayabilir. Bu nedenle doğrudan yazıp mapping oluştuktan sonra onu kopyalayıp düzenleyebiliriz. Daha sonra eski index’i silip düzenlediğimiz mapping ile tekrar index oluşturabiliriz. Bu yöntem daha akıllıca ve pratik bir yöntem olur.

PUT /housing
{
    "mappings" : {
      "properties" : {
        "households" : {
          "type" : "float"
        },
        "housing_median_age" : {
          "type" : "float"
        },
        "location" : {
         "type": "geo_point"
        },
        "median_house_value" : {
          "type" : "float"
        },
        "median_income" : {
          "type" : "float"
        },
        "ocean_proximity" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "population" : {
          "type" : "float"
        },
        "total_bedrooms" : {
          "type" : "float"
        },
        "total_rooms" : {
          "type" : "float"
        }
      }
    }
  }

2.8. Spark ile Elasticsearch’e Yazma

Yüzdük yüzdük ve kuyruğuna geldik. Şimdi yazmak için hazırız. Yazmadan önce lütfen Elasticsearch tarafından maven repository’e yüklenen dependency’i pom.xml dosyanıza eklemeyi unutmayın.

 <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>7.5.0</version>
</dependency>

Yazma başlasın…

 df4.write
    .format("org.elasticsearch.spark.sql")
    .mode("overwrite")
    .option("es.nodes", "cloudera")
    .option("es.port","9200")
    .save("housing")

Herhangi bir hata almadıysanız başarıyla yazdığınız anlamına gelir.

2.9. Yazma Kontrolü

Yazma sonucunun kontrolü için tekrar Kibana Dev Tool’a gideli ve aşağıdaki komut ile Elasticsearch’teki mevcut tüm index’leri listeleyelim.

GET /_cat/indices?v

Çıktı:

health status index		uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   housing   oFa94aWrQ1KbuD9Xl2OfBw   1   1      20433            0      2.7mb          2.7mb

Gördüğümüz gibi 20.433 satır housing index’inde duruyor. Aslında bu tarafta satır dememeliyiz. Spark tarafındaki bir satır bu tarafta oldu bir belge (document).

2.10. Spark ile Elasticsearch’den Veri Okuma

Şimdi buraya kadar gelmişken Spark ile Elasticsearch’den veri okumadan gitmek olmaz. Bunca yoldan sonra aslında veri okumak o kadar da zor değil. Yazdığımız kod bloğunu birazcık düzenleyerek bunu yapabiliriz.

val dfFromES = spark.read.format("org.elasticsearch.spark.sql")
    .option("es.nodes", "cloudera")
    .option("es.port","9200")
    .load("housing")

  dfFromES.show(5)

Çıktı:

+----------+------------------+----------------+------------------+-------------+---------------+----------+--------------+-----------+
|households|housing_median_age|        location|median_house_value|median_income|ocean_proximity|population|total_bedrooms|total_rooms|
+----------+------------------+----------------+------------------+-------------+---------------+----------+--------------+-----------+
|     126.0|              41.0|[37.88, -122.23]|          452600.0|       8.3252|       NEAR BAY|     322.0|         129.0|      880.0|
|    1138.0|              21.0|[37.86, -122.22]|          358500.0|       8.3014|       NEAR BAY|    2401.0|        1106.0|     7099.0|
|     177.0|              52.0|[37.85, -122.24]|          352100.0|       7.2574|       NEAR BAY|     496.0|         190.0|     1467.0|
|     219.0|              52.0|[37.85, -122.25]|          341300.0|       5.6431|       NEAR BAY|     558.0|         235.0|     1274.0|
|     259.0|              52.0|[37.85, -122.25]|          342200.0|       3.8462|       NEAR BAY|     565.0|         280.0|     1627.0|
+----------+------------------+----------------+------------------+-------------+---------------+----------+--------------+-----------+

Gayet başarılı. Bu yazımızda Spark ile Elasticsearch’e nasıl veri yazılacağını ve Elasticsearch’den nasıl veri okunacağını gördük. Yazı ile ilgili tüm kodlara VBO reposundaki bu bağlantıdan ulaşabilirsiniz. Başka bir yazıda görüşünceye dek hoşçakalın…

Yazar Hakkında
Toplam 177 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara