PySpark Dataframe İşlemleri

Bölüm 3

Pyspark ile Spark Dataframe işlemleri yazı dizimize devam ediyoruz.

from pyspark.sql import Row
liste = [
 (1,'Cemal',35,'Isci','Ankara'),
 (2,'Ceyda',42,'Memur','Kayseri'),
 (3,'Timur',30,'Issiz','Istanbul'),
 (4,'Burcu',29,'Pazarlamaci','Ankara'),
 (5,'Yasemin',23,'Pazarlamaci','Bursa'),
 (6,'Ali',33,None,None),
 (7,'Burcu',29,'Pazarlamaci','Ankara'),
 (8,None,31,None,None),
 (9,'Ahmet',33,'Doktor','Ankara')
]
rdd = sc.parallelize(liste)
insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4]))
insanlarDF = sqlContext.createDataFrame(insanlar)

insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")

PySpark Dataframe Örneklem Seçmek (Sampling)

Dataframe’den örneklem alabiliriz. Kullanılacak metod sample(). Bu metod üç parametre alır: 1. withReplacement (True ya da False), 2. fraction örneğin .3 demek %30’unu al demek, 3. seed değeri rastgele seçim değeri.

insanlarDF.sample(False, 0.2, 3).show()
+--------+-----+---+------+--------+
|RecordId|  Adi|Yas|Meslek|   Sehir|
+--------+-----+---+------+--------+
|       1|Cemal| 35|  Isci|  Ankara|
|       3|Timur| 30| Issiz|Istanbul|
|       9|Ahmet| 33|Doktor|  Ankara|
+--------+-----+---+------+--------+

PySpark Dataframe Map İşlemi

Yukarıda ve daha önce gördüğümüz birçok işlem sonuç olarak yine dataframe döndürürken map() metodu sonuç olarak rdd döndürmektedir.

yasRDD = insanlarDF.map(lambda x: Row(Yas= (x[2] + 10)))
yasRDD.take(9)
[Row(Yas=45),
 Row(Yas=52),
 Row(Yas=40),
 Row(Yas=39),
 Row(Yas=33),
 Row(Yas=43),
 Row(Yas=39),
 Row(Yas=41),
 Row(Yas=43)]

Yukarıda Row(Yas = x[2]…) komutunda 2 sütun sıralamasında Yas sütununun index değerini verir. RecordId 0, Adi 1 ve Yas 2. Özetle yukarıda yaptığımız iş şu: Yaş sütunundaki her bir değere 10 ekledik ve  yasRDD adında yeni bir RDD’ye atadık ve bu rdd’yi yazdırdık.

PySpark Dataframe Sıralama (Sorting, OrderBy)

Şimdi dataframe’i büyükten küçüğe doğru yaş sıralamasına koyalım:

insanlarDF.orderBy(insanlarDF.Yas.desc()).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       2|  Ceyda| 42|      Memur| Kayseri|
|       1|  Cemal| 35|       Isci|  Ankara|
|       6|    Ali| 33|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       8|   null| 31|       null|    null|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

İsterseniz bir de isimlere göre sıralayalım:

insanlarDF.orderBy(insanlarDF.Adi).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       8|   null| 31|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       6|    Ali| 33|       null|    null|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

Yukarıda parantez içindeki insanlarDF.Adi sonuna küçükten büyüğe sıralama yapan asc() fonksiyonu eklesek de olurdu. Eklemedik, demek ki varsayılan sıralama küçükten büyüğe.

PySpark Dataframe Yeni Sütun Eklemek

Bu işlemi yapmak için withColumn() metodunu kullanıyoruz. Yeni bir sütun ekleyebileceğimiz gibi mevcut olanın yerine de koyabiliriz. Şimdi yaşımızı 2 ile çarpalım ve Yas_carpi_iki isminde yeni sütun olarak ekleyelim:

insanlarDF.withColumn('Yas_carpi_iki', insanlarDF.Yas * 2).show()
+--------+-------+---+-----------+--------+-------------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|Yas_carpi_iki|
+--------+-------+---+-----------+--------+-------------+
|       1|  Cemal| 35|       Isci|  Ankara|           70|
|       2|  Ceyda| 42|      Memur| Kayseri|           84|
|       3|  Timur| 30|      Issiz|Istanbul|           60|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|           46|
|       6|    Ali| 33|       null|    null|           66|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       8|   null| 31|       null|    null|           62|
|       9|  Ahmet| 33|     Doktor|  Ankara|           66|
+--------+-------+---+-----------+--------+-------------+

Yazar Hakkında
Toplam 146 yazı
Erkan ŞİRİN
Erkan ŞİRİN
2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete danışmanlık ve eğitimler vermekte, projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık ve sahtecilik tespiti, akan veri işleme ve veri hazırlama sürecidir.
Yorumlar (2 yorum)
Tugba
Tugba Cevapla
- 14:03

Güzel bir Türkçe kaynak olmuş, elinize sağlık

    Erkan ŞİRİN
    Erkan ŞİRİN Cevapla
    - 19:16

    Çok teşekkürler Tuğba Hanım. Tekrar bekleriz 🙂

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara