Anasayfa / Büyük Veri / IntelliJ IDEA ile Apache Spark Projesini Uzak YARN Cluster Üzerinde Çalıştırmak-2/2

IntelliJ IDEA ile Apache Spark Projesini Uzak YARN Cluster Üzerinde Çalıştırmak-2/2

Merhaba. Yazı serimizin ikincisine devam ediyoruz. Bir önceki ilk yazımızda konuya giriş yapmıştık. Hatırlayalım, amacımız Spark uygulamasını Windows bilgisayarımızda kurulu IntelliJ ile uzak hadoop cluster üzerinde geliştirmekti. IntelliJ ile devam ediyoruz.

4. IntelliJ Spark Uygulaması: Maven Projesi Oluşturmak

Şimdi uygulamamızı yazmaya başlayacağız. Bunun için IntelliJ’i başlatalım.

Intellij üzerinden yeni proje (Create New Project) diyoruz. Bir sonraki ekranda Maven seçiyoruz.

Bir sonraki pencerede GroupId ve ArtifactId belirliyoruz.

Projenin adını ve nereye kaydedileceğini seçiyoruz. Finish diyoruz. Eğer yeni bir klasör yaratılması gerekiyorsa yaratayım mı diye soracaktır. Ok deyip devam edelim.

sparkremotecluster‘a sağ tıklayıp Add Framework Support‘tan resimde görüldüğü gibi Scala dilini seçiyoruz. Daha sonra main ve test kısmında java olan yerleri (Shift+F6) ile scala yapıyoruz. En son hem main hem test altında com.veribilimiokulu paketini ekliyoruz (scala üzerine sağ tıkla -> New -> Package).

pom.xml dosyasına aşağıdaki dependencies ekleniyor.

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

    </dependencies>

Yukarıda Spark versiyonunu 2.4.0 yapmamız dikkatinizi çekmiş olmalı zira Cloudera 6.2.0 paketindeki Spark sürümü 2.4.0. Aksi halde java.io.InvalidClassException: org.apache.spark.sql.execution.datasources.FilePartition; local class incompatible: stream classdesc serialVersionUID hatasını alabiliriz.

com.veribilimiokulu paketi altında bir object oluşturuyoruz.

5. SparkConf Ayarları

Spark uygulamamıza ait object oluştu. Şimdi basit bir uygulama yazabiliriz. Ancak SparkSession oluştururken tekrar bazı konfigürasyon değerlerini SparkConf ile girmemiz gerekir. Bu ayarların ne olduğu ve değerlerin nereden öğrenileceği yorum satırlarında belirtilmiştir.

val sparkConf = new SparkConf()
    .setMaster("yarn")
    .setAppName("SparkDeneme")

    // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020")

    // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032")

    // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030")

    // HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar")

Şayet uygulama çalışırken dirver ile bağlantı hatası verir ise hadoop cluster’ın bizim makinemize dönerken kullandığı port numarası Windows güvenlik duvarından kapalı olabilir. Bunu açmalıyız. Bunu burada detaylı yazmıyorum çünkü port açma ile ilgili bir çok kaynak mevcut. Böyle bir hata alırsanız açarsınız.

6. Kodları Çalıştırma

Tüm kodlarımız ise aşağıdadır:

package com.veribilimiokulu

import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.{functions => F}
import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.{functions => F}
import org.apache.spark.SparkConf

object SparkDeneme extends App {

  Logger.getLogger("org").setLevel(Level.INFO) // ilk etapta INFI yaparak alınacak hataların sebepleri öğrenilir.
  // Bütün hatalar ayıklandıktan sonra ERROR'a getirilir

  val sparkConf = new SparkConf()
    .setMaster("yarn")
    .setAppName("SparkDeneme")

    // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020")

    // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032")

    // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030")

    // HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar")


  val spark = SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

  import spark.implicits._


  val df = spark.read.format("csv")
    .option("header",true)
    .option("inferSchema", true)
    .option("sep",",")
    .load("/user/admin/data/iris.csv")

  df.show()


}

Sonuç:

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
|          5.4|         3.7|          1.5|         0.2|Iris-setosa|
|          4.8|         3.4|          1.6|         0.2|Iris-setosa|
|          4.8|         3.0|          1.4|         0.1|Iris-setosa|
|          4.3|         3.0|          1.1|         0.1|Iris-setosa|
|          5.8|         4.0|          1.2|         0.2|Iris-setosa|
|          5.7|         4.4|          1.5|         0.4|Iris-setosa|
|          5.4|         3.9|          1.3|         0.4|Iris-setosa|
|          5.1|         3.5|          1.4|         0.3|Iris-setosa|
|          5.7|         3.8|          1.7|         0.3|Iris-setosa|
|          5.1|         3.8|          1.5|         0.3|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 20 rows

Elbette yukarıdaki sonuç bir çok INFO logunun arasındadır. Şayet hata ayıklama bitmiş ise log seviyesini ERROR’a çekebilirsiniz.

Kodlar çalışırken ResourceManager üzerinden takip edebilirsiniz. Örnek ekran görüntüsü aşağıdadır. (Sizin adresiniz farklı olacaktır)

http://cdh2.impektra.com:8088/cluster/apps/RUNNING

Evet böylelikle Windows makinemizden hiç rahatımızı bozmadan uzak hadoop cluster üzerinde kod geliştirme yapabildik ve windows makinemizi Spark driver olarak kullandık.

Büyük veriyle kalın…

Hakkında Erkan ŞİRİN

2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Halihazırda İmpektra Bilişim A.Ş.'de büyük veri yöneticisi olarak çalışmakta olup aynı zamanda Gazi Üniversitesi Yönetim Bilişim Sistemleri doktora öğrencisidir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete eğitimler vermekte ve projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık tespiti, sahtecilik tespiti, veri hazırlama sürecidir.

GÖZ ATMAK İSTEYEBİLİRSİNİZ

Python ile Veri Bilimi Çalışma Ortamı Kurmak (Jupyter ve Paket Kurulumu)

Merhaba bu yazımızda veri bilimine yeni başlayanlar için başlangıç çalışma ortamını oluşturacağız. Amacımız jupyter notebook …

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir