
Veri işleme süreçlerinde tekrar edilebilirlik (idempotency) ve depolama sistemlerinden bağımsız çalışabilme, güvenilir mimarilerin olmazsa olmaz özellikleridir. Apache Airflow bu noktada yalnızca işleri orkestre eden bir araç değil, aynı zamanda bu tür ihtiyaçları karşılayan mekanizmalar sunan bir platformdur. Bu yazıda Airflow’un iki önemli kavramı üzerinde duracağız: Airflow Context ve Airflow Object Storage.
Airflow Context
Airflow’da her görev çalıştırıldığında beraberinde bir context nesnesi gelir. Bu nesne bir sözlük yapısındadır ve görevle ilgili ayrıntılı bilgiler barındırır:
- DAG’ın çalıştırılma zamanı
- Task Instance (ti) bilgisi
- Parametreler
- Çalıştırma kimliği (run_id)
Bu bilgiler sayesinde görevler, yalnızca yazılan kodla değil, aynı zamanda çalıştırıldıkları bağlama göre davranış sergileyebilir.
Context’in kullanım alanları
- Görevler arası veri paylaşımı (XCom):
Bir görevden diğerine veri aktarmak içinti.xcom_pull()
kullanılabilir. - Tarih bilgisinin alınması:
Çoğu senaryoda üretilen dosyaların çalıştırma tarihine göre adlandırılması gerekir.datetime.now()
kullanmak yerine context içindekilogical_date
veyarun_after
değerleri tercih edilmelidir. - Dinamik akış yönetimi:
Çalıştırma günü, saat ya da parametresine göre farklı işlemlerin tetiklenmesi mümkündür.
Context bu yönüyle Airflow’da idempotency sağlamak için en kritik araçlardan biridir.
Airflow Context ve Jinja Template
Airflow’da birçok parametre (özellikle BashOperator
, PythonOperator
veya custom operator’ların parametreleri) Jinja templating ile context’ten gelen değerlerle doldurulabilir.
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="example_jinja_context", start_date=datetime(2025, 1, 1), schedule_interval="@daily", catchup=False, ): run_report = BashOperator( task_id="run_report", bash_command="echo 'Bugünün tarihi: {{ ds }}' > /tmp/report_{{ ds_nodash }}.txt", )
Burada:
{{ ds }}
→ DAG’ın execution date’i (YYYY-MM-DD
formatında){{ ds_nodash }}
→ tire işaretsiz tarih (YYYYMMDD
)
Bu değerler context’ten otomatik gelir. Yani bash_command
içinde Python kodu yazmadan context kullanabilmiş oluyoruz.
Sık Kullanılan Jinja Context Değişkenleri
{{ ds }}
→ Çalıştırma tarihi (2025-09-23){{ ds_nodash }}
→ 20250923{{ prev_ds }}
→ Bir önceki çalıştırma tarihi{{ next_ds }}
→ Bir sonraki çalıştırma tarihi{{ run_id }}
→ Çalıştırmanın benzersiz kimliği{{ macros }}
→ Tarih işlemleri için fonksiyonlar
Idempotency’nin Önemi
Idempotency, aynı girdiden her zaman aynı çıktıyı üretmek anlamına gelir. Veri mühendisliğinde bu özellik, güvenilirlik açısından büyük önem taşır.
Örneğin:
- Çıktı dosya adını
datetime.now()
ile oluşturmak → idempotent değildir. - Çıktı dosya adını
context["dag_run"].logical_date
ile oluşturmak → idempotenttir.
Bu yaklaşım, bir görevin tekrar çalıştırıldığında farklı sonuç üretmesini engeller ve veri bütünlüğünü korur.
Airflow Object Storage
Airflow’un Object Storage yapısı, farklı dosya sistemleri üzerinde tek tip bir erişim imkânı sağlar. Yerel disk, S3, GCS, Azure Blob veya HDFS gibi sistemlere aynı kod yapısıyla erişmek mümkündür.
Bunu sağlayan temel sınıf ObjectStoragePath
’tir. Dosya yolları file://
, s3://
, gs://
gibi öneklerle tanımlanır.
Örneğin:
from airflow.sdk import ObjectStoragePath store = ObjectStoragePath( "file://outputs/reports", conn_id=None ) (store / "example.txt").write_text("Deneme raporu")
Bağlantı bilgisini değiştirmek yeterlidir; kod tarafında ek değişiklik yapılmaz. Bu da uygulamaların farklı ortamlara taşınmasını kolaylaştırır.
Örnek: Günlük Satış Raporu
Context ve Object Storage birlikte kullanıldığında idempotent ve taşınabilir bir iş akışı elde edilir. Aşağıdaki örnekte günlük satış verilerinden bir rapor üretilmekte ve depolama sistemine kaydedilmektedir.
import os from airflow.sdk import asset, ObjectStoragePath STORAGE_SYSTEM = os.getenv("OBJECT_STORAGE_SYSTEM", "file") STORAGE_PATH = os.getenv("OBJECT_STORAGE_PATH", "outputs/reports") @asset def generate_sales_report(context: dict): """ Günlük satış raporunu hazırlar. """ # Depolama bağlantısı store = ObjectStoragePath( f"{STORAGE_SYSTEM}://{STORAGE_PATH}", conn_id=os.getenv("OBJECT_STORAGE_CONN_ID") ) # Çalıştırma tarihi context'ten alınır run_date = context["dag_run"].logical_date.strftime("%Y-%m-%d") # Upstream task'tan satış verisi çekilir sales_data = context["ti"].xcom_pull(task_ids="fetch_sales") # Basit rapor oluşturma total_amount = sum([row["amount"] for row in sales_data]) report_text = f"{run_date} toplam satış: {total_amount} TL\n" # Rapor dosyasını yaz output_path = store / f"{run_date}_sales_report.txt" output_path.write_text(report_text)
Bu örnekte neler oldu?
- Context kullanıldı: Çalıştırma tarihi context’ten alındı.
- Object Storage kullanıldı: Rapor farklı sistemlere aynı kodla yazılabilir.
- İdempotency sağlandı: Aynı gün için tekrar çalıştırıldığında aynı dosya üretilecektir.
Gerçek Kullanım Alanları
- Veri ambarı yüklemeleri: Günlük veya saatlik partisyonların aynı tarih bilgisiyle oluşturulması.
- Makine öğrenmesi eğitimleri: Belirli bir günün verisiyle eğitilen modelin her tekrar çalıştırmada aynı kalması.
- Finansal raporlama: Tekrar eden çalıştırmalarda tutarlı sonuç üreten rapor dosyaları.
Sonuç
Airflow Context ve Object Storage, güvenilir veri işleme akışlarının temelini oluşturan iki kavramdır. Context, görevlerin çalıştığı bağlamı sağlayarak idempotency’yi mümkün kılar. Object Storage ise depolama sistemlerinden bağımsız bir erişim katmanı sunar.
Bu iki özellik birlikte kullanıldığında iş akışları hem tekrar edilebilir, hem taşınabilir, hem de bakımı kolay hale gelir.
Airflow 3 Veri Bilimi Okulu Data Engineering Bootcamp‘te. Eğer veri mühendisliğini kapsamlı ve uygulama öğrenmek istiyorsanız Türkiye’nin en kapsamlı ve uygulamalı veri mühendisliği eğitimini tavsiye ederim.