Zookeeper-less Kafka Cluster Kurulumu

Bildiğimiz gibi Kafka yakın zamanda Zookeeper bağımlılığına tamamen son verdi (KIP-500 Kafka 2.8.0). Her ne kadar mevcut Kafka Cluster’lar Zookeeper kullanıyor olsa da yavaş yavaş bu kullanım azalacak ve gelecekte Zookeeper-less Kafka kullanımı yaygınlaşacak. Bu yazımızda Zookeeper olmadan Kafka cluster kurulumunu Docker compose yardımıyla yapacağız ve böylelikle 3-node Zookeeper-less Kafka’yı deneme şansı bulacağız.

Zookeeper ne iş yapıyordu?

Zookeeper koordinasyon ve konfigürasyon servisi. Dağıtık sistemler, node’lar arası sürekli bir koordinasyon ve güncel metadatanın anlık paylaşımı ihtiyacını duyar. RabbitMQ, Elasticsearch, Cassandra gibi dağıtık sistemler bunu Zookeeper olmaksızın çözmüşler. Ancak Kafka Zookeeper’ı tercih etmiş.

Zookeeper öncelikle Kafka Cluster’da node durumunu izlemek, topic ve mesajların listesini tutmak için kullanılır. Temel beş görevini şu şekilde sıralayabiliriz[1]:

  1. Controller Seçimi. Controller tüm parçalar (partitions) için leader/follower ilişkisini takip eden Broker’dır. Cluster içinde bir Broker’ın controller görevini üstlenmesini Zookeeper takip eder.
  2. Cluster üyeliği: Cluster’da çalışan Broker’ların güncel listesini tutar. Cluster’a kabul eder.
  3. Topic konfigürasyonu: Zookeeper, mevcut topic’lerin listesi, her topic için partition sayısı, replicaların konumu, tercih edilen leader node gibi işlevleri görür.
  4. Access Control List (Erişim Kontrol Listeleri – ACLs): Zookeeper ayrıca tüm topic’ler için ACL’leri korur. Bu, her topic’i kimin veya neyin okumasına/yazmasına izin verildiğini, consumer group listesini, grup üyelerini ve her bir consumer group’un her bir partition’dan aldığı en son offset’i içerir.
  5. Kotalar (Quotas): Zookeeper, her istemcinin ne kadar veri okumasına/yazmasına izin verildiğini bilir.

Zookeeper’ı niye kaldırdılar?

Bir Kafka cluster için en az 3 tane Zookeeper ihtiyacınız oluyordu. Bu her yönüyle ciddi bir maliyet. Güvenliği, kaynakları, diski, yedeklemesi, bakımı, güncellemesi, izlemesi vs vs. bir sürü ilave dert. Dolayısıyla hem bunları sağlamak, hem performans artışı sağlamak, her şeyden önemlisi tam olarak bağımsız ve kendi kendine yetebilen olmak için kaldırdılar.

Zookeeper’ın yaptığı işi şimdi kim yapıyor?

Zookeeper’ın yaptığı işi kendi Controller rolüne sahip node’ları ile  KRaft konsensus ptotokolünü kullanarak yapıyor. Detaylı bilgi için buraya bakabilirsiniz[2].

ZK’sız Kafka dünyasında, node’lar için iki farklı rol var: Controller ve Broker. Cluster içindeki her node, bir veya her iki role sahip olabilir. Tüm Controller node’ları cluster’ı koordine etmekten sorumlu olan aktif Controller’ı seçer ve diğer Controller node’ları etkin yedek kopyalar olarak hareket eder. Tüm Broker’lar daha önce Zookeeper’da olduğu gibi istemcilerden (consumer/producer) istek kabul eder ve işlerler.

Kafka 2.8.0 sonrası sürümlerde Zookeeper kullanılır mı?

Evet kullanılır. Şuan son sürüm 3.2.0 ve Zookeeper kullanılıyor. Canlı ortam (production) için Zookeeper-less Kafka henüz hazır değil. 2.8.0 Zookeeper olmadan kullanılabilecek ilk sürüm. Zookeeper kalktı anlamına gelmiyor.

Ortam Bilgileri

İşletim sistemi: CentOS7

Docker: Docker Engine – Community  Version: 20.10.15

Kafka: 3.2.0 – Scala: 2.12

Node sayısı: 3

Kafka Client ve Cluster UUID

Ana makinemizi (CentOS7) Kafka client olarak kullacağız. Bu sebeple buraya kafka binary dosyalarını indirip açacağız. Tüm kafka komutlarını /tmp/kafka/kafka_2.12-3.2.0 içinden çalıştırıyor olacağız.

mkdir /tmp/kafka
curl "https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz" -o /tmp/kafka/kafka.tgz
cd /tmp/kafka
tar -xzf kafka.tgz
cd kafka_2.12-3.2.0

Şimdi bir uuid elde edelim.

./bin/kafka-storage.sh random-uuid

Sonuç aşağıdakine benzer birşey olacaktır

EP6hyiddQNW5FPrAvR9kWw

Bu değeri start-kafka.sh içinde kullanacağız. Cluster id numarası olacak.

Kodlar

Bu yazıda bahsedilen kodların tamamına buradan ulaşabilirsiniz.

Proje dizini ağaç yapısı

tree -L 3
.
├── config
│   ├── kafka1
│   │   └── server.properties
│   ├── kafka2
│   │   └── server.properties
│   └── kafka3
│       └── server.properties
├── data
│   ├── kafka1
│   ├── kafka2
│   └── kafka3
├── docker-compose.yaml
├── Dockerfile
└── start-kafka.sh

Docker compose dosyası

version: "3.8"
networks:
  kafka-net:
    ipam:
      config:
        - subnet: 172.18.0.0/16
services:
  kafka1:
    container_name: kafka1
    image: erkansirin78/kafka:3.2.0
    build: 
      context: .
    ports:
        - "9092:9092"
    networks:
      kafka-net:
        ipv4_address: 172.18.0.11
    volumes:
    - ./config/kafka1/server.properties:/kafka/config/server.properties
    - ./data/kafka1/:/data/kafka/
  kafka2:
    container_name: kafka2
    image: erkansirin78/kafka:3.2.0
    build: 
      context: .
    ports:
        - "9292:9092"
    networks:
      kafka-net:
        ipv4_address: 172.18.0.12
    volumes:
    - ./config/kafka2/server.properties:/kafka/config/server.properties
    - ./data/kafka2/:/data/kafka/
  kafka3:
    container_name: kafka3
    image: erkansirin78/kafka:3.2.0
    build: 
      context: .
    ports:
        - "9392:9092"
    networks:
      kafka-net:
        ipv4_address: 172.18.0.13
    volumes:
    - ./config/kafka3/server.properties:/kafka/config/server.properties
    - ./data/kafka3/:/data/kafka/

server.properties içinde ip numarası kullanabilmek için network oluşturduk ve her bir konteyner için sabit ip numarası verdik. Properties dosyası içeriğine aşağıda değineceğiz.

Dockerfile

Kendi imajımızı kendimiz oluşturalım. Bunun için aşağıdaki Dockerfile dosyasını kullanacağız.

FROM openjdk:11.0.10-jre-buster

RUN apt-get update && \
    apt-get install -y curl

ENV KAFKA_VERSION 3.2.0
ENV SCALA_VERSION 2.12

RUN  mkdir /tmp/kafka && \
    curl "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \
    -o /tmp/kafka/kafka.tgz && \
    mkdir /kafka && cd /kafka && \
    tar -xvzf /tmp/kafka/kafka.tgz --strip 1

RUN mkdir -p /data/kafka

COPY start-kafka.sh  /usr/bin

RUN chmod +x  /usr/bin/start-kafka.sh

CMD ["start-kafka.sh"]

Satır-1: Baz imajı Java11 kullanıyoruz.

Satır-3: Baz imaj içindeki işletim sistemi paketlerini güncelliyoruz.

Satır-6,7: Burada kullanmak istediğimiz Kafka ve Scala sürümlerini belirtiyoruz.

Satır-9: Belirttiğimiz sürüme uygun olarak Kafka’yı indirip açıyoruz.

Satır-15: Kafka commit log için dizin yaratıyoruz. Kafka verisini burada saklayacak. Uygulama logu değil genelde karışır.

Satır-17: Kafka’yı başlatmak için oluşturduğumuz scripti içeri kopyalıyoruz.

Satır-19: Scripte çalışma (execute) yetkisi veriyoruz.

Satır-21: Kafka çalışsın.

Kafka çalıştırma scripti

start-kafka.sh

#!/bin/bash

/kafka/bin/kafka-storage.sh format --config /kafka/config/server.properties --cluster-id 'EP6hyiddQNW5FPrAvR9kWw' --ignore-formatted

/kafka/bin/kafka-server-start.sh /kafka/config/server.properties

Satır-3: Verinin saklanacağı dizinin formatlanması. –ignore-formatted seçeneği müteakip çalışmalarda daha önce formatlanmışsa hata almamasını, görmezden gelmesini sağlıyor.

Satır-5: Kafka’nın çalıştırılması.

Konfigürasyon dosyası

Her broker için farklı bir konfigürasyon dosyası var. Önemlilerini, varsayılan ayarlardan ve her bir broker için farklı olanlarını açıklayalım.

kafka1 server.properties

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9192,CONTROLLER://:9093,LISTENER_DOCKER_EXTERNAL://:9092
advertised.listeners=PLAINTEXT://kafka1:9092,LISTENER_DOCKER_EXTERNAL://172.18.0.11:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
log.dirs=/data/kafka 
num.partitions=3
delete.topic.enable=true

process.roles: Node görevi ne olsun hem broker hem controller. Zookeeper olmadan KRaft modunda çalıştıran ayar. Hem broker hem controller değerini 3 node için de kullandık çünkü elimizde sadece 3 node var. Daha büyük clusterlar için bazı nodelar sadece controller olabilir.

node.id=1: Her bir broker farklı id numarasına sahip olmalıdır.

controller.quorum.voters: Konsensus için oylamaya katılacak nodeid@sunucuadı:port formatında adresleri. Controller işleri 9093 portundan döndüğü için burada port 9093. Bu ayar eski zookeeper.connect ayarının işlevini görüyor.

listeners: LISTENER_DOCKER_EXTERNAL://:9092 kaydını ekledik. Aşağıda bunu kullanacağız çünkü.

advertised.listeners: Burada LISTENER_DOCKER_EXTERNAL://172.18.0.11:9092 kaydını biz ekledik. Bunun sebebi Docker çalışan makineden client erişimleri olursa hata almasın diye. Buradaki kafka1 sunucu adı sadece docker network içinde konteynırların birbirlerine erişiminde çalışır. Konteynır dışından bu isimle brokerlara erişemeyiz. Ayrıca PLAINTEXT için de node1’e ait ip adresi eklendi.

listener.security.protocol.map: LISTENER_DOCKER_EXTERNAL:PLAINTEXT kaydını ekledik. Çünkü listener içinde kullanmak istiyoruz.

log.dirs: Verinin saklanacağı dizin.

num.partitions: Topic oluşurken varsayılan partitions değeri.

delete.topic.enable: topic silmek istersek silelim diye.

kafka2 server.properties

process.roles=broker,controller
node.id=2
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9192,CONTROLLER://:9093,LISTENER_DOCKER_EXTERNAL://:9092
advertised.listeners=PLAINTEXT://kafka2:9192,LISTENER_DOCKER_EXTERNAL://172.18.0.12:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
log.dirs=/data/kafka
num.partitions=3
delete.topic.enable=true

kafka3 server.properties

process.roles=broker,controller
node.id=3
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9192,CONTROLLER://:9093,LISTENER_DOCKER_EXTERNAL://:9092
advertised.listeners=PLAINTEXT://kafka3:9192,LISTENER_DOCKER_EXTERNAL://172.18.0.13:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
log.dirs=/data/kafka
num.partitions=3
delete.topic.enable=true

Çalıştırma

Repodan indirdiğiniz dosyaları açıp içine girdikten sonra eğer data dizinlerini yukarıda yaratmışsanız tek yapmanız gereken docker-compose çalıştırmak olacaktır. Eğer data dizni yok ise yaratın. Çünkü içi boş dizinler Github repoda görünmeyecektir.

for i in {1..3}; do mkdir data/kafka${i}; done

Docker-compose up

docker-compose up --build -d

Daha sonra her bir node için log takibi yapın. Belli bir süre sonra loglar stabil hale gelecektir.

docker logs -f kafka1

Beklenen çıktı:

...
...
...
        zookeeper.ssl.ocsp.enable = false
        zookeeper.ssl.protocol = TLSv1.2
        zookeeper.ssl.truststore.location = null
        zookeeper.ssl.truststore.password = null
        zookeeper.ssl.truststore.type = null
 (kafka.server.KafkaConfig)
[2022-07-21 02:35:17,448] INFO [SocketServer listenerType=BROKER, nodeId=1] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2022-07-21 02:35:17,460] INFO [SocketServer listenerType=BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2022-07-21 02:35:17,464] INFO [SocketServer listenerType=BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(LISTENER_DOCKER_EXTERNAL) (kafka.network.SocketServer)
[2022-07-21 02:35:17,465] INFO [SocketServer listenerType=BROKER, nodeId=1] Started socket server acceptors and processors (kafka.network.SocketServer)
[2022-07-21 02:35:17,466] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2022-07-21 02:35:17,469] INFO Kafka version: 3.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-07-21 02:35:17,469] INFO Kafka commitId: 38103ffaa962ef50 (org.apache.kafka.common.utils.AppInfoParser)
[2022-07-21 02:35:17,469] INFO Kafka startTimeMs: 1658370917466 (org.apache.kafka.common.utils.AppInfoParser)
[2022-07-21 02:35:17,476] INFO Kafka Server started (kafka.server.KafkaRaftServer)
[2022-07-21 02:35:17,479] INFO [BrokerLifecycleManager id=1] The broker is in RECOVERY. (kafka.server.BrokerLifecycleManager)
[2022-07-21 02:35:17,643] INFO [Controller 1] Unfenced broker: UnfenceBrokerRecord(id=3, epoch=0) (org.apache.kafka.controller.ClusterControlManager)
[2022-07-21 02:35:19,531] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
[2022-07-21 02:35:20,043] INFO [Controller 1] Unfenced broker: UnfenceBrokerRecord(id=2, epoch=2) (org.apache.kafka.controller.ClusterControlManager)
[2022-07-21 02:35:20,044] INFO [Controller 1] Unfenced broker: UnfenceBrokerRecord(id=1, epoch=3) (org.apache.kafka.controller.ClusterControlManager)

Konteynırları listeleyelim.

docker-compose ps


NAME COMMAND SERVICE STATUS PORTS
kafka1 "start-kafka.sh" kafka1 running 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp
kafka2 "start-kafka.sh" kafka2 running 0.0.0.0:9292->9092/tcp, :::9292->9092/tcp
kafka3 "start-kafka.sh" kafka3 running 0.0.0.0:9392->9092/tcp, :::9392->9092/tcp

Kafka Client Örnekleri

Client dizini içine geçelim: cd /tmp/kafka/kafka_2.12-3.2.0/

Node (Broker&Controller) Listeleme

 ./bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | awk '/id/{print $1}'


172.18.0.12:9092
172.18.0.13:9092
172.18.0.11:9092

Topic Yaratma

 ./bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic test1 \
--replication-factor 3 --partitions 5

Çıktı: Created topic test1.

Topic Listeleme

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Çıktı: test1

Console-producer ile mesaj gönderme

İki farklı terminal açarak birinde consumer, diğerinde producer çalıştırarak test1’e mesajlar gönderiniz.

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
>test producer
>geliyor mu?

Console-consumer ile mesaj alma

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1
test producer
geliyor mu?

Sıfırlama

Şayet geliştirme esnasında sıfırlama ihtiyacı olursa veri tutan dizin içindekileri silmek yeterli olur.

sudo rm -rf data/kafka1/*
sudo rm -rf data/kafka2/*
sudo rm -rf data/kafka3/*

Zookeeper-less Kafka’yı denemek için docker kullanarak bir Kafka cluster kurmuş olduk. Başka bir yazıda görüşene kadar hoşça kalın…

Kaynaklar

  1. https://dattell.com/data-architecture-blog/what-is-zookeeper-how-does-it-support-kafka
  2. https://www.confluent.io/blog/kafka-without-zookeeper-a-sneak-peek/
  3. https://kafka.apache.org/documentation/
  4. https://www.youtube.com/watch?v=ncTosfaZ5cQ

Yazar Hakkında
Toplam 167 yazı
Erkan ŞİRİN
Erkan ŞİRİN
2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete danışmanlık ve eğitimler vermekte, projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık ve sahtecilik tespiti, akan veri işleme ve veri hazırlama sürecidir.
Yorumlar (Yorum yapılmamış)

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.

×

Bir Şeyler Ara