Anasayfa / Büyük Veri / Hadoop Cluster Kurulumu / Apache Spark2 CSV Okuma, Şema Oluşturma, Dataframe Üzerinde SQL Sorguları (Scala)

Apache Spark2 CSV Okuma, Şema Oluşturma, Dataframe Üzerinde SQL Sorguları (Scala)

Merhaba, bu yazımızda Spark Dataframe oluştururken ve dataframe üzerinde veri keşfi yaparken kullandığım bazı işlemleri paylaşacağım. Basit bir yazı olacak ancak bunu önemsiyorum çünkü birçok insan veri yükleme esnasında sorun yaşıyor ve bir sürü zaman kaybediyor. Daha önce de buna benzer yazı yazmıştım ancak o zaman Spark1 kullanmıştım. Artık Spark2 var. Spark 2.0 sürümüyle birlikte dataframe API yönünde kanat kırdı. Sizlere tavsiyem Spark RDD API ile hiç uğraşmayın. Yüzünüzü Spark2 Dataframe API’ye çevirin. Aslında bu Spark için de güzel birşey, gelecek veri tiplerinden emin olarak iş yapıyor, içi rahat oluyor yani 🙂

Bu yazıda kullanacağım araçlar şunlardır:

  • Spark Sürümü: Spark 2.1.1
  • Geliştirme Ortamı: Apache Zeppelin Notebook
  • Kaynak Yönetimi: Apache YARN (Yani Spark’ı YARN modunda çalıştırıyorum)
  • Veri Depolama: Hadoop HDFS
  • Programlama Dili: Scala

Zeppelin kullandığım için ayrıca SparkContext() yaratmayacağım, çünkü Zeppelin bu işi bizim içi yapıyor. Ayrıca interpreter ayarlarımda Spark2 varsayılan interpereter olduğundan her paragrafta %Spark2 diye belirtmeme gerek yok. Kodların olduğu kutucuklarda açık yeşil ile seçilmiş satırlar sorgu sonuçlarını gösterir, kod değildir. Veri seti SanFransico itfaiye teşkikatının olaylara müdahale ile ilegili bilgilerin bulunduğu bir veridir. Kullandığım veri setini buradan indirebilir ve detaylı bilgiler edinebilirsiniz. Bu Youtube videosunda Pyspark ile yapılmış benzer işlemleri görebilirsiniz. Umarım biz de de bu tür veriler tutulmaya ve paylaşılmaya başlanır. Hadoop clusterımda node3 adında bir EdgeNode’um var. Veri setini buradaki /home/erkan/veri_setlerim/ dizinine indirdim. Buradan örnek çalışmalar için kullandığım  HDFS dizni olan /user/erkan/veri_setlerim/‘e aşağıdaki kodlarla kopyaladım.

[erkan@node3 ~]$ hdfs dfs -put /home/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv /user/erkan/veri_setlerim/

Evet artık HDFS’de veri setimiz hazır.

Spark Versiyonunu Öğrenme

Spark versiyonunu basit bir kod ile öğrenebiliriz.

spark.version
res7: String = 2.1.1.2.6.2.0-205
Spark ile CSV Uzantılı Dosyaları Okumak

HDFS’de bulunan dosyamızı okuyalım ve bir dataframe oluşturalım

val itfaiyeDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/user/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv")
df: org.apache.spark.sql.DataFrame = [Call Number: int, Unit ID: string ... 32 more fields]
Took 23 sec. Last updated by admin at October 29 2017, 7:56:51 AM

Spark’da csv uzantılı dosyaları okurken bir çok seçenek var biz bunlardan sadece iki tanesini kullandık; header ve inferSchema. inferSchema veriden örneklem alarak sütunların veri türlerini belirlemeye çalışır. Böylelikle şema oluşturmak için uğraşmayız. Ancak bunun bir yan etkisi de zaman alması ve sütun isimlerinde  boşluk bırakmak gibi bazı yan etkilerinin olmasıdır. Şimdi Yüklediğimiz veri setini printSchema() ile görelim.

itfaiyeDF.printSchema()
root
 |-- Call Number: integer (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Final Priority: integer (nullable = true)
 |-- ALS Unit: boolean (nullable = true)
 |-- Call Type Group: string (nullable = true)
 |-- Number of Alarms: integer (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Unit sequence in call dispatch: integer (nullable = true)
 |-- Fire Prevention District: string (nullable = true)
 |-- Supervisor District: string (nullable = true)
 |-- Neighborhooods - Analysis Boundaries: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)

Okuma süresini azaltmak ve sütun isimlerini değiştirerek okumak istiyorsak kendimiz bir şema hazırlayarak okuma esnasında schema() kullanmalıyız.

Spark Dataframe için Şema Hazırlamak

Şema hazırlamak için gerekli kütüphaneleri indirelim ve yukarıda yazdırdığımız şemanın aynısını kendimiz hazırlayalım. Ancak sütun isimlerini birleştirelim.

import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType}
val itfaiyeDFSchema = StructType(Array(
 StructField("CallNumber", IntegerType, true),
 StructField("UnitID", StringType, true),
 StructField("IncidentNumber", IntegerType, true),
 StructField("CallType", StringType, true),
 StructField("CallDate", StringType, true),
 StructField("WatchDate", StringType, true),
 StructField("ReceivedDtTm", StringType, true),
 StructField("EntryDtTm", StringType, true),
 StructField("DispatchDtTm", StringType, true),
 StructField("ResponseDtTm", StringType, true),
 StructField("OnSceneDtTm", StringType, true),
 StructField("TransportDtTm", StringType, true),
 StructField("HospitalDtTm", StringType, true),
 StructField("CallFinalDisposition", StringType, true),
 StructField("AvailableDtTm", StringType, true),
 StructField("Address", StringType, true),
 StructField("City", StringType, true),
 StructField("ZipcodeOfIncident", IntegerType, true),
 StructField("Battalion", StringType, true),
 StructField("StationArea", StringType, true),
 StructField("Box", StringType, true),
 StructField("OriginalPriority", StringType, true),
 StructField("Priority", StringType, true),
 StructField("FinalPriority", IntegerType, true),
 StructField("ALSUnit", StringType, true),
 StructField("CallTypeGroup", StringType, true),
 StructField("NumberOfAlarms", StringType, true),
 StructField("UnitType", StringType, true),
 StructField("UnitSequenceInCallDispatch", IntegerType, true),
 StructField("FirePreventionDistrict", StringType, true),
 StructField("SupervisorDistrict", StringType, true),
 StructField("NeighborhooodsAnalysisBoundaries", StringType, true),
 StructField("Location", StringType, true),
 StructField("RowID", StringType, true)
 ))
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType}itfaiyeDFSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,IntegerType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(ReceivedDtTm,StringType,true), StructField(EntryDtTm,StringType,true), StructField(DispatchDtTm,StringType,true), StructField(ResponseDtTm,StringType,true), StructField(OnSceneDtTm,StringType,true), StructField(TransportDtTm,StringType,true), StructField(HospitalDtTm,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(Available DtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipco...

şimdi verimizi tekrar okuyalım ancak inferSchema() yerine kendi şemamızı kullanalım. Bu sefer ikinci option() yerine schema() fonksiyonunu kullanacağız.

val itfaiyeDF = spark.read.format("csv").option("header","true").schema(itfaiyeDFSchema).load("/user/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv")
itfaiyeDF.printSchema()
itfaiyeDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 32 more fields]
root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberOfAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- NeighborhooodsAnalysisBoundaries: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)
 
Took 0 sec. Last updated by admin at October 29 2017, 11:28:31 AM.
Spark Dataframe’i SQL Tablosu Olarak Kaydetme ve Sorgu Çalıştırma

SparkSQL doğrudan SQL sorgularını çalıştırmamıza olanak veriyor. Üsetlik bunu kullanmak çok kolay. Dataframe’i geçici bir SQL tablosu olarak kaydediyoruz arkasındanda bu tablo sanki gerçek bir SQL tablosuymuş gibi SQL sogularımızı çalıştıryoruz. Önce tablomuzu kaydedelim. Bunun için createOrReplaceTempView("KendimizBelirliyoruz") kullanıyoruz. Ben tabloya itfaiyeTable dedim siz başka bir isim verebilirsiniz.

itfaiyeDF.createOrReplaceTempView("itfaiyeTable")

Şimdi de basit bir SQL sorgusu çalıştıralım. Ben Zeppelin kullandığım için paragrafa %sql ile başlamak zorundayım.

%sql
SELECT * FROM itfaiyeTable LIMIT 10

Sorgu sonucu:

Sütun sayısı çok olduğu için diğer veriyi görmek için sağa kaydırmak gerekir.
Hoşçakalın…

Hakkında Erkan ŞİRİN

2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Halihazırda İmpektra Bilişim A.Ş.'de büyük veri yöneticisi olarak çalışmakta olup aynı zamanda Gazi Üniversitesi Yönetim Bilişim Sistemleri doktora öğrencisidir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete eğitimler vermekte ve projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık tespiti, sahtecilik tespiti, veri hazırlama sürecidir.

GÖZ ATMAK İSTEYEBİLİRSİNİZ

IntelliJ IDEA ile Apache Spark Projesini Uzak YARN Cluster Üzerinde Çalıştırmak-2/2

Merhaba. Yazı serimizin ikincisine devam ediyoruz. Bir önceki ilk yazımızda konuya giriş yapmıştık. Hatırlayalım, amacımız …

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir