Veri Bilimi Okulu

Airflow’da Idempotent Veri Akışları: Context ve Object Storage
Airflow’da Idempotent Veri Akışları: Context ve Object Storage
airflow_context_object_storage_kapak_960x640

Loading

Veri işleme süreçlerinde tekrar edilebilirlik (idempotency) ve depolama sistemlerinden bağımsız çalışabilme, güvenilir mimarilerin olmazsa olmaz özellikleridir. Apache Airflow bu noktada yalnızca işleri orkestre eden bir araç değil, aynı zamanda bu tür ihtiyaçları karşılayan mekanizmalar sunan bir platformdur. Bu yazıda Airflow’un iki önemli kavramı üzerinde duracağız: Airflow Context ve Airflow Object Storage.

Airflow Context

Airflow’da her görev çalıştırıldığında beraberinde bir context nesnesi gelir. Bu nesne bir sözlük yapısındadır ve görevle ilgili ayrıntılı bilgiler barındırır:

  • DAG’ın çalıştırılma zamanı
  • Task Instance (ti) bilgisi
  • Parametreler
  • Çalıştırma kimliği (run_id)

Bu bilgiler sayesinde görevler, yalnızca yazılan kodla değil, aynı zamanda çalıştırıldıkları bağlama göre davranış sergileyebilir.

Context’in kullanım alanları

  1. Görevler arası veri paylaşımı (XCom):
    Bir görevden diğerine veri aktarmak için ti.xcom_pull() kullanılabilir.
  2. Tarih bilgisinin alınması:
    Çoğu senaryoda üretilen dosyaların çalıştırma tarihine göre adlandırılması gerekir. datetime.now() kullanmak yerine context içindeki logical_date veya run_after değerleri tercih edilmelidir.
  3. Dinamik akış yönetimi:
    Çalıştırma günü, saat ya da parametresine göre farklı işlemlerin tetiklenmesi mümkündür.

Context bu yönüyle Airflow’da idempotency sağlamak için en kritik araçlardan biridir.

Airflow Context ve Jinja Template

Airflow’da birçok parametre (özellikle BashOperator, PythonOperator veya custom operator’ların parametreleri) Jinja templating ile context’ten gelen değerlerle doldurulabilir.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="example_jinja_context",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False,
):
    run_report = BashOperator(
        task_id="run_report",
        bash_command="echo 'Bugünün tarihi: {{ ds }}' > /tmp/report_{{ ds_nodash }}.txt",
    )

Burada:

  • {{ ds }} → DAG’ın execution date’i (YYYY-MM-DD formatında)
  • {{ ds_nodash }} → tire işaretsiz tarih (YYYYMMDD)

Bu değerler context’ten otomatik gelir. Yani bash_command içinde Python kodu yazmadan context kullanabilmiş oluyoruz.

Sık Kullanılan Jinja Context Değişkenleri

  • {{ ds }} → Çalıştırma tarihi (2025-09-23)
  • {{ ds_nodash }} → 20250923
  • {{ prev_ds }} → Bir önceki çalıştırma tarihi
  • {{ next_ds }} → Bir sonraki çalıştırma tarihi
  • {{ run_id }} → Çalıştırmanın benzersiz kimliği
  • {{ macros }} → Tarih işlemleri için fonksiyonlar

Idempotency’nin Önemi

Idempotency, aynı girdiden her zaman aynı çıktıyı üretmek anlamına gelir. Veri mühendisliğinde bu özellik, güvenilirlik açısından büyük önem taşır.

Örneğin:

  • Çıktı dosya adını datetime.now() ile oluşturmak → idempotent değildir.
  • Çıktı dosya adını context["dag_run"].logical_date ile oluşturmak → idempotenttir.

Bu yaklaşım, bir görevin tekrar çalıştırıldığında farklı sonuç üretmesini engeller ve veri bütünlüğünü korur.

Airflow Object Storage

Airflow’un Object Storage yapısı, farklı dosya sistemleri üzerinde tek tip bir erişim imkânı sağlar. Yerel disk, S3, GCS, Azure Blob veya HDFS gibi sistemlere aynı kod yapısıyla erişmek mümkündür.

Bunu sağlayan temel sınıf ObjectStoragePath’tir. Dosya yolları file://, s3://, gs:// gibi öneklerle tanımlanır.

Örneğin:

from airflow.sdk import ObjectStoragePath

store = ObjectStoragePath(
    "file://outputs/reports",
    conn_id=None
)

(store / "example.txt").write_text("Deneme raporu")

Bağlantı bilgisini değiştirmek yeterlidir; kod tarafında ek değişiklik yapılmaz. Bu da uygulamaların farklı ortamlara taşınmasını kolaylaştırır.

Örnek: Günlük Satış Raporu

Context ve Object Storage birlikte kullanıldığında idempotent ve taşınabilir bir iş akışı elde edilir. Aşağıdaki örnekte günlük satış verilerinden bir rapor üretilmekte ve depolama sistemine kaydedilmektedir.

import os
from airflow.sdk import asset, ObjectStoragePath

STORAGE_SYSTEM = os.getenv("OBJECT_STORAGE_SYSTEM", "file")
STORAGE_PATH = os.getenv("OBJECT_STORAGE_PATH", "outputs/reports")

@asset
def generate_sales_report(context: dict):
    """
    Günlük satış raporunu hazırlar.
    """
    # Depolama bağlantısı
    store = ObjectStoragePath(
        f"{STORAGE_SYSTEM}://{STORAGE_PATH}",
        conn_id=os.getenv("OBJECT_STORAGE_CONN_ID")
    )

    # Çalıştırma tarihi context'ten alınır
    run_date = context["dag_run"].logical_date.strftime("%Y-%m-%d")

    # Upstream task'tan satış verisi çekilir
    sales_data = context["ti"].xcom_pull(task_ids="fetch_sales")

    # Basit rapor oluşturma
    total_amount = sum([row["amount"] for row in sales_data])
    report_text = f"{run_date} toplam satış: {total_amount} TL\n"

    # Rapor dosyasını yaz
    output_path = store / f"{run_date}_sales_report.txt"
    output_path.write_text(report_text)

Bu örnekte neler oldu?

  1. Context kullanıldı: Çalıştırma tarihi context’ten alındı.
  2. Object Storage kullanıldı: Rapor farklı sistemlere aynı kodla yazılabilir.
  3. İdempotency sağlandı: Aynı gün için tekrar çalıştırıldığında aynı dosya üretilecektir.

Gerçek Kullanım Alanları

  • Veri ambarı yüklemeleri: Günlük veya saatlik partisyonların aynı tarih bilgisiyle oluşturulması.
  • Makine öğrenmesi eğitimleri: Belirli bir günün verisiyle eğitilen modelin her tekrar çalıştırmada aynı kalması.
  • Finansal raporlama: Tekrar eden çalıştırmalarda tutarlı sonuç üreten rapor dosyaları.

Sonuç

Airflow Context ve Object Storage, güvenilir veri işleme akışlarının temelini oluşturan iki kavramdır. Context, görevlerin çalıştığı bağlamı sağlayarak idempotency’yi mümkün kılar. Object Storage ise depolama sistemlerinden bağımsız bir erişim katmanı sunar.

Bu iki özellik birlikte kullanıldığında iş akışları hem tekrar edilebilir, hem taşınabilir, hem de bakımı kolay hale gelir.

Airflow 3 Veri Bilimi Okulu Data Engineering Bootcamp‘te. Eğer veri mühendisliğini kapsamlı ve uygulama öğrenmek istiyorsanız Türkiye’nin en kapsamlı ve uygulamalı veri mühendisliği eğitimini tavsiye ederim.

Kaynaklar

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character