Airflow-GitHub Entegrasyonu: GitHub DAG Dosyalarınız Anında Airflow’da

Airflow-Github veri akışları (data pipelines) orkestrasyonu için sektörde yaygın olarak kullanılan bir ikili. Python ile yazdığınız DAG dosyaları Airflow ile buluştuğunda Airflow bu DAG dosyalarında sizin kendisine ne iş yaptırmak istediğinizi anlıyor ve işi sizin için takip ediyor. Airflow dag dosyalarını bir dizinde arar (AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/ veya airflow.cfg içinde dags_folder = /opt/airflow/dags/) ve bu dizni varsayılan olarak 5 dakikada bir kontrol eder. Ancak geliştirme ortamından dag dosyalarının Airflow dags diznine taşınması konusunda Airflow ile hazır gelen bir özellik yok. Kendi DataOps sisteminizi kendiniz kurmalı ve geliştirmeden canlı sisteme uzanan süreçte dag kodlarının Airflow dags diznine ulaşmasını sağlamalısınız. Bunun en bodos yöntemi kodları kopyalamak olur, daha kötüsü de bu dizinde geliştirme yapmak olur.

Bu yazımızda sürüm kontrol sistemi kullanarak çalışan veri mühendisleri, veri bilimciler, ml mühendisleri vb. geliştiricilerin Airflow kodlarını kod reposundan (Örn: GitHub) Airflow dags klasörüne nasıl ulaştıracağını göreceğiz. Çalışma ortamımız Docker olacak. İşi git-sync adında bir imajdan oluşturacağımız konteynır ile yapacağız.

Git-sync

git-sync, bir Git reposunu (Örn: GitHub) yerel bir dizine çeken, bir süre bekleyen ve ardından bu işlemi tekrarlayan basit bir komuttur. git-sync, bir defalık çekim yapabileceği gibi düzenli aralıklarla da çekim yapabilir. Bir dalın (branch) HEAD’inden, bir Git etiketinden (tag) veya belirli bir Git hash’inden veri çekebilir. Ancak, yalnızca repodaki hedef değiştiyse (örneğin, bir dalda yeni bir commit varsa) tekrar veri çeker.  git-sync, HTTP(S) (kimlik doğrulamalı veya kimlik doğrulamasız) ya da SSH üzerinden veri çekebilir. Daha fazla bilgi için: GitHub – git-sync

GitHub Reposu Yaratma

GitHub üzerinde oturum açın ve kendinize yeni bir repo yaratın. Ben adını airflow-git-sync verdim. Repoyu private yaptım.

Docker Compose ile Airflow

Örnek yapmak için Airflow’a ihtiyacımız olacak. Bu yazıdan faydalanarak docker-compose ile airflow ortamını kurabilirsiniz.

Airflow Konfigürasyonu

.env dosyasına ekleyeceğimiz değişkenlerle bu ayarlamaları yapacağız.

AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/<your_github_reponame>/
DAGS_REPO="<your_github_repo>"
GITHUB_USERNAME="<your_github_username>"

GitHub Parola/Token için Volume

GitHub private repodan çekim yapmak için kullanıcı adı ve parola ihtiyacı olacak. Git-sync’e bunu bir volume ile dosya içinde vereceğiz. Kullanıcı adı ise .env dosyasındaki GITHUB_USERNAME‘den docker-compose’a geçecek. docker-compose.yaml dosyasının olduğu dizinde aşağıdaki linux komutlarını çalıştıralım.

mkdir git-credentials
echo "mysupersecretpassword" > git-credentials/password

mysupersecretpassword yerine GitHub token yerleştiriniz. git-credentials klasörünü volume olarak kullanacağız.

docker-compose.yaml Güncelleme

Dosya içinde uygun bir yere aşağıdaki git-sync servisini ekleyin.

...
...
  git-sync:
    image: registry.k8s.io/git-sync/git-sync:v4.2.3
    container_name: git-sync
    user: "${AIRFLOW_UID:-50000}:0"
    env_file: .env
    command:
      - --repo=${DAGS_REPO}
      - --root=/git
      - --period=10s
      - --ref=HEAD
      - --max-failures=1000000000
      - --verbose=2
      - --username=${GITHUB_USERNAME}
      - --password-file=/git-creds/password
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/git
      - ./git-credentials:/git-creds:ro
...
...

Geliştirme Ortamı

Geliştirme ortamı olarak kullandığınız ne varsa VSCode/PyCharm gibi onu açınız ve GitHub reposunu kopyalayarak bir proje yaratınız. Buraya aşağıdaki python kodlarından oluşan dag dosyasını veya kendi hazırladığınız basit bir dag dosyasını ekleyiniz.

  • my_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator

start_date = datetime(2023, 10, 11)

default_args = {
    'owner': 'airflow',
    'start_date': start_date,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

with DAG('my_dag', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:
    t0 = BashOperator(task_id='ls_data', bash_command='ls -l /tmp', retries=2, retry_delay=timedelta(seconds=15))

    t1 = BashOperator(task_id='download_data',
                      bash_command='curl -o /tmp/dirty_store_transactions.csv https://github.com/erkansirin78/datasets/raw/master/dirty_store_transactions.csv',
                      retries=2, retry_delay=timedelta(seconds=15))

    t2 = BashOperator(task_id='check_file_exists', bash_command='sha256sum /tmp/dirty_store_transactions.csv',
                      retries=2, retry_delay=timedelta(seconds=15))

    t0 >> t1 >> t2

Add, commit ve push ile main dalına değişikliği gönderiniz.

Airflow Arayüz Sonuç Kontrol

Airflow web arayüzünü açın ve my_dag’ı ekranda görünüz.

Şekil-1: Airflow Web UI

İşte bu kadar. Artık GitHub main dalındaki dag kodlarınız anında Airflow’da.

Airflow ve diğer veri mühendisliği temel yeteneklerinizi geliştirmek için VBO Data Engineering Bootcamp‘i öneririm.

Bir sonraki yazıda görüşünceye dek iyi geliştirmeler…

Kapak Görseli: Photo by Roman Synkevych on Unsplash

Yazar Hakkında
Toplam 180 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara