
![]()
Bu yazıda Airflow Spark Kubernetes ile çalışan veri mühendisleri için oldukça pratik ve sık karşılaşılan bir konuyu ele alacağız: Kubernetes üzerinde çalışan bir Apache Airflow ortamından, Spark uygulamalarına dinamik olarak argüman göndermeyi gerçek kod örnekleriyle birlikte inceleyeceğiz.
Eğer siz de modern bir veri gölü evi (Data Lakehouse) mimarisi kuruyorsanız ve iş akışlarınızı (workflows) orkestre etmek için Airflow kullanıyorsanız, bu yazı tam size göre.
Mimari Genel Bakış
Senaryo şu: GitHub (başka bir veri kaynağı olabilir önemli değil) üzerinde barındırılan NYC Taksi verilerini (Parquet formatında) bir nesne depolama alanına (Object Storage) aktarıyoruz. Ardından Apache Spark ile bu ham veriyi dönüştürüp (transform), Apache Iceberg tablo formatında, Nessie katalog üzerinden Gümüş katmana (Silver Layer) yazıyoruz.
Bu mimari, veri mühendisliğinde popüler olan Madalyon Mimarisi (Medallion Architecture) desenine dayanıyor. Madalyon Mimarisi, veriyi bir veri gölü evi içinde mantıksal olarak düzenlemek için kullanılan bir veri tasarım kalıbıdır (data design pattern) [1]. Veriler, Bronz (ham veri), Gümüş (doğrulanmış ve temizlenmiş veri) ve Altın (zenginleştirilmiş, iş kullanımına hazır veri) olmak üzere üç katmandan geçerek kalitesini aşamalı bir şekilde artırır [2].
Bizim senaryomuzda şu bileşenler devrede:
- Apache Airflow: İş akışı orkestratörü (Workflow Orchestrator) — Kubernetes üzerinde çalışıyor
- Spark Operatörü (Spark Operator): Kubernetes üzerinde Spark uygulamalarını yöneten bir CRD (Custom Resource Definition) [3]
- Apache Iceberg: Veri gölü evi tablo formatı (Table Format) — ACID işlemleri, şema evrimi (schema evolution) ve zaman yolculuğu (time travel) desteği sağlıyor [4]
- Nessie: Git benzeri sürüm kontrolü sunan bir Iceberg kataloğu (Catalog) [5]
- S3-uyumlu Nesne Depolama (Object Storage): Bronz ve Gümüş katman verileri burada tutuluyor
Veri akış hattımız (pipeline) zincirleme iki DAG’dan oluşuyor. Birinci DAG (data_ingestion_github_to_s3_daily_dag) her gün çalışarak ham veriyi GitHub’dan çekip Bronz katmana alıyor ve ardından TriggerDagRunOperator ile ikinci DAG’ı tetikliyor. İkinci DAG (load_facts_to_silver) ise SparkKubernetesOperator aracılığıyla Spark uygulamasını başlatıyor, Bronz katmandaki veriyi dönüştürüp Gümüş katmana Iceberg tablosu olarak yazıyor. Şimdi bu zinciri adım adım inceleyelim.
Adım 1: Günlük Veri Alım DAG’ı — KubernetesPodOperator ve TriggerDagRunOperator
İlk DAG, her gün çalışarak GitHub’dan ham Parquet dosyalarını çekip Bronz katmana (Bronze Layer) aktarıyor. Burada iki operatör birlikte çalışıyor: KubernetesPodOperator veri alımını yapıyor, TriggerDagRunOperator ise başarılı alımdan sonra Spark işini tetikliyor.
Airflow, şablonlama motoru olarak Jinja2 kullanır. Çift süslü parantez {{ }} içine yazılan ifadeler çalışma zamanında (runtime) değerlendirilir [6]. Bu sayede görev parametrelerini (task parameters) dinamik olarak belirleyebiliyoruz [7].
İşte günlük veri alım DAG dosyamız:
from airflow.sdk import dag
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.models import Variable
from datetime import datetime, timedelta
@dag(
schedule="@daily",
start_date=datetime(2024, 3, 1),
catchup=False,
tags=["ingestion", "github", "s3"],
description="From Github parquet file to S3",
)
def data_ingestion_github_to_s3_daily_dag():
pod_arguments = [
"--token", "{{ var.value.GITHUB_TOKEN }}",
"--date", "{{ macros.ds_add(ds, -731) }}",
"--owner", "{{ var.value.REPO_OWNER }}",
"--repo", "{{ var.value.REPO_NAME }}",
"--path-prefix", "{{ var.value.PATH_PREFIX }}",
"--bucket", "{{ var.value.BRONZE_BUCKET }}",
"--s3-prefix", "{{ var.value.BRONZE_BUCKET_S3_PREFIX }}",
"--endpoint", "{{ var.value.S3_ENDPOINT }}"
]
ingest_task = KubernetesPodOperator(
task_id="run_ingestion_container",
name="github-to-s3-ingester",
namespace="airflow",
image="github_to_s3_ingest:1.0",
image_pull_policy="IfNotPresent",
cmds=["python", "/app/01_github_to_s3.py"],
arguments=pod_arguments,
get_logs=True,
container_resources={
"requests": {"cpu": "200m", "memory": "512Mi"},
"limits": {"cpu": "500m", "memory": "1Gi"}
},
is_delete_operator_pod=True,
in_cluster=True,
retries=3,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=10),
startup_timeout_seconds=300,
security_context={
"run_as_user": 1000,
"run_as_group": 1000,
"fs_group": 1000
},
)
trigger_spark_job = TriggerDagRunOperator(
task_id="trigger_spark_processing",
trigger_dag_id="load_facts_to_silver",
wait_for_completion=False,
reset_dag_run=True,
)
# Görev bağımlılığı: önce alım, sonra Spark tetikleme
ingest_task >> trigger_spark_job
return [ingest_task, trigger_spark_job]
dag_instance = data_ingestion_github_to_s3_daily_dag()
Burada neler oluyor?
Bu DAG’da üç önemli mekanizma var:
1. Airflow Değişkenleri (Airflow Variables) ile Argüman Gönderme: pod_arguments listesinde {{ var.value.GITHUB_TOKEN }} gibi ifadeler, Airflow’un merkezi anahtar-değer deposundan (key-value store) çalışma zamanında değerlerle yer değiştirir [8]. Bu sayede kod değiştirmeden farklı ortamlar ve yapılandırmalar arasında geçiş yapabiliyoruz.
2. Airflow Makroları (Macros) ile Tarih Hesaplama: {{ macros.ds_add(ds, -731) }} ifadesine dikkat edin. Burada ds, Airflow’un mantıksal çalışma tarihini (logical execution date) YYYY-MM-DD formatında temsil eden yerleşik bir şablon değişkenidir (template variable) [6]. macros.ds_add ise bu tarihe gün ekleyip çıkaran bir yardımcı fonksiyondur [6]. -731 değeri (yaklaşık 2 yıl) ile güncel tarihten 2 yıl öncesinin verisini çekiyoruz. Bu, NYC Taksi verilerinin 2024 yılına ait olması nedeniyle 2026’dan geriye eşleme (mapping) yapmamızı sağlıyor.
3. TriggerDagRunOperator ile DAG Zincirleme: İşte boru hattımızın en kritik noktalarından biri. TriggerDagRunOperator, bir DAG’dan başka bir DAG’ı tetiklememizi sağlar. Burada trigger_dag_id="load_facts_to_silver" ile Spark dönüşüm DAG’ını tetikliyoruz. wait_for_completion=False ayarıyla tetikleme görevinin Spark işinin bitmesini beklemeden başarılı sayılmasını sağlıyoruz. reset_dag_run=True ise aynı gün için zaten bir çalışma varsa sıfırlayıp yeniden başlatır.
Görev bağımlılığı (task dependency) ingest_task >> trigger_spark_job satırıyla tanımlanıyor. Bu, Spark işinin ancak veri alımı başarıyla tamamlandıktan sonra tetikleneceğini garanti eder. Böylece Bronz katmana veri yazılmadan Spark dönüşümünün çalışması engelleniyor.
Bronz Katman Python Betiği
KubernetesPodOperator’ün çalıştırdığı Python betiği, GitHub API’sinden Parquet dosyalarını çekip S3-uyumlu nesne depolama alanına yükler:
import requests
import boto3
import argparse
import os
from datetime import datetime, timedelta
def parse_args():
parser = argparse.ArgumentParser(
description="Migrate Parquet files from GitHub to S3/MinIO for a date range."
)
parser.add_argument("--token", default=os.getenv("GITHUB_TOKEN"), help="GitHub PAT")
parser.add_argument("--start-date", required=True, help="Start date in YYYY-MM-DD")
parser.add_argument("--end-date", required=True, help="End date in YYYY-MM-DD")
parser.add_argument("--owner", default="erkansirin78", help="GitHub Repo Owner")
parser.add_argument("--repo", default="datasets", help="GitHub Repo Name")
parser.add_argument("--path-prefix", default="yellow_tripdata_partitioned_by_day")
parser.add_argument("--bucket", default="bronze", help="Target S3/MinIO Bucket")
parser.add_argument("--s3-prefix", default="nyc-taxi-data", help="S3 Folder Prefix")
parser.add_argument("--endpoint", default="http://localhost:30902", help="S3 Endpoint URL")
return parser.parse_args()
def process_single_day(s3_client, headers, args, current_date):
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")
github_path = f"{args.path_prefix}/year={year}/month={int(month)}/day={int(day)}"
s3_dest_prefix = f"{args.s3_prefix}/{year}/{month}/{day}/"
api_url = f"https://api.github.com/repos/{args.owner}/{args.repo}/contents/{github_path}"
response = requests.get(api_url, headers=headers)
if response.status_code != 200:
return
for file_info in response.json():
if file_info['name'].endswith('.parquet'):
s3_key = f"{s3_dest_prefix}{file_info['name']}"
with requests.get(file_info['download_url'], headers=headers, stream=True) as r:
r.raise_for_status()
s3_client.upload_fileobj(r.raw, args.bucket, s3_key)Bu betik, DAG’daki pod_arguments aracılığıyla gelen argümanları argparse ile alıyor. Airflow’un Jinja şablonlama ile gönderdiği --token, --start-date, --bucket gibi değerler, çalışma zamanında gerçek değerlere dönüşüyor. Burada da Spark betiğinde göreceğimiz aynı argparse + os.getenv() çift katmanlı varsayılan değer (dual-layer default) desenini görüyoruz.
Adım 2: Spark Dönüşüm DAG’ı — SparkKubernetesOperator ile Tetiklenen Orkestrasyon
Bronz katmana veri aktarıldıktan sonra, birinci DAG’daki TriggerDagRunOperator ikinci DAG’ı tetikliyor. Burada SparkKubernetesOperator devreye giriyor. Bu operatör, Kubernetes kümesinde doğrudan bir SparkApplication özel kaynağı (Custom Resource) oluşturan, Spark Operatörü ile entegre çalışan bir Airflow operatörüdür [17].
İşte Spark dönüşüm DAG’ımız:
from datetime import datetime, timedelta
from airflow.sdk import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='load_facts_to_silver',
default_args=default_args,
description='Load facts data to Silver layer using Spark on Kubernetes (triggered by ingestion DAG)',
schedule=None, # Zamanlanmış değil, tetikleniyor
start_date=datetime(2026, 3, 1),
catchup=False,
max_active_runs=1,
tags=['spark', 'kubernetes', 'silver', 'facts', 'triggered']
) as dag:
ingest_task = SparkKubernetesOperator(
task_id="load-facts",
namespace="default",
name="load-facts",
application_file="spark_applications/write_fact_trip_sparkApplication.yaml",
get_logs=True,
do_xcom_push=False,
success_run_history_limit=1,
startup_timeout_seconds=600,
log_events_on_failure=True,
reattach_on_restart=True,
delete_on_termination=True,
kubernetes_conn_id="kubernetes_default",
random_name_suffix=True
)Neden schedule=None?
Bu DAG’ın en dikkat çekici özelliği schedule=None ayarıdır. Bu, DAG’ın kendi başına bir zamanlama ile çalışmadığı, yalnızca dışarıdan tetiklendiğinde çalıştığı anlamına gelir. Birinci DAG’daki TriggerDagRunOperator bu DAG’ı tetikliyor. Bu desen sayesinde veri alımı başarıyla tamamlanmadan Spark dönüşümünün çalışması imkansız hale geliyor — yani veri tutarlılığı (data consistency) garanti altına alınmış oluyor.
KubernetesPodOperator vs SparkKubernetesOperator — Fark ne?
Birinci DAG’da KubernetesPodOperator kullandık, ikinci DAG’da ise SparkKubernetesOperator kullanıyoruz. Bu iki operatör arasındaki farkı anlamak çok önemli.
KubernetesPodOperator genel amaçlı bir operatördür. Herhangi bir Docker imajını (image) bir Kubernetes pod’u olarak çalıştırır ve pod’un komut satırı argümanlarını (command-line arguments) doğrudan arguments parametresine Jinja şablonları ile geçebilirsiniz [9]. Veri alım gibi Spark gerektirmeyen görevler için idealdir.
SparkKubernetesOperator ise Spark Operatörü ile entegre çalışan, özelleştirilmiş bir operatördür. Kubernetes kümesinde bir SparkApplication özel kaynağı oluşturur [17]. En kritik parametresi application_file‘dır: bu parametre, Spark uygulamasının tüm yapılandırmasını barındıran bir YAML dosyasına referans verir [15]. application_file bir şablon alanıdır (template field), yani YAML dosyasının içindeki Jinja ifadeleri de Airflow tarafından çalışma zamanında değerlendirilir [17].
Bu ayrımı bir tablo ile özetleyelim:
| Özellik | KubernetesPodOperator | SparkKubernetesOperator |
|---|---|---|
| Kullanım amacı | Genel amaçlı konteyner çalıştırma | Spark uygulamalarını çalıştırma |
| Argüman gönderme | arguments parametresi (doğrudan) | YAML dosyasındaki arguments bölümü |
| Pod yönetimi | Tek bir pod oluşturur | Sürücü + yürütücü pod’larını yönetir |
| Spark Operatörü | Gerekmez | Gerekir (CRD olarak kurulmalı) |
| Yapılandırma | Operatör parametrelerinde | Harici YAML dosyasında |
application_file — Sihir Burada
DAG’daki application_file="spark_applications/write_fact_trip_sparkApplication.yaml" satırına dikkat edin. Bu YAML dosyası, DAG’lar dizini (dags folder) altında bulunmalıdır [15]. Airflow, DAG çalıştırıldığında bu YAML dosyasını okur, içindeki Jinja şablonlarını değerlendirir ve sonucu bir SparkApplication kaynağı olarak Kubernetes API’sine gönderir.
DAG’daki diğer önemli parametrelere de göz atalım:
get_logs=True: Spark sürücü pod’unun günlüklerini (logs) alıp Airflow arayüzünde gösterir [17].reattach_on_restart=True: Airflow zamanlayıcısı (scheduler) çökerse, çalışan pod’a yeniden bağlanır. Bu özellik aktifken operatör, sürücü ve yürütücü pod’larına otomatik olarak Airflow görev bağlam etiketleri (task context labels — dag_id, task_id, run_id) ekler [17].delete_on_termination=True: İş tamamlandığında kaynakları otomatik temizler.random_name_suffix=True: Pod adına rastgele bir sonek (suffix) ekleyerek ad çakışmalarını önler [17].max_active_runs=1: DAG düzeyinde eşzamanlı (concurrent) çalışma sayısını bire sınırlar. Bu, aynı Iceberg tablosuna paralel yazma çakışmalarını engeller.
Adım 3: SparkApplication YAML — Argümanları Tanımlama
Şimdi SparkKubernetesOperator‘ün referans verdiği YAML dosyasının içine bakalım. Bu dosya, Spark Operatörünün anlayacağı bir SparkApplication tanımıdır. Spark Operatörü, Kubernetes kümesinde SparkApplication türünde özel kaynak nesneleri (Custom Resource) oluşturan bir bileşendir [3]. YAML formatında bir tanım dosyası hazırlıyoruz ve Spark Operatörü bu tanıma göre sürücü (driver) ve yürütücü (executor) pod’larını oluşturuyor [10].
İşte YAML dosyamızın temel yapısı:
apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: initial-load-fact-trip namespace: default spec: type: Python pythonVersion: "3" mode: cluster image: "data-transformation-spark-iceberg-nessie:1.0" imagePullPolicy: IfNotPresent mainApplicationFile: local:///app/load_facts/load_trip_fact_initial_load.py sparkVersion: "3.5.3" timeToLiveSeconds: 3600
timeToLiveSeconds: 3600 ayarıyla SparkApplication tamamlandıktan 1 saat sonra ilgili tüm Kubernetes kaynakları otomatik olarak silinir. Bu, kümedeki kaynak birikimini önler.
Bağımlılıklar (Dependencies) ve Spark Yapılandırması (Spark Configuration)
Spark ile Iceberg ve Nessie entegrasyonu için gerekli bağımlılıkları deps bölümünde Maven paketleri olarak belirtiyoruz. Spark yapılandırma parametresinin spark.jars.packages ayarı, Maven koordinatlarını kullanarak belirtilen bağımlılıkları ve geçişli (transitive) bağımlılıkları çeker [11]:
deps:
packages:
- "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
- "org.apache.iceberg:iceberg-spark-extensions-3.5_2.12:1.8.1"
- "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.102.5"
- "org.apache.hadoop:hadoop-aws:3.3.4"
- "org.apache.hadoop:hadoop-common:3.3.4"
repositories:
- "https://repo1.maven.org/maven2"
sparkConf:
"spark.sql.catalog.nessie": "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.nessie.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog"
"spark.sql.catalog.nessie.uri": "http://nessie.nessie.svc.cluster.local:19120/api/v2"
"spark.sql.catalog.nessie.warehouse": "s3a://warehouse/"
"spark.sql.catalog.nessie.ref": "main"
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions"Bu yapılandırmada Nessie kataloğunun URI’sini, Kubernetes servis keşfi (service discovery) aracılığıyla küme içi DNS adı olarak belirttiğimize dikkat edin: http://nessie.nessie.svc.cluster.local:19120/api/v2. Nessie, Iceberg tablolarının meta verilerini izleyen açık kaynaklı bir katalog olup, Git benzeri dallanma (branching) ve sürüm kontrolü özellikleri sunar [5].
sparkConf bölümünde ayrıca performans ve dayanıklılık ayarları da var:
"spark.kubernetes.driver.deleteOnTermination": "true"
"spark.kubernetes.executor.deleteOnTermination": "true"
"spark.sql.execution.timeout": "3600s"
"spark.network.timeout": "800s"
"spark.executor.heartbeatInterval": "20s"
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"Bu ayarlar, sürücü ve yürütücü pod’larının iş bitiminde otomatik silinmesini, SQL işlemleri için 1 saatlik zaman aşımını (timeout), ve 128MB’lık uyarlanabilir bölüm boyutunu (adaptive partition size) sağlıyor.
Argümanlar (Arguments) Bölümü — Jinja Şablonlama ve Makrolar ile
Şimdi en kritik kısma geliyoruz: arguments bölümü. SparkKubernetesOperator‘ün application_file alanı bir şablon alanı (template field) olduğundan, YAML dosyasındaki Jinja ifadeleri DAG çalışma zamanında Airflow tarafından otomatik olarak değerlendirilir [17]:
arguments:
- "--source_bucket"
- "{{ var.value.source_bronze_bucket | default('bronze') }}"
- "--namespace"
- "{{ var.value.silver_namespace | default('nessie.silver') }}"
- "--static_bronze_path"
- "{{ var.value.static_bronze_path | default('nyc-taxi-data') }}"
- "--start_date"
- "{{ macros.ds_add(ds, -1) }}"
- "--end_date"
- "{{ macros.ds_add(ds, -1) }}"
- "--data_year_offset"
- "{{ var.value.data_year_offset | default('2') }}"
- "--fact_table_name"
- "{{ var.value.fact_trip_table_name | default('facttrip') }}"
- "--overwride_mode"
- "{{ var.value.fact_trip_overwrite_mode | default('append') }}"
- "--access_key_id"
- "{{ var.value.aws_access_key_id | default('rustfsadmin') }}"
- "--secret_key"
- "{{ var.value.aws_secret_access_key | default('rustfsadmin') }}"Bu bölümde birden fazla Jinja mekanizmasının bir arada kullanıldığını görüyoruz:
Airflow Makroları (Macros): {{ macros.ds_add(ds, -1) }} ifadesi, DAG’ın mantıksal çalışma tarihinden (logical execution date) bir gün çıkarır [6]. Böylece Spark işi her zaman bir önceki günün verisini işler. Bu, veri alım DAG’ının günlük çalışma düzenine mükemmel uyum sağlar — bugün çalışan alım DAG’ı dünün verisini getirir, tetiklenen Spark DAG’ı da aynı günü hedefler.
Airflow Değişkenleri (Variables) + Varsayılan Değerler (Defaults): {{ var.value.source_bronze_bucket | default('bronze') }} deseni ile Airflow değişkeni tanımlı değilse varsayılan bir değer kullanılmasını sağlıyoruz. Bu, hem esneklik hem de dayanıklılık sağlayan çok güzel bir pratiktir.
Makro ve Değişken Kullanımı Arasındaki Fark: Tarih argümanlarında macros.ds_add kullanılırken diğer yapılandırma değerlerinde var.value kullanılıyor. Bu bilinçli bir tasarım tercihidir: tarihler çalışma zamanı bağlamına (runtime context) bağlıdır ve her DAG çalışmasında otomatik olarak değişir; diğer yapılandırma değerleri ise ortama (environment) bağlıdır ve elle yönetilir.
Akışı somutlaştıralım: Birinci DAG 16 Mart 2026 tarihinde çalışıp Spark DAG’ını tetiklediğinde, YAML’daki {{ macros.ds_add(ds, -1) }} ifadesi 2026-03-15 olarak değerlendirilir. PySpark uygulamasına --start_date 2026-03-15 ve --end_date 2026-03-15 argümanları gönderilir. Uygulama içindeki year_offset=2 sayesinde gerçek veri yolu s3a://bronze/nyc-taxi-data/2024/03/15/ olarak hesaplanır.
ConfigMap ve Secret ile Çevresel Değişken Yönetimi
YAML dosyasında argümanların yanı sıra, sürücü ve yürütücü pod’larına çevresel değişkenler (environment variables) de enjekte ediyoruz. Burada Kubernetes’in iki temel yapılandırma mekanizmasını kullanıyoruz:
ConfigMap (hassas olmayan yapılandırma verileri):
driver:
cores: 2
coreLimit: "2000m"
memory: "2048m"
serviceAccount: spark
env:
- name: SOURCE_BRONZE_BUCKET
valueFrom:
configMapKeyRef:
name: silver-transformations-configmap
key: bronze-bucket-name
- name: SILVER_NAMESPACE
valueFrom:
configMapKeyRef:
name: silver-transformations-configmap
key: silver-namespaceSecret (hassas kimlik bilgileri):
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: silver-transformations-secret
key: access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: silver-transformations-secret
key: secret-key
Spark Operatörü kullanıcı kılavuzuna (User Guide) göre, hem sürücü hem de yürütücü tanımlarında ConfigMap ve Secret montajları için isteğe bağlı alanlar mevcuttur [12]. Hassas olmayan yapılandırmaları ConfigMap’te, kimlik bilgilerini ise Secret’ta tutmak en iyi uygulama (best practice) olarak kabul edilir [13].
Dikkat edilmesi gereken nokta: Sürücü 2 çekirdek (core) ve 2048MB bellek (memory), yürütücüler ise 2 çekirdek, 2048MB bellek ve 2 kopya (instance) olarak yapılandırılmış. Ayrıca aynı çevresel değişkenlerin hem sürücü hem de yürütücü bölümlerinde tanımlanması gerekiyor çünkü Spark kümesinde bunlar ayrı pod’larda çalışır.
Peki neden hem arguments ile hem de ConfigMap/Secret ile değer gönderiyoruz? Burada bir çift katmanlı yapılandırma stratejisi (dual-layer configuration strategy) var. Argümanlar, Airflow’un çalışma zamanında dinamik olarak belirlediği değerler için kullanılırken (tarih aralıkları, yazma modu gibi), ConfigMap ve Secret’lar Kubernetes seviyesinde yönetilen ve daha az değişen altyapı yapılandırmaları için kullanılıyor (erişim anahtarları, servis adresleri gibi). PySpark uygulaması ise her iki kaynağı da argparse varsayılan değerleri aracılığıyla zarif bir şekilde birleştiriyor — bunu bir sonraki adımda göreceğiz.
Adım 4: PySpark Uygulaması — Argümanları Tüketme
Artık argümanların Airflow’dan → TriggerDagRunOperator → SparkKubernetesOperator → YAML → Spark pod’una nasıl ulaştığını gördük. Şimdi PySpark tarafında bu argümanları nasıl aldığımıza ve kullandığımıza bakalım.
argparse ile Komut Satırı Argümanlarını Alma
import argparse, os
def parse_args():
parser = argparse.ArgumentParser(description="Data Transformation.")
parser.add_argument("--source_bucket",
default=os.getenv("SOURCE_BRONZE_BUCKET"),
help="Silver Bucket Name")
parser.add_argument("--namespace",
default=os.getenv("SILVER_NAMESPACE"),
help="Target iceberg namespace nessie.silver")
parser.add_argument("--start_date",
default=os.getenv("FACT_TRIP_BRONZE_START_DATE"),
help="start date of fact trip")
parser.add_argument("--end_date",
default=os.getenv("FACT_TRIP_BRONZE_END_DATE"),
help="end date of fact trip")
parser.add_argument("--data_year_offset", type=int,
default=int(os.getenv("DATA_YEAR_OFFSET", "0")),
help="Year offset to apply to dates")
parser.add_argument("--fact_table_name",
default=os.getenv("FACT_TRIP_TABLE_NAME"),
help="Target silver table default facttrip")
parser.add_argument("--overwride_mode",
default=os.getenv("FACT_TRIP_OVERWRITE_MODE"),
help="overwrite for recreate, default append")
parser.add_argument("--access_key_id",
default=os.getenv("AWS_ACCESS_KEY_ID"),
help="S3 AWS_ACCESS_KEY_ID")
parser.add_argument("--secret_key",
default=os.getenv("AWS_SECRET_ACCESS_KEY"),
help="S3 AWS_SECRET_ACCESS_KEY")
return parser.parse_args()Burada çok güzel bir çift katmanlı varsayılan değer (dual-layer default) deseni görüyoruz: argparse ile tanımlanan her argüman, önce komut satırından (yani YAML’daki arguments bölümünden Airflow aracılığıyla gelen değeri) kontrol eder; eğer yoksa os.getenv() ile çevresel değişkene (ConfigMap/Secret’tan gelen değere) geri düşer. Bu tasarım, uygulamayı hem Airflow orkestratörü üzerinden hem de doğrudan spark-submit komutuyla çalıştırılabilir kılar.
Bu esnekliği somut bir örnekle açıklayalım: Üretim (production) ortamında Airflow, YAML’daki arguments aracılığıyla --start_date 2026-03-15 gönderir (macros.ds_add ile hesaplanmış) ve bu değer önceliklidir. Ancak bir geliştirici yerel ortamda test yaparken, argüman göndermeden sadece FACT_TRIP_BRONZE_START_DATE=2026-03-15 çevresel değişkenini ayarlayarak aynı PySpark betiğini çalıştırabilir.
Bronz Katmandan Veri Okuma
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
def get_bronze_data(spark, bucket_name, path, start_date, end_date, year_offset=0):
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
if year_offset:
start = start - relativedelta(years=year_offset)
end = end - relativedelta(years=year_offset)
num_days = (end - start).days + 1
target_paths = [
f"s3a://{bucket_name}/{path}/{(start + timedelta(days=i)).strftime('%Y/%m/%d')}/"
for i in range(num_days)
]
df = (spark.read
.option("recursiveFileLookup", "true")
.parquet(*target_paths)
)
return dfBurada year_offset argümanının kullanımına dikkat edin. YAML’daki {{ macros.ds_add(ds, -1) }} ile gelen tarih (örneğin 2026-03-15), PySpark içinde year_offset=2 ile 2024-03-15’e dönüştürülüyor. Böylece S3 yolu s3a://bronze/nyc-taxi-data/2024/03/15/ olarak oluşturuluyor. Bu tarih hesaplama zinciri — Airflow makrosu → argüman → PySpark içi ofset — boru hattının farklı katmanlarında tarihin nasıl yönetildiğinin güzel bir örneği.
Dönüşümler ve Iceberg Tablosuna Yazma
PySpark uygulaması ham veriyi okuduktan sonra bir dizi dönüşüm (transformation) uygular:
from pyspark.sql import functions as F
def main():
args = parse_args()
spark = get_spark_session(s3_access_key=args.access_key_id, s3_secret_key=args.secret_key)
df = get_bronze_data(spark, bucket_name=args.source_bucket,
path=args.static_bronze_path,
start_date=args.start_date,
end_date=args.end_date,
year_offset=args.data_year_offset)
# Benzersiz yolculuk kimliği oluştur
df2 = df.withColumn("tripid", F.monotonically_increasing_id())
# Tarih sütunlarını dakika hassasiyetinde sayısal formata dönüştür
df3 = df2.withColumn("tpep_pickup_datetime_minute",
F.date_format(F.col("tpep_pickup_datetime"), "yyyyMMddHHmm").cast("long"))
df4 = df3.withColumn("tpep_dropoff_datetime_minute",
F.date_format(F.col("tpep_dropoff_datetime"), "yyyyMMddHHmm").cast("long"))
# Veri tipini düzelt
df5 = df4.withColumn("RatecodeID", F.col("RatecodeID").cast("int"))
# Iceberg tablosuna yaz
write_to_iceberg(spark=spark, df=df5, table_name=args.fact_table_name, mode=args.overwride_mode)def write_to_iceberg(spark, df, table_name, mode):
table_exists = spark.catalog.tableExists(f"nessie.silver.{table_name}")
if not table_exists:
spark.createDataFrame([], df.schema) \
.writeTo(f"nessie.silver.{table_name}").create()
df.write.format("iceberg").mode(mode) \
.save(f"nessie.silver.{table_name}")mode parametresi de dışarıdan argüman olarak geliyor (--overwride_mode). Böylece aynı Spark uygulamasını hem append hem de overwrite modunda çalıştırabiliyorsunuz — kod değiştirmeden, sadece Airflow değişkenini güncelleyerek.
Argüman Akış Diyagramı
Toparlayalım: Bir argüman, kaynağından Spark uygulamasına kadar şu yolculuğu izler:
1. Kaynak → Airflow Değişkeni / Makro: Değişkenler Airflow arayüzünden tanımlanır. Tarihler ise ds ve macros.ds_add gibi yerleşik makrolarla otomatik hesaplanır [6].
2. Veri Alım DAG’ı Çalışır: KubernetesPodOperator, Jinja şablonlarıyla argüman alarak veriyi Bronz katmana aktarır.
3. TriggerDagRunOperator → Spark DAG: Veri alımı başarılı olursa, TriggerDagRunOperator ikinci DAG’ı (load_facts_to_silver) tetikler. Spark DAG’ı schedule=None olduğundan yalnızca bu tetikleme ile çalışır.
4. SparkKubernetesOperator → YAML Şablon İşleme: Operatör, application_file ile referans verilen YAML dosyasını okur ve Jinja ifadelerini (hem var.value hem macros.ds_add) gerçek değerleriyle değiştirir [17].
5. SparkApplication → Kubernetes API: Değerlendirilmiş YAML, bir SparkApplication kaynağı olarak Kubernetes API sunucusuna gönderilir. Spark Operatörü bu kaynağı algılayıp sürücü ve yürütücü pod’larını oluşturur [10].
6. arguments → Python argparse: Sürücü pod’u başladığında, YAML’daki arguments listesi komut satırı argümanları olarak PySpark betiğine iletilir.
7. ConfigMap/Secret → os.getenv() (Yedek): Argüman komut satırından gelmezse, argparse varsayılan değer olarak os.getenv() ile ConfigMap/Secret’tan gelen çevresel değişkeni kullanır.
En İyi Uygulamalar ve İpuçları
Bu mimariyi kurarken dikkat etmenizi önerdiğim birkaç nokta var:
DAG zincirleme için TriggerDagRunOperator kullanın. Boru hattınızın aşamalarını ayrı DAG’lar olarak tasarlayıp TriggerDagRunOperator ile bağlamak, her aşamayı bağımsız olarak test etmenize, izlemenize ve yeniden çalıştırmanıza olanak tanır. Tetiklenen DAG’da schedule=None ayarını kullanarak yalnızca dışarıdan tetiklenmeyle çalışmasını sağlayın.
YAML dosyasını DAG’lar dizini altına koyun. SparkKubernetesOperator kullanırken YAML dosyası, DAG’lar dizininden (dags folder) erişilebilir olmalıdır [15]. Bizim örneğimizde spark_applications/write_fact_trip_sparkApplication.yaml şeklinde bir alt dizin yapısı kullanıyoruz.
Hassas veriyi asla YAML’a gömmeyin. Erişim anahtarları (access keys) ve gizli anahtarlar (secret keys) gibi hassas bilgileri Kubernetes Secret nesnelerinde saklayın. YAML dosyasında secretKeyRef ile referans verin. Spark Operatörü kılavuzuna göre, Secret montajı için secretType alanının Generic olarak ayarlanması yeterlidir [12].
Tarihler için makroları, yapılandırma için değişkenleri kullanın. macros.ds_add(ds, -1) gibi makrolar, her çalışma için otomatik olarak doğru tarihi hesaplar. Yapılandırma değerleri (kova adı, tablo adı, yazma modu) ise var.value ile yönetilir. Bu iki mekanizmayı karıştırmamak, bakım kolaylığı sağlar.
Sürücü ve yürütücü yapılandırmasını senkronize tutun. Spark kümesinde her iki tarafın da aynı çevresel değişkenlere erişmesi gerekir. YAML’da driver ve executor bölümlerindeki env değişkenlerini eşitlemeyi unutmayın.
reattach_on_restart=True ayarını kullanın. Airflow zamanlayıcısı (scheduler) beklenmedik şekilde çökerse, bu ayar sayesinde çalışan Spark işine yeniden bağlanabilirsiniz [17].
Kaynak sınırlarını (resource limits) iş yüküne göre ayarlayın. Bizim örneğimizde sürücü 2 çekirdek / 2GB, yürütücüler 2 çekirdek / 2GB / 2 kopya olarak ayarlanmış. Kubernetes üzerinde Spark çalıştırırken kaynak yalıtımı (resource isolation) kritiktir [14].
max_active_runs=1 ile eşzamanlılığı sınırlayın. Özellikle aynı Iceberg tablosuna yazan Spark işleri için eşzamanlı çalışma sayısını bire sınırlamak, yazma çakışmalarını (write conflicts) önler.
Sonuç
Bu yazıda, Kubernetes üzerinde Airflow ile Spark işlerini orkestre ederken argüman göndermenin farklı yaklaşımlarını gerçek bir boru hattı üzerinde inceledik:
- KubernetesPodOperator + TriggerDagRunOperator (Bronz katman — Veri Alım ve Tetikleme):
argumentsparametresine doğrudan Jinja şablonları ile değer gönderme.macros.ds_addile tarih hesaplama. Başarılı alım sonrası Spark DAG’ını tetikleme. - SparkKubernetesOperator (Gümüş katman — Veri Dönüşümü):
schedule=Noneile yalnızca tetiklenerek çalışan DAG. Harici YAML dosyası (application_file) aracılığıyla SparkApplication tanımı. YAML içinde hemvar.valuedeğişkenleri hemmacros.ds_addmakroları ile Jinja şablonlama. - ConfigMap ve Secret: Kubernetes yerel mekanizmaları ile çevresel değişken enjeksiyonu. Hassas olmayan yapılandırma → ConfigMap, kimlik bilgileri → Secret.
- argparse + os.getenv(): PySpark uygulamasında çift katmanlı varsayılan değer deseni — hem komut satırı argümanlarını hem de çevresel değişkenleri destekleyerek esneklik sağlama.
Bu yaklaşım, kodunuzu hiç değiştirmeden farklı senaryolarda (farklı tarih aralıkları, farklı veri kaynakları, farklı yazma modları) çalıştırmanıza olanak tanır. Veri gölü evi mimarinizde esneklik ve sürdürülebilirlik istiyorsanız, bu desenleri benimsemenizi şiddetle tavsiye ederim. Eğer bu örnekleri uygulamalı olarak tecrübe etmek istiyorsanız VBO Data Engineering Project Bootcamp harika bir adres.
Bir sonraki yazıda görüşmek üzere!
Kaynaklar
[1] Databricks, “What is Medallion Architecture?”, https://www.databricks.com/glossary/medallion-architecture
[2] Microsoft Learn, “What is the medallion lakehouse architecture?”, https://learn.microsoft.com/en-us/azure/databricks/lakehouse/medallion
[3] Kubeflow, “Spark Operator – User Guide”, https://kubeflow.github.io/spark-operator/docs/user-guide.html
[4] Dremio, “Learn Apache Iceberg Locally with Spark, Nessie”, https://www.dremio.com/blog/hands-on-with-apache-iceberg-nessie-dremio-apache-spark/
[5] Project Nessie, “Nessie + Iceberg + Spark”, https://projectnessie.org/iceberg/spark/
[6] Apache Airflow Documentation, “Templates reference”, https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
[7] Astronomer, “Use Airflow templates”, https://www.astronomer.io/docs/learn/templating
[8] Reintech, “Parameterizing Workflows with Airflow Variables and Templating”, https://reintech.io/blog/airflow-variables-templating
[9] Apache Airflow Documentation, “KubernetesPodOperator”, https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html
[10] Kubeflow, “Quick Start Guide – Spark Operator”, https://kubeflow.github.io/spark-operator/docs/quick-start-guide.html
[11] Kubeflow, “Writing a SparkApplication”, https://www.kubeflow.org/docs/components/spark-operator/user-guide/writing-sparkapplication/
[12] Spark Operator GitHub, “User Guide – Mounting Secrets and ConfigMaps”, https://github.com/mesosphere/spark-on-k8s-operator/blob/master/docs/user-guide.md
[13] MinIO Blog, “Spark, MinIO and Kubernetes”, https://blog.min.io/spark-minio-kubernetes/
[14] AWS Blog, “Best practices for running Spark on Amazon EKS”, https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/
[15] Medium – Young Gyu Kim, “Running Spark Applications on Kubernetes with Spark Operator and Airflow 3.0”, https://medium.com/@nsalexamy/running-spark-applications-on-kubernetes-with-spark-operator-and-airflow-3-0-adf7d01a1023
[16] Apache Airflow Documentation, “Operators – Jinja Templating”, https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
[17] Apache Airflow Documentation, “SparkKubernetesOperator API”, https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/spark_kubernetes/index.html
[18] Medium – OSDS, “Executing Spark Operator using Airflow on Kubernetes — Part 2”, https://medium.com/@howdyservices9/executing-spark-operator-using-airflow-on-kubernetes-part-2-62ae8fae7a7a
[19] Cloud Native Daily, “Apache Airflow, Spark, and Kubernetes for Streamlined Workflow Management”, https://medium.com/cloud-native-daily/apache-airflow-spark-and-kubernetes-for-streamlined-workflow-management-869c6b48a026
[20] lakefs.io, “Apache Iceberg Catalogs: Types & How to Choose”, https://lakefs.io/blog/iceberg-catalog/
[21] Conduktor, “Iceberg Catalog Management: REST, Hive, Glue, and Nessie”, https://www.conduktor.io/glossary/iceberg-catalog-management-hive-glue-and-nessie
[22] Medium – Young Gyu Kim, “Building a Production-Ready Data Lakehouse Locally”, https://medium.com/@nsalexamy/building-a-production-ready-data-lakehouse-locally-apache-iceberg-nessie-trino-and-spark-on-eea4445888ab
[23] Dremio, “Getting Started with Project Nessie, Apache Iceberg, and Apache Spark Using Docker”, https://www.dremio.com/getting-started-with-project-nessie-apache-iceberg-and-apache-spark-using-docker/
[24] lakefs.io, “Nessie Catalog: Key Features, Use Cases & How to Use”, https://lakefs.io/blog/nessie-catalog/
[25] meain.io Blog, “Templating things in Airflow DAG”, https://blog.meain.io/2021/airflow-template-in-dag
[26] GitHub Issue #16728, “SparkKubernetesOperator to support arguments”, https://github.com/apache/airflow/issues/16728
[27] Spark Operator API Docs, https://kubeflow.github.io/spark-operator/docs/api-docs.html