
Merhaba arkadaşlar! Bugün Kubernetes dünyasında çok önemli ama bazen gözden kaçan iki konuya dalacağız: Jobs ve CronJobs. Eğer Kubernetes’i bir süredir kullanıyorsanız, muhtemelen Deployment’lar, ReplicaSet’ler ve StatefulSet’ler gibi pod’larınızı sürekli çalıştıran denetleyicilere (controllers) aşinasınızdır. Peki ya sadece bir kez çalışması gereken işler için ne yapmalıyız? İşte tam da bu noktada Jobs ve CronJobs devreye giriyor!
Neden Her Şey Sürekli Çalışmak Zorunda Değil?
Deployment’lar harika. Pod’larınızı sürekli ayakta tutarlar, bir sorun olduğunda yeniden başlatırlar. Ama şunu düşünelim: veritabanınızın yedeğini (backup) almak istiyorsunuz. Bu işlem saatlerce çalışacak bir şey değil, sadece birkaç dakika sürecek bir görev. Ya da toplu olarak e-posta göndermek istiyorsunuz. Bu işlemler tamamlandıktan sonra pod’un çalışmaya devam etmesinin bir anlamı yok.
Kubernetes Jobs, batch (toplu) görevleri çalıştırmak için tasarlanmıştır ve belirlenen sayıda pod’un başarıyla tamamlanmasını sağlar [1]. İşte tam olarak bu tür senaryolar için Jobs ve CronJobs mükemmel çözümler sunuyor.
Job Nedir ve Nasıl Çalışır?
Kubernetes Job’u, bir görevi başarıyla tamamlamak üzere tasarlanmış özel bir kaynaktır (resource). Job’lar tek seferlik görevleri temsil eder ve tamamlandıktan sonra dururlar [2]. Bir Job oluşturduğunuzda şunlar olur:
Job Yaşam Döngüsü (Job Lifecycle)
- Pod Oluşturma: Job, manifest dosyasında belirttiğiniz image ile bir pod oluşturur
- Hata Yönetimi: Eğer pod çalışırken hata alırsa (bellek, CPU vb. sorunlardan dolayı), Job belirli sayıda yeniden deneme yapar
- Tamamlanma Kontrolü: Pod başarıyla tamamlandığında, Job tamamlanmış olarak işaretlenir
Gelin bu süreci biraz daha detaylandıralım:
backoffLimit (Geri Çekilme Limiti)
Manifest dosyanızda backoffLimit: 3
şeklinde bir ayar yaptığınızı düşünelim. Bu, pod başarısız olursa Job’un maksimum 3 kez yeniden deneme yapacağı anlamına gelir. Üç denemeden sonra hala başarısız olursa, Job “BackoffLimitExceeded” hatası ile başarısız olarak işaretlenir [1].
Eğer hiçbir değer belirtmezseniz, backoffLimit varsayılan olarak 6’dır.
activeDeadlineSeconds (Aktif Süre Limiti)
Bu özellik ile bir Job’un maksimum ne kadar süre çalışabileceğini belirlersiniz. Örneğin activeDeadlineSeconds: 100
derseniz, Job 100 saniyeden fazla çalışırsa şunlar olur:
- Pod Hemen Sonlandırılır: Çalışan tüm pod’lar anında terminate edilir (SIGTERM sinyali gönderilir)
- Job Failed Olarak İşaretlenir: Job’un durumu “Failed” olur ve reason alanında “DeadlineExceeded” hatası görünür
- Veri Kaybı Riski: İşlem ortada kesildiği için veritabanına eksik/tutarsız veri yazılmış olabilir
- Rollback Yok: Otomatik bir rollback mekanizması yoktur, bu yüzden transaction yönetimi kritiktir
Gerçek Hayattan Örnek:
Diyelim ki ETL pipeline’ınız S3’ten 1000 dosya işliyor ve her dosyayı PostgreSQL’e yazıyor. activeDeadlineSeconds: 100 ayarladınız ama 500. dosyayı işlerken süre doldu:
İşlem Adımları: ├─ Dosya 1-499: ✅ Başarıyla PostgreSQL'e yazıldı ├─ Dosya 500: ⏳ İşleniyor... (activeDeadlineSeconds doldu!) └─ Dosya 501-1000: ❌ Hiç işlenmedi
Sonuç: 500 dosya veritabanında, 500 dosya eksik. Bir sonraki çalıştırmada duplikasyon riski var!
Önemli: activeDeadlineSeconds, backoffLimit’ten önceliklidir. Yani süre dolduğunda kaç pod oluşturulduğu, kaç kez retry yapıldığı önemli değildir – Job anında sonlandırılır.
completions (Tamamlanma Sayısı)
Bu parametre ile Job’un pod’u kaç kez başarıyla çalıştırması gerektiğini belirlersiniz. Deployment’lardaki replicas kavramına benzer şekilde çalışır. completions: 3
derseniz, Job toplamda 3 pod başarıyla tamamlanana kadar yeni pod’lar oluşturmaya devam eder.
parallelism (Paralellik)
Varsayılan olarak pod’lar sırayla (sequentially) oluşturulur. Ancak parallelism: 2
ayarı yaparsanız, aynı anda 2 pod çalışabilir. Örneğin, completions: 3 ve parallelism: 2 ayarlarıyla, önce 2 pod başlatılır. Bu 2 pod tamamlandığında, kalan 1 pod oluşturulur [1].
Job Manifest Örneği
Basit bir Job manifest’i şöyle görünür:
apiVersion: batch/v1 kind: Job metadata: name: database-backup spec: backoffLimit: 5 activeDeadlineSeconds: 100 ttlSecondsAfterFinished: 60 template: spec: containers: - name: backup image: mongo:latest command: ["mongodump", "--host=mongodb-service"] restartPolicy: Never
ttlSecondsAfterFinished özelliği de çok kullanışlıdır. Bu ayar ile Job tamamlandıktan sonra otomatik olarak silinme süresi belirlenir ve Job silindiğinde ilişkili pod’lar da silinir [2]. Yukarıdaki örnekte Job tamamlandıktan 60 saniye sonra otomatik olarak silinecektir.
Job’ları Hangi Durumlarda Kullanmalıyız?
Jobs, görüntü işleme veya rapor oluşturma gibi toplu işleri çalıştırmak ve paralellik gerektiren karmaşık görevleri yürütmek için idealdir [3]. İşte bazı yaygın kullanım senaryoları:
1. Veritabanı Yedekleme (Database Backup)
Her gece veritabanınızın yedeğini almak istiyorsunuz. Bu işlem 5-10 dakika sürüyor ve tamamlandığında pod’un çalışmaya devam etmesine gerek yok.
2. Toplu E-posta Gönderimi
Binlerce kullanıcıya newsletter göndermek gibi. İşlem tamamlandığında container’ın çalışmaya devam etmesine gerek yok.
3. Log Rotasyonu (Log Rotation)
Log dosyalarının çok büyümemesini sağlamak ve gerektiğinde geçmiş log verilerine erişim için log rotasyonu yapılabilir [3].
4. Veri İşleme ve Dönüştürme
Log dosyalarını analiz etmek, veriyi parse etmek, dönüştürmek gibi işlemler. Bu tür görevler genellikle belirli aralıklarla veya ihtiyaç duyulduğunda çalıştırılır [3].
CronJob: Zamanlanmış Görevler için Mükemmel Çözüm
Peki ya bu Job’ları düzenli aralıklarla çalıştırmak istersek? İşte burada CronJob devreye giriyor!
CronJob, tekrarlayan bir programa göre tek seferlik Job’lar başlatır ve cron formatında yazılmış bir programa göre periyodik olarak Job çalıştırır [1].
CronJob Nasıl Çalışır?
CronJob’un çalışma mantığı şöyle:
- Siz bir CronJob oluşturursunuz ve cron expression (cron ifadesi) belirlersiniz
- Kubernetes, belirlenen zamanlarda otomatik olarak Job oluşturur
- Her Job kendi yaşam döngüsünü tamamlar
- Bir sonraki zamanlanmış saatte yeni bir Job oluşturulur
Cron Expression (Cron İfadesi) Anlayalım
Cron ifadesi, boşluklarla ayrılmış beş giriş kullanılarak tanımlanır: dakika, saat, ayın günü, ay ve haftanın günü [4]. İşte format:
# ┌───────────── dakika (0 - 59) # │ ┌───────────── saat (0 - 23) # │ │ ┌───────────── ayın günü (1 - 31) # │ │ │ ┌───────────── ay (1 - 12) # │ │ │ │ ┌───────────── haftanın günü (0 - 6) # │ │ │ │ │ # * * * * *
Örnekler:
* * * * *
→ Her dakika0 0 * * *
→ Her gün gece yarısı (00:00)0 2 * * *
→ Her gün saat 02:00’de*/5 * * * *
→ Her 5 dakikada bir0 9 * * 1
→ Her Pazartesi saat 09:00’da30 14 1 * *
→ Her ayın 1. günü saat 14:30’da
Cron expression’larınızı test etmek için crontab.guru gibi online araçları kullanabilirsiniz [4].
CronJob Manifest Örneği
apiVersion: batch/v1 kind: CronJob metadata: name: database-backup-cron spec: schedule: "0 0 * * *" # Her gece yarısı concurrencyPolicy: Forbid successfulJobsHistoryLimit: 2 failedJobsHistoryLimit: 1 jobTemplate: spec: backoffLimit: 3 template: spec: containers: - name: backup image: mongo:latest command: ["mongodump", "--host=mongodb-service"] restartPolicy: Never
CronJob’un Önemli Özellikleri
1. concurrencyPolicy (Eşzamanlılık Politikası)
Concurrency policy, aynı anda kaç tane eşzamanlı Job instance’ının çalışmasına izin verileceğini kontrol eder [1]. Üç seçenek var:
- Allow: Birden fazla Job aynı anda çalışabilir
- Forbid (varsayılan): Bir Job çalışırken yeni Job başlatılmaz
- Replace: Çalışan Job durdurulur ve yeni Job başlatılır
Örneğin, yedekleme işleminiz 10 dakika sürüyor ama CronJob her 5 dakikada bir çalışacak şekilde ayarlanmış. Forbid
politikası ile önceki yedekleme bitmeden yeni bir Job başlatılmaz.
2. successfulJobsHistoryLimit ve failedJobsHistoryLimit
Varsayılan olarak successfulJobsHistoryLimit 3 ve failedJobsHistoryLimit 1 olarak ayarlanır [5]. Bu ayarlar ile ne kadar Job geçmişinin saklanacağını belirlersiniz. Eski Job’lar otomatik olarak silinir.
3. startingDeadlineSeconds
Bu alan, Job’un zamanlanmış saatini kaçırması durumunda başlamak için maksimum süreyi saniye cinsinden belirtir ve bu süreden sonra Job başlatılmaz [4]. Örneğin startingDeadlineSeconds: 100
derseniz, zamanlanmış saatten 100 saniye sonrasına kadar Job başlatılabilir. Bu süre geçerse “missed” olarak işaretlenir.
4. suspend (Askıya Alma)
Suspend alanı true olarak ayarlandığında yeni Job’ların çalışmasını engeller ancak mevcut çalışmaların bitmesine izin verir [1]. Bu özellik çok kullanışlıdır. Örneğin geçici olarak CronJob’u durdurmak istiyorsanız:
kubectl patch cronjob database-backup-cron -p '{"spec":{"suspend":true}}'
Tekrar aktif etmek için:
kubectl patch cronjob database-backup-cron -p '{"spec":{"suspend":false}}'
Pratik Örnek: MongoDB Yedekleme ile Öğrenelim
Hadi teoriden pratiğe geçelim ve gerçek bir senaryo üzerinden Job ve CronJob’u öğrenelim.
Adım 1: MongoDB Job ile Tek Seferlik Yedekleme
Diyelim ki StatefulSet ile çalışan bir MongoDB’niz var. Şimdi bu veritabanının yedeğini Job ile alalım:
apiVersion: batch/v1 kind: Job metadata: name: mongodb-backup-job spec: backoffLimit: 5 activeDeadlineSeconds: 100 ttlSecondsAfterFinished: 60 template: spec: containers: - name: backup image: mongo:latest command: - mongodump - --host=mongodb-0.mongodb-service.default.svc.cluster.local:27017 - --username=admin - --password=secret123 - --out=/backup/dump volumeMounts: - name: backup-volume mountPath: /backup volumes: - name: backup-volume persistentVolumeClaim: claimName: backup-pvc restartPolicy: Never
Job’u çalıştıralım:
kubectl apply -f mongodb-backup-job.yaml
Job’u kontrol edelim:
kubectl get jobs
Pod’ları görelim:
kubectl get pods
Job başarıyla tamamlandıktan sonra, ttlSecondsAfterFinished: 60 ayarı sayesinde 60 saniye sonra otomatik olarak silinecektir.
Adım 2: CronJob ile Düzenli Yedekleme
Şimdi bu yedeklemeyi her gece saat 02:00’de otomatik çalışacak şekilde ayarlayalım:
apiVersion: batch/v1 kind: CronJob metadata: name: mongodb-backup-cron spec: schedule: "0 2 * * *" # Her gece saat 02:00 concurrencyPolicy: Forbid successfulJobsHistoryLimit: 2 failedJobsHistoryLimit: 1 jobTemplate: spec: backoffLimit: 3 template: spec: containers: - name: backup image: mongo:latest command: - mongodump - --host=mongodb-0.mongodb-service.default.svc.cluster.local:27017 - --username=admin - --password=secret123 - --out=/backup/dump volumeMounts: - name: backup-volume mountPath: /backup volumes: - name: backup-volume persistentVolumeClaim: claimName: backup-pvc restartPolicy: Never
CronJob’u oluşturalım:
kubectl apply -f mongodb-backup-cron.yaml
CronJob’u kontrol edelim:
kubectl get cronjobs # veya kısa hali kubectl get cj
Oluşturulan Job’ları görmek için:
kubectl get jobs --watch
CronJob Yönetimi için Kullanışlı Komutlar
CronJob’u Manuel Çalıştırma
Bazen bir CronJob’u zamanlanmış saatinin dışında hemen çalıştırmanız gerekebilir ve bunun için CronJob’un mevcut yapılandırmasını kullanarak tek seferlik bir Job oluşturabilirsiniz [6]. Şöyle yapabilirsiniz:
# CronJob'un YAML'ini alın kubectl get cronjob mongodb-backup-cron -o yaml > temp-job.yaml # YAML'i düzenleyin (kind: CronJob -> kind: Job, schedule kaldırın) # Sonra Job'u oluşturun kubectl create -f temp-job.yaml
CronJob’u Geçici Olarak Durdurma
# Suspend etmek için kubectl patch cronjob mongodb-backup-cron -p '{"spec":{"suspend":true}}' # Tekrar aktif etmek için kubectl patch cronjob mongodb-backup-cron -p '{"spec":{"suspend":false}}'
CronJob Loglarını İnceleme
# Son çalışan Job'u bulun kubectl get jobs -l cron-job-name=mongodb-backup-cron # O Job'un loglarını görün kubectl logs job/mongodb-backup-cron-1234567890
CronJob’u Silme
kubectl delete cronjob mongodb-backup-cron
CronJob silindiğinde, oluşturduğu tüm Job’lar ve pod’lar da silinir ve yeni Job’lar oluşturulmaz [5].
Job vs CronJob: Hangisini Ne Zaman Kullanmalı?
Job Kullanın:
- Tek seferlik görevler için
- Hemen çalışması gereken işlemler için
- Başka bir sistem tarafından tetiklenecek görevler için
- Test ve geliştirme sırasında
CronJob Kullanın:
- Düzenli tekrarlayan görevler için
- Zamanlanmış yedeklemeler için
- Periyodik raporlama için
- Düzenli temizlik işlemleri için
En İyi Pratikler ve İpuçları
1. Daima İdempotent İşlemler Yazın
Kubernetes, belirli durumlarda iki Job oluşturulabileceği veya hiç Job oluşturulmayabileceği için Job’ların idempotent olması gerekir [1]. Yani işleminiz birden fazla kez çalışsa bile aynı sonucu vermeli.
2. ttlSecondsAfterFinished Kullanın
Yönetilmeyen Job’lar, tamamlandıktan sonra pod’ları bıraktığı için ttlSecondsAfterFinished alanını ayarlamak önerilir [2]. Bu, cluster’ınızı temiz tutar.
3. Resource Limit’leri Belirleyin
Her Job için CPU ve memory limit’leri belirleyin:
resources: limits: memory: "256Mi" cpu: "500m" requests: memory: "128Mi" cpu: "250m"
4. Hata Durumlarını Yönetin
backoffLimit ve activeDeadlineSeconds ayarlarını mutlaka yapın. Aksi halde Job sonsuza kadar retry yapabilir.
5. Log’ları Saklamak İçin External Storage Kullanın
Job tamamlandığında pod silineceği için log’lar kaybolur. Önemli log’ları external storage’a yönlendirin.
6. Monitoring ve Alerting Kurun
CronJob monitoring için CronJob’ların aktivitelerini izlemek ve hatalarına yanıt vermek önemlidir [7]. Prometheus ve Grafana gibi araçlarla Job’larınızı izleyin.
Yaygın Hatalar ve Sorun Giderme
1. BackoffLimitExceeded Hatası
Bu hata, Job’un belirlenen retry sayısını aştığı anlamına gelir. Çözüm:
- Pod loglarını kontrol edin:
kubectl logs <pod-name>
- Event’leri inceleyin:
kubectl describe job <job-name>
- backoffLimit değerini artırın veya uygulama kodunuzu düzeltin
2. DeadlineExceeded Hatası
Job, activeDeadlineSeconds süresini aştı. Çözüm:
- İşleminiz gerçekten çok mu uzun sürüyor?
- activeDeadlineSeconds değerini artırın
- Paralellik ekleyerek işlemi hızlandırın
3. CronJob Çalışmıyor
Kontrol edilecekler:
- Cron expression doğru mu? crontab.guru’da test edin
- CronJob suspend durumda mı?
kubectl get cj
ile kontrol edin - startingDeadlineSeconds çok kısa mı?
4. Çok Fazla Pod Oluşuyor
CronJob geçmişi periyodik garbage collection’a tabidir ve gördüğünüz giriş sayısı beklenenden fazla olabilir [5]. successfulJobsHistoryLimit ve failedJobsHistoryLimit ayarlarını düşürün.
Gerçek Dünya Örneği: S3’ten PostgreSQL’e ETL Pipeline
Şimdi çok yaygın bir senaryoya bakalım: S3’te saklanan veriyi okuyup, dönüştürüp PostgreSQL’e yazan bir ETL (Extract-Transform-Load) pipeline’ı. Birçok şirket bu tür ETL süreçlerini Kubernetes CronJob’ları ile başarıyla uygulamaktadır.
Senaryo
Diyelim ki:
- Her saat başı S3 bucket’ınıza yeni CSV dosyaları geliyor
- Bu dosyalar müşteri aktivite logları içeriyor
- Bu verileri okuyup, temizleyip, zenginleştirip PostgreSQL’e yazmanız gerekiyor
- Bu işlem her saat başında otomatik olarak çalışmalı
Neden CronJob İdeal Bir Çözüm?
✅ Periyodik Görev: Saatte bir çalışması gerekiyor – CronJob için ideal ✅ Kısa Ömürlü İşlem: Veri işleme tamamlandıktan sonra pod’un çalışmaya devam etmesine gerek yok ✅ Kaynak Verimliliği: Deployment gibi sürekli çalışan bir pod yerine, sadece ihtiyaç olduğunda kaynak kullanılır ✅ Hata Yönetimi: backoffLimit ile otomatik retry mekanizması ✅ İzlenebilirlik: Her çalıştırma için ayrı Job ve pod oluşur, log’ları takip etmek kolay
Python ETL Uygulaması
Önce basit bir Python ETL uygulaması yazalım:
# etl_s3_to_postgres.py import boto3 import pandas as pd from sqlmodel import SQLModel, Field, create_engine, Session, select from typing import Optional from datetime import datetime import os import logging import signal import sys logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # SQLModel ile model tanımla class CustomerEvent(SQLModel, table=True): __tablename__ = "customer_events" id: Optional[int] = Field(default=None, primary_key=True) customer_id: int = Field(index=True) event_type: str = Field(max_length=100) event_time: datetime = Field(index=True) event_data: Optional[str] = None # JSON string olarak saklanacak processed_at: datetime = Field(default_factory=datetime.now) class Config: # unique constraint için table_args = ( {'extend_existing': True}, ) class S3ToPostgresETL: def __init__(self): self.shutdown_requested = False signal.signal(signal.SIGTERM, self.handle_shutdown) # S3 yapılandırması self.s3_client = boto3.client('s3', aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), region_name=os.getenv('AWS_REGION', 'eu-west-1') ) self.bucket_name = os.getenv('S3_BUCKET_NAME') self.s3_prefix = os.getenv('S3_PREFIX', 'customer-logs/') # PostgreSQL yapılandırması - SQLModel engine database_url = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT', '5432')}/{os.getenv('POSTGRES_DB')}" self.engine = create_engine(database_url, echo=False) # Tabloları oluştur SQLModel.metadata.create_all(self.engine) def handle_shutdown(self, signum, frame): """SIGTERM sinyalini yakala ve temiz kapanış yap""" logger.warning("SIGTERM alındı! Temiz kapanış başlatılıyor...") self.shutdown_requested = True sys.exit(0) def extract_from_s3(self): """S3'ten son 1 saatteki dosyaları oku""" logger.info(f"S3'ten veri çekiliyor: {self.bucket_name}/{self.s3_prefix}") # Son 1 saatteki dosyaları listele current_hour = datetime.now().strftime('%Y-%m-%d-%H') prefix = f"{self.s3_prefix}{current_hour}" response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=prefix ) dataframes = [] if 'Contents' in response: for obj in response['Contents']: if self.shutdown_requested: logger.info("Shutdown request - extract durduruluyor") break key = obj['Key'] logger.info(f"Dosya okunuyor: {key}") # CSV dosyasını oku obj_data = self.s3_client.get_object(Bucket=self.bucket_name, Key=key) df = pd.read_csv(obj_data['Body']) dataframes.append(df) if not dataframes: logger.warning("İşlenecek dosya bulunamadı") return pd.DataFrame() # Tüm dataframe'leri birleştir combined_df = pd.concat(dataframes, ignore_index=True) logger.info(f"Toplam {len(combined_df)} satır okundu") return combined_df def transform(self, df): """Veriyi dönüştür ve temizle""" if df.empty: return df logger.info("Veri dönüştürme başlıyor...") # Null değerleri temizle df = df.dropna() # Timestamp ekle df['processed_at'] = datetime.now() # Duplikaları kaldır df = df.drop_duplicates(subset=['customer_id', 'event_time']) # Veri tipleri dönüşümü df['customer_id'] = df['customer_id'].astype(int) df['event_time'] = pd.to_datetime(df['event_time']) # event_data JSON string'e çevir (eğer dict ise) if 'event_data' in df.columns: df['event_data'] = df['event_data'].apply( lambda x: str(x) if pd.notna(x) else None ) logger.info(f"Dönüştürme tamamlandı. Temiz veri: {len(df)} satır") return df def load_to_postgres(self, df): """SQLModel kullanarak veriyi PostgreSQL'e yükle""" if df.empty: logger.info("Yüklenecek veri yok") return logger.info("PostgreSQL'e veri yükleniyor (SQLModel)...") # Session oluştur with Session(self.engine) as session: try: inserted_count = 0 skipped_count = 0 # DataFrame'i CustomerEvent objelerine dönüştür records = df.to_dict('records') for i, record in enumerate(records): # Her 100 kayıtta bir shutdown kontrolü if i % 100 == 0 and self.shutdown_requested: logger.info("Shutdown request - load durduruluyor") session.rollback() return # Aynı event var mı kontrol et (duplicate check) statement = select(CustomerEvent).where( CustomerEvent.customer_id == record['customer_id'], CustomerEvent.event_time == record['event_time'] ) existing_event = session.exec(statement).first() if existing_event: skipped_count += 1 continue # Yeni event oluştur event = CustomerEvent( customer_id=record['customer_id'], event_type=record['event_type'], event_time=record['event_time'], event_data=record.get('event_data'), processed_at=record['processed_at'] ) session.add(event) inserted_count += 1 # Her 500 kayıtta bir commit (batch processing) if inserted_count % 500 == 0: session.commit() logger.info(f"Progress: {inserted_count} kayıt eklendi...") # Kalan kayıtları commit et session.commit() logger.info(f"✅ Başarıyla {inserted_count} kayıt eklendi, {skipped_count} duplicate atlandı") except Exception as e: session.rollback() logger.error(f"❌ PostgreSQL yükleme hatası: {e}") raise def load_to_postgres_bulk(self, df): """Daha hızlı bulk insert için alternatif method""" if df.empty: logger.info("Yüklenecek veri yok") return logger.info("PostgreSQL'e bulk insert yapılıyor...") with Session(self.engine) as session: try: # DataFrame'deki tüm kayıtları CustomerEvent objelerine çevir events = [ CustomerEvent( customer_id=row['customer_id'], event_type=row['event_type'], event_time=row['event_time'], event_data=row.get('event_data'), processed_at=row['processed_at'] ) for _, row in df.iterrows() ] # Bulk insert - çok daha hızlı ama duplicate check yok session.bulk_save_objects(events) session.commit() logger.info(f"✅ Bulk insert: {len(events)} kayıt eklendi") except Exception as e: session.rollback() logger.error(f"❌ Bulk insert hatası: {e}") raise def get_statistics(self): """SQLModel query örneği - istatistik al""" with Session(self.engine) as session: # Bugün işlenen toplam kayıt sayısı today = datetime.now().date() statement = select(CustomerEvent).where( CustomerEvent.processed_at >= today ) today_count = len(session.exec(statement).all()) logger.info(f"📊 Bugün işlenen kayıt: {today_count}") return today_count def run(self): """ETL pipeline'ını çalıştır""" try: logger.info("🚀 ETL süreci başlatıldı") start_time = datetime.now() # Extract df = self.extract_from_s3() if self.shutdown_requested: return # Transform df = self.transform(df) if self.shutdown_requested: return # Load (duplicate check ile - daha güvenli) self.load_to_postgres(df) # Alternatif: Bulk insert (daha hızlı, duplicate check yok) # self.load_to_postgres_bulk(df) # İstatistik göster self.get_statistics() duration = (datetime.now() - start_time).total_seconds() logger.info(f"✅ ETL süreci tamamlandı. Süre: {duration:.2f} saniye") except Exception as e: logger.error(f"❌ ETL hatası: {e}") raise if __name__ == "__main__": etl = S3ToPostgresETL() etl.run()
Dockerfile
FROM python:3.12-slim # Metadata LABEL maintainer="your-email@example.com" LABEL description="S3 to PostgreSQL ETL Pipeline" # Çalışma dizini WORKDIR /app # Sistem bağımlılıklarını kur (psycopg2 için gerekli) RUN apt-get update && apt-get install -y \ libpq-dev \ gcc \ && rm -rf /var/lib/apt/lists/* # Python bağımlılıklarını önce kopyala (Docker cache için) COPY requirements.txt . # Bağımlılıkları kur RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir -r requirements.txt # ETL script'ini kopyala COPY etl_s3_to_postgres.py . # Root olmayan bir user oluştur (güvenlik için) RUN useradd -m -u 1000 etluser && \ chown -R etluser:etluser /app # User'ı değiştir USER etluser # Health check (opsiyonel) HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD python -c "import sys; sys.exit(0)" || exit 1 # Container başlatıldığında ETL'i çalıştır CMD ["python", "-u", "etl_s3_to_postgres.py"]
Multi-Stage Build ile Optimize Edilmiş Dockerfile (Önerilen)
# Builder stage - compile dependencies FROM python:3.12-slim AS builder WORKDIR /app # Sistem bağımlılıklarını kur RUN apt-get update && apt-get install -y \ libpq-dev \ gcc \ && rm -rf /var/lib/apt/lists/* # Python bağımlılıklarını kur COPY requirements.txt . RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir --user -r requirements.txt # Runtime stage - minimal image FROM python:3.12-slim LABEL maintainer="your-email@example.com" LABEL description="S3 to PostgreSQL ETL Pipeline - Optimized" WORKDIR /app # Runtime bağımlılıkları RUN apt-get update && apt-get install -y \ libpq5 \ && rm -rf /var/lib/apt/lists/* # Builder stage'den Python paketlerini kopyala COPY --from=builder /root/.local /root/.local # PATH'e ekle ENV PATH=/root/.local/bin:$PATH # ETL script'ini kopyala COPY etl_s3_to_postgres.py . # Root olmayan user oluştur RUN useradd -m -u 1000 etluser && \ chown -R etluser:etluser /app USER etluser # Unbuffered Python output için (loglar için önemli) ENV PYTHONUNBUFFERED=1 CMD ["python", "-u", "etl_s3_to_postgres.py"]
.dockerignore (Build’i hızlandırır)
# Python __pycache__/ *.py[cod] *$py.class *.so .Python env/ venv/ ENV/ build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # IDE .vscode/ .idea/ *.swp *.swo *~ # Git .git/ .gitignore # Documentation README.md *.md # Test tests/ .pytest_cache/ .coverage htmlcov/ # CI/CD .github/ .gitlab-ci.yml # Docker Dockerfile* docker-compose*.yml .dockerignore # Kubernetes *.yaml *.yml
requirements.txt
boto3==1.40.47 pandas==2.3.3 sqlmodel==0.0.25 psycopg2-binary==2.9.10
Kubernetes CronJob Manifest
apiVersion: batch/v1 kind: CronJob metadata: name: s3-to-postgres-etl namespace: data-pipeline spec: # Her saat başında çalış schedule: "0 * * * *" # Önceki Job bitmeden yenisi başlamasın concurrencyPolicy: Forbid # Son 3 başarılı ve 1 başarısız Job'u sakla successfulJobsHistoryLimit: 3 failedJobsHistoryLimit: 1 # Zamanlanmış saatten 300 saniye sonrasına kadar başlatılabilir startingDeadlineSeconds: 300 jobTemplate: spec: # Maksimum 2 saat çalışabilir activeDeadlineSeconds: 7200 # Başarısız olursa 3 kez dene backoffLimit: 3 # Job tamamlandıktan 1 saat sonra otomatik sil ttlSecondsAfterFinished: 3600 template: metadata: labels: app: s3-postgres-etl version: v1 spec: restartPolicy: Never containers: - name: etl-worker image: your-registry/s3-postgres-etl:v1.0 imagePullPolicy: IfNotPresent env: # AWS yapılandırması - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-credentials key: access-key-id - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-credentials key: secret-access-key - name: AWS_REGION value: "eu-west-1" - name: S3_BUCKET_NAME value: "customer-data-lake" - name: S3_PREFIX value: "raw-logs/customer-events/" # PostgreSQL yapılandırması - name: POSTGRES_HOST value: "postgresql-service.database.svc.cluster.local" - name: POSTGRES_PORT value: "5432" - name: POSTGRES_DB value: "analytics" - name: POSTGRES_USER valueFrom: secretKeyRef: name: postgres-credentials key: username - name: POSTGRES_PASSWORD valueFrom: secretKeyRef: name: postgres-credentials key: password resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" # Eğer AWS IAM Roles for Service Accounts (IRSA) kullanıyorsanız serviceAccountName: s3-postgres-etl-sa --- # Secret'ları oluşturmak için (gerçek değerleri kullanın!) apiVersion: v1 kind: Secret metadata: name: aws-credentials namespace: data-pipeline type: Opaque stringData: access-key-id: "YOUR_AWS_ACCESS_KEY" secret-access-key: "YOUR_AWS_SECRET_KEY" --- apiVersion: v1 kind: Secret metadata: name: postgres-credentials namespace: data-pipeline type: Opaque stringData: username: "etl_user" password: "super_secret_password"
Deploy Etme
# Namespace oluştur kubectl create namespace data-pipeline # Docker image'ı build et (basit versiyon) docker build -t your-registry/s3-postgres-etl:v1.0 . # Veya multi-stage build ile (önerilen - daha küçük image) docker build -f Dockerfile -t your-registry/s3-postgres-etl:v1.0 . # Image boyutunu kontrol et docker images | grep s3-postgres-etl # Image'ı push et docker push your-registry/s3-postgres-etl:v1.0 # Alternatif: Docker Hub kullanıyorsanız docker login docker tag your-registry/s3-postgres-etl:v1.0 username/s3-postgres-etl:v1.0 docker push username/s3-postgres-etl:v1.0 # CronJob'u deploy et kubectl apply -f s3-postgres-etl-cronjob.yaml # CronJob'u kontrol et kubectl get cronjob -n data-pipeline # Detaylı bilgi kubectl describe cronjob s3-to-postgres-etl -n data-pipeline # Manuel test için hemen çalıştır kubectl create job --from=cronjob/s3-to-postgres-etl manual-test-1 -n data-pipeline # Job'un durumunu izle kubectl get jobs -n data-pipeline --watch # Pod'ları listele kubectl get pods -n data-pipeline # Log'ları izle (pod adını yukarıdan al) kubectl logs -f <pod-name> -n data-pipeline # Veya job'dan direkt kubectl logs -f job/manual-test-1 -n data-pipeline # Hata ayıklama için pod'a gir (eğer hala çalışıyorsa) kubectl exec -it <pod-name> -n data-pipeline -- /bin/bash # CronJob'u geçici olarak durdur kubectl patch cronjob s3-to-postgres-etl -n data-pipeline \ -p '{"spec":{"suspend":true}}' # Tekrar aktif et kubectl patch cronjob s3-to-postgres-etl -n data-pipeline \ -p '{"spec":{"suspend":false}}' # CronJob'u sil (tüm job'lar ve pod'lar silinir) kubectl delete cronjob s3-to-postgres-etl -n data-pipeline
Local’de Test Etme (Docker Compose ile)
Production’a deploy etmeden önce local’de test edin:
# docker-compose.yml version: '3.8' services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: analytics POSTGRES_USER: etl_user POSTGRES_PASSWORD: test_password ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data etl: build: . depends_on: - postgres environment: AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} AWS_REGION: eu-west-1 S3_BUCKET_NAME: test-bucket S3_PREFIX: test-data/ POSTGRES_HOST: postgres POSTGRES_PORT: 5432 POSTGRES_DB: analytics POSTGRES_USER: etl_user POSTGRES_PASSWORD: test_password volumes: postgres_data:
# Local test docker-compose up --build # Sadece ETL çalıştır docker-compose run --rm etl # Temizle docker-compose down -v
Monitoring ve Alerting
ETL pipeline’ınızı izlemek için Prometheus metrics ekleyin:
# etl_s3_to_postgres.py içine eklenecek from prometheus_client import Counter, Histogram, Gauge, push_to_gateway import time # Metrics tanımla etl_runs_total = Counter('etl_runs_total', 'Total ETL runs') etl_failures_total = Counter('etl_failures_total', 'Total ETL failures') etl_duration_seconds = Histogram('etl_duration_seconds', 'ETL duration') etl_rows_processed = Gauge('etl_rows_processed', 'Rows processed in last run') def run(self): etl_runs_total.inc() start_time = time.time() try: df = self.extract_from_s3() df = self.transform(df) self.load_to_postgres(df) etl_rows_processed.set(len(df)) etl_duration_seconds.observe(time.time() - start_time) # Metrics'i Pushgateway'e gönder push_to_gateway('pushgateway:9091', job='s3-postgres-etl', registry=...) except Exception as e: etl_failures_total.inc() raise
Bu Yaklaşımın Avantajları
1. Maliyet Optimizasyonu: Deployment’tan %95 daha az kaynak kullanır çünkü sadece saatte 1 kez 5-10 dakika çalışır
2. Basitlik: Her ETL tap’ı ayrı container’da olduğu için yeni pipeline’lar oluşturmak ve deploy etmek kolaydır
3. İzlenebilirlik: Her çalıştırma için ayrı Job ve pod oluşur, debug ve monitoring kolay
4. Esneklik: Schedule’ı değiştirmek sadece bir cron expression değişikliği
5. Güvenilirlik: Kubernetes’in built-in retry ve error handling mekanizmaları
Dikkat Edilmesi Gerekenler
⚠️ İdempotency: ETL kodunuz aynı veriyi birden fazla kez işlese bile aynı sonucu vermeli
⚠️ State Management: Hangi dosyaların işlendiğini takip etmek için S3’te veya veritabanında state bilgisi saklayın
⚠️ Timeout Ayarı Kritik: activeDeadlineSeconds çok kısa olursa işlem yarıda kesilir! ETL pipeline’ınızın ortalama çalışma süresini ölçün ve en az %50 fazlasını ayarlayın. Örneğin ortalama 30 dakika sürüyorsa, 45-60 dakika (2700-3600 saniye) ayarlayın:
spec: jobTemplate: spec: # 1 saat = 3600 saniye activeDeadlineSeconds: 3600
⚠️ Transaction Yönetimi: PostgreSQL’e yazarken SQLModel’in Session yönetimi otomatik transaction sağlar. Hata durumunda rollback yapılır:
def load_to_postgres(self, df): """SQLModel Session otomatik transaction yönetimi sağlar""" with Session(self.engine) as session: try: # Session otomatik olarak transaction başlatır for record in df.to_dict('records'): event = CustomerEvent(**record) session.add(event) # Her şey başarılıysa commit session.commit() logger.info("✅ Transaction başarıyla commit edildi") except Exception as e: # Hata varsa otomatik rollback session.rollback() logger.error(f"❌ Hata! Transaction rollback edildi: {e}") raise
⚠️ Graceful Shutdown: Python uygulamanıza SIGTERM sinyali geldiğinde temiz bir şekilde kapanmasını sağlayın:
import signal import sys class S3ToPostgresETL: def __init__(self): self.shutdown_requested = False # SIGTERM sinyalini yakala signal.signal(signal.SIGTERM, self.handle_shutdown) def handle_shutdown(self, signum, frame): logger.warning("SIGTERM alındı! Temiz kapanış başlatılıyor...") self.shutdown_requested = True # Mevcut transaction'ı rollback et if hasattr(self, 'current_conn'): self.current_conn.rollback() self.current_conn.close() sys.exit(0) def load_to_postgres(self, df): conn = psycopg2.connect(**self.pg_config) self.current_conn = conn # Shutdown handler için try: for i, record in enumerate(df.to_dict('records')): # Her 100 kayıtta bir shutdown kontrolü if i % 100 == 0 and self.shutdown_requested: logger.info("Shutdown request - işlem durduruluyor") conn.rollback() return cur.execute("INSERT INTO ...", record) conn.commit() except Exception as e: conn.rollback() raise finally: conn.close()
⚠️ Resource Limits: Memory ve CPU limit’lerini veri boyutuna göre ayarlayın. Pandas büyük CSV’leri okurken çok memory kullanır:
resources: requests: memory: "512Mi" # Minimum ihtiyaç cpu: "500m" limits: memory: "2Gi" # Maksimum izin verilen cpu: "1000m"
⚠️ Monitoring: activeDeadlineSeconds dolduğunda alert alın:
# Prometheus alert rule örneği - alert: ETLJobDeadlineExceeded expr: kube_job_status_failed{reason="DeadlineExceeded",job_name=~"s3-to-postgres-etl.*"} > 0 for: 1m annotations: summary: "ETL Job {{ $labels.job_name }} deadline'ı aştı!" description: "activeDeadlineSeconds doldu, işlem yarıda kesildi."
⚠️ Checkpoint Mekanizması: Uzun süren işlemler için checkpoint kullanın. Böylece işlem yarıda kesilse bile kaldığı yerden devam edebilir:
def run(self): # Son checkpoint'i oku last_checkpoint = self.read_checkpoint_from_s3() files = self.list_s3_files() for file in files: if file in last_checkpoint['processed_files']: continue # Bu dosya zaten işlendi, atla # Dosyayı işle df = self.read_file(file) self.load_to_postgres(df) # Checkpoint güncelle last_checkpoint['processed_files'].append(file) self.save_checkpoint_to_s3(last_checkpoint)
Gelişmiş Kullanım Senaryoları
1. Paralel İşleme
Büyük veri setlerini paralel işlemek için:
spec: completions: 10 parallelism: 3
Bu, toplam 10 pod çalıştırır ama aynı anda maksimum 3 pod çalışır.
2. Job Zincirleme
Kubernetes native olarak Job zincirleme desteklemez. Bu işlevsellik gerektiğinde Argo veya Apache Airflow gibi harici araçlarla entegrasyon yapmak en iyisidir [7].
3. Conditional CronJob
CronJob’ları sadece belirli koşullarda çalıştırmak için init container’lar kullanabilirsiniz:
initContainers: - name: check-condition image: busybox command: ['sh', '-c', 'if [ condition ]; then exit 0; else exit 1; fi']
Sonuç
Jobs ve CronJobs, Kubernetes ekosisteminde çok güçlü araçlar. Sürekli çalışması gerekmeyen işlemleri yönetmek için mükemmel çözümler sunuyorlar. Veritabanı yedeklemelerinden log rotasyonuna, toplu e-posta gönderiminden veri işlemeye kadar birçok senaryoda kullanabilirsiniz.
Önemli Noktalar:
- Job’lar tek seferlik görevler içindir
- CronJob’lar zamanlanmış tekrarlayan görevler içindir
- backoffLimit ve activeDeadlineSeconds ile hata yönetimi yapın
- ttlSecondsAfterFinished ile temizlik otomasyonu sağlayın
- concurrencyPolicy ile eşzamanlılığı kontrol edin
- İdempotent işlemler yazın
- Resource limit’leri belirleyin
Artık kendi Job ve CronJob’larınızı oluşturmaya hazırsınız! Herhangi bir sorunla karşılaşırsanız, loglara bakın, event’leri inceleyin ve community’den yardım isteyin.
Umarım bu yazı sizin için faydalı olmuştur. Bir sonraki yazıda görüşmek üzere, çav…
Kaynaklar
[1] Kubernetes. (2025). “CronJob | Kubernetes”. https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
[2] Kubernetes. (2025). “Jobs | Kubernetes”. https://kubernetes.io/docs/concepts/workloads/controllers/job/
[3] K21Academy. (2025). “Kubernetes Jobs and CronJobs: A beginner’s guide”. https://k21academy.com/docker-kubernetes/k8s-jobs-and-cronjobs/
[4] Spacelift. (2025). “CronJob in Kubernetes – Automating Tasks on a Schedule”. https://spacelift.io/blog/kubernetes-cronjob
[5] Google Cloud. “CronJobs | Google Kubernetes Engine (GKE)”. https://cloud.google.com/kubernetes-engine/docs/how-to/cronjobs
[6] Nulldog. (2025). “Manually Trigger Kubernetes CronJobs & Scheduled Jobs”. https://nulldog.com/manually-trigger-kubernetes-cronjobs-scheduled-jobs
[7] Cronitor. “Kubernetes Cron Job Recipes”. https://cronitor.io/guides/kubernetes-cron-job-recipes