Debezium (CDC) nedir?

Debezium, Apache Kafka üzerine inşa edilmiştir ve bir dizi Kafka Connect uyumlu source connector’dur. Debezium, belirli bir veritabanı yönetim sistemi (DBMS) ile çalışır ve meydana gelen değişiklikleri algılayarak ve her değişiklik olayının kaydını bir Kafka topic‘e aktararak DBMS’deki veri değişikliklerinin geçmişini kaydeder. Debeizum’a CDC de diyebiliriz.

Change Data Capture (CDC) Nedir?

Veri tabanında yapılan InsertUpdate ya da Delete gibi işlemlerin sonrasında değişen verilerin ilk halinin ve son halinin CDC desteği sağlayan veri tabanı tarafından izlenmesi ve değişikliklerin kayıt altına alınması işlemidir.

Debezium’u aşağıdaki veritabanı sistemlerindeki olay akışlarını yakalaması için kullanabiliriz.

  • Debezium connector for Cassandra
  • Debezium connector for Db2
  • Debezium connector for MongoDB
  • Debezium connector for MySQL
  • Debezium connector for Oracle Database
  • Debezium connector for PostgreSQL
  • Debezium connector for SQL Server
  • Debezium connector for Vitess

Debezium Server kullanarak PostgreSQL olay akışlarını yakalayan bir örnek yapalım. 

1- Sunucumuzda veya clusterımızda zookeeper ve kafka çalışır durumda olması gerek.
2- Debezium server dosyalarını indiriyoruz ve klasöre çıkartıyoruz.
https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.1.1.Final/debezium-server-dist-2.1.1.Final.tar.gz
3- PostgreSQL’de olay akışını takip edeceğimiz için  postgres connector kullanmalıyız.
4- debezium-server/conf altındaki application.properties dosyasını güncelleyelim.
5- Aşağıdaki properties dosyasına göre 127.0.0.1 sunucundaki postgre sql’e debezium user ile  order veritabanına bağlanıyor. Olay akışını takip etmesini istediğimiz tablolar ise transaction,customer ve product tablolarıdır.
6- İsteğe ve amaça bağlı application.properties dosyasında aşağıdaki linkteki  propertyleri uygulayabilirsiniz.
https://debezium.io/documentation/reference/stable/operations/debezium-server.html
application.properties
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=testserver1:9092,testserver2:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.topic.creation.enable = true
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/debezium-server/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=5432
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.dbname=order
debezium.source.database.server.name=postgresorder
debezium.source.database.encrypt=false
debezium.source.database.history.producer.security.protocol=SASL_PLAINTEXT
debezium.source.database.history.kafka.bootstrap.servers=testserver1:9092,testserver2:9093
debezium.source.database.history.kafka.topic=debezium
debezium.source.table.include.list=transactions,customer,product
debezium.source.snapshot.mode=never
debezium.source.plugin.name=pgoutput
quarkus.log.console.json=false
debezium.source.time.precision.mode=connect

7- /debezium-server klasörü altındaki run.sh‘ı çalıştırarak debeziuma start verelim.
8- order.transaction tablosuna yeni bir kayıt geldiği varsayalım. Debezium çalıştır durumda olduğu için, bu değişikliği yakalayıp postgresorder.transaction topic’e yazar.  Böylelikle Postgres SQL’de olan değişikliği yakalamış olduk.

Özet

Kafka Connect SOURCE connector’ü aslında DEBEZIUM (CDC) olmaktadır. Örnekteki gibi PostgreSQL’den aldığımız verileri brokerlar aracılığı ile SINK connectorlere ulaştırıyoruz. SINK connectorler ile ister kafka ister çeşitli uygulamalarda verimizi saklayabiliriz.