Veri Bilimi Okulu

Debezium ile MySQL veri tabanı değişikliklerinin yakalanması
Debezium ile MySQL veri tabanı değişikliklerinin yakalanması
debezium-chris-lawton-5IHz5WhosQE-unsplash

Loading

Herkese merhaba!

Bugünkü yazımızda yine keyifli bir konuya değineceğiz.  Diyelim ki veri tabanımızdaki değişiklikleri yakalamak ve başka bir sunucuya/buluta kesintisiz şekilde taşımak istiyoruz veya analiz/raporlama yapmak için gerçek zamanlı veri akışı sağlamak istiyoruz.  Peki bunu nasıl gerçekleştirebiliriz?  Gelin cevaplamak için hem veri değişikliklerinin yakalanması (Change Data Capture) işlemini hem de bunun için özelleşmiş bir araç olan Debezium‘u tanıyalım ve örnek bir uygulama ile MySQL veri tabanında bu işlemi gözlemleyelim😊

Veri değişikliklerinin yakalanması (Change Data Capture) nedir?

CDC, organizasyonların veri tabanı değişikliklerini otomatik olarak tanımlamasını ve yakalamasını sağlayan bir işlemdir.  Bir veri tabanı olayı meydana gelir gelmez verileri sürekli olarak yakalayıp işleyerek gerçek zamanlı veri hareketleri sağlar.  CDC işlemini gerçekleştirmenin farklı yöntemleri mevcuttur, ancak temelde aşağıdaki iki kategoriye ayırabiliriz:

  • Log-based (Veri tabanı işlem loglarının takibi ile)
  • Query-based (Trigger, Timestamp, Snapshot ile)

Yaygın ve etkili kullanım veri tabanı işlem loglarının kullanılmasıdır.  Diğer yöntemler veri tabanına ek bir yük getirirken (örneğin “timestamp” yönteminde düzenli bir sorgu gerekmesi gibi),  mevcut işlem loglarının okunmasında böyle bir durum yoktur.

Debezium nedir ve CDC işleminde nerede konumlanmaktadır?

En yaygın kullanımı temel alındığında, Debezium Kafka üzerine inşa edilmiş ve bir grup Kafka Connect uyumlu konnektör sağlayan bir araçtır.  Mevcut veri tabanlarınızdaki bilgileri olay akışlarına dönüştürerek uygulamaların veri tabanlarındaki satır düzeyindeki değişiklikleri algılamasını ve bunlara anında yanıt vermesini sağlar.  Bunun için, Debezium konnektörlerinin Kafka Connect kümesinde bulunması gerekir.

Kafka Connect’e de buradaki önemli rolü gereği kısaca değinecek olursak,  Apache Kafka ve diğer sistemler arasında ölçeklenebilir ve güvenilir bir şekilde veri akışı sağlayan, Kafka’ya bağımlı bir araçtır.  REST API arayüzü aracılığıyla konnektörleri Kafka Connect kümesine kolayca tanımlamak ve yönetmek mümkündür.

Şekil-1: Debezium Mimarisi (Konnektör rolü)

Debezium, Cassandra, Db2, MongoDB, MySQL, Oracle Database, Postgresql, SQL Server, Vitess veri tabanları için konnektör sunmaktadır.  Bunlara ek olarak, özel Java uygulamalarında gömülü bir kütüphane veya tamamen bağımsız bir sunucu olarak kullanılabilmektedir.

Hazırsak uygulamaya başlayalım! 👇

Debezium MySQL Connector ile uygulama örneği

Hepimiz biliyoruz ki bir konuyu kavramanın en iyi yolu o konu hakkında pratik yapmaktır.  Şimdi biz de bir uygulama ile yukarıda konuştuklarımız nasıl gerçekleşiyor bir görelim.

(Not: Örnek çalışmada CentOS7 kullanılmıştır, işlemler terminalde gerçekleştirilmiştir.)

Şekil-2: Örnek proje şeması
  • Yukarıdaki şemada görülen servislerimizi çalıştırmak için Debezium’un dokümanlarında tanımladığı adımlara uygun olarak sunduğu docker-compose-mysql.yaml dosyasını kullanalım.  Burada önemli noktalardan biri “connect” için tanımlanan ortam değişkenleridir.  Bu değişkenlere Debezium ihtiyaç duymaktadır.  (Değişkenlerin açıklamaları için docker hub’da debezium/connect-base‘i  ziyaret edebilirsiniz.)
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.0
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:2.0
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:2.0
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
# Debezium image needs the topics below (and more for advanced settings)
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1 # group id of kafka connect cluster
- CONFIG_STORAGE_TOPIC=my_connect_configs # store connector configurations.
- OFFSET_STORAGE_TOPIC=my_connect_offsets # store connector offsets.
- STATUS_STORAGE_TOPIC=my_connect_statuses # store connector status.
version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:2.0 ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:2.0 ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: image: quay.io/debezium/example-mysql:2.0 ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: image: quay.io/debezium/connect:2.0 ports: - 8083:8083 links: - kafka - mysql environment: # Debezium image needs the topics below (and more for advanced settings) - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 # group id of kafka connect cluster - CONFIG_STORAGE_TOPIC=my_connect_configs # store connector configurations. - OFFSET_STORAGE_TOPIC=my_connect_offsets # store connector offsets. - STATUS_STORAGE_TOPIC=my_connect_statuses # store connector status.
version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.0
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:2.0
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:2.0
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:2.0
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     # Debezium image needs the topics below (and more for advanced settings)
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1 # group id of kafka connect cluster
     - CONFIG_STORAGE_TOPIC=my_connect_configs # store connector configurations.
     - OFFSET_STORAGE_TOPIC=my_connect_offsets # store connector offsets.
     - STATUS_STORAGE_TOPIC=my_connect_statuses # store connector status.
  • Servislerimizi çalıştıralım ve durumlarını kontrol edelim.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml up -d
docker-compose -f docker-compose-mysql.yaml up -d
docker-compose -f docker-compose-mysql.yaml up -d
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml ps
docker-compose -f docker-compose-mysql.yaml ps
docker-compose -f docker-compose-mysql.yaml ps
Şekil-3: Docker konteyner durum bilgisi
  • Root kullanıcı ile MySQL veri tabanına bağlanalım ve mevcut kullanıcıları görüntüleyelim.  Aşağıdaki görselde görülen mysqluser, konteyner oluştururken tanımlanan kullanıcıdır. Biz daha sonra debezium kullanıcısı ile işlemlere devam edeceğiz.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u root -pdebezium'
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u root -pdebezium'
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u root -pdebezium'
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT user,host FROM mysql.user;
SELECT user,host FROM mysql.user;
SELECT user,host FROM mysql.user;
Şekil-4: MySQL kullanıcı listesi
  • Kullandığımız MySQL docker imajında yine mevcut veri tabanları bulunmaktadır.  Biz “dataops” isminde yeni bir tane veri tabanı oluşturup, debezium kullanıcısına yetki tanımlayalım.  Veri tabanlarını listeleyelim.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
CREATE DATABASE dataops;
CREATE DATABASE dataops;
CREATE DATABASE dataops;
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
GRANT ALL ON dataops.* TO 'debezium'@'%';
GRANT ALL ON dataops.* TO 'debezium'@'%';
GRANT ALL ON dataops.* TO 'debezium'@'%';
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
FLUSH PRIVILEGES;
FLUSH PRIVILEGES;
FLUSH PRIVILEGES;
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SHOW databases;
SHOW databases;
SHOW databases;
Şekil-5: Veri tabanları listesi
  • Ctrl+D ile root kullanıcısından çıkıp, debezium kullanıcısı ile bağlanalım ve “example” isimli bir tablo oluşturalım.  Tablolarımızı listeleyelim.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u debezium -pdbz'
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u debezium -pdbz'
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u debezium -pdbz'
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
CREATE TABLE dataops.example(
customerId int,
customerFName varchar(255),
customerLName varchar(255),
customerCity varchar(255)
);
CREATE TABLE dataops.example( customerId int, customerFName varchar(255), customerLName varchar(255), customerCity varchar(255) );
CREATE TABLE dataops.example(
    customerId int,
    customerFName varchar(255),
    customerLName varchar(255),
    customerCity varchar(255)
);
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
USE dataops;
USE dataops;
USE dataops;
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SHOW tables;
SHOW tables;
SHOW tables;
Şekil-6: Tablo listesi

Evet artık veri tabanımız ve tablomuz hazır.  Kafka ve Kafka Connect ile ilgili adımlarımıza geçelim.

  • Kafka Connect’e konnektör tanımlamak için bilgilerimizi bir json dosyasında tanımlamamız gerekiyor.  Bunun için yine Debezium bir kaynak sunuyor.  Biz de bunun üzerinden küçük değişiklikler yaparak json dosyamızı aşağıdaki şekilde ayarlayalım ve kısaca bir kaç konfigürasyon ayarını birlikte tanıyalım.  (Tüm konfigürasyon ayarları için Debezium’un web sitesinden  inceleme yapmanız faydalı olacaktır.)
    • name : Konnektörün spesifik adı. Biz burada yeni bir isim verdik.
    • topic.prefix : MySQL sunucusu veya kümesi için konu öneki.
    • database.include.list : Belirtilen sunucu tarafından barındırılan veri tabanlarının listesi.  Yeni oluşturduğumuz veri tabanını burada belirttik. (Değişimi takip edilecek veri tabanları buraya dahil edilecek)
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
{
"name": "dataops-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"topic.prefix": "dbserver1",
"database.server.id": "184054",
"database.include.list": "dataops",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.dataops"
}
}
{ "name": "dataops-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "topic.prefix": "dbserver1", "database.server.id": "184054", "database.include.list": "dataops", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.dataops" } }
{
    "name": "dataops-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "topic.prefix": "dbserver1",
        "database.server.id": "184054",
        "database.include.list": "dataops",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.dataops"
    }
}
  • Terminalde yeni bir sekme açıp konnektör bilgilerimizi Kafka Connect’e iletelim.  Şekil-7’de görüleceği şekilde konnektörümüz oluşturuldu.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
Şekil-7: Konnektör tanımlama
  • Kafka topic listesine bir bakalım, dbserver1 ön eki ile topic oluştuysa artık dinlemeye geçebiliriz.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \ --bootstrap-server kafka:9092 \ --list
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
Şekil-8: Kafka topic listesi
  • Yukarıda topicleri gördüğümüze göre, yeni bir terminal daha açalım ve topic olarak dbserver1.dataops.example’ı dinleyecek bir kafka-console-consumer oluşturalım.  Ve bundan sonra artık pür dikkat MySQL veri tabanımızdaki değişiklikleri bu terminalden takip ediyor olacağız. Harika👌
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.dataops.example
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.dataops.example
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.dataops.example

Peki, değişiklik yapmaya hazır mıyız?😊

  • Önce MySQL in aktif olduğu terminalde birkaç veri/kayıt ekleme (insert) işlemi yapalım.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
INSERT INTO example VALUES (1,"Richard","Hernandez","Brownsville"),\
(2,"Mary","Barrett","Littleton"),\
(3,"Ann","Smith","Caguas"),\
(4,"Mary","Jones","San Marcos"),\
(5,"Robert","Hudson","Caguas");
INSERT INTO example VALUES (1,"Richard","Hernandez","Brownsville"),\ (2,"Mary","Barrett","Littleton"),\ (3,"Ann","Smith","Caguas"),\ (4,"Mary","Jones","San Marcos"),\ (5,"Robert","Hudson","Caguas");
INSERT INTO example VALUES (1,"Richard","Hernandez","Brownsville"),\
(2,"Mary","Barrett","Littleton"),\
(3,"Ann","Smith","Caguas"),\
(4,"Mary","Jones","San Marcos"),\
(5,"Robert","Hudson","Caguas");
  • Ve kafka-console-consumer’ın aktif olduğu terminale gidip neler olduğuna bakalım.👀  Aşağıda görebileceğimiz gibi, değişikliğe ait mesaj key-value olacak şekilde consumer tarafından alındı.  Eklediğimiz 5 kayıt için de mesaj geldiğini göreceksinizdir.
Şekil-9: Kafka-console-consumer
  • JSON Crack ile yukarıdaki mesajı (value) bir inceleyelim.  Burada bizi asıl ilgilendiren payload’un altındaki bilgilerdir.  Şekil-10’da görebileceğimiz gibi before-after olacak şekilde veri tabanındaki değişikliği yakalamış olduk💕.
    • before : Kaydın mevcut durumu
    • after : Kaydın değişiklikten sonraki durumu
    • op : Yapılan işlemin çeşidi (c: insert, u: update, d: delete)
    • ts_ms : Değişikliğe ait zaman bilgisi
Şekil-10: Kayıt ekleme (Insert) – JSON
  • Veriyi güncelleme (update) işlemi deneyelim, örneğin 1 numaralı müşterimizin il bilgisi değişmiş olsun.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
UPDATE example SET customerCity = 'Ocean Drive' WHERE customerId = 1;
UPDATE example SET customerCity = 'Ocean Drive' WHERE customerId = 1;
UPDATE example SET customerCity = 'Ocean Drive' WHERE customerId = 1;
  • Kafka-console-consumer’a düşen son mesajı yine incelediğimizde güncelleme işleminin before-after şeklinde yansıdığını görebiliriz.
Şekil-11: Kayıt güncelleme (Update) – JSON
  • Son olarak bir de silme (delete) işlemi yapalım.  Burada dikkat etmemiz gereken konu şu, bir kayıt silindiğinde aynı “key”‘e sahip iki adet mesaj geliyor ve ikinci mesajın “value” kısmı “null” değerine sahip oluyor.  Bu kayıt Kafka için bir belirteç görevi görüyor. (Bu anahtara sahip verilerin “log compaction” tarafından silinebileceği anlamına geliyor)
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
DELETE FROM example WHERE customerId = 1;
DELETE FROM example WHERE customerId = 1;
DELETE FROM example WHERE customerId = 1;
Şekil-12: Kayıt silme (Delete) – JSON

Yazımızın sonuna gelirken, Kafka’ya ve CDC konusunda gerçekten etkili bulduğum Debezium’a hayranlığımı buradan da dile getirmek istiyorum.  Yukarıda birlikte de gördüğümüz gibi,  bu araçlar sayesinde veri tabanındaki değişiklikleri yakalayıp önceki-sonraki durumlarını ve gerekli olabilecek diğer meta verileri eş zamanlı olacak şekilde almış olduk.  Tabi bu özelliği ne şekilde kullanacağımız artık bizim iş ihtiyacımıza bağlı olarak değişecektir.

Umarım siz de bu uygulamadan benim kadar keyif almışsınızdır.  Yeni paylaşımlarda görüşmek üzere.✌️

Kaynaklar:

https://www.qlik.com/us/change-data-capture/cdc-change-data-capture

https://www.linkedin.com/pulse/cdc-extraction-method-comparison-query-based-vs-log-based-iarkdata/?trk=organization-update-content_share-article

https://debezium.io/documentation/reference/stable/tutorial.html

https://github.com/debezium/debezium-examples/tree/main/tutorial

https://kafka.apache.org/documentation.html#connect

https://debezium.io/documentation/reference/stable/connectors/mysql.html

0

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character