Spark on Kubernetes

Merhabalar. Bu yazımızda bir Spark uygulamasını Kubernetes cluster üzerinde çalıştıracağız, yaygın adıyla Spark on Kubernetes. Bildiğimiz gibi Spark 2.3 sürümüne kadar sadece 3 cluster yöneticisi vardı: Hadoop YARN, Apache Mesos ve Spark Standalone. Sürüm 2.3 ile beraber Kubernetes de bunların arasına katıldı. Şimdiye kadar Spark kullanımında hakim cluster yöneticisi YARN idi, belki hala öyledir. Peki niçin hala öyle? Kubernetes gibi bir kahraman varken neden hala YARN çok yaygın? Canlı ortamdaki sistemlerin yeniliklere reaksiyon göstermesi çok çabuk olmuyor, biraz zaman alıyor. Öncelikle bir Kubernetes ortamının kullanılmaya başlanması, yani uygulamalarda mikro servis mimarisine geçiş yapılması, tüm geliştirme, yedekleme, izleme, devops vb. süreçlerin mikro servis yaklaşımına uyması gerekiyor. Ancak bundan sonra Spark uygulamalarının da Kubernetes üzerinde koşulması mümkün olabilir. Elbette diğer uygulamalarınızı eski usül devam ettirip sadece Spark işleri için bir Kubernetes cluster kurabilirsiniz fakat sadece Spark için bunu yapar mısınız tartışılır. Yoksa diğer uygulamaların mikro servis mimarisinde çalışması Spark’ı Kubernetes’de koşmak için teknik bir ön koşul değil.

Belki belli bir süre daha konteyner teknolojilerini görmezden gelebilirsiniz fakat trend bu yönde ilerliyor ve her geçen gün daha fazla şirket Spark’ı K8s üzerinde kullanmaya başlıyor. Üstelik 2018’den beri yapılan uzun soluklu geliştirmeler sonunda  sürüm 3.1 (Mart 2021) ile beraber Spark Kubernetes üzerinde canlı ortamlar için hazır (general availability/production ready) olduğunu duyurdu. Özetlemek gerekirse; önümüzdeki dönemde Spark olsun veya başka uygulamalar olsun, hatta veri tabanlarında bile, kaçınılmaz bir şekilde konteyner ortamları ile muhatap olacağız.

Spark on Kubernetes cluster managers
Şekil-1: Spark Cluster Managers

Bir spark uygulamasını Kubernetes üzerinde 2 şekilde başlatabiliriz.

  • spark-submit
  • spark operator

İlki diğer cluster yöneticilerinden de alışkın olduğumuz yöntem, ancak ikincisi Kubernetes dünyasına özgü ve daha çok tercih edilen yöntem. Operatörler özellikle statefull uygulamalarda veya veri tabanlarında insan müdahalesi gerektiren durumların önceden tespit edilerek operatör nesnesi tarafından yapılmasını sağlar. Örneğin nosql veri tabanlarının bazılarında önce master nodeları sonra slave nodeları başlatmalısınız. Operatör sizin yerinize bu gereksinimi gözetiyor. Statefull uygulamaların aksine stateless uygulamalar ise durum bilgisi tutmak gerekmediği için deployment objeleriyle rahatlıkla ölçeklenerek Kubernetes üzerinde çalıştırılabilir, yani pek bir operatör ihtiyacı yoktur. Bu yazımızdaki örnek spark operator yöntemiyle olacaktır.

Spark uygulamasını Kubernetes üzerinde çalıştırmanın faydaları:

  • Konteyner teknolojisinin sunduğu genel avantajlar Spark uygulamaları için de geçerlidir. Konteynerler uygulamalarınızı daha taşınabilir hale getirir, bağımlılıkların (dependencies) paketlenmesini basitleştirir, tekrarlanabilir ve güvenilir iş akışları inşa etmeye olanak tanır. Genel DevOps yükünü azaltır ve kodunuzu daha hızlı yinelemenize olanak tanır.
  • Aynı cluster üzerinde farklı sürüm Spark kullanılabilir.
  • Aynı cluster dev/test/prod amacıyla kullanılabilir.
  • Tam olarak izolasyon imkanı mevcuttur. Bu imkan YARN’da kısıtlıdır, genel Hadoop cluster ortam bağımlılıklarına uymak gerekir.
  • Big data uygulamaları haricindeki uygulamaları da aynı cluster üzerinde çalıştırabilirsiniz. YARN ile bu pek mümkün değil.
  • İlk geliştirme sonrası müteakip değişiklikler imaj içinde sadece kod katmanında olduğu için küçük bir değişikliği denemek bazen bir dakikadan bile az sürebilir. Klasik sistemlerde nokta değiştirseniz tüm kodu derleme yapıp çalıştıracağınız ortama taşımanız zaman alır.
  • Kaynakları daha etkin kullanabilir ve böylelikle maliyetleri azaltabilirsiniz.
  • Client kütüphanelerine bağımlı kalmadan API ile Spark uygulamasını başlatabilirsiniz.

Spark uygulamasını Kubernetes üzerinde çalıştırmanın kısıtları:

  • Sürekli olarak operatör için bir pod çalışmalıdır. Ancak bu ihmal edilebilecek kadar küçük bir kaynak tüketir ve üstelik sadece operatör kullandığımızda geçerlidir.
  • Dinamik kaynak kullanımı (Dynamic Resource Allocation) özelliği halen YARN’daki olgunluğa sahip değil veya biraz farklı diyelim.
  • YARN kadar performanslı çalışmadığı yönünde bazı kıyaslama sonuçları var.
  • Spark web arayüzüne ulaşmak için port yönlendirme veya ingress oluşturma gibi ilave işler yapmanız gerekli.

Ortam Bilgileri

Kubernetes Cluster: 2 master, 4 worker ve 1 rancher server’dan oluşan kubernetes cluster. Rancher server’da kubectl komutlarını çalıştırdım.

Kubernetes sürümü: 1.20.9

İşletim Sistemi: CentOS7

İmaj Repo: Dockerhub

 

 

Şekil-2: Rancher Server Web UI – Cluster Nodes

 

Varsayımlar

Kubernetes cluster mevcut, kubectl komutlarını çalıştırdığınız (bende Rancher Server) makinede Spark, Helm ve Docker yüklü. AWS S3 bucket ve içinde verisi hazır (Bu şart değil veriyi başka yerden de okuyabilirsiniz).

Spark Uygulaması Ne İş Yapacak?

Bu örnekte kullandığım spark uygulaması bir perakende veri setini AWS S3’den okuyacak ve veri üzerinde basit manipülasyonlar yapacak. Veri setinin önemi yok, sizler farklı veri setleri üzerinde farklı analizler ve manipülasyonlar yapabilirsiniz.

Baz Spark İmajı Oluşturma (İlk imaj)

Uygulamanız için size baz bir imaj lazım. İsterseniz mevcut imajları (bir sürü bulabilirsiniz) veya benim hazırladığım imajı kullanabilirsiniz. Ancak yine de kendime ait bir Spark imajım olsun diyorsanız buyurun birlikte oluşturalım. Spark bu konuda bize yardımcı oluyor. Spark kurulum dosyalarının içinde imaj oluşturmak için bir script (docker-image-tool.sh) hazırlamışlar. Bunu kullanarak rahatlıkla bir imaj derleyebiliriz. Ben Rancher server üzerinde kubectl kullandığımı, burada Spark ve Docker yüklü olduğunu belirtmiştim. SPARK_HOME dizinine geçiyorum ve orada neler var bir göz atıyoruz.

cd /opt/spark
ls -l
total 124
drwxr-xr-x 2 root root  4096 Feb 22 05:11 bin
drwxr-xr-x 2 root root   244 Apr  7 07:17 conf
drwxr-xr-x 5 root root    50 Feb 22 05:11 data
drwxr-xr-x 4 root root    29 Feb 22 05:11 examples
drwxr-xr-x 2 root root 12288 Feb 22 05:11 jars
drwxr-xr-x 4 root root    38 Feb 22 05:11 kubernetes
-rw-r--r-- 1 root root 23235 Feb 22 05:11 LICENSE
drwxr-xr-x 2 root root  4096 Feb 22 05:11 licenses
drwxr-xr-x 2 root root    94 Apr  7 07:14 logs
-rw-r--r-- 1 root root 57677 Feb 22 05:11 NOTICE
drwxr-xr-x 9 root root   327 Feb 22 05:11 python
drwxr-xr-x 3 root root    17 Feb 22 05:11 R
-rw-r--r-- 1 root root  4488 Feb 22 05:11 README.md
-rw-r--r-- 1 root root   183 Feb 22 05:11 RELEASE
drwxr-xr-x 2 root root  4096 Feb 22 05:11 sbin
drwxr-xr-x 2 root root    42 Feb 22 05:11 yarn

Yukarıda bin dosyası içinde docker-image-tool.sh var. İşte bununla kendi Spark baz imajımızı oluşturabiliriz. Bu script eğer bir seçenek olarak python (-p) belirtmezsek varsayılan olarak scala/java imajı oluşturuyor, yani python kütüphanelerini dahil etmiyor. Bizim Spark uygulama kodumuz python olduğu için imajımız da pyspark imajı olmalı.

bin/docker-image-tool.sh \
-r docker.io/erkansirin78 \
-t 3.1.1 \
-u 1000 \
-b java_image_tag=11-jre-slim \
-p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile build

Scriptin ürettiği ekran çıktısının son kısmı aşağıdaki gibi olacaktır.

. . . 
. . .
Step 12/12 : USER ${spark_uid}
 ---> Running in 4f7299e91b03
Removing intermediate container 4f7299e91b03
 ---> 35cc4bf94308
[Warning] One or more build-args [java_image_tag] were not consumed
Successfully built 35cc4bf94308
Successfully tagged erkansirin78/spark-py:3.1.1

Scripti çalıştırırken kullandığımız seçenekler (options) ne işe yarıyor kısaca açıklayalım.

r: İmaj reposu. Ben kendi dockerhub hesabımı seçtim.

-t: İmaj etiketi (tag)

-u: Kullanıcı id. Herhangi bir id belirtmezsek 185 id numaralı kullanıcı varsayılan olarak yer alıyor. Ben kendi kullanıcı id numaram olan 1000’i kullandım.

-b: Baz imaj

-p: PySpark imajı için farklı bir Dockerfile kullan

Çalışacak Spark Kodu

from pyspark.sql import SparkSession, functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
import os

spark = SparkSession.builder \
.appName("Spark on K8s") \
.getOrCreate()

sc = spark.sparkContext

sc.setLogLevel('WARN')

accessKeyId = os.getenv('ACCESS_KEY')
secretAccessKey = os.getenv('SECRET_KEY')

sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', accessKeyId)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secretAccessKey)
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.amazonaws.com')


dfmarket5mil = spark.read.format('parquet') \
.load('s3a://<your-bucket>/<your-data>')

print("*******************************")

print(dfmarket5mil.count())

dfmarket5mil.printSchema()

dfmarket5mil2 = dfmarket5mil \
.withColumn("AMOUNT", F.col("AMOUNT").cast(IntegerType())) \
.withColumn("PRICE", F.regexp_replace("PRICE", ",",".").cast(FloatType())) \
.withColumn("LINENETTOTAL", F.regexp_replace("LINENETTOTAL", ",",".").cast(FloatType())) \
.withColumn("LINENET", F.regexp_replace("LINENET", ",",".").cast(FloatType())) \
.withColumn("LATITUDE", F.regexp_replace("LATITUDE", ",",".").cast(FloatType())) \
.withColumn("LONGITUDE", F.regexp_replace("LONGITUDE", ",",".").cast(FloatType())) \
.withColumn("DATE_", F.to_timestamp("DATE_")) \
.withColumn("STARTDATE", F.to_timestamp("STARTDATE")) \
.withColumn("ENDDATE", F.to_timestamp("ENDDATE")) \
.withColumn("CAPIBLOCK_CREADEDDATE", F.to_timestamp("CAPIBLOCK_CREADEDDATE"))


dfmarket5mil2 \
.groupBy("REGION").count() \
.limit(28) \
.show()

dfmarket5mil2 \
.groupBy("CITY").count() \
.limit(128) \
.orderBy(F.desc("count")) \
.show(100)

for col_name in dfmarket5mil2.dtypes:
    if col_name[1] == 'string':
        print(col_name[0], dfmarket5mil2.select(col_name[0]).distinct().count())

print("Spark is shutting down.")

spark.stop()

Yukarıda kodun tamamını görüyoruz. Dediğim gibi veriyi okuduktan sonra yapılan manipülasyonların önemi yok, anlamak için boşuna kafa yormayın.  Önemli olan kısım print(“*******************************”) satırına kadar olan kısım. Adım adım bu kodu çalıştırmadan önce neler yapmalıyız onu konuşalım. Şimdiye kadar elimizde bir K8s cluster ve bir tane Spark baz imajı (erkansirin78/spark-py:3.1.1) var. Biz kodumuzu bu imaj içerisine koymalıyız ve ilave bağımlılıklar varsa (örneğin python paketleri) onları eklemeliyiz. Bunun için Spark içinde hazır gelen Dockerfile haricinde ayrı bir Dockerfile ihtiyacımız olacak ve burada baz imaj olarak yukarıda yarattığım imajı (erkansirin78/spark-py:3.1.1) kullanacağım. Bunun için dizin değiştirip spark kodlarımı yazdığım dizine geçiyorum. Artık SPARK_HOME içinde değilim. Bu çalışmada iki farklı dizin kullandım. İlki baz Spark imajını oluşturduğum SPARK_HOME (bende /opt/spark) diğeri ise kod geliştirmesi yaptığım dizin. Tüm spark kodu ve yaml dosyaları bu ikinci dizinde yer alıyor.

Spark İmajı Oluşturma (Kodların dahil olduğu ikinci imaj)

Spark dizninden çıktım kodlarımın, Dockerfile, requirements.txt gibi dosyalarımın olduğu dizine geldim. Hadi buna proje dizini diyelim.

FROM erkansirin78/spark-py:3.1.1

USER root

WORKDIR /app

COPY requirements.txt .

RUN pip3 install -r requirements.txt

COPY spark_on_k8s_app.py .

requirements.txt dosyasına ben sadece ve öylesine findspark paketini koydum. Şimdi bu Dockerfile’ı kullanarak içinde uygulama kodlarının bulunduğu ikinci imajı oluşturalım. Bu kolay olacak çünkü katmanların çoğu zaten yukarıdaki baz Spark imajında oluştu, burada bir katman daha ekleyip imajı mühürleyeceğiz.

docker build -t spark-k8s-app:1.0 .
Sending build context to Docker daemon  81.92kB
Step 1/6 : FROM erkansirin78/spark-py:3.1.1
 ---> 35cc4bf94308
Step 2/6 : USER root
 ---> Using cache
 ---> b88aef81d9c2
Step 3/6 : WORKDIR /app
 ---> Using cache
 ---> d51d05bb63de
Step 4/6 : COPY requirements.txt .
 ---> Using cache
 ---> 952a0e698068
Step 5/6 : RUN pip3 install -r requirements.txt
 ---> Using cache
 ---> 15ed8ac6a23c
Step 6/6 : COPY spark_on_k8s_app.py .
 ---> Using cache
 ---> 566390dc9f11
Successfully built 566390dc9f11
Successfully tagged spark-k8s-app:1.0

Bu imajımızı Dockerhub’a göndermeden önce bir etiket verelim docker tag spark-k8s-app:1.0 erkansirin78/spark-k8s-app:1.0

Docker login ile oturum açalım ve arkasından imajımızı Dockerhub’a gönderelim.

docker login
docker image push erkansirin78/spark-k8s-app:1.0

Peki güzel. İmaj yavaş yavaş Dockerhub’a gidedursun  biz bu esnada kodun içinden kendimize vazife çıkaralım. Şimdi spark kodunda şu iki satıra odaklanalım.

accessKeyId = os.getenv('ACCESS_KEY') 
secretAccessKey = os.getenv('SECRET_KEY')

Bu bilgiler bizim AWS S3 bucket erişimimiz için gerekli bilgiler. Spark uygulamasının içinden bunlara nasıl erişeceğiz? Öncelikle bu iki değeri tutan bir K8s Secret objesi oluşturalım. Bu dosya şu şekildedir.  s3_credentials_secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: s3-credentials
type: Opaque
data:
  ACCESS_KEY: <base64 encoded accesskey>
  SECRET_KEY: <base64 encoded secretkey>

Yukarıda base64 encoded değerlerini oluşturmak için komut satırında öncelikle access ve secret keylerimizi bir ortam değişkenine atıyoruz. Sonrasında base64 ile şifreliyoruz.

export ACCESS_KEY=<YOUR ACCESS_KEY HERE>
export SECRET_KEY=<YOUR SECRET_KEY HERE>

echo -n $ACCESS_KEY | base64
<encoded output will be here>
echo -n $SECRET_KEY | base64
<encoded output will be here>

Secret yaml dosyasında (s3_credentials_secret.yaml) ACCESS_KEY ve SECRET_KEY değerlerine base64 ile ürettiklerimizi yazıyoruz.

Spark Operator Kurulumu

Spark uygulaması için Google Cloud tarafından özel olarak yazılmış Spark Operator’ü kuralım. Operatörün github sayfasında detaylı bilgi bulabilirsiniz. Ben helm kullanarak kurulum yapacağım.

helm repo add \
spark-operator \
https://googlecloudplatform.github.io/spark-on-k8s-operator 


kubectl create ns spark-operator

helm install \
my-spark-operator \
spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true

Bazı ilave özellikleri kullanmak istiyorsak webhook’u aktif ediyoruz.  Bakalım operatör kurulmuş mu?

kubectl get all -n spark-operator
NAME                                                       READY   STATUS      RESTARTS   AGE
pod/my-sparkop-release-spark-operator-875cf4546-ccdf7      1/1     Running     2          26h
pod/my-sparkop-release-spark-operator-webhook-init-ng5gv   0/1     Completed   0          26h

NAME                                                TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)   AGE
service/my-sparkop-release-spark-operator-webhook   ClusterIP   10.43.18.40   <none>        443/TCP   26h

NAME                                                READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-sparkop-release-spark-operator   1/1     1            1           26h

NAME                                                          DESIRED   CURRENT   READY   AGE
replicaset.apps/my-sparkop-release-spark-operator-875cf4546   1         1         1       26h

NAME                                                       COMPLETIONS   DURATION   AGE
job.batch/my-sparkop-release-spark-operator-webhook-init   1/1           2m29s      26h

Kubernetes SparkApplication Objesi Yaratma

Kod yazmaktan sonra asıl ikinci önemli iş burada.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: pyspark-on-k8s
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "erkansirin78/spark-k8s-app:1.0"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/spark_on_k8s_app.py
  sparkVersion: "3.1.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
    envSecretKeyRefs:
      ACCESS_KEY:
        name: s3-credentials
        key: ACCESS_KEY
      SECRET_KEY:
        name: s3-credentials
        key: SECRET_KEY
  executor:
    cores: 1
    instances: 2
    memory: "1000m"
    labels:
      version: 3.1.1
  deps:
    jars:
      - https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
      - https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

 

Yukarıdaki yaml dosyasını açıklayalım:

name: pyspark-on-k8s – Spark uygulamasının adı ne olacak.

image: “erkansirin78/spark-k8s-app:1.0” – Kodlarımızı içine koyup Dockerhub’a gönderdiğimiz imaj.

imagePullPolicy: Always – Kodumuzu her değiştirdiğimizde yeniden derleyip Dockerhub’a push ediyoruz ve Spark’ı Kubernetes üzerinde her çalıştırdığımızda en güncel halini dockerhub’dan alıyor. Kod çok yer kaplamadığı ve fazla yer tutan katmanlar değişmediği için her seferinde koca imajı yeniden indirmiyor.

mainApplicationFile: local:///app/spark_on_k8s_app.py – Çalışacak kodun imaj içinde nerede olduğunu söylüyoruz. Bunun yerini kodun dahil olduğu ikinci imajı oluştururken Docker file içinde ayrlamıştık.

envSecretKeyRefs – Burada Secret objesiyle oluşturduğumuz AWS S3 access ve secret anahtarlara ulaşıyoruz ortam değişkeni oluşturuyoruz. Kodun içinden de bu ortam değişkeinde bulunan anahtarları kullanarak AWS S3’e erişiyoruz.

deps –  Burası AWS S3’e bağlanmak için ilave kütüphaneler. Bununla ilgili detayı buradaki yazımda bulabilirsiniz.

Servis Hesapları

Hata almamak için servis hesaplarını oluşturalım.

kubectl create serviceaccount spark

kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default

İşte Spark on Kubernetes dediğimiz an

Son olarak SparkApplication objesini çalıştırmak kalıyor. İşte tam burada spark-submit’i Kubernetes Operator jargonuyla yapmış oluyoruz. Spark on Kubernetes başlasın 🙂

kubectl apply -f sparkapplication-without-vols-object.yaml

Yeni bir terminalden podları, mevcut terminalden ise logları izleyelim.

Pod watch (Yeni terminal):

kubectl get pods -w
NAME                    READY   STATUS    RESTARTS   AGE
pyspark-on-k8s-driver   0/1     Pending   0          0s
pyspark-on-k8s-driver   0/1     Pending   0          0s
pyspark-on-k8s-driver   0/1     ContainerCreating   0          0s
pyspark-on-k8s-driver   0/1     ContainerCreating   0          3s
pyspark-on-k8s-driver   1/1     Running             0          6s
spark-on-k8s-382c037b6f4053bc-exec-1   0/1     Pending             0          0s
spark-on-k8s-382c037b6f4053bc-exec-1   0/1     Pending             0          0s
spark-on-k8s-382c037b6f4053bc-exec-1   0/1     ContainerCreating   0          0s
spark-on-k8s-382c037b6f4053bc-exec-2   0/1     Pending             0          0s
spark-on-k8s-382c037b6f4053bc-exec-2   0/1     Pending             0          0s
spark-on-k8s-382c037b6f4053bc-exec-2   0/1     ContainerCreating   0          0s
spark-on-k8s-382c037b6f4053bc-exec-1   0/1     ContainerCreating   0          1s
spark-on-k8s-382c037b6f4053bc-exec-2   0/1     ContainerCreating   0          2s
spark-on-k8s-382c037b6f4053bc-exec-2   1/1     Running             0          5s
spark-on-k8s-382c037b6f4053bc-exec-1   1/1     Running             0          7s

Log watch (Mevcut terminal): Son kısımları sadece kubectl logs -f pyspark-on-k8s-driver

21/08/22 19:05:20 INFO BlockManagerMasterEndpoint: Registering block manager 10.42.4.53:38950 with 400.0 MiB RAM, BlockManagerId(1, 10.42.4.53, 38950, None)
21/08/22 19:05:22 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/app/spark-warehouse').
21/08/22 19:05:22 INFO SharedState: Warehouse path is 'file:/app/spark-warehouse'.
21/08/22 19:05:27 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
*******************************
5387858
root
 |-- LOGICALREF: integer (nullable = true)
 |-- COUNT_: integer (nullable = true)
 |-- ITEMCODE: string (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- FICHENO: string (nullable = true)
 |-- DATE_: timestamp (nullable = true)
 |-- AMOUNT: integer (nullable = true)
 |-- PRICE: float (nullable = true)
 |-- LINENETTOTAL: float (nullable = true)
 |-- LINENET: float (nullable = true)
 |-- BRANCHNR: string (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- SALESMAN: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- CLIENTCODE: string (nullable = true)
 |-- CLIENTNAME: string (nullable = true)
 |-- BRANDCODE: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- CATEGORY_NAME1: string (nullable = true)
 |-- CATEGORY_NAME2: string (nullable = true)
 |-- CATEGORY_NAME3: string (nullable = true)
 |-- STARTDATE: timestamp (nullable = true)
 |-- ENDDATE: timestamp (nullable = true)
 |-- SPECODE: string (nullable = true)
 |-- CAPIBLOCK_CREADEDDATE: timestamp (nullable = true)

+-----------------+-------+
|           REGION|  count|
+-----------------+-------+
|              Ege| 764290|
|Güneydoğu Anadolu| 564834|
|          Akdeniz| 677270|
|          Marmara|1541540|
|     Doğu Anadolu| 397168|
|       İç Anadolu| 845430|
|        Karadeniz| 597326|
+-----------------+-------+

+--------------+------+
|          CITY| count|
+--------------+------+
|      İstanbul|985742|
|        Ankara|355698|
|         İzmir|282202|
|         Bursa|190060|
|       Antalya|159392|
|         Adana|144470|
|         Konya|143582|
|     Gaziantep|131924|
|     Şanlıurfa|129210|
|     Zonguldak|128478|
|       Kocaeli|122734|
|        Mersin|116148|
|    Diyarbakır|110058|
|         Hatay|103668|
|        Manisa| 93762|
|       Kayseri| 91866|
|        Samsun| 85322|
|     Balıkesir| 79494|
|           Van| 76440|
| Kahramanmaraş| 70816|
|         Aydın| 69862|
|       Denizli| 68160|
|      Tekirdağ| 65326|
|       Sakarya| 65080|
|         Muğla| 60518|
|     Eskişehir| 54856|
|       Malatya| 52214|
|        Mardin| 51914|
|       Erzurum| 50944|
|       Trabzon| 49846|
|          Ordu| 48008|
|Afyonkarahisar| 47294|
|         Tokat| 41222|
|      Adıyaman| 39788|
|         Sivas| 39522|
|       Kütahya| 38394|
|        Batman| 37554|
|        Elazığ| 36650|
|      Osmaniye| 35498|
|         Çorum| 34954|
|          Ağrı| 34382|
|     Çanakkale| 34164|
|        Şırnak| 32478|
|       Isparta| 28964|
|        Yozgat| 27756|
|       Giresun| 27040|
|        Edirne| 25898|
|       Aksaray| 25752|
|           Muş| 25132|
|         Düzce| 24996|
|          Uşak| 24604|
|     Kastamonu| 23640|
|         Niğde| 23318|
|    Kırklareli| 22940|
|          Rize| 22612|
|        Amasya| 22524|
|        Bitlis| 22342|
|         Siirt| 21732|
|          Kars| 20046|
|          Bolu| 19956|
|      Nevşehir| 19112|
|        Bingöl| 18806|
|       Hakkari| 18800|
|     Kırıkkale| 18410|
|        Burdur| 18314|
|       Karaman| 17034|
|      Erzincan| 16722|
|      Kırşehir| 15776|
|        Yalova| 15722|
|       Karabük| 15274|
|       Bilecik| 13874|
|         Sinop| 13370|
|         Iğdır| 12900|
|       Çankırı| 12748|
|        Artvin| 12292|
|        Bartın| 12064|
|     Gümüşhane| 10818|
|         Kilis| 10176|
|       Ardahan|  6282|
|       Tunceli|  5508|
|       Bayburt|  4910|
+--------------+------+

ITEMCODE 12230
ITEMNAME 12125
FICHENO 629110
BRANCHNR 81
BRANCH 81
SALESMAN 536
CITY 81
REGION 7
CLIENTCODE 53413
CLIENTNAME 53282
BRANDCODE 366
BRAND 366

Uygulamayı kapatmak için

kubectl delete -f sparkapplication-without-vols-object.yaml

Bir çok detay daha eklenebilir, ancak bu bir blog yazısının boyunu aşar. Artık yazıyı burada bitiriyorum. Spark’ı Kubernetes üzerinde çalıştırmanın hazırlık zahmeti var. İlk başlarda biraz saç baş yoldurabilir ve neymiş bu Spark on Kubernetes diyebilirsiniz. Ancak yeterince ısrarlı ve dikkatli bakarsanız duvarın arkasını bile görebilirsiniz 🙂

Başka bir yazıda görüşmek dileğiyle…

Kaynaklar

  1. https://www.datamechanics.co/blog-post/pros-and-cons-of-running-apache-spark-on-kubernetes
  2. Kapak Photo by frank mckenna on Unsplash
  3. https://www.youtube.com/watch?v=FX0F_OnsUOU&t=241s
  4. https://spark.apache.org/docs/latest/running-on-kubernetes.html

Yazar Hakkında
Toplam 179 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara