Airflow-GitHub Entegrasyonu: GitHub DAG Dosyalarınız Anında Airflow’da
Airflow-Github veri akışları (data pipelines) orkestrasyonu için sektörde yaygın olarak kullanılan bir ikili. Python ile yazdığınız DAG dosyaları Airflow ile buluştuğunda Airflow bu DAG dosyalarında sizin kendisine ne iş yaptırmak istediğinizi anlıyor ve işi sizin için takip ediyor. Airflow dag dosyalarını bir dizinde arar (AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/
veya airflow.cfg içinde dags_folder = /opt/airflow/dags/
) ve bu dizni varsayılan olarak 5 dakikada bir kontrol eder. Ancak geliştirme ortamından dag dosyalarının Airflow dags diznine taşınması konusunda Airflow ile hazır gelen bir özellik yok. Kendi DataOps sisteminizi kendiniz kurmalı ve geliştirmeden canlı sisteme uzanan süreçte dag kodlarının Airflow dags diznine ulaşmasını sağlamalısınız. Bunun en bodos yöntemi kodları kopyalamak olur, daha kötüsü de bu dizinde geliştirme yapmak olur.
Bu yazımızda sürüm kontrol sistemi kullanarak çalışan veri mühendisleri, veri bilimciler, ml mühendisleri vb. geliştiricilerin Airflow kodlarını kod reposundan (Örn: GitHub) Airflow dags klasörüne nasıl ulaştıracağını göreceğiz. Çalışma ortamımız Docker olacak. İşi git-sync adında bir imajdan oluşturacağımız konteynır ile yapacağız.
Git-sync
git-sync, bir Git reposunu (Örn: GitHub) yerel bir dizine çeken, bir süre bekleyen ve ardından bu işlemi tekrarlayan basit bir komuttur. git-sync, bir defalık çekim yapabileceği gibi düzenli aralıklarla da çekim yapabilir. Bir dalın (branch) HEAD’inden, bir Git etiketinden (tag) veya belirli bir Git hash’inden veri çekebilir. Ancak, yalnızca repodaki hedef değiştiyse (örneğin, bir dalda yeni bir commit varsa) tekrar veri çeker. git-sync, HTTP(S) (kimlik doğrulamalı veya kimlik doğrulamasız) ya da SSH üzerinden veri çekebilir. Daha fazla bilgi için: GitHub – git-sync
GitHub Reposu Yaratma
GitHub üzerinde oturum açın ve kendinize yeni bir repo yaratın. Ben adını airflow-git-sync
verdim. Repoyu private yaptım.
Docker Compose ile Airflow
Örnek yapmak için Airflow’a ihtiyacımız olacak. Bu yazıdan faydalanarak docker-compose ile airflow ortamını kurabilirsiniz.
Airflow Konfigürasyonu
.env dosyasına ekleyeceğimiz değişkenlerle bu ayarlamaları yapacağız.
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/<your_github_reponame>/ DAGS_REPO="<your_github_repo>" GITHUB_USERNAME="<your_github_username>"
GitHub Parola/Token için Volume
GitHub private repodan çekim yapmak için kullanıcı adı ve parola ihtiyacı olacak. Git-sync’e bunu bir volume ile dosya içinde vereceğiz. Kullanıcı adı ise .env dosyasındaki GITHUB_USERNAME
‘den docker-compose’a geçecek. docker-compose.yaml dosyasının olduğu dizinde aşağıdaki linux komutlarını çalıştıralım.
mkdir git-credentials echo "mysupersecretpassword" > git-credentials/password
mysupersecretpassword yerine GitHub token yerleştiriniz. git-credentials klasörünü volume olarak kullanacağız.
docker-compose.yaml Güncelleme
Dosya içinde uygun bir yere aşağıdaki git-sync servisini ekleyin.
... ... git-sync: image: registry.k8s.io/git-sync/git-sync:v4.2.3 container_name: git-sync user: "${AIRFLOW_UID:-50000}:0" env_file: .env command: - --repo=${DAGS_REPO} - --root=/git - --period=10s - --ref=HEAD - --max-failures=1000000000 - --verbose=2 - --username=${GITHUB_USERNAME} - --password-file=/git-creds/password volumes: - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/git - ./git-credentials:/git-creds:ro ... ...
Geliştirme Ortamı
Geliştirme ortamı olarak kullandığınız ne varsa VSCode/PyCharm gibi onu açınız ve GitHub reposunu kopyalayarak bir proje yaratınız. Buraya aşağıdaki python kodlarından oluşan dag dosyasını veya kendi hazırladığınız basit bir dag dosyasını ekleyiniz.
- my_dag.py
from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash import BashOperator start_date = datetime(2023, 10, 11) default_args = { 'owner': 'airflow', 'start_date': start_date, 'retries': 1, 'retry_delay': timedelta(seconds=5) } with DAG('my_dag', default_args=default_args, schedule_interval='@daily', catchup=False) as dag: t0 = BashOperator(task_id='ls_data', bash_command='ls -l /tmp', retries=2, retry_delay=timedelta(seconds=15)) t1 = BashOperator(task_id='download_data', bash_command='curl -o /tmp/dirty_store_transactions.csv https://github.com/erkansirin78/datasets/raw/master/dirty_store_transactions.csv', retries=2, retry_delay=timedelta(seconds=15)) t2 = BashOperator(task_id='check_file_exists', bash_command='sha256sum /tmp/dirty_store_transactions.csv', retries=2, retry_delay=timedelta(seconds=15)) t0 >> t1 >> t2
Add, commit ve push ile main dalına değişikliği gönderiniz.
Airflow Arayüz Sonuç Kontrol
Airflow web arayüzünü açın ve my_dag’ı ekranda görünüz.
İşte bu kadar. Artık GitHub main dalındaki dag kodlarınız anında Airflow’da.
Airflow ve diğer veri mühendisliği temel yeteneklerinizi geliştirmek için VBO Data Engineering Bootcamp‘i öneririm.
Bir sonraki yazıda görüşünceye dek iyi geliştirmeler…
Kapak Görseli: Photo by Roman Synkevych on Unsplash