Veri Bilimi Okulu

Kubernetes Jobs ve CronJobs ile Otomatik Görevleri Yönetmeyi Öğrenelim
Kubernetes Jobs ve CronJobs ile Otomatik Görevleri Yönetmeyi Öğrenelim
kubernetes_jobs_and_cronjobs_kapak_960x640

Loading

Merhaba arkadaşlar! Bugün Kubernetes dünyasında çok önemli ama bazen gözden kaçan iki konuya dalacağız: Jobs ve CronJobs. Eğer Kubernetes’i bir süredir kullanıyorsanız, muhtemelen Deployment’lar, ReplicaSet’ler ve StatefulSet’ler gibi pod’larınızı sürekli çalıştıran denetleyicilere (controllers) aşinasınızdır. Peki ya sadece bir kez çalışması gereken işler için ne yapmalıyız? İşte tam da bu noktada Jobs ve CronJobs devreye giriyor!

Neden Her Şey Sürekli Çalışmak Zorunda Değil?

Deployment’lar harika. Pod’larınızı sürekli ayakta tutarlar, bir sorun olduğunda yeniden başlatırlar. Ama şunu düşünelim: veritabanınızın yedeğini (backup) almak istiyorsunuz. Bu işlem saatlerce çalışacak bir şey değil, sadece birkaç dakika sürecek bir görev. Ya da toplu olarak e-posta göndermek istiyorsunuz. Bu işlemler tamamlandıktan sonra pod’un çalışmaya devam etmesinin bir anlamı yok.

Kubernetes Jobs, batch (toplu) görevleri çalıştırmak için tasarlanmıştır ve belirlenen sayıda pod’un başarıyla tamamlanmasını sağlar [1]. İşte tam olarak bu tür senaryolar için Jobs ve CronJobs mükemmel çözümler sunuyor.

Job Nedir ve Nasıl Çalışır?

Kubernetes Job’u, bir görevi başarıyla tamamlamak üzere tasarlanmış özel bir kaynaktır (resource). Job’lar tek seferlik görevleri temsil eder ve tamamlandıktan sonra dururlar [2]. Bir Job oluşturduğunuzda şunlar olur:

Job Yaşam Döngüsü (Job Lifecycle)

  1. Pod Oluşturma: Job, manifest dosyasında belirttiğiniz image ile bir pod oluşturur
  2. Hata Yönetimi: Eğer pod çalışırken hata alırsa (bellek, CPU vb. sorunlardan dolayı), Job belirli sayıda yeniden deneme yapar
  3. Tamamlanma Kontrolü: Pod başarıyla tamamlandığında, Job tamamlanmış olarak işaretlenir

Gelin bu süreci biraz daha detaylandıralım:

backoffLimit (Geri Çekilme Limiti)

Manifest dosyanızda backoffLimit: 3 şeklinde bir ayar yaptığınızı düşünelim. Bu, pod başarısız olursa Job’un maksimum 3 kez yeniden deneme yapacağı anlamına gelir. Üç denemeden sonra hala başarısız olursa, Job “BackoffLimitExceeded” hatası ile başarısız olarak işaretlenir [1].

Eğer hiçbir değer belirtmezseniz, backoffLimit varsayılan olarak 6’dır.

activeDeadlineSeconds (Aktif Süre Limiti)

Bu özellik ile bir Job’un maksimum ne kadar süre çalışabileceğini belirlersiniz. Örneğin activeDeadlineSeconds: 100 derseniz, Job 100 saniyeden fazla çalışırsa şunlar olur:

  1. Pod Hemen Sonlandırılır: Çalışan tüm pod’lar anında terminate edilir (SIGTERM sinyali gönderilir)
  2. Job Failed Olarak İşaretlenir: Job’un durumu “Failed” olur ve reason alanında “DeadlineExceeded” hatası görünür
  3. Veri Kaybı Riski: İşlem ortada kesildiği için veritabanına eksik/tutarsız veri yazılmış olabilir
  4. Rollback Yok: Otomatik bir rollback mekanizması yoktur, bu yüzden transaction yönetimi kritiktir

Gerçek Hayattan Örnek:

Diyelim ki ETL pipeline’ınız S3’ten 1000 dosya işliyor ve her dosyayı PostgreSQL’e yazıyor. activeDeadlineSeconds: 100 ayarladınız ama 500. dosyayı işlerken süre doldu:

İşlem Adımları:
├─ Dosya 1-499: ✅ Başarıyla PostgreSQL'e yazıldı
├─ Dosya 500: ⏳ İşleniyor... (activeDeadlineSeconds doldu!)
└─ Dosya 501-1000: ❌ Hiç işlenmedi

Sonuç: 500 dosya veritabanında, 500 dosya eksik. Bir sonraki çalıştırmada duplikasyon riski var!

Önemli: activeDeadlineSeconds, backoffLimit’ten önceliklidir. Yani süre dolduğunda kaç pod oluşturulduğu, kaç kez retry yapıldığı önemli değildir – Job anında sonlandırılır.

completions (Tamamlanma Sayısı)

Bu parametre ile Job’un pod’u kaç kez başarıyla çalıştırması gerektiğini belirlersiniz. Deployment’lardaki replicas kavramına benzer şekilde çalışır. completions: 3 derseniz, Job toplamda 3 pod başarıyla tamamlanana kadar yeni pod’lar oluşturmaya devam eder.

parallelism (Paralellik)

Varsayılan olarak pod’lar sırayla (sequentially) oluşturulur. Ancak parallelism: 2 ayarı yaparsanız, aynı anda 2 pod çalışabilir. Örneğin, completions: 3 ve parallelism: 2 ayarlarıyla, önce 2 pod başlatılır. Bu 2 pod tamamlandığında, kalan 1 pod oluşturulur [1].

Job Manifest Örneği

Basit bir Job manifest’i şöyle görünür:

apiVersion: batch/v1
kind: Job
metadata:
  name: database-backup
spec:
  backoffLimit: 5
  activeDeadlineSeconds: 100
  ttlSecondsAfterFinished: 60
  template:
    spec:
      containers:
      - name: backup
        image: mongo:latest
        command: ["mongodump", "--host=mongodb-service"]
      restartPolicy: Never

ttlSecondsAfterFinished özelliği de çok kullanışlıdır. Bu ayar ile Job tamamlandıktan sonra otomatik olarak silinme süresi belirlenir ve Job silindiğinde ilişkili pod’lar da silinir [2]. Yukarıdaki örnekte Job tamamlandıktan 60 saniye sonra otomatik olarak silinecektir.

Job’ları Hangi Durumlarda Kullanmalıyız?

Jobs, görüntü işleme veya rapor oluşturma gibi toplu işleri çalıştırmak ve paralellik gerektiren karmaşık görevleri yürütmek için idealdir [3]. İşte bazı yaygın kullanım senaryoları:

1. Veritabanı Yedekleme (Database Backup)

Her gece veritabanınızın yedeğini almak istiyorsunuz. Bu işlem 5-10 dakika sürüyor ve tamamlandığında pod’un çalışmaya devam etmesine gerek yok.

2. Toplu E-posta Gönderimi

Binlerce kullanıcıya newsletter göndermek gibi. İşlem tamamlandığında container’ın çalışmaya devam etmesine gerek yok.

3. Log Rotasyonu (Log Rotation)

Log dosyalarının çok büyümemesini sağlamak ve gerektiğinde geçmiş log verilerine erişim için log rotasyonu yapılabilir [3].

4. Veri İşleme ve Dönüştürme

Log dosyalarını analiz etmek, veriyi parse etmek, dönüştürmek gibi işlemler. Bu tür görevler genellikle belirli aralıklarla veya ihtiyaç duyulduğunda çalıştırılır [3].

CronJob: Zamanlanmış Görevler için Mükemmel Çözüm

Peki ya bu Job’ları düzenli aralıklarla çalıştırmak istersek? İşte burada CronJob devreye giriyor!

CronJob, tekrarlayan bir programa göre tek seferlik Job’lar başlatır ve cron formatında yazılmış bir programa göre periyodik olarak Job çalıştırır [1].

CronJob Nasıl Çalışır?

CronJob’un çalışma mantığı şöyle:

  1. Siz bir CronJob oluşturursunuz ve cron expression (cron ifadesi) belirlersiniz
  2. Kubernetes, belirlenen zamanlarda otomatik olarak Job oluşturur
  3. Her Job kendi yaşam döngüsünü tamamlar
  4. Bir sonraki zamanlanmış saatte yeni bir Job oluşturulur

Cron Expression (Cron İfadesi) Anlayalım

Cron ifadesi, boşluklarla ayrılmış beş giriş kullanılarak tanımlanır: dakika, saat, ayın günü, ay ve haftanın günü [4]. İşte format:

# ┌───────────── dakika (0 - 59)
# │ ┌───────────── saat (0 - 23)
# │ │ ┌───────────── ayın günü (1 - 31)
# │ │ │ ┌───────────── ay (1 - 12)
# │ │ │ │ ┌───────────── haftanın günü (0 - 6)
# │ │ │ │ │
# * * * * *

Örnekler:

  • * * * * * → Her dakika
  • 0 0 * * * → Her gün gece yarısı (00:00)
  • 0 2 * * * → Her gün saat 02:00’de
  • */5 * * * * → Her 5 dakikada bir
  • 0 9 * * 1 → Her Pazartesi saat 09:00’da
  • 30 14 1 * * → Her ayın 1. günü saat 14:30’da

Cron expression’larınızı test etmek için crontab.guru gibi online araçları kullanabilirsiniz [4].

CronJob Manifest Örneği

apiVersion: batch/v1
kind: CronJob
metadata:
  name: database-backup-cron
spec:
  schedule: "0 0 * * *"  # Her gece yarısı
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 2
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 3
      template:
        spec:
          containers:
          - name: backup
            image: mongo:latest
            command: ["mongodump", "--host=mongodb-service"]
          restartPolicy: Never

CronJob’un Önemli Özellikleri

1. concurrencyPolicy (Eşzamanlılık Politikası)

Concurrency policy, aynı anda kaç tane eşzamanlı Job instance’ının çalışmasına izin verileceğini kontrol eder [1]. Üç seçenek var:

  • Allow: Birden fazla Job aynı anda çalışabilir
  • Forbid (varsayılan): Bir Job çalışırken yeni Job başlatılmaz
  • Replace: Çalışan Job durdurulur ve yeni Job başlatılır

Örneğin, yedekleme işleminiz 10 dakika sürüyor ama CronJob her 5 dakikada bir çalışacak şekilde ayarlanmış. Forbid politikası ile önceki yedekleme bitmeden yeni bir Job başlatılmaz.

2. successfulJobsHistoryLimit ve failedJobsHistoryLimit

Varsayılan olarak successfulJobsHistoryLimit 3 ve failedJobsHistoryLimit 1 olarak ayarlanır [5]. Bu ayarlar ile ne kadar Job geçmişinin saklanacağını belirlersiniz. Eski Job’lar otomatik olarak silinir.

3. startingDeadlineSeconds

Bu alan, Job’un zamanlanmış saatini kaçırması durumunda başlamak için maksimum süreyi saniye cinsinden belirtir ve bu süreden sonra Job başlatılmaz [4]. Örneğin startingDeadlineSeconds: 100 derseniz, zamanlanmış saatten 100 saniye sonrasına kadar Job başlatılabilir. Bu süre geçerse “missed” olarak işaretlenir.

4. suspend (Askıya Alma)

Suspend alanı true olarak ayarlandığında yeni Job’ların çalışmasını engeller ancak mevcut çalışmaların bitmesine izin verir [1]. Bu özellik çok kullanışlıdır. Örneğin geçici olarak CronJob’u durdurmak istiyorsanız:

kubectl patch cronjob database-backup-cron -p '{"spec":{"suspend":true}}'

Tekrar aktif etmek için:

kubectl patch cronjob database-backup-cron -p '{"spec":{"suspend":false}}'

Pratik Örnek: MongoDB Yedekleme ile Öğrenelim

Hadi teoriden pratiğe geçelim ve gerçek bir senaryo üzerinden Job ve CronJob’u öğrenelim.

Adım 1: MongoDB Job ile Tek Seferlik Yedekleme

Diyelim ki StatefulSet ile çalışan bir MongoDB’niz var. Şimdi bu veritabanının yedeğini Job ile alalım:

apiVersion: batch/v1
kind: Job
metadata:
  name: mongodb-backup-job
spec:
  backoffLimit: 5
  activeDeadlineSeconds: 100
  ttlSecondsAfterFinished: 60
  template:
    spec:
      containers:
      - name: backup
        image: mongo:latest
        command:
        - mongodump
        - --host=mongodb-0.mongodb-service.default.svc.cluster.local:27017
        - --username=admin
        - --password=secret123
        - --out=/backup/dump
        volumeMounts:
        - name: backup-volume
          mountPath: /backup
      volumes:
      - name: backup-volume
        persistentVolumeClaim:
          claimName: backup-pvc
      restartPolicy: Never

Job’u çalıştıralım:

kubectl apply -f mongodb-backup-job.yaml

Job’u kontrol edelim:

kubectl get jobs

Pod’ları görelim:

kubectl get pods

Job başarıyla tamamlandıktan sonra, ttlSecondsAfterFinished: 60 ayarı sayesinde 60 saniye sonra otomatik olarak silinecektir.

Adım 2: CronJob ile Düzenli Yedekleme

Şimdi bu yedeklemeyi her gece saat 02:00’de otomatik çalışacak şekilde ayarlayalım:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: mongodb-backup-cron
spec:
  schedule: "0 2 * * *"  # Her gece saat 02:00
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 2
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      backoffLimit: 3
      template:
        spec:
          containers:
          - name: backup
            image: mongo:latest
            command:
            - mongodump
            - --host=mongodb-0.mongodb-service.default.svc.cluster.local:27017
            - --username=admin
            - --password=secret123
            - --out=/backup/dump
            volumeMounts:
            - name: backup-volume
              mountPath: /backup
          volumes:
          - name: backup-volume
            persistentVolumeClaim:
              claimName: backup-pvc
          restartPolicy: Never

CronJob’u oluşturalım:

kubectl apply -f mongodb-backup-cron.yaml

CronJob’u kontrol edelim:

kubectl get cronjobs
# veya kısa hali
kubectl get cj

Oluşturulan Job’ları görmek için:

kubectl get jobs --watch

CronJob Yönetimi için Kullanışlı Komutlar

CronJob’u Manuel Çalıştırma

Bazen bir CronJob’u zamanlanmış saatinin dışında hemen çalıştırmanız gerekebilir ve bunun için CronJob’un mevcut yapılandırmasını kullanarak tek seferlik bir Job oluşturabilirsiniz [6]. Şöyle yapabilirsiniz:

# CronJob'un YAML'ini alın
kubectl get cronjob mongodb-backup-cron -o yaml > temp-job.yaml

# YAML'i düzenleyin (kind: CronJob -> kind: Job, schedule kaldırın)
# Sonra Job'u oluşturun
kubectl create -f temp-job.yaml

CronJob’u Geçici Olarak Durdurma

# Suspend etmek için
kubectl patch cronjob mongodb-backup-cron -p '{"spec":{"suspend":true}}'

# Tekrar aktif etmek için
kubectl patch cronjob mongodb-backup-cron -p '{"spec":{"suspend":false}}'

CronJob Loglarını İnceleme

# Son çalışan Job'u bulun
kubectl get jobs -l cron-job-name=mongodb-backup-cron

# O Job'un loglarını görün
kubectl logs job/mongodb-backup-cron-1234567890

CronJob’u Silme

kubectl delete cronjob mongodb-backup-cron

CronJob silindiğinde, oluşturduğu tüm Job’lar ve pod’lar da silinir ve yeni Job’lar oluşturulmaz [5].

Job vs CronJob: Hangisini Ne Zaman Kullanmalı?

Job Kullanın:

  • Tek seferlik görevler için
  • Hemen çalışması gereken işlemler için
  • Başka bir sistem tarafından tetiklenecek görevler için
  • Test ve geliştirme sırasında

CronJob Kullanın:

  • Düzenli tekrarlayan görevler için
  • Zamanlanmış yedeklemeler için
  • Periyodik raporlama için
  • Düzenli temizlik işlemleri için

En İyi Pratikler ve İpuçları

1. Daima İdempotent İşlemler Yazın

Kubernetes, belirli durumlarda iki Job oluşturulabileceği veya hiç Job oluşturulmayabileceği için Job’ların idempotent olması gerekir [1]. Yani işleminiz birden fazla kez çalışsa bile aynı sonucu vermeli.

2. ttlSecondsAfterFinished Kullanın

Yönetilmeyen Job’lar, tamamlandıktan sonra pod’ları bıraktığı için ttlSecondsAfterFinished alanını ayarlamak önerilir [2]. Bu, cluster’ınızı temiz tutar.

3. Resource Limit’leri Belirleyin

Her Job için CPU ve memory limit’leri belirleyin:

resources:
  limits:
    memory: "256Mi"
    cpu: "500m"
  requests:
    memory: "128Mi"
    cpu: "250m"

4. Hata Durumlarını Yönetin

backoffLimit ve activeDeadlineSeconds ayarlarını mutlaka yapın. Aksi halde Job sonsuza kadar retry yapabilir.

5. Log’ları Saklamak İçin External Storage Kullanın

Job tamamlandığında pod silineceği için log’lar kaybolur. Önemli log’ları external storage’a yönlendirin.

6. Monitoring ve Alerting Kurun

CronJob monitoring için CronJob’ların aktivitelerini izlemek ve hatalarına yanıt vermek önemlidir [7]. Prometheus ve Grafana gibi araçlarla Job’larınızı izleyin.

Yaygın Hatalar ve Sorun Giderme

1. BackoffLimitExceeded Hatası

Bu hata, Job’un belirlenen retry sayısını aştığı anlamına gelir. Çözüm:

  • Pod loglarını kontrol edin: kubectl logs <pod-name>
  • Event’leri inceleyin: kubectl describe job <job-name>
  • backoffLimit değerini artırın veya uygulama kodunuzu düzeltin

2. DeadlineExceeded Hatası

Job, activeDeadlineSeconds süresini aştı. Çözüm:

  • İşleminiz gerçekten çok mu uzun sürüyor?
  • activeDeadlineSeconds değerini artırın
  • Paralellik ekleyerek işlemi hızlandırın

3. CronJob Çalışmıyor

Kontrol edilecekler:

  • Cron expression doğru mu? crontab.guru’da test edin
  • CronJob suspend durumda mı? kubectl get cj ile kontrol edin
  • startingDeadlineSeconds çok kısa mı?

4. Çok Fazla Pod Oluşuyor

CronJob geçmişi periyodik garbage collection’a tabidir ve gördüğünüz giriş sayısı beklenenden fazla olabilir [5]. successfulJobsHistoryLimit ve failedJobsHistoryLimit ayarlarını düşürün.

Gerçek Dünya Örneği: S3’ten PostgreSQL’e ETL Pipeline

Şimdi çok yaygın bir senaryoya bakalım: S3’te saklanan veriyi okuyup, dönüştürüp PostgreSQL’e yazan bir ETL (Extract-Transform-Load) pipeline’ı. Birçok şirket bu tür ETL süreçlerini Kubernetes CronJob’ları ile başarıyla uygulamaktadır.

Senaryo

Diyelim ki:

  • Her saat başı S3 bucket’ınıza yeni CSV dosyaları geliyor
  • Bu dosyalar müşteri aktivite logları içeriyor
  • Bu verileri okuyup, temizleyip, zenginleştirip PostgreSQL’e yazmanız gerekiyor
  • Bu işlem her saat başında otomatik olarak çalışmalı

Neden CronJob İdeal Bir Çözüm?

✅ Periyodik Görev: Saatte bir çalışması gerekiyor – CronJob için ideal ✅ Kısa Ömürlü İşlem: Veri işleme tamamlandıktan sonra pod’un çalışmaya devam etmesine gerek yok ✅ Kaynak Verimliliği: Deployment gibi sürekli çalışan bir pod yerine, sadece ihtiyaç olduğunda kaynak kullanılır ✅ Hata Yönetimi: backoffLimit ile otomatik retry mekanizması ✅ İzlenebilirlik: Her çalıştırma için ayrı Job ve pod oluşur, log’ları takip etmek kolay

Python ETL Uygulaması

Önce basit bir Python ETL uygulaması yazalım:

# etl_s3_to_postgres.py
import boto3
import pandas as pd
from sqlmodel import SQLModel, Field, create_engine, Session, select
from typing import Optional
from datetime import datetime
import os
import logging
import signal
import sys

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# SQLModel ile model tanımla
class CustomerEvent(SQLModel, table=True):
    __tablename__ = "customer_events"
    
    id: Optional[int] = Field(default=None, primary_key=True)
    customer_id: int = Field(index=True)
    event_type: str = Field(max_length=100)
    event_time: datetime = Field(index=True)
    event_data: Optional[str] = None  # JSON string olarak saklanacak
    processed_at: datetime = Field(default_factory=datetime.now)
    
    class Config:
        # unique constraint için
        table_args = (
            {'extend_existing': True},
        )

class S3ToPostgresETL:
    def __init__(self):
        self.shutdown_requested = False
        signal.signal(signal.SIGTERM, self.handle_shutdown)
        
        # S3 yapılandırması
        self.s3_client = boto3.client('s3',
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
            region_name=os.getenv('AWS_REGION', 'eu-west-1')
        )
        self.bucket_name = os.getenv('S3_BUCKET_NAME')
        self.s3_prefix = os.getenv('S3_PREFIX', 'customer-logs/')
        
        # PostgreSQL yapılandırması - SQLModel engine
        database_url = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT', '5432')}/{os.getenv('POSTGRES_DB')}"
        self.engine = create_engine(database_url, echo=False)
        
        # Tabloları oluştur
        SQLModel.metadata.create_all(self.engine)
        
    def handle_shutdown(self, signum, frame):
        """SIGTERM sinyalini yakala ve temiz kapanış yap"""
        logger.warning("SIGTERM alındı! Temiz kapanış başlatılıyor...")
        self.shutdown_requested = True
        sys.exit(0)
        
    def extract_from_s3(self):
        """S3'ten son 1 saatteki dosyaları oku"""
        logger.info(f"S3'ten veri çekiliyor: {self.bucket_name}/{self.s3_prefix}")
        
        # Son 1 saatteki dosyaları listele
        current_hour = datetime.now().strftime('%Y-%m-%d-%H')
        prefix = f"{self.s3_prefix}{current_hour}"
        
        response = self.s3_client.list_objects_v2(
            Bucket=self.bucket_name,
            Prefix=prefix
        )
        
        dataframes = []
        if 'Contents' in response:
            for obj in response['Contents']:
                if self.shutdown_requested:
                    logger.info("Shutdown request - extract durduruluyor")
                    break
                    
                key = obj['Key']
                logger.info(f"Dosya okunuyor: {key}")
                
                # CSV dosyasını oku
                obj_data = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
                df = pd.read_csv(obj_data['Body'])
                dataframes.append(df)
        
        if not dataframes:
            logger.warning("İşlenecek dosya bulunamadı")
            return pd.DataFrame()
            
        # Tüm dataframe'leri birleştir
        combined_df = pd.concat(dataframes, ignore_index=True)
        logger.info(f"Toplam {len(combined_df)} satır okundu")
        return combined_df
    
    def transform(self, df):
        """Veriyi dönüştür ve temizle"""
        if df.empty:
            return df
            
        logger.info("Veri dönüştürme başlıyor...")
        
        # Null değerleri temizle
        df = df.dropna()
        
        # Timestamp ekle
        df['processed_at'] = datetime.now()
        
        # Duplikaları kaldır
        df = df.drop_duplicates(subset=['customer_id', 'event_time'])
        
        # Veri tipleri dönüşümü
        df['customer_id'] = df['customer_id'].astype(int)
        df['event_time'] = pd.to_datetime(df['event_time'])
        
        # event_data JSON string'e çevir (eğer dict ise)
        if 'event_data' in df.columns:
            df['event_data'] = df['event_data'].apply(
                lambda x: str(x) if pd.notna(x) else None
            )
        
        logger.info(f"Dönüştürme tamamlandı. Temiz veri: {len(df)} satır")
        return df
    
    def load_to_postgres(self, df):
        """SQLModel kullanarak veriyi PostgreSQL'e yükle"""
        if df.empty:
            logger.info("Yüklenecek veri yok")
            return
            
        logger.info("PostgreSQL'e veri yükleniyor (SQLModel)...")
        
        # Session oluştur
        with Session(self.engine) as session:
            try:
                inserted_count = 0
                skipped_count = 0
                
                # DataFrame'i CustomerEvent objelerine dönüştür
                records = df.to_dict('records')
                
                for i, record in enumerate(records):
                    # Her 100 kayıtta bir shutdown kontrolü
                    if i % 100 == 0 and self.shutdown_requested:
                        logger.info("Shutdown request - load durduruluyor")
                        session.rollback()
                        return
                    
                    # Aynı event var mı kontrol et (duplicate check)
                    statement = select(CustomerEvent).where(
                        CustomerEvent.customer_id == record['customer_id'],
                        CustomerEvent.event_time == record['event_time']
                    )
                    existing_event = session.exec(statement).first()
                    
                    if existing_event:
                        skipped_count += 1
                        continue
                    
                    # Yeni event oluştur
                    event = CustomerEvent(
                        customer_id=record['customer_id'],
                        event_type=record['event_type'],
                        event_time=record['event_time'],
                        event_data=record.get('event_data'),
                        processed_at=record['processed_at']
                    )
                    
                    session.add(event)
                    inserted_count += 1
                    
                    # Her 500 kayıtta bir commit (batch processing)
                    if inserted_count % 500 == 0:
                        session.commit()
                        logger.info(f"Progress: {inserted_count} kayıt eklendi...")
                
                # Kalan kayıtları commit et
                session.commit()
                logger.info(f"✅ Başarıyla {inserted_count} kayıt eklendi, {skipped_count} duplicate atlandı")
                
            except Exception as e:
                session.rollback()
                logger.error(f"❌ PostgreSQL yükleme hatası: {e}")
                raise
    
    def load_to_postgres_bulk(self, df):
        """Daha hızlı bulk insert için alternatif method"""
        if df.empty:
            logger.info("Yüklenecek veri yok")
            return
            
        logger.info("PostgreSQL'e bulk insert yapılıyor...")
        
        with Session(self.engine) as session:
            try:
                # DataFrame'deki tüm kayıtları CustomerEvent objelerine çevir
                events = [
                    CustomerEvent(
                        customer_id=row['customer_id'],
                        event_type=row['event_type'],
                        event_time=row['event_time'],
                        event_data=row.get('event_data'),
                        processed_at=row['processed_at']
                    )
                    for _, row in df.iterrows()
                ]
                
                # Bulk insert - çok daha hızlı ama duplicate check yok
                session.bulk_save_objects(events)
                session.commit()
                
                logger.info(f"✅ Bulk insert: {len(events)} kayıt eklendi")
                
            except Exception as e:
                session.rollback()
                logger.error(f"❌ Bulk insert hatası: {e}")
                raise
    
    def get_statistics(self):
        """SQLModel query örneği - istatistik al"""
        with Session(self.engine) as session:
            # Bugün işlenen toplam kayıt sayısı
            today = datetime.now().date()
            statement = select(CustomerEvent).where(
                CustomerEvent.processed_at >= today
            )
            today_count = len(session.exec(statement).all())
            
            logger.info(f"📊 Bugün işlenen kayıt: {today_count}")
            return today_count
    
    def run(self):
        """ETL pipeline'ını çalıştır"""
        try:
            logger.info("🚀 ETL süreci başlatıldı")
            start_time = datetime.now()
            
            # Extract
            df = self.extract_from_s3()
            
            if self.shutdown_requested:
                return
            
            # Transform
            df = self.transform(df)
            
            if self.shutdown_requested:
                return
            
            # Load (duplicate check ile - daha güvenli)
            self.load_to_postgres(df)
            
            # Alternatif: Bulk insert (daha hızlı, duplicate check yok)
            # self.load_to_postgres_bulk(df)
            
            # İstatistik göster
            self.get_statistics()
            
            duration = (datetime.now() - start_time).total_seconds()
            logger.info(f"✅ ETL süreci tamamlandı. Süre: {duration:.2f} saniye")
            
        except Exception as e:
            logger.error(f"❌ ETL hatası: {e}")
            raise

if __name__ == "__main__":
    etl = S3ToPostgresETL()
    etl.run()

Dockerfile

FROM python:3.12-slim

# Metadata
LABEL maintainer="your-email@example.com"
LABEL description="S3 to PostgreSQL ETL Pipeline"

# Çalışma dizini
WORKDIR /app

# Sistem bağımlılıklarını kur (psycopg2 için gerekli)
RUN apt-get update && apt-get install -y \
    libpq-dev \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Python bağımlılıklarını önce kopyala (Docker cache için)
COPY requirements.txt .

# Bağımlılıkları kur
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt

# ETL script'ini kopyala
COPY etl_s3_to_postgres.py .

# Root olmayan bir user oluştur (güvenlik için)
RUN useradd -m -u 1000 etluser && \
    chown -R etluser:etluser /app

# User'ı değiştir
USER etluser

# Health check (opsiyonel)
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD python -c "import sys; sys.exit(0)" || exit 1

# Container başlatıldığında ETL'i çalıştır
CMD ["python", "-u", "etl_s3_to_postgres.py"]

Multi-Stage Build ile Optimize Edilmiş Dockerfile (Önerilen)

# Builder stage - compile dependencies
FROM python:3.12-slim AS builder

WORKDIR /app

# Sistem bağımlılıklarını kur
RUN apt-get update && apt-get install -y \
    libpq-dev \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Python bağımlılıklarını kur
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir --user -r requirements.txt

# Runtime stage - minimal image
FROM python:3.12-slim

LABEL maintainer="your-email@example.com"
LABEL description="S3 to PostgreSQL ETL Pipeline - Optimized"

WORKDIR /app

# Runtime bağımlılıkları
RUN apt-get update && apt-get install -y \
    libpq5 \
    && rm -rf /var/lib/apt/lists/*

# Builder stage'den Python paketlerini kopyala
COPY --from=builder /root/.local /root/.local

# PATH'e ekle
ENV PATH=/root/.local/bin:$PATH

# ETL script'ini kopyala
COPY etl_s3_to_postgres.py .

# Root olmayan user oluştur
RUN useradd -m -u 1000 etluser && \
    chown -R etluser:etluser /app

USER etluser

# Unbuffered Python output için (loglar için önemli)
ENV PYTHONUNBUFFERED=1

CMD ["python", "-u", "etl_s3_to_postgres.py"]

.dockerignore (Build’i hızlandırır)

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# IDE
.vscode/
.idea/
*.swp
*.swo
*~

# Git
.git/
.gitignore

# Documentation
README.md
*.md

# Test
tests/
.pytest_cache/
.coverage
htmlcov/

# CI/CD
.github/
.gitlab-ci.yml

# Docker
Dockerfile*
docker-compose*.yml
.dockerignore

# Kubernetes
*.yaml
*.yml

requirements.txt

boto3==1.40.47
pandas==2.3.3
sqlmodel==0.0.25
psycopg2-binary==2.9.10

Kubernetes CronJob Manifest

apiVersion: batch/v1
kind: CronJob
metadata:
  name: s3-to-postgres-etl
  namespace: data-pipeline
spec:
  # Her saat başında çalış
  schedule: "0 * * * *"
  
  # Önceki Job bitmeden yenisi başlamasın
  concurrencyPolicy: Forbid
  
  # Son 3 başarılı ve 1 başarısız Job'u sakla
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 1
  
  # Zamanlanmış saatten 300 saniye sonrasına kadar başlatılabilir
  startingDeadlineSeconds: 300
  
  jobTemplate:
    spec:
      # Maksimum 2 saat çalışabilir
      activeDeadlineSeconds: 7200
      
      # Başarısız olursa 3 kez dene
      backoffLimit: 3
      
      # Job tamamlandıktan 1 saat sonra otomatik sil
      ttlSecondsAfterFinished: 3600
      
      template:
        metadata:
          labels:
            app: s3-postgres-etl
            version: v1
        spec:
          restartPolicy: Never
          
          containers:
          - name: etl-worker
            image: your-registry/s3-postgres-etl:v1.0
            imagePullPolicy: IfNotPresent
            
            env:
            # AWS yapılandırması
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-credentials
                  key: access-key-id
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: aws-credentials
                  key: secret-access-key
            - name: AWS_REGION
              value: "eu-west-1"
            - name: S3_BUCKET_NAME
              value: "customer-data-lake"
            - name: S3_PREFIX
              value: "raw-logs/customer-events/"
            
            # PostgreSQL yapılandırması
            - name: POSTGRES_HOST
              value: "postgresql-service.database.svc.cluster.local"
            - name: POSTGRES_PORT
              value: "5432"
            - name: POSTGRES_DB
              value: "analytics"
            - name: POSTGRES_USER
              valueFrom:
                secretKeyRef:
                  name: postgres-credentials
                  key: username
            - name: POSTGRES_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: postgres-credentials
                  key: password
            
            resources:
              requests:
                memory: "512Mi"
                cpu: "500m"
              limits:
                memory: "1Gi"
                cpu: "1000m"
          
          # Eğer AWS IAM Roles for Service Accounts (IRSA) kullanıyorsanız
          serviceAccountName: s3-postgres-etl-sa
---
# Secret'ları oluşturmak için (gerçek değerleri kullanın!)
apiVersion: v1
kind: Secret
metadata:
  name: aws-credentials
  namespace: data-pipeline
type: Opaque
stringData:
  access-key-id: "YOUR_AWS_ACCESS_KEY"
  secret-access-key: "YOUR_AWS_SECRET_KEY"
---
apiVersion: v1
kind: Secret
metadata:
  name: postgres-credentials
  namespace: data-pipeline
type: Opaque
stringData:
  username: "etl_user"
  password: "super_secret_password"

Deploy Etme

# Namespace oluştur
kubectl create namespace data-pipeline

# Docker image'ı build et (basit versiyon)
docker build -t your-registry/s3-postgres-etl:v1.0 .

# Veya multi-stage build ile (önerilen - daha küçük image)
docker build -f Dockerfile -t your-registry/s3-postgres-etl:v1.0 .

# Image boyutunu kontrol et
docker images | grep s3-postgres-etl

# Image'ı push et
docker push your-registry/s3-postgres-etl:v1.0

# Alternatif: Docker Hub kullanıyorsanız
docker login
docker tag your-registry/s3-postgres-etl:v1.0 username/s3-postgres-etl:v1.0
docker push username/s3-postgres-etl:v1.0

# CronJob'u deploy et
kubectl apply -f s3-postgres-etl-cronjob.yaml

# CronJob'u kontrol et
kubectl get cronjob -n data-pipeline

# Detaylı bilgi
kubectl describe cronjob s3-to-postgres-etl -n data-pipeline

# Manuel test için hemen çalıştır
kubectl create job --from=cronjob/s3-to-postgres-etl manual-test-1 -n data-pipeline

# Job'un durumunu izle
kubectl get jobs -n data-pipeline --watch

# Pod'ları listele
kubectl get pods -n data-pipeline

# Log'ları izle (pod adını yukarıdan al)
kubectl logs -f <pod-name> -n data-pipeline

# Veya job'dan direkt
kubectl logs -f job/manual-test-1 -n data-pipeline

# Hata ayıklama için pod'a gir (eğer hala çalışıyorsa)
kubectl exec -it <pod-name> -n data-pipeline -- /bin/bash

# CronJob'u geçici olarak durdur
kubectl patch cronjob s3-to-postgres-etl -n data-pipeline \
  -p '{"spec":{"suspend":true}}'

# Tekrar aktif et
kubectl patch cronjob s3-to-postgres-etl -n data-pipeline \
  -p '{"spec":{"suspend":false}}'

# CronJob'u sil (tüm job'lar ve pod'lar silinir)
kubectl delete cronjob s3-to-postgres-etl -n data-pipeline

Local’de Test Etme (Docker Compose ile)

Production’a deploy etmeden önce local’de test edin:

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: analytics
      POSTGRES_USER: etl_user
      POSTGRES_PASSWORD: test_password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  etl:
    build: .
    depends_on:
      - postgres
    environment:
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      AWS_REGION: eu-west-1
      S3_BUCKET_NAME: test-bucket
      S3_PREFIX: test-data/
      POSTGRES_HOST: postgres
      POSTGRES_PORT: 5432
      POSTGRES_DB: analytics
      POSTGRES_USER: etl_user
      POSTGRES_PASSWORD: test_password

volumes:
  postgres_data:
# Local test
docker-compose up --build

# Sadece ETL çalıştır
docker-compose run --rm etl

# Temizle
docker-compose down -v

Monitoring ve Alerting

ETL pipeline’ınızı izlemek için Prometheus metrics ekleyin:

# etl_s3_to_postgres.py içine eklenecek
from prometheus_client import Counter, Histogram, Gauge, push_to_gateway
import time

# Metrics tanımla
etl_runs_total = Counter('etl_runs_total', 'Total ETL runs')
etl_failures_total = Counter('etl_failures_total', 'Total ETL failures')
etl_duration_seconds = Histogram('etl_duration_seconds', 'ETL duration')
etl_rows_processed = Gauge('etl_rows_processed', 'Rows processed in last run')

def run(self):
    etl_runs_total.inc()
    start_time = time.time()
    
    try:
        df = self.extract_from_s3()
        df = self.transform(df)
        self.load_to_postgres(df)
        
        etl_rows_processed.set(len(df))
        etl_duration_seconds.observe(time.time() - start_time)
        
        # Metrics'i Pushgateway'e gönder
        push_to_gateway('pushgateway:9091', job='s3-postgres-etl', registry=...)
        
    except Exception as e:
        etl_failures_total.inc()
        raise

Bu Yaklaşımın Avantajları

1. Maliyet Optimizasyonu: Deployment’tan %95 daha az kaynak kullanır çünkü sadece saatte 1 kez 5-10 dakika çalışır

2. Basitlik: Her ETL tap’ı ayrı container’da olduğu için yeni pipeline’lar oluşturmak ve deploy etmek kolaydır

3. İzlenebilirlik: Her çalıştırma için ayrı Job ve pod oluşur, debug ve monitoring kolay

4. Esneklik: Schedule’ı değiştirmek sadece bir cron expression değişikliği

5. Güvenilirlik: Kubernetes’in built-in retry ve error handling mekanizmaları

Dikkat Edilmesi Gerekenler

⚠️ İdempotency: ETL kodunuz aynı veriyi birden fazla kez işlese bile aynı sonucu vermeli

⚠️ State Management: Hangi dosyaların işlendiğini takip etmek için S3’te veya veritabanında state bilgisi saklayın

⚠️ Timeout Ayarı Kritik: activeDeadlineSeconds çok kısa olursa işlem yarıda kesilir! ETL pipeline’ınızın ortalama çalışma süresini ölçün ve en az %50 fazlasını ayarlayın. Örneğin ortalama 30 dakika sürüyorsa, 45-60 dakika (2700-3600 saniye) ayarlayın:

spec:
  jobTemplate:
    spec:
      # 1 saat = 3600 saniye
      activeDeadlineSeconds: 3600

⚠️ Transaction Yönetimi: PostgreSQL’e yazarken SQLModel’in Session yönetimi otomatik transaction sağlar. Hata durumunda rollback yapılır:

def load_to_postgres(self, df):
    """SQLModel Session otomatik transaction yönetimi sağlar"""
    with Session(self.engine) as session:
        try:
            # Session otomatik olarak transaction başlatır
            
            for record in df.to_dict('records'):
                event = CustomerEvent(**record)
                session.add(event)
            
            # Her şey başarılıysa commit
            session.commit()
            logger.info("✅ Transaction başarıyla commit edildi")
            
        except Exception as e:
            # Hata varsa otomatik rollback
            session.rollback()
            logger.error(f"❌ Hata! Transaction rollback edildi: {e}")
            raise

⚠️ Graceful Shutdown: Python uygulamanıza SIGTERM sinyali geldiğinde temiz bir şekilde kapanmasını sağlayın:

import signal
import sys

class S3ToPostgresETL:
    def __init__(self):
        self.shutdown_requested = False
        # SIGTERM sinyalini yakala
        signal.signal(signal.SIGTERM, self.handle_shutdown)
    
    def handle_shutdown(self, signum, frame):
        logger.warning("SIGTERM alındı! Temiz kapanış başlatılıyor...")
        self.shutdown_requested = True
        # Mevcut transaction'ı rollback et
        if hasattr(self, 'current_conn'):
            self.current_conn.rollback()
            self.current_conn.close()
        sys.exit(0)
    
    def load_to_postgres(self, df):
        conn = psycopg2.connect(**self.pg_config)
        self.current_conn = conn  # Shutdown handler için
        
        try:
            for i, record in enumerate(df.to_dict('records')):
                # Her 100 kayıtta bir shutdown kontrolü
                if i % 100 == 0 and self.shutdown_requested:
                    logger.info("Shutdown request - işlem durduruluyor")
                    conn.rollback()
                    return
                
                cur.execute("INSERT INTO ...", record)
            
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise
        finally:
            conn.close()

⚠️ Resource Limits: Memory ve CPU limit’lerini veri boyutuna göre ayarlayın. Pandas büyük CSV’leri okurken çok memory kullanır:

resources:
  requests:
    memory: "512Mi"  # Minimum ihtiyaç
    cpu: "500m"
  limits:
    memory: "2Gi"    # Maksimum izin verilen
    cpu: "1000m"

⚠️ Monitoring: activeDeadlineSeconds dolduğunda alert alın:

# Prometheus alert rule örneği
- alert: ETLJobDeadlineExceeded
  expr: kube_job_status_failed{reason="DeadlineExceeded",job_name=~"s3-to-postgres-etl.*"} > 0
  for: 1m
  annotations:
    summary: "ETL Job {{ $labels.job_name }} deadline'ı aştı!"
    description: "activeDeadlineSeconds doldu, işlem yarıda kesildi."

⚠️ Checkpoint Mekanizması: Uzun süren işlemler için checkpoint kullanın. Böylece işlem yarıda kesilse bile kaldığı yerden devam edebilir:

def run(self):
    # Son checkpoint'i oku
    last_checkpoint = self.read_checkpoint_from_s3()
    
    files = self.list_s3_files()
    
    for file in files:
        if file in last_checkpoint['processed_files']:
            continue  # Bu dosya zaten işlendi, atla
        
        # Dosyayı işle
        df = self.read_file(file)
        self.load_to_postgres(df)
        
        # Checkpoint güncelle
        last_checkpoint['processed_files'].append(file)
        self.save_checkpoint_to_s3(last_checkpoint)

Gelişmiş Kullanım Senaryoları

1. Paralel İşleme

Büyük veri setlerini paralel işlemek için:

spec:
  completions: 10
  parallelism: 3

Bu, toplam 10 pod çalıştırır ama aynı anda maksimum 3 pod çalışır.

2. Job Zincirleme

Kubernetes native olarak Job zincirleme desteklemez. Bu işlevsellik gerektiğinde Argo veya Apache Airflow gibi harici araçlarla entegrasyon yapmak en iyisidir [7].

3. Conditional CronJob

CronJob’ları sadece belirli koşullarda çalıştırmak için init container’lar kullanabilirsiniz:

initContainers:
- name: check-condition
  image: busybox
  command: ['sh', '-c', 'if [ condition ]; then exit 0; else exit 1; fi']

Sonuç

Jobs ve CronJobs, Kubernetes ekosisteminde çok güçlü araçlar. Sürekli çalışması gerekmeyen işlemleri yönetmek için mükemmel çözümler sunuyorlar. Veritabanı yedeklemelerinden log rotasyonuna, toplu e-posta gönderiminden veri işlemeye kadar birçok senaryoda kullanabilirsiniz.

Önemli Noktalar:

  • Job’lar tek seferlik görevler içindir
  • CronJob’lar zamanlanmış tekrarlayan görevler içindir
  • backoffLimit ve activeDeadlineSeconds ile hata yönetimi yapın
  • ttlSecondsAfterFinished ile temizlik otomasyonu sağlayın
  • concurrencyPolicy ile eşzamanlılığı kontrol edin
  • İdempotent işlemler yazın
  • Resource limit’leri belirleyin

Artık kendi Job ve CronJob’larınızı oluşturmaya hazırsınız! Herhangi bir sorunla karşılaşırsanız, loglara bakın, event’leri inceleyin ve community’den yardım isteyin.

Umarım bu yazı sizin için faydalı olmuştur. Bir sonraki yazıda görüşmek üzere, çav…

Kaynaklar

[1] Kubernetes. (2025). “CronJob | Kubernetes”. https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/

[2] Kubernetes. (2025). “Jobs | Kubernetes”. https://kubernetes.io/docs/concepts/workloads/controllers/job/

[3] K21Academy. (2025). “Kubernetes Jobs and CronJobs: A beginner’s guide”. https://k21academy.com/docker-kubernetes/k8s-jobs-and-cronjobs/

[4] Spacelift. (2025). “CronJob in Kubernetes – Automating Tasks on a Schedule”. https://spacelift.io/blog/kubernetes-cronjob

[5] Google Cloud. “CronJobs | Google Kubernetes Engine (GKE)”. https://cloud.google.com/kubernetes-engine/docs/how-to/cronjobs

[6] Nulldog. (2025). “Manually Trigger Kubernetes CronJobs & Scheduled Jobs”. https://nulldog.com/manually-trigger-kubernetes-cronjobs-scheduled-jobs

[7] Cronitor. “Kubernetes Cron Job Recipes”. https://cronitor.io/guides/kubernetes-cron-job-recipes

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character