Apache Spark, Apache Airflow, Delta Lake ve MinIO ile ETL Çalışması
Veri odaklı kuruluşlarda, çeşitli ham ve karmaşık verilerden içgörü elde etmek için gerektiğinde büyük miktarda verinin düzenlenmesi, basitleştirilmesi veya zenginleştirilmesi gerekir.
ETL (Çıkart, Dönüştür, Yükle) konsepti, büyük ölçekli verileri parçalara ayırır ve veri bilimcilerin/analistlerin verilere erişmesini, verileri analiz etmesini ve bunları iş zekasına dönüştürmesini kolaylaştırır. Aynı zamanda ETL, belirli bir veri hattı (pipeline) türünü ifade eder. Genel olarak tanımlamak gerekirse, bir veri hattı, genellikle verilerin alınmasıyla başlayan ve her adımın bir sonraki adıma girdi olan bir çıktı sağladığı bir dizi veri işleme adımıdır.
Bu yazımızda yukarıda özetlediğimiz ETL sürecini Spark, Airflow, Delta Lake ve MinIO araçlarını kullanarak örnek bir çalışmada deneyimleyeceğiz. Hazırsak başlayalım mı?🔎
Çalışma Açıklaması | Movies & Credits Veri Seti
Bu örnek çalışmada, veri analistlerinin film verileriyle ilgili bazı gelişmiş analizlere ihtiyacı vardır. Aşağıdaki sorulara rahatlıkla yanıt verebilmeleri için, ham verileri işleyerek SQL ile kolayca sorgulayabilecekleri hale getirmemiz beklenmektedir.
- Sam Worthington’ın oynadığı, en çok hasılat yapan film hangisidir?
- Film türleri ile gişe hasılatı arasındaki ilişki nedir?
- Çıkış tarihleri ile gelir arasındaki ilişki nedir?
- Bir yönetmen hep aynı ekiple mi çalışır?
- Bir oyuncu, kariyeri boyunca hangi film setlerinde ve rollerde yer almıştır?
Bunu sağlamak için veri kaynağından ham verileri alacağız, dönüşüm işlemlerini yaparak “expected_outputs.ipynb” dosyasındaki formatlara uygun hale getireceğiz, başka bir hedef kaynağa kaydedeceğiz ve süreklilik için bu işlemleri otomatize edeceğiz. 😎
Çalışma Ortamı Hazırlığı
Aşağıdaki araçların kurulumlarını gerçekleştirdiğimiz varsayımı ile çalışmaya devam edeceğiz. Gerekli Python kütüphane/paketlerini “requirements.txt” dosyasından edinebiliriz.
- Apache Spark
- Apache Airflow
- MinIO
- Delta Lake
- AWS CLI
- Python
- Docker
- CentOS7 (veya benzeri)
- PyCharm (veya benzeri)
İhtiyaç duyulan araçları sağladıysak çalışma adımlarına geçebiliriz. ✔
Çalışma Adımları
Örnek çalışmamız özelinde, genel akış şemamızı aşağıdaki gibi düşünebiliriz.
- MinIO sunucusunun çalıştırılması
- MinIO “bronze bucket”a veri üretilmesi
- Verinin MinIO “bronze bucket”dan alınması, dönüştürülmesi ve MinIO “silver bucket”a yazılması/birleştirilmesi
- Operasyonun (3. Adım) günlük olarak Airflow ile tetiklenmesi
1. MinIO sunucusunun çalıştırılması
Aşağıdaki kodların yardımıyla Docker’ı başlatalım ve “docker-compose.yml” dosyası kullanılarak MinIO’yu çalıştıralım.
sudo systemctl start docker
cd <location_of_docker_compose.yaml> && docker-compose up -d
2. MinIO “bronze bucket”a veri üretilmesi
MinIO çalışırken, “http://localhost:9001/” adresini kullanılarak, MinIO web arayüzüne bağlanalım ve hesap oluşturalım. Bu çalışma için kullanıcı adı olarak “root”, kullanıcı şifresi olarak da “root12345” kullanabiliriz.
Ham verileri ve daha sonra rafine verileri depolamak için “tmdb-bronze” ve “tmdb-silver” isimleriyle “bucket” oluşturalım.
Veri setlerini içe aktaralım ve zip dosyasından çıkaralım.
wget -O <your_file_location_to_save> https://github.com/erkansirin78/datasets/raw/master/tmdb_5000_movies_and_credits.zip
unzip tmdb_5000_movies_and_credits.zip
Veri akışının simüle edilebilmesi için data-generator deposundaki adımları takip ederek data-generator’u kullanıma hazır hale getirelim.
Aşağıdaki kodları, data-generator dosyalarının kaydedildiği dizinde çalıştıralım.
python dataframe_to_s3.py -buc tmdb-bronze \ -k credits/credits_part \ -aki root -sac root12345 \ -eu http://localhost:9000 \ -i /<tmdb_file_location>/tmdb_5000_credits.csv \ -ofp True -z 500 -b 0.1
python dataframe_to_s3.py -buc tmdb-bronze \ -k movies/movies_part \ -aki root -sac root12345 \ -eu http://localhost:9000 \ -i /<tmdb_file_location>/tmdb_5000_movies.csv \ -ofp True -z 500 -b 0.1
Bir süre sonra MinIO “tmdb-bronze” kaynağını kontrol edebiliriz ve nesnelerimizin buraya kaydedildiğini görebiliriz. ❤
3. Verinin MinIO “bronze bucket”dan alınması, dönüştürülmesi ve MinIO “silver bucket”a yazılması/birleştirilmesi
Çalışmanın ana bölümünü oluşturmadan önce ham verileri incelemeye ihtiyacımız olacak. Öncelikle S3 uyumlu MinIO ile iletişim kurabilecek bir Spark oturumu oluşturacağız. Bu işlem çalışmamızın zorlayıcı bölümlerinden biri sayılabilir ancak gerekli bilgileri edindiğimizde tabii ki zor diye bir şey yok.✨
Erkan Şirin’e ait “Apache Spark AWS S3 Datasource” makalesini inceleyelim ve kendisinin önerdiği şekilde jar dosyalarını indirelim. Spark’a ait jar dosyalarının kayıtlı olduğu dizine bu dosyaları taşıyalım. Makalede de belirtildiği üzere burada önemli nokta, jar dosyalarının versiyonlarına dikkat etmemizdir.
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar
mv aws-java-sdk-bundle-1.11.375.jar hadoop-aws-3.2.0.jar <your_spark_jar_files_directory>
Spark oturumunu aşağıdaki kodlar ile kullanmamız ön çalışmamız için yeterli olacaktır. Notebook kullanarak, bir grup ham veriyi MinIO’dan okuyalım ve “prestudy_of_dataframes.ipynb” dosyasında olduğu gibi dönüşüm işlemlerini yapalım.
- MinIO’da bulunan nesnelerin “key” leri (adresleri) hakkında dikkatli olmalıyız. Gün ve saat data-generator’un çalıştığı zamana göre değişiklik göstermektedir.
- Ön çalışmada, verinin yeniden şekillendirilmesi için kullanılan “from_json” ve “explode_outer” anahtar fonksiyonlardır.
spark = (SparkSession.builder .appName("prestudy") .config("spark.hadoop.fs.s3a.access.key", "root") .config("spark.hadoop.fs.s3a.secret.key", "root12345") .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") .getOrCreate())
Verinin rafine hale nasıl dönüştürüleceğini çözdükten sonra artık bu işlemi “tmdb-bronze” da bulunan tüm nesnelere nasıl yansıtacağımız ve bunu nasıl otomatize edeceğimiz hakkında düşünebiliriz. Heyecan verici değil mi?
Farklı yaklaşımlar da olabileceği gibi, bu çalışma özelinde otomatize sistemimizin iş akışını aşağıdaki gibi düşünebiliriz.
Bilindiği gibi, bir proje geliştirmenin birden çok yöntemi olabilir. Bu örnek çalışmada ise görevlerin farklı kod dosyalarına bölünmesi şeklinde ilerleyeceğiz.
Aşağıdaki kod dosyalarını “bronze_to_silver_data_transformation” klasörü altında oluşturalım.
- session.py
- operation.py
- transformation.py
- schema.py
Yukarıdaki iş akışında görüldüğü üzere, MinIO “tmdb-bronze” da bulunan günlük verileri kontrol etmeye ve hedef nesnelerin “key” lerini almaya ihtiyacımız olacak. Bu adım için, Boto3 kütüphanesinin “client” ve/veya “resource” oturumlarından yardım alacağız. Ayrıca, farklı AWS profillerinin nasıl kullanıldığını öğrenmek için “Named profiles for the AWS CLI” yazısı inceleyebiliriz ve MinIO için oluşturduğumuz kullanıcı adı ve şifremizi bu şekilde kullanabiliriz.
Dönüşüm işlemleri yapıldıktan sonra rafine verimizi “tmdb-silver” kaynağına yazacağız. Delta Lake depolama katmanı kullanarak buradaki mevcut veriler ile birleştirmek için ise delta-core jar dosyalarına ihtiyaç duyacağız. Bu adımda delta-core maven koordinatlarını “spark.jars.packages” özelliği ile birlikte kullanacağız (veya daha önce uyguladığımız gibi dosyayı indirilerek jar dosyalarının olduğu dizine alabiliriz). Son olarak, Delta Lake dokümanlarında bahsi geçen “spark.sql.extensions” ve “spark.sql.catalog.spark_catalog” özelliklerini de ekleyerek konfigürasyonu tamamlayacağız.
Bu bilgiler doğrultusunda, Boto3 ve Spark oturumlarımız için gerekli fonksiyonları “session.py” dosyasında aşağıdaki gibi oluşturalım.
def get_s3_cli_res(profile, url): session = boto3.Session(profile_name=profile) s3_cli = session.client("s3", endpoint_url=url) s3_res = session.resource("s3", endpoint_url=url) return {"cli": s3_cli, "res": s3_res}
def get_spark_minio_session(name, access_key, secret_key, url, spark_home): findspark.init(spark_home) spark = (SparkSession.builder .appName(name) .master("local[2]") # -- delta lake configuration .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") # -- .config("spark.hadoop.fs.s3a.access.key", access_key) .config("spark.hadoop.fs.s3a.secret.key", secret_key) .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.hadoop.fs.s3a.endpoint", url) .getOrCreate()) return spark
Gerekli oturumlar için fonksiyonlarımızı oluşturduktan sonra artık dönüşüm işlemlerinden ve bu adımdaki iş akışından sorumlu dosyalarımızı oluşturabiliriz.
Dönüşüm işlerini gerçekleştirmek için, daha önce yaptığımız ön çalışmayı (prestudy.ipynb) yeniden organize ederek “Transformation” sınıfı ve buna ait metotları “transformation.py” dosyasında oluşturalım. Hedeflediğimiz çıktı biçimlerine uygun şemaları da “schema.py” dosyasında oluşturarak gerekli noktalarda kullanalım.
- get_cast_crew_df
- get_movies_df
- get_genres_keywords_companies_df
- get_languages_country_df
- get_silver_df
Veri setinden bağımsız fonksiyonlarımızı, iş akışından sorumlu olan “operation.py” dosyasında oluşturalım.
- write_to_delta
- transform_and_save_to_silver
- get_list_of_objects
- check_bucket
4. Operasyonun (3. Adım) günlük olarak Airflow ile tetiklenmesi
İlk olarak, operation.py dosyasında bulunan “check_bucket” ve “transform_and_save_to_silver” fonksiyonlarını tetikleyecek olan “bronze_to_silver_transformation_dag.py ” DAG dosyasını oluşturalım.
Airflow’u aşağıdaki kodları kullanarak çalıştıralım.
sudo systemctl start airflow
sudo systemctl start airflow-scheduler
Airflow’un erişebilmesi için, “bronze_to_silver_transformation_dag.py ” dosyasını ve “bronze_to_silver_data_transformation” klasörünü Airflow dag dosyalarının olduğu klasöre kopyalayalım.
cp bronze_to_silver_transformation_dag.py <airflow_dags_directory>
cp bronze_to_silver_data_transformation <airflow_dags_directory>
Airflow dag dosyalarının web arayüzüne “http://localhost:1502/” adresi ile erişelim. Dag dosyasının yüklenmesi zaman alabilir ve hatalar ile karşılaşabiliriz. Endişe etmeyelim ve bizden kaynaklanan bir durum olup olmadığını uyarılardan takip edelim. Dag dosyası yüklendikten sonra artık iş akışını”graph” sekmesinden takip edebiliriz.
Bir süre sonra MinIO “tmdb-silver” kaynağını kontrol ettiğimizde, dönüşümü gerçekleşen verilerimizin buraya aktarıldığını göreceğiz. 🥳🥳
Son olarak, aşağıdaki örnek python kodunu kullanarak, buraya aktarılan verileri “delta” formatında okuyabilir ve sonuçları görmenin keyfine varabiliriz.
cast_delta = spark.read.format("delta").load("s3a://tmdb-silver/cast")
Evet, hazırlaması ve paylaşması keyifli bir yazımızın sonuna geldik. Yazımızın temelini oluşturan örnek çalışmanın sayesinde, çok yönlü düşünmemize katkı sağlayacak görevler ve bunlara bağlı fonksiyonlar/metotlar geliştirdik, güncel teknolojiler/araçlar ile bir ETL projesi inşa etmenin keyfine vardık. Ayrıca sistemimizin çıktıları sayesinde veri bilimcilerin/analistlerin daha hızlı ve kolay bir şekilde soruları cevaplamasını, içgörüler elde etmesini sağladık.
Diğer keyifli yazılar ve çalışmalarda görüşmek üzere. Şimdilik hoşçakalın! 🙋♀️
Kaynaklar
- Ana veri kaynağı: https://www.kaggle.com/datasets/tmdb/tmdb-movie-metadata?select=tmdb_5000_movies.csv
- Aws-Spark bağlantısı: https://medium.com/@erkansirin/apache-spark-aws-s3-datasource-eb65ee906e64
- Boto3 “client”: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client
- Boto3 “resource”: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource
- Spark konfigürasyonları: https://spark.apache.org/docs/latest/configuration.html
- Delta Lake giriş: https://docs.delta.io/latest/delta-intro.html
- Airflow genel bakış: https://airflow.apache.org/docs/apache-airflow/stable/index.html
- Kapak fotoğrafı: https://www.pexels.com/photo/simple-workspace-at-home-6476587/