Veri Bilimi Okulu

Airflow Spark Kubernetes: Spark Uygulamasına Dışarıdan Argüman Göndermek
Airflow Spark Kubernetes: Spark Uygulamasına Dışarıdan Argüman Göndermek
airflow-spark-kubernetes-spark-uygulamasina-disaridan-arguman-gondermek_kapak_960x640

Loading

Bu yazıda Airflow, Spark, Kubernetes ile çalışan veri mühendisleri için oldukça pratik bir konuyu ele alacağız: Kubernetes üzerinde çalışan bir Apache Airflow ortamından, Spark uygulamalarına dinamik olarak argüman göndermeyi gerçek kod örnekleriyle birlikte inceleyeceğiz.

Eğer siz de modern bir data lakehouse mimarisi kuruyorsanız ve iş akışlarınızı orkestre etmek için Airflow kullanıyorsanız, bu yazı tam size göre.

Mimari Genel Bakış

Senaryo şu: bir veri kaynağından NYC Taksi verilerini (Parquet formatında) bir nesne depolama alanına (Object Storage) aktarıyoruz, bronz katman (bronze layer). Ardından Apache Spark ile bu ham veriyi dönüştürüp (transform), Apache Iceberg tablo formatında, Nessie katalog üzerinden Gümüş katmana (Silver Layer) yazıyoruz.

Bu mimari, veri mühendisliğinde popüler olan Madalyon Mimarisi (Medallion Architecture) desenine dayanıyor. Madalyon Mimarisi, veriyi bir data lakehouse içinde mantıksal olarak düzenlemek için kullanılan bir veri tasarım kalıbıdır (data design pattern) [1]. Veriler, Bronz (ham veri), Gümüş (doğrulanmış ve temizlenmiş veri) ve Altın (zenginleştirilmiş, iş kullanımına hazır veri) olmak üzere üç katmandan geçerek kalitesini aşamalı bir şekilde artırır [2].

Bizim senaryomuzda şu bileşenler devrede:

  • Apache Airflow: İş akışı orkestratörü (Workflow Orchestrator) — Kubernetes üzerinde çalışıyor
  • Spark Operatörü (Spark Operator): Kubernetes üzerinde Spark uygulamalarını yöneten bir CRD (Custom Resource Definition) [3]
  • Apache Iceberg: Veri gölü evi tablo formatı (Table Format) — ACID işlemleri, şema evrimi (schema evolution) ve zaman yolculuğu (time travel) desteği sağlıyor [4]
  • Nessie: Git benzeri sürüm kontrolü sunan bir Iceberg kataloğu (Catalog) [5]
  • S3-uyumlu Nesne Depolama (Object Storage): Bronz ve Gümüş katman verileri burada tutuluyor

Argüman gönderme mekanizması üç katmandan oluşuyor: Airflow DAG’ı → SparkApplication YAML → PySpark uygulaması. Şimdi bu zinciri adım adım inceleyelim.

Adım 1: Airflow DAG’ı — SparkKubernetesOperator ve DAG Parametreleri

İlk olarak Airflow tarafına bakalım. Burada SparkKubernetesOperator kullanıyoruz. Bu operatör, Kubernetes üzerinde doğrudan bir SparkApplication özel kaynağı (Custom Resource) oluşturan, Spark Operatörü ile entegre çalışan bir Airflow operatörüdür [17]. (Çok fazla operatör dedik açıklayalım 🙂 Kubernetes operator ile Airflow operator aynı şeyler değil farklı bunu bilelim). Yani bir Kubernetes pod’u başlatmak yerine, Spark Operatörü’ne bir iş tanımı (job specification) gönderiyoruz ve o da driver ile executor pod’larını bizim için yönetiyor.

DAG dosyamız:

from datetime import datetime, timedelta
from airflow.sdk import DAG
from airflow.sdk.definitions.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='load_facts_to_silver',
    default_args=default_args,
    description='Load facts data to Silver layer using Spark on Kubernetes',
    schedule=None,
    start_date=datetime(2026, 3, 1),
    catchup=False,
    max_active_runs=1,
    tags=['spark', 'kubernetes', 'silver', 'facts', 'triggered'],
    params={
        "start_date": Param(
            default="",
            type="string",
            description="Start date for fact trip (YYYY-MM-DD). Leave empty to use execution date - 1"
        ),
        "end_date": Param(
            default="",
            type="string",
            description="End date for fact trip (YYYY-MM-DD). Leave empty to use execution date - 1"
        ),
        "data_year_offset": Param(
            default="",
            type="string",
            description="Years of offset back to 2026 -> 2024"
        ),
        "fact_trip_overwrite_mode": Param(
            default="append",
            type="string",
            enum=["append", "overwrite"],
            description="append or overwrite table"
        )
    }
) as dag:

    ingest_task = SparkKubernetesOperator(
        task_id="load-facts",
        namespace="default",
        name="load-facts",
        application_file="spark_applications/write_fact_trip_sparkApplication.yaml",
        get_logs=True,
        do_xcom_push=False,
        success_run_history_limit=1,
        startup_timeout_seconds=600,
        log_events_on_failure=True,
        reattach_on_restart=True,
        delete_on_termination=True,
        kubernetes_conn_id="kubernetes_default",
        random_name_suffix=True
    )

Bu DAG’da üç kritik mekanizma var. Bunlara tek tek bakalım.

application_file — YAML ile Spark Tanımı

application_file="spark_applications/write_fact_trip_sparkApplication.yaml" satırı, DAG’ın en önemli parçasıdır. Bu parametre, Spark uygulamasının tüm yapılandırmasını (Spark konfigürasyonu, bağımlılıklar (maven coordinates, jars), argümanlar, driver/executor kaynakları) barındıran bir YAML dosyasına referans verir [15]. YAML dosyası, DAG’lar dizini (dags folder) altında bulunmalıdır [15]. application_file bir şablon alanıdır (template field), yani YAML dosyasının içindeki Jinja ifadeleri de Airflow tarafından çalışma zamanında değerlendirilir [17]. İşte bütün sihir burada gerçekleşiyor — birazdan YAML’ın içine dalacağız.

DAG Parametreleri (Params) — Elle Tetikleme Formu

DAG nesnesi içindeki params bloğu, DAG elle tetiklendiğinde Airflow arayüzünde kullanıcı dostu bir form oluşturur [28]. Tanımlanan her Param, formda bir giriş alanı olarak gösterilir. Kullanıcı girdiği değerler doğrulamadan geçmezse, Airflow bir uyarı gösterir ve DAG çalıştırmasını oluşturmaz [28]. schedule=None (burada @daily’de olabilir ancak ben bu DAG’ı başka bir DAG üzerinden tetikledim ve o kısım bu yazının kapsamı dışında kaldı o yüzden None) olan DAG’larda parametre doğrulaması DAG parsing sırasında değil, tetikleme anında yapılır — bu da zorunlu varsayılan değer tanımlamadan DAG oluşturabilmeyi sağlar [28].

Dört parametre tanımlıyoruz (sizin ihtiyacınıza kalmış benim bu dördüne ihtiyacım vardı veya öyle tasarladım diyelim):

  • start_date ve end_date: Varsayılan olarak boş. Boş kalırsa YAML’daki Jinja ifadesi otomatik tarih hesaplar. Ama kullanıcı geçmiş bir tarih aralığını yeniden işlemek istediğinde elle doldurabilir. Böylece kendi otomatik tetiklendiğinde bir gün öncesi tarihiyle çalışırken elle tetiklediğinizde istediğiniz bir tarih aralığında çalıştırabilirsiniz.
  • data_year_offset: Yıl ofsetini değiştirmek için. NYC Taksi verileri 2024’e ait, biz 2026’da çalıştırıyoruz — varsayılan 2 yıl geri ofset. Böyle birşey gerekli değilse 0 (sıfır) girip geçiyorum.
  • fact_trip_overwrite_mode: enum=["append", "overwrite"] tanımı sayesinde Airflow arayüzünde bir açılır menü (dropdown) olarak gösterilir [29]. Kullanıcının yalnızca geçerli değerlerden birini seçmesi garanti edilir. Dilerseniz kodda değişiklik yaparak upsert modu da ekleyebilirsiniz.

Diğer Önemli Parametreler

random_name_suffix=True: Pod adına rastgele bir sonek (suffix) ekleyerek ad çakışmalarını önler [17].

schedule=None: DAG kendi başına çalışmaz, yalnızca dışarıdan tetiklendiğinde çalışır. Bu, başka bir DAG’dan TriggerDagRunOperator ile veya Airflow arayüzünden elle tetikleme ile olabilir.

max_active_runs=1: Eşzamanlı (concurrent) çalışma sayısını bire sınırlar. Aynı Iceberg tablosuna paralel yazma çakışmalarını (write conflicts) engeller.

get_logs=True: Spark driver pod’unun günlüklerini (logs) Airflow arayüzünde gösterir [17].

reattach_on_restart=True: Airflow zamanlayıcısı (scheduler) çökerse, çalışan pod’a yeniden bağlanır. Operatör otomatik olarak pod’lara görev bağlam etiketleri (task context labels — dag_id, task_id, run_id) ekler [17].

Adım 2: SparkApplication YAML — Üç Katmanlı Öncelik Zinciri

Şimdi SparkKubernetesOperator‘ün referans verdiği YAML dosyasının içine bakalım. Bu dosya, Spark Operatörünün anlayacağı bir SparkApplication tanımıdır. Spark Operatörü, Kubernetes üzerinde bu tanıma göre driver ve executor pod’larını oluşturur [10].

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: initial-load-fact-trip
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "data-transformation-spark-iceberg-nessie:1.0"
  imagePullPolicy: IfNotPresent
  mainApplicationFile: local:///app/load_facts/load_trip_fact_initial_load.py
  sparkVersion: "3.5.3"
  timeToLiveSeconds: 3600

timeToLiveSeconds: 3600 ayarıyla SparkApplication tamamlandıktan 1 saat sonra ilgili tüm Kubernetes kaynakları otomatik olarak silinir.

Bağımlılıklar ve Spark Yapılandırması

Spark ile Iceberg ve Nessie entegrasyonu için gerekli bağımlılıkları deps bölümünde Maven paketleri olarak belirtiyoruz [11]:

deps:
    packages:
      - "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
      - "org.apache.iceberg:iceberg-spark-extensions-3.5_2.12:1.8.1"
      - "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.102.5"
      - "org.apache.hadoop:hadoop-aws:3.3.4"
      - "org.apache.hadoop:hadoop-common:3.3.4"
    repositories:
      - "https://repo1.maven.org/maven2"
  sparkConf:
    "spark.sql.catalog.nessie": "org.apache.iceberg.spark.SparkCatalog"
    "spark.sql.catalog.nessie.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog"
    "spark.sql.catalog.nessie.uri": "http://nessie.nessie.svc.cluster.local:19120/api/v2"
    "spark.sql.catalog.nessie.warehouse": "s3a://warehouse/"
    "spark.sql.catalog.nessie.ref": "main"
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions"

Nessie kataloğunun URI’sini, Kubernetes servis keşfi (service discovery) aracılığıyla küme içi DNS adı olarak belirttiğimize dikkat edin. Nessie, Iceberg tablolarının meta verilerini izleyen açık kaynaklı bir katalog olup, Git benzeri dallanma (branching) ve sürüm kontrolü özellikleri sunar [5].

sparkConf bölümünde ayrıca performans ve dayanıklılık ayarları da var:

"spark.kubernetes.driver.deleteOnTermination": "true"
"spark.kubernetes.executor.deleteOnTermination": "true"
"spark.sql.execution.timeout": "3600s"
"spark.network.timeout": "800s"
"spark.executor.heartbeatInterval": "20s"
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"

Argümanlar — Üç Katmanlı Öncelik Zinciri

Şimdi yazının en kritik kısmına geliyoruz: arguments bölümü. application_file bir şablon alanı (template field) olduğundan, YAML’daki Jinja ifadeleri Airflow tarafından çalışma zamanında değerlendirilir [17]. Burada DAG’daki params tanımlarıyla birlikte üç katmanlı bir öncelik zinciri (three-tier priority chain) ortaya çıkıyor:

arguments:
    - "--source_bucket"
    - "{{ var.value.source_bronze_bucket | default('bronze') }}"
    - "--namespace"
    - "{{ var.value.silver_namespace | default('nessie.silver') }}"
    - "--static_bronze_path"
    - "{{ var.value.static_bronze_path | default('nyc-taxi-data') }}"
    - "--start_date"
    - "{{ params.start_date if params.start_date else macros.ds_add(ds, -1) }}"
    - "--end_date"
    - "{{ params.end_date if params.end_date else macros.ds_add(ds, -1) }}"
    - "--data_year_offset"
    - "{{ params.data_year_offset if params.data_year_offset else (var.value.data_year_offset | default('2')) }}"
    - "--fact_table_name"
    - "{{ var.value.fact_trip_table_name | default('facttrip') }}"
    - "--overwride_mode"
    - "{{ params.fact_trip_overwrite_mode if params.fact_trip_overwrite_mode else (var.value.fact_trip_overwrite_mode | default('append')) }}"
    - "--access_key_id"
    - "{{ var.value.aws_access_key_id | default('rustfsadmin') }}"
    - "--secret_key"
    - "{{ var.value.aws_secret_access_key | default('rustfsadmin') }}"

Bu YAML’da üç farklı Jinja mekanizmasının iç içe geçtiğini görüyoruz. Bunu üç katmanlı öncelik zinciri olarak düşünebiliriz:

Katman 1 — DAG Parametreleri (params): En yüksek önceliğe sahip. {{ params.start_date if params.start_date else ... }} ifadesi, kullanıcı Airflow arayüzünden DAG’ı elle tetiklerken bir değer girdiyse onu kullanır. DAG’daki Param tanımları bu değerleri sağlar [28].

Katman 2 — Airflow Değişkenleri (var.value) veya Makrolar (macros): Parametre boş bırakıldığında devreye girer. Tarih argümanları için macros.ds_add(ds, -1) ile bir önceki gün otomatik hesaplanır [6]. Yapılandırma değerleri için var.value.data_year_offset ile Airflow değişken deposundan (key-value store) değer çekilir [8].

Katman 3 — Varsayılan Değerler (default): Hem parametre hem de Airflow değişkeni tanımsızsa, | default('2') ile sabit kodlanmış (hardcoded) bir varsayılan devreye girer.

Bu zinciri somut bir örnekle açıklayalım. --data_year_offset argümanını ele alalım:

  1. Kullanıcı Airflow arayüzünden DAG’ı tetiklerken data_year_offset = 3 girerse → 3 kullanılır (params öncelikli)
  2. Kullanıcı boş bırakırsa ama Airflow’da data_year_offset değişkeni 2 olarak tanımlıysa → 2 kullanılır (var.value devreye girer)
  3. Ne parametre ne değişken varsa → ‘2’ kullanılır (default son çare)

Tarih argümanları için ise zincir biraz farklı çalışır. --start_date örneği:

  1. Kullanıcı start_date = 2026-01-15 girerse → 2026-01-15 kullanılır (geçmiş veriyi yeniden işleme)
  2. Kullanıcı boş bırakırsa → macros.ds_add(ds, -1) ile bir önceki gün otomatik hesaplanır (günlük çalışma)

Bu tasarım üç farklı kullanım senaryosunu zarif bir şekilde destekliyor: günlük otomatik tetikleme (parametreler boş, makrolar devrede), elle yeniden işleme (Airflow arayüzünden belirli tarih ve mod seçerek), ve ortam bazlı yapılandırma (Airflow değişkenleri ile dev/staging/prod ayrımı).

Spark Kubernetes Operator Airflow Argüman Akış Diyagramı

ConfigMap ve Secret ile Ortam Değişkenleri (Environment Variables) Yönetimi

YAML dosyasında argümanların yanı sıra, driver ve executor pod’larına ortam değişkenleri de enjekte ediyoruz. Kubernetes’in iki temel yapılandırma mekanizmasını kullanıyoruz:

ConfigMap (hassas olmayan yapılandırma verileri):

driver:
    cores: 2
    coreLimit: "2000m"
    memory: "2048m"
    serviceAccount: spark
    env:
      - name: SOURCE_BRONZE_BUCKET
        valueFrom:
          configMapKeyRef:
            name: silver-transformations-configmap
            key: bronze-bucket-name
      - name: SILVER_NAMESPACE
        valueFrom:
          configMapKeyRef:
            name: silver-transformations-configmap
            key: silver-namespace

Secret (hassas kimlik bilgileri):

- name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: silver-transformations-secret
            key: access-key-id
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: silver-transformations-secret
            key: secret-key

Spark Operatörü kullanıcı kılavuzuna (User Guide) göre, hem driver hem de executor tanımlarında ConfigMap ve Secret montajları için isteğe bağlı alanlar mevcuttur [12]. Hassas olmayan yapılandırmaları ConfigMap’te, kimlik bilgilerini ise Secret’ta tutmak en iyi uygulama (best practice) olarak kabul edilir [13].

Önemli bir nokta: Aynı çevresel değişkenlerin hem driver hem de executor bölümlerinde tanımlanması gerekir. Spark kümesinde bunlar ayrı pod’larda çalışır ve her birinin bu yapılandırmalara erişmesi gerekir.

Peki neden hem arguments ile hem de ConfigMap/Secret ile değer gönderiyoruz? Argümanlar, Airflow’un çalışma zamanında dinamik olarak belirlediği değerler içindir (tarih aralıkları, yazma modu). ConfigMap ve Secret’lar ise Kubernetes seviyesinde yönetilen, daha az değişen altyapı yapılandırmaları içindir (erişim anahtarları, servis adresleri). PySpark uygulaması ise her iki kaynağı da argparse varsayılan değerleri aracılığıyla zarif bir şekilde birleştiriyor — bunu bir sonraki adımda göreceğiz.

Adım 3: PySpark Uygulaması — Argümanları Tüketme

Artık argümanların Airflow DAG’ından → YAML → Spark pod’una nasıl ulaştığını gördük. Şimdi PySpark tarafında bu argümanları nasıl aldığımıza ve kullandığımıza bakalım.

argparse ile Komut Satırı Argümanlarını Alma

import argparse, os

def parse_args():
    parser = argparse.ArgumentParser(description="Data Transformation.")

    parser.add_argument("--source_bucket",
        default=os.getenv("SOURCE_BRONZE_BUCKET"),
        help="Bronze Bucket Name")
    parser.add_argument("--namespace",
        default=os.getenv("SILVER_NAMESPACE"),
        help="Target iceberg namespace nessie.silver")
    parser.add_argument("--static_bronze_path",
        default=os.getenv("STATIC_BRONZE_PATH"),
        help="Source bronze bucket static path")
    parser.add_argument("--start_date",
        default=os.getenv("FACT_TRIP_BRONZE_START_DATE"),
        help="start date of fact trip")
    parser.add_argument("--end_date",
        default=os.getenv("FACT_TRIP_BRONZE_END_DATE"),
        help="end date of fact trip")
    parser.add_argument("--data_year_offset", type=int,
        default=int(os.getenv("DATA_YEAR_OFFSET", "0")),
        help="Year offset to apply to dates (e.g., 2 means 2026->2024)")
    parser.add_argument("--fact_table_name",
        default=os.getenv("FACT_TRIP_TABLE_NAME"),
        help="Target silver table default facttrip")
    parser.add_argument("--overwride_mode",
        default=os.getenv("FACT_TRIP_OVERWRITE_MODE"),
        help="overwrite for recreate, default append")
    parser.add_argument("--access_key_id",
        default=os.getenv("AWS_ACCESS_KEY_ID"),
        help="S3 AWS_ACCESS_KEY_ID")
    parser.add_argument("--secret_key",
        default=os.getenv("AWS_SECRET_ACCESS_KEY"),
        help="S3 AWS_SECRET_ACCESS_KEY")

    return parser.parse_args()

Burada çok güzel bir çift katmanlı varsayılan değer (dual-layer default) deseni var: argparse ile tanımlanan her argüman, önce komut satırından gelen değeri kontrol eder (yani YAML’daki arguments bölümünden Airflow aracılığıyla gelen); eğer yoksa os.getenv() ile çevresel değişkene (ConfigMap/Secret’tan gelen) geri düşer.

Bu esnekliği somut bir örnekle açıklayalım: Canlı (production) ortamında Airflow, YAML’daki arguments aracılığıyla --start_date 2026-03-15 gönderir ve bu değer önceliklidir. Ancak bir lokal geliştirme ortamında test yaparken, argüman göndermeden sadece FACT_TRIP_BRONZE_START_DATE=2026-03-15 çevresel değişkenini ayarlayarak aynı PySpark kodu çalıştırılabilir.

from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

def get_bronze_data(spark, bucket_name, path, start_date, end_date, year_offset=0):
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")

    if year_offset:
        start = start - relativedelta(years=year_offset)
        end = end - relativedelta(years=year_offset)

    num_days = (end - start).days + 1

    target_paths = [
        f"s3a://{bucket_name}/{path}/{(start + timedelta(days=i)).strftime('%Y/%m/%d')}/"
        for i in range(num_days)
    ]

    df = (spark.read
        .option("recursiveFileLookup", "true")
        .parquet(*target_paths)
    )
    return df

year_offset argümanına dikkat edin. YAML’dan gelen tarih (örneğin 2026-03-15), PySpark içinde year_offset=2 ile 2024-03-15’e dönüştürülüyor. Böylece S3 yolu s3a://bronze/nyc-taxi-data/2024/03/15/ olarak hesaplanıyor. Bu tür parametrelerin Airflow arayüzünden dışarıdan kontrol edilebilmesi, uygulamayı son derece esnek kılıyor.

Dönüşümler ve Iceberg Tablosuna Yazma

PySpark uygulaması ham veriyi okuduktan sonra bir dizi dönüşüm (transformation) uygular:

from pyspark.sql import functions as F

def main():
    args = parse_args()
    spark = get_spark_session(s3_access_key=args.access_key_id, s3_secret_key=args.secret_key)

    df = get_bronze_data(spark, bucket_name=args.source_bucket,
                    path=args.static_bronze_path,
                    start_date=args.start_date,
                    end_date=args.end_date,
                    year_offset=args.data_year_offset)

    # Benzersiz yolculuk kimliği oluştur
    df2 = df.withColumn("tripid", F.monotonically_increasing_id())

    # Tarih sütunlarını dakika hassasiyetinde sayısal formata dönüştür
    df3 = df2.withColumn("tpep_pickup_datetime_minute",
        F.date_format(F.col("tpep_pickup_datetime"), "yyyyMMddHHmm").cast("long"))

    df4 = df3.withColumn("tpep_dropoff_datetime_minute",
        F.date_format(F.col("tpep_dropoff_datetime"), "yyyyMMddHHmm").cast("long"))

    # Veri tipini düzelt
    df5 = df4.withColumn("RatecodeID", F.col("RatecodeID").cast("int"))

    # Iceberg tablosuna yaz
    write_to_iceberg(spark=spark, df=df5,
        table_name=args.fact_table_name, mode=args.overwride_mode)
def write_to_iceberg(spark, df, table_name, mode):
    table_exists = spark.catalog.tableExists(f"nessie.silver.{table_name}")

    if not table_exists:
        spark.createDataFrame([], df.schema) \
             .writeTo(f"nessie.silver.{table_name}").create()

    df.write.format("iceberg").mode(mode) \
        .save(f"nessie.silver.{table_name}")

Argüman Akış Özeti

Bir argümanın kaynağından Spark uygulamasına kadar yolculuğunu özetleyelim:

1. Kaynaklar: Üç farklı kaynaktan değer gelebilir — DAG Parametreleri (Airflow arayüzünden elle tetikleme) [28], Airflow Değişkenleri (merkezi anahtar-değer deposu) [8], ve Airflow Makroları (otomatik tarih hesaplama) [6].

2. SparkKubernetesOperator → YAML: Operatör, application_file ile referans verilen YAML dosyasını okur ve Jinja ifadelerini üç katmanlı öncelik zinciriyle değerlendirir: önce params, sonra var.value / macros, son olarak default [17].

3. SparkApplication → Kubernetes API: Değerlendirilmiş YAML, bir SparkApplication kaynağı olarak Kubernetes API sunucusuna gönderilir. Spark Operatörü bu kaynağı algılayıp driver ve executor pod’larını oluşturur [10].

4. arguments → Python argparse: Driver pod’u başladığında, YAML’daki arguments listesi komut satırı argümanları olarak PySpark betiğine iletilir.

5. ConfigMap/Secret → os.getenv() (Yedek): Argüman komut satırından gelmezse, argparse varsayılan değer olarak os.getenv() ile ConfigMap/Secret’tan gelen çevresel değişkeni kullanır.

En İyi Pratikler ve İpuçları

DAG Parametrelerini (Params) yeniden işleme senaryoları için kullanın. Param sınıfı ile elle tetiklemede kullanıcı dostu bir form oluşturabilirsiniz. enum özelliği ile geçerli değerleri sınırlayın (örneğin ["append", "overwrite"]). Varsayılan değerleri boş bırakarak günlük çalışmalarda makroların devreye girmesini sağlayın [28].

YAML dosyasını DAG’lar dizini altına koyun. SparkKubernetesOperator kullanırken YAML dosyası, DAG’lar dizininden erişilebilir olmalıdır [15]. Bizim örneğimizde spark_applications/write_fact_trip_sparkApplication.yaml şeklinde bir alt dizin yapısı kullanıyoruz.

Hassas veriyi asla YAML’a gömmeyin. Erişim anahtarları (access keys) ve gizli anahtarlar (secret keys) gibi hassas bilgileri Kubernetes Secret nesnelerinde saklayın. YAML dosyasında secretKeyRef ile referans verin [12].

Tarihler için makroları, yapılandırma için değişkenleri kullanın. macros.ds_add(ds, -1) gibi makrolar her çalışma için doğru tarihi hesaplar. Yapılandırma değerleri ise var.value ile yönetilir. Bu iki mekanizmayı karıştırmamak, bakım kolaylığı sağlar.

Driver ve executor yapılandırmasını senkronize tutun. YAML’da driver ve executor bölümlerindeki env değişkenlerini eşitlemeyi unutmayın.

reattach_on_restart=True ayarını kullanın. Airflow zamanlayıcısı çökerse, çalışan Spark işine yeniden bağlanabilirsiniz [17].

max_active_runs=1 ile eşzamanlılığı sınırlayın. Aynı Iceberg tablosuna yazan işlerde yazma çakışmalarını önler.

Sonuç

Bu yazıda, Kubernetes üzerinde Airflow ile Spark uygulamalarına argüman göndermenin mekanizmalarını gerçek örneklerle inceledik:

  • SparkKubernetesOperator + DAG Parametreleri (Params): Param sınıfı ile Airflow arayüzünden elle tetiklemede kullanıcı dostu form. enum ile geçerli değer kısıtlama.
  • SparkApplication YAML + Üç Katmanlı Öncelik Zinciri: paramsvar.value / macrosdefault sıralamasıyla Jinja şablonlama. Hem otomatik hem de elle tetikleme senaryolarını tek bir YAML’da destekleme.
  • ConfigMap ve Secret: Kubernetes yerel mekanizmaları ile çevresel değişken enjeksiyonu. Hassas olmayan yapılandırma → ConfigMap, kimlik bilgileri → Secret.
  • argparse + os.getenv(): PySpark uygulamasında çift katmanlı varsayılan değer deseni — hem komut satırı argümanlarını hem de çevresel değişkenleri destekleyerek esneklik sağlama.

Bu yaklaşım, kodunuzu hiç değiştirmeden farklı senaryolarda (farklı tarih aralıkları, farklı veri kaynakları, farklı yazma modları) çalıştırmanıza olanak tanır. Veri gölü evi mimarinizde esneklik ve sürdürülebilirlik istiyorsanız, bu desenleri benimsemenizi şiddetle tavsiye ederim. Eğer bu örnekleri uygulamalı olarak tecrübe etmek istiyorsanız VBO Data Engineering Project Bootcamp harika bir adres.

Bir sonraki yazıda görüşmek üzere!

Kaynaklar

[1] Databricks, “What is Medallion Architecture?”, https://www.databricks.com/glossary/medallion-architecture

[2] Microsoft Learn, “What is the medallion lakehouse architecture?”, https://learn.microsoft.com/en-us/azure/databricks/lakehouse/medallion

[3] Kubeflow, “Spark Operator – User Guide”, https://kubeflow.github.io/spark-operator/docs/user-guide.html

[4] Dremio, “Learn Apache Iceberg Locally with Spark, Nessie”, https://www.dremio.com/blog/hands-on-with-apache-iceberg-nessie-dremio-apache-spark/

[5] Project Nessie, “Nessie + Iceberg + Spark”, https://projectnessie.org/iceberg/spark/

[6] Apache Airflow Documentation, “Templates reference”, https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

[7] Astronomer, “Use Airflow templates”, https://www.astronomer.io/docs/learn/templating

[8] Reintech, “Parameterizing Workflows with Airflow Variables and Templating”, https://reintech.io/blog/airflow-variables-templating

[9] Apache Airflow Documentation, “KubernetesPodOperator”, https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html

[10] Kubeflow, “Quick Start Guide – Spark Operator”, https://kubeflow.github.io/spark-operator/docs/quick-start-guide.html

[11] Kubeflow, “Writing a SparkApplication”, https://www.kubeflow.org/docs/components/spark-operator/user-guide/writing-sparkapplication/

[12] Spark Operator GitHub, “User Guide – Mounting Secrets and ConfigMaps”, https://github.com/mesosphere/spark-on-k8s-operator/blob/master/docs/user-guide.md

[13] MinIO Blog, “Spark, MinIO and Kubernetes”, https://blog.min.io/spark-minio-kubernetes/

[14] AWS Blog, “Best practices for running Spark on Amazon EKS”, https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/

[15] Medium – Young Gyu Kim, “Running Spark Applications on Kubernetes with Spark Operator and Airflow 3.0”, https://medium.com/@nsalexamy/running-spark-applications-on-kubernetes-with-spark-operator-and-airflow-3-0-adf7d01a1023

[16] Apache Airflow Documentation, “Operators – Jinja Templating”, https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html

[17] Apache Airflow Documentation, “SparkKubernetesOperator API”, https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/spark_kubernetes/index.html

[18] Medium – OSDS, “Executing Spark Operator using Airflow on Kubernetes — Part 2”, https://medium.com/@howdyservices9/executing-spark-operator-using-airflow-on-kubernetes-part-2-62ae8fae7a7a

[19] Cloud Native Daily, “Apache Airflow, Spark, and Kubernetes for Streamlined Workflow Management”, https://medium.com/cloud-native-daily/apache-airflow-spark-and-kubernetes-for-streamlined-workflow-management-869c6b48a026

[20] lakefs.io, “Apache Iceberg Catalogs: Types & How to Choose”, https://lakefs.io/blog/iceberg-catalog/

[21] Conduktor, “Iceberg Catalog Management: REST, Hive, Glue, and Nessie”, https://www.conduktor.io/glossary/iceberg-catalog-management-hive-glue-and-nessie

[22] Medium – Young Gyu Kim, “Building a Production-Ready Data Lakehouse Locally”, https://medium.com/@nsalexamy/building-a-production-ready-data-lakehouse-locally-apache-iceberg-nessie-trino-and-spark-on-eea4445888ab

[23] Dremio, “Getting Started with Project Nessie, Apache Iceberg, and Apache Spark Using Docker”, https://www.dremio.com/getting-started-with-project-nessie-apache-iceberg-and-apache-spark-using-docker/

[24] lakefs.io, “Nessie Catalog: Key Features, Use Cases & How to Use”, https://lakefs.io/blog/nessie-catalog/

[25] meain.io Blog, “Templating things in Airflow DAG”, https://blog.meain.io/2021/airflow-template-in-dag

[26] GitHub Issue #16728, “SparkKubernetesOperator to support arguments”, https://github.com/apache/airflow/issues/16728

[27] Spark Operator API Docs, https://kubeflow.github.io/spark-operator/docs/api-docs.html

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character