Apache Spark, Minio, Nessie Catalog, Iceberg ve Docker ile Lakehouse Örneği

Veri ambarı (data warehouse) ile veri gölünün (data lake) en iyi yönlerini birleştirerek bizlere ilişkisel veri tabanı konforunu büyük veri üzerinde sunan lakehouse çözümleri gün geçtikçe hayatımızdaki yerini alıyor. Bugün burada tamamen açık kaynak kodlu bileşenleri kullanarak docker üzerinde basit bir lakehouse örneği yapacağız.

1. Altyapıyı Oluşturan Bileşenler

1.1. Nessie

Nessie veri gölleri için transactional katalog sunan Apache Hive Metastore işlevi gören açık kaynaklı bir projedir. Hive metastore ile arasındaki en büyük fark veri versiyonlaması (Git-inspired data version control) özelliğidir.

1.2. MinIO

MinIO Amazon S3 uyumlu popüler bir nesne tabanlı depolama (object storage) çözümüdür. Özellikle S3 kullanılamayan kapalı ortamlarda ve Kubernetes ile birlikte nesne tabanlı depolama ihtiyacını çözmektedir.

1.3. Apache Spark

Apache Spark açık kaynaklı dağıtık veri işleme motorudur. Veri mühendisliği, veri bilimi, veri analizi, graf analizi, makine öğrenmesi, gerçek zamanlı veri işleme gibi bir çok ihtiyaç için yaygın olarak kullanılmaktadır.

1.4. Apache Iceberg

Iceberg, büyük analitik tablolar için yüksek performanslı bir formattır. Iceberg, SQL tablolarının güvenilirliğini ve basitliğini büyük verilere taşırken Spark, Trino, Flink, Presto, Hive ve Impala gibi araçların aynı anda aynı tablolarla güvenli bir şekilde çalışmasını mümkün kılar.

2. Docker Compose ile Altyapıyı Ayağa Kaldırma

Aşağıdaki docker-compose.yaml dosyasını kullanarak örneğimiz için altyapıyı ayağa kaldıracağız.

version: "3"

services:
  # Nessie Catalog Server
  nessie:
    image: projectnessie/nessie:0.67.0
    container_name: nessie
    networks:
      vbo:
    ports:
      - 19120:19120
  # Minio
  minio:
    image: "minio/minio:RELEASE.2023-05-04T21-44-30Z"
    container_name: minio
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    networks:
      vbo:
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]
  # Spark
  spark:
    container_name: spark
    image: veribilimiokulu/pyspark-3.4.1_python-3.8:1.0
    ports:
      - "8888:8888"
      - "4041:4040"
    networks:
      - vbo
    volumes:
      - ./spark/examples:/opt/examples
    command: sleep infinity
networks:
  vbo:

Docker compose konteynırlarımızı ayağa kaldıralım.

docker-compose up -d

3. MinIO Web Arayüzü

http://localhost:9001/login adresinden kullanıcı: minioadmin,  şifre: minioadmin ile login olalım.

warehouse adında bir bucket oluşturalım.

4. Spark

Spark konteynıra bağlanalım.

docker exec -it spark bash

4.1. Jupyter Lab Çalıştırma

Jupyter lab kullanmak istiyorum. Bunun için jupyterlab paketini ve Jupyter’e Spark’ı göstermek için findspark paketlerini yükleyeceğim.

pip install jupyterlab findspark

Jupyter Lab’ı başlatalım

jupyter lab --ip 0.0.0.0 --port 8888 --allow-root

Info loglarındaki linki alıp tarayıcımıza yapıştırınca karşımızda Jupyter Lab’ı bulacağız.

4.2. Spark Uygulaması

İlk hücrede gerekli kütüphaneleri import edip Spark’ın yerini söyleyelim.

import findspark
findspark.init("/opt/spark/")
from pyspark.sql import SparkSession, functions as F

Önemli değişkenleri tanımlayalım. Gerekli deteylar yorum olarak mevcuttur.

# Spark'ın katalog olarak Nessie'ye erişeceği url
url = "http://nessie:19120/api/v1"

# Nessie tablolarının tutulacağı bucket
full_path_to_warehouse = 's3a://warehouse'

# Bessie için kullanacağımız branch
ref = "main"

# Nessie authentication türü. Diğer seçenekler (NONE, BEARER, OAUTH2 or AWS)
auth_type = "NONE"

# AWS S3 yerine MinIO kullandığımız için. Spark'a amazona gitme burada kal demek için.
s3_endpoint = "http://minio:9000"

# MinIO'ya erişim için. Bunlar root olarak docker-compose içinde belirtiliyor. Bu haliyle canlı ortamlarda kullanılmamalıdır.
accessKeyId='minioadmin'
secretAccessKey='minioadmin'

4.2.1. Spark Konfigürasyon

SarkSession oluştururken bazı konfigürasyonlar kullanacağız.

spark = (
    SparkSession.builder
    .master("local[2]")
    .appName("Spark Nessie Iceberg Demo")
    .config("spark.driver.memory", "2g")
    # Nessie, Iceberg, Delta, MinIO (S3) paketlerini uygun versiyonları ile buraya eklemeliyiz.
    .config('spark.jars.packages',
            'org.apache.hadoop:hadoop-aws:3.3.0,io.delta:delta-core_2.12:2.4.0,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.4_2.12:0.75.0')
    # MinIO (S3) erişimi için gerekli anahtarlar.
    .config("spark.hadoop.fs.s3a.access.key", accessKeyId)
    .config("spark.hadoop.fs.s3a.secret.key", secretAccessKey)
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Spark Amazon S3 varsayılan API'sine değil lokaldeki MinIO'ya gitsin.
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint)
    # Spark extensions arasından Iceberg ve Nessie
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    # Spark Nessie'yi nerede bulacak onun adresi
    .config("spark.sql.catalog.nessie.uri", url)
    # Hangi branch ile çalışacak
    .config("spark.sql.catalog.nessie.ref", ref)
    # Nessie'ye her gelen birşey sormasın. Hangi auth yöntemi ile sorulacak. Burada yok.
    .config("spark.sql.catalog.nessie.authentication.type", auth_type)
    # Katalog nessie olsun.
     .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    # Spark katalog implementasyonu iceberg.nessie olsun. Varsayılan kendi lokali
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    # Varsayılan warehouse adresini minio s3 warehouse bucket gösteriyoruz.
    .config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse)
    .getOrCreate()
)

4.2.2. Spark Dataframe

Github’dan bir veri seti okuyup spark dataframe oluşturalım.

from pyspark import SparkFiles
sc = spark.sparkContext
github_url="https://raw.githubusercontent.com/erkansirin78/datasets/master/Churn_Modelling.csv"
sc.addFile(github_url)
df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"),header= True, inferSchema=True)
df.show(3)

4.2.3.Nessie Namespace ve Tablo Oluşturma

spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.demo;")

spark.sql("DROP TABLE IF EXISTS nessie.demo.churn;")

spark.createDataFrame([], df.schema).writeTo("nessie.demo.churn").create()

4.2.4.Spark Dataframe’i Nessie Tabloya Yazma

Dataframe’i tabloya yazma

df.write.format("iceberg").mode("overwrite") \
    .save("nessie.demo.churn")

4.2.4. Nessie Tablo’dan Spark ile Okuyarak Spark Dataframe Oluşturma

Yazdığımız tablodan okyup tekrar dataframe oluşturma.

df_from_iceberg = spark.table("nessie.demo.churn")
df_from_iceberg.show()

Bu yazımızda açık kaynak araçlarla lakehouse üzerinde  bir tablo oluşturma ve bu tabloya spark ile okuma yazma örneği yapmış olduk. Başka bir yazıda görüşene dek hoşçakalın.

Notebook ve compose dosyası burada.

Not: Bu yazının hazırlanmasında [1, 2] yazılarından çok faydalandım teşekkür ederim.

Kapak Foto: Photo by Alexander Hafemann on Unsplash

Kaynaklar

  1. https://www.dremio.com/a-notebook-for-getting-started-with-project-nessie-apache-iceberg-and-apache-spark/
  2. https://medium.com/@khurrammeraj17/creating-a-lakehouse-by-using-apache-spark-minio-nessie-catalog-and-dremio-67c23a335616

Yazar Hakkında
Toplam 174 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara