Spark SQL Window Functions

Window functions SQL dünyasında yaygın olarak bilinen fonksiyonlar. SQL’de kullandığımız bir çok fonksiyonu Spark ile de kullanabiliyoruz. Bu yazımızda Spark Scala ve SQL söz dizimleriyle window functions örnekleri yapacağız.

Window functions nedir, ne işe yarar?

Window fonksiyonlar, window olarak adlandırılan bir satır grubu üzerinde çalışır ve satır grubuna dayalı olarak her satır için bir dönüş değeri hesaplar. Window fonksiyonlar, hareketli bir ortalamayı hesaplamak, kümülatif bir istatistik hesaplamak veya geçerli satırın göreli konumu verilen satırların değerine erişmek gibi görevleri işlemek için kullanışlıdır.

Aggregation sorgularında ( Örneğin MAX) bir çok satırdan hesaplama yapıp tek bir satır elde ederiz. Eğer GROUP BY ile kullanmışsak gruplanan kategori sayısı kadar satır elde ederiz. Window fonksiyonlar da benzer şekilde aggregation yapar ancak satır sayısını korur, sonucu her bir satır için yeni bir sütunda tutar. Window fonksiyonlar ile bir sürü iç içe sorgu yazarak elde edeceğimiz sonuca daha sade ve pratik bir şekilde ulaşabiliriz.

Window functions ne tarz sorulara cevap verir?

Her bir departman için kıdemi en büyük 2 kişiyi bulmak. Her bir market şubesinde en çok satılan 5 ürün kategorisini bulmak vb.

Spark dokümanları Window fonksiyonları üç kategoriye ayırmış:

1. Ranking Functions

Syntax: RANK | DENSE_RANK | PERCENT_RANK | NTILE | ROW_NUMBER

2. Analytic Functions

Syntax: CUME_DIST | LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE

3. Aggregate Functions

Syntax: MAX | MIN | COUNT | SUM | AVG | ...

Temel söz dizimi

MAX(salary) over(partition by dept) as dept_max_salary

İlk kısım MAX(salary), standart aggregation fonksiyonlara çok benziyor. OVER eklemek, onu bir pencere işlevi olarak belirler. OVER içine bir partition belirtilmez ise tüm set bir pencere olarak kabul edilir. PARTITION pencere belirlemek içindir. Yukarıdaki örnekte departman kullanılmıştır. Yani veri içinde kaç tane benzersiz departman var ise o kadar pencere oluşacaktır.

Örnekler

Örnekleri yaptığımız ortam bilgileri şu şekilde:

  • İşletim sistemi: CentOS7
  • IDE: IntelliJ 2022
  • Spark: 3.2.1
  • Scala: 2.12.12
  • Java: Java 11

Spark Session ve Dataframe

val spark = SparkSession.builder()
  .master("local[4]")
  .appName("Spark Window Functions")
  .config("spark.driver.memory","2g")
  .getOrCreate()

import spark.implicits._

val df = Seq(
  (1001, "Satılmış", "İdari", 4000),
  (1002, "Özge", "Personel", 3000),
  (1003, "Hüsnü", "Bilgi Sistemleri", 4000),
  (1004, "Menşure", "Muhasebe", 6500),
  (1005, "Doruk", "Personel", 3000),
  (1006, "Şilan",  "Muhasebe", 5000),
  (1007, "Baran", "Personel", 7000),
  (1008, "Ülkü", "İdari", 4000),
  (1009, "Cüneyt", "Bilgi Sistemleri", 6500),
  (1010, "Gülşen", "Bilgi Sistemleri", 7000),
  (1011, "Melih", "Bilgi Sistemleri", 8000),
  (1012, "Gülbahar", "Bilgi Sistemleri", 10000),
  (1013, "Tuna", "İdari", 2000),
  (1014, "Raşel", "Personel", 3000),
  (1015, "Şahabettin", "Bilgi Sistemleri", 4500),
  (1016, "Elmas", "Muhasebe", 6500),
  (1017, "Ahmet Hamdi", "Personel", 3500),
  (1018, "Leyla", "Muhasebe", 5500),
  (1019, "Cuma", "Personel", 8000),
  (1020, "Yelda", "İdari", 5000),
  (1021, "Rojda", "Bilgi Sistemleri", 6000),
  (1022, "İbrahim", "Bilgi Sistemleri", 8000),
  (1023, "Davut", "Bilgi Sistemleri", 8000),
  (1024, "Arzu", "Bilgi Sistemleri", 11000)
).toDF("id", "name", "dept", "salary")

df.createOrReplaceTempView("employee")

1. Ranking Functions RANK | DENSE_RANK | PERCENT_RANK | NTILE | ROW_NUMBER

Spark Scala

df.withColumn("rn", F.row_number()
    .over(Window.partitionBy("dept").orderBy("salary")))
    .withColumn("rank", F.rank()
      .over(Window.partitionBy("dept").orderBy("salary")))
    .withColumn("dense_rank", F.dense_rank()
      .over(Window.partitionBy("dept").orderBy("salary")))
    .withColumn("percent_rank", F.percent_rank()
      .over(Window.partitionBy("dept").orderBy("salary")))
    .withColumn("ntile", F.ntile(3)
      .over(Window.partitionBy("dept").orderBy("salary")))
    .show()

Spark SQL

spark.sql(
    """
      |SELECT a.*,
      |ROW_NUMBER() OVER(PARTITION BY dept ORDER BY salary) as rn,
      |RANK() OVER(PARTITION BY dept ORDER BY salary) as rank,
      |DENSE_RANK() OVER(PARTITION BY dept ORDER BY salary) as dense_rank,
      |PERCENT_RANK() OVER(PARTITION BY dept ORDER BY salary) as percent_rank,
      |NTILE(3) OVER(PARTITION BY dept ORDER BY salary) as ntile
      |FROM employee a
      |""".stripMargin).show(false)
  • Çıktı
+----+-----------+----------------+------+---+----+----------+------------------+-----+
|  id|       name|            dept|salary| rn|rank|dense_rank|      percent_rank|ntile|
+----+-----------+----------------+------+---+----+----------+------------------+-----+
|1003|      Hüsnü|Bilgi Sistemleri|  4000|  1|   1|         1|               0.0|    1|
|1015| Şahabettin|Bilgi Sistemleri|  4500|  2|   2|         2|0.1111111111111111|    1|
|1021|      Rojda|Bilgi Sistemleri|  6000|  3|   3|         3|0.2222222222222222|    1|
|1009|     Cüneyt|Bilgi Sistemleri|  6500|  4|   4|         4|0.3333333333333333|    1|
|1010|     Gülşen|Bilgi Sistemleri|  7000|  5|   5|         5|0.4444444444444444|    2|
|1011|      Melih|Bilgi Sistemleri|  8000|  6|   6|         6|0.5555555555555556|    2|
|1022|    İbrahim|Bilgi Sistemleri|  8000|  7|   6|         6|0.5555555555555556|    2|
|1023|      Davut|Bilgi Sistemleri|  8000|  8|   6|         6|0.5555555555555556|    3|
|1012|   Gülbahar|Bilgi Sistemleri| 10000|  9|   9|         7|0.8888888888888888|    3|
|1024|       Arzu|Bilgi Sistemleri| 11000| 10|  10|         8|               1.0|    3|
|1006|      Şilan|        Muhasebe|  5000|  1|   1|         1|               0.0|    1|
|1018|      Leyla|        Muhasebe|  5500|  2|   2|         2|0.3333333333333333|    1|
|1004|    Menşure|        Muhasebe|  6500|  3|   3|         3|0.6666666666666666|    2|
|1016|      Elmas|        Muhasebe|  6500|  4|   3|         3|0.6666666666666666|    3|
|1002|       Özge|        Personel|  3000|  1|   1|         1|               0.0|    1|
|1005|      Doruk|        Personel|  3000|  2|   1|         1|               0.0|    1|
|1014|      Raşel|        Personel|  3000|  3|   1|         1|               0.0|    2|
|1017|Ahmet Hamdi|        Personel|  3500|  4|   4|         2|               0.6|    2|
|1007|      Baran|        Personel|  7000|  5|   5|         3|               0.8|    3|
|1019|       Cuma|        Personel|  8000|  6|   6|         4|               1.0|    3|
+----+-----------+----------------+------+---+----+----------+------------------+-----+
only showing top 20 rows

Row number her bir window içinde tüm satırları ardışık olarak sıralıyor. Rank ise aynı maaşa sahip çalışanlara aynı sıra numarasını veriyor ancak müteakip çalışan (bir düşük maaş) için row number’da kaldığı yerden devam ediyor. Dense rank, rank’ten farklı olarak aynı maaşta olanları takip edene row number değil ardışık rakamı veriyor. percent_rank dense_rank’in bir benzeri. Sadece yüzdelik dilimleri gösteriyor. Ntile ise içine tam sayı argüman alan bir fonksiyon. Kullanıcı tarafından belirlenen bu n tam sayıya göre pencereyi n adede bölüyor ve her bir gruba ardışık artacak şekilde rakam atıyor.

2. Analytic Functions CUME_DIST | LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE

2.1. CUME_DIST

Spark Scala

df.withColumn("cume_dist", F.cume_dist()
    .over(Window.partitionBy("dept").orderBy(F.col("salary"))))
    .show(false)
  • Çıktı
+----+-----------+----------------+------+------------------+
|id  |name       |dept            |salary|cume_dist         |
+----+-----------+----------------+------+------------------+
|1003|Hüsnü      |Bilgi Sistemleri|4000  |0.1               |
|1015|Şahabettin |Bilgi Sistemleri|4500  |0.2               |
|1021|Rojda      |Bilgi Sistemleri|6000  |0.3               |
|1009|Cüneyt     |Bilgi Sistemleri|6500  |0.4               |
|1010|Gülşen     |Bilgi Sistemleri|7000  |0.5               |
|1011|Melih      |Bilgi Sistemleri|8000  |0.8               |
|1022|İbrahim    |Bilgi Sistemleri|8000  |0.8               |
|1023|Davut      |Bilgi Sistemleri|8000  |0.8               |
|1012|Gülbahar   |Bilgi Sistemleri|10000 |0.9               |
|1024|Arzu       |Bilgi Sistemleri|11000 |1.0               |
|1006|Şilan      |Muhasebe        |5000  |0.25              |
|1018|Leyla      |Muhasebe        |5500  |0.5               |
|1004|Menşure    |Muhasebe        |6500  |1.0               |
|1016|Elmas      |Muhasebe        |6500  |1.0               |
|1002|Özge       |Personel        |3000  |0.5               |
|1005|Doruk      |Personel        |3000  |0.5               |
|1014|Raşel      |Personel        |3000  |0.5               |
|1017|Ahmet Hamdi|Personel        |3500  |0.6666666666666666|
|1007|Baran      |Personel        |7000  |0.8333333333333334|
|1019|Cuma       |Personel        |8000  |1.0               |
+----+-----------+----------------+------+------------------+

2.2. LAG ve LEAD

Özellikle verileri anlamlı bir sırayla aldıysanız, satırları önceki veya sonraki satırlarla karşılaştırmak yararlı olabilir. Diğer satırlardan değer çeken sütunlar oluşturmak için LAG veya LEAD’i kullanabilirsiniz; tek yapmanız gereken hangi sütundan çekeceğinizi ve çekmeyi kaç satır uzaktakinden yapmak istediğinizi belirtmektir. LAG üstteki, LEAD alttaki satırlardan çeker.

Şimdi id sırasına göre (muhtemelen işe giriş sırası) bir önceki ve bir sonraki çalışandan ne kadar farklı maaşı var onu görelim.

Spark Scala

df.withColumn("lag", F.lag("salary", 1, 0)
   .over(Window.partitionBy("dept").orderBy(F.col("id"))))
   .withColumn("lead", F.lead("salary", 1, 0)
     .over(Window.partitionBy("dept").orderBy(F.col("id"))))
   .withColumn("lag_diff", $"salary" - $"lag")
   .withColumn("lead_diff", $"salary" - $"lead")
   .show(false)

Spark SQL

spark.sql(
    """
      |SELECT b.*,
      |(b.salary - b.lag) as lag_diff,
      |(b.salary - b.lead) as lead_diff
      | FROM (
      |SELECT a.*,
      |LAG(salary, 1, 0) OVER(PARTITION BY dept ORDER BY id) as lag,
      |LEAD(salary, 1, 0) OVER(PARTITION BY dept ORDER BY id) as lead
      |FROM employee a) b
      |""".stripMargin).show(false)
  • Çıktı
+----+-----------+----------------+------+-----+-----+--------+---------+
|id  |name       |dept            |salary|lag  |lead |lag_diff|lead_diff|
+----+-----------+----------------+------+-----+-----+--------+---------+
|1003|Hüsnü      |Bilgi Sistemleri|4000  |0    |6500 |4000    |-2500    |
|1009|Cüneyt     |Bilgi Sistemleri|6500  |4000 |7000 |2500    |-500     |
|1010|Gülşen     |Bilgi Sistemleri|7000  |6500 |8000 |500     |-1000    |
|1011|Melih      |Bilgi Sistemleri|8000  |7000 |10000|1000    |-2000    |
|1012|Gülbahar   |Bilgi Sistemleri|10000 |8000 |4500 |2000    |5500     |
|1015|Şahabettin |Bilgi Sistemleri|4500  |10000|6000 |-5500   |-1500    |
|1021|Rojda      |Bilgi Sistemleri|6000  |4500 |8000 |1500    |-2000    |
|1022|İbrahim    |Bilgi Sistemleri|8000  |6000 |8000 |2000    |0        |
|1023|Davut      |Bilgi Sistemleri|8000  |8000 |11000|0       |-3000    |
|1024|Arzu       |Bilgi Sistemleri|11000 |8000 |0    |3000    |11000    |
|1004|Menşure    |Muhasebe        |6500  |0    |5000 |6500    |1500     |
|1006|Şilan      |Muhasebe        |5000  |6500 |6500 |-1500   |-1500    |
|1016|Elmas      |Muhasebe        |6500  |5000 |5500 |1500    |1000     |
|1018|Leyla      |Muhasebe        |5500  |6500 |0    |-1000   |5500     |
|1002|Özge       |Personel        |3000  |0    |3000 |3000    |0        |
|1005|Doruk      |Personel        |3000  |3000 |7000 |0       |-4000    |
|1007|Baran      |Personel        |7000  |3000 |3000 |4000    |4000     |
|1014|Raşel      |Personel        |3000  |7000 |3500 |-4000   |-500     |
|1017|Ahmet Hamdi|Personel        |3500  |3000 |8000 |500     |-4500    |
|1019|Cuma       |Personel        |8000  |3500 |0    |4500    |8000     |
+----+-----------+----------------+------+-----+-----+--------+---------+

F.lag("salary", 1, 0) ve LAG(salary, 1, 0) ifadelerinde 1 kaç satır üst veya alta bakacağı, 0 ise varsayılan değeri gösterir. Eğer varsayılan değer kullanmazsak üst veya alt satır olmadığında null değer gelir.

3. Aggregate Functions MAX | MIN | COUNT | SUM | AVG | ...

Spark Scala

df.withColumn("dept_max_salary", F.max("salary")
                                            .over(Window.partitionBy("dept")))
    .withColumn("dept_min_salary", F.min("salary")
                                            .over(Window.partitionBy("dept")))
    .withColumn("dept_avg_salary", F.avg("salary")
                                            .over(Window.partitionBy("dept")))
    .show()

Spark SQL

spark.sql(
    """
      |SELECT a.*,
      |MAX(salary) over(partition by dept) as dept_max_salary,
      |MIN(salary) over(partition by dept) as dept_min_salary,
      |AVG(salary) over(partition by dept) as dept_avg_salary
      |FROM employee a
      |""".stripMargin).show(false)
  • Çıktı
+----+-----------+----------------+------+---------------+---------------+-----------------+
|  id|       name|            dept|salary|dept_max_salary|dept_min_salary|  dept_avg_salary|
+----+-----------+----------------+------+---------------+---------------+-----------------+
|1003|      Hüsnü|Bilgi Sistemleri|  4000|          11000|           4000|           7300.0|
|1009|     Cüneyt|Bilgi Sistemleri|  6500|          11000|           4000|           7300.0|
|1010|     Gülşen|Bilgi Sistemleri|  7000|          11000|           4000|           7300.0|
|1011|      Melih|Bilgi Sistemleri|  8000|          11000|           4000|           7300.0|
|1012|   Gülbahar|Bilgi Sistemleri| 10000|          11000|           4000|           7300.0|
|1015| Şahabettin|Bilgi Sistemleri|  4500|          11000|           4000|           7300.0|
|1021|      Rojda|Bilgi Sistemleri|  6000|          11000|           4000|           7300.0|
|1022|    İbrahim|Bilgi Sistemleri|  8000|          11000|           4000|           7300.0|
|1023|      Davut|Bilgi Sistemleri|  8000|          11000|           4000|           7300.0|
|1024|       Arzu|Bilgi Sistemleri| 11000|          11000|           4000|           7300.0|
|1004|    Menşure|        Muhasebe|  6500|           6500|           5000|           5875.0|
|1006|      Şilan|        Muhasebe|  5000|           6500|           5000|           5875.0|
|1016|      Elmas|        Muhasebe|  6500|           6500|           5000|           5875.0|
|1018|      Leyla|        Muhasebe|  5500|           6500|           5000|           5875.0|
|1002|       Özge|        Personel|  3000|           8000|           3000|4583.333333333333|
|1005|      Doruk|        Personel|  3000|           8000|           3000|4583.333333333333|
|1007|      Baran|        Personel|  7000|           8000|           3000|4583.333333333333|
|1014|      Raşel|        Personel|  3000|           8000|           3000|4583.333333333333|
|1017|Ahmet Hamdi|        Personel|  3500|           8000|           3000|4583.333333333333|
|1019|       Cuma|        Personel|  8000|           8000|           3000|4583.333333333333|
+----+-----------+----------------+------+---------------+---------------+-----------------+
only showing top 20 rows

3.1. Her bir departmandan en yüksek maaşa sahip 2 çalışan

Spark Scala

df.withColumn("rank", F.rank()
      .over(Window.partitionBy("dept").orderBy(F.col("salary").desc)))
      .where($"rank".lt(3))
      .show(false)

Spark SQL

spark.sql(
      """
        |SELECT * FROM(
        |SELECT a.*,
        |RANK() OVER(PARTITION BY dept ORDER BY salary DESC) as rank
        |FROM employee a) b
        |WHERE b.rank < 3
        |""".stripMargin).show(false)
  • Çıktı
+----+--------+----------------+------+----+
|id  |name    |dept            |salary|rank|
+----+--------+----------------+------+----+
|1024|Arzu    |Bilgi Sistemleri|11000 |1   |
|1012|Gülbahar|Bilgi Sistemleri|10000 |2   |
|1004|Menşure |Muhasebe        |6500  |1   |
|1016|Elmas   |Muhasebe        |6500  |1   |
|1019|Cuma    |Personel        |8000  |1   |
|1007|Baran   |Personel        |7000  |2   |
|1020|Yelda   |İdari           |5000  |1   |
|1001|Satılmış|İdari           |4000  |2   |
|1008|Ülkü    |İdari           |4000  |2   |
+----+--------+----------------+------+----+

İdari’den 3 kişi geldi. Çünkü 2’nci ve 3’üncünün maaşı aynı. Başka bir kritere bakmaksızın hangisinin gerçekten 2. olduğuna karar veremeyiz.

Başka bir yazıda daha görüşmek dileğiyle çav!

Kaynaklar

  1. Kapak görseli: Photo by Waldemar Brandt on Unsplash
  2. https://www.youtube.com/watch?v=Ww71knvhQ-s
  3. https://sparkbyexamples.com/spark/spark-sql-window-functions/
  4. https://spark.apache.org/docs/3.2.1/sql-ref-syntax-qry-select-window.html

Yazar Hakkında
Toplam 177 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara