Debezium (CDC) nedir?
Change Data Capture (CDC) Nedir?
Veri tabanında yapılan Insert, Update ya da Delete gibi işlemlerin sonrasında değişen verilerin ilk halinin ve son halinin CDC desteği sağlayan veri tabanı tarafından izlenmesi ve değişikliklerin kayıt altına alınması işlemidir.
Debezium’u aşağıdaki veritabanı sistemlerindeki olay akışlarını yakalaması için kullanabiliriz.
- Debezium connector for Cassandra
- Debezium connector for Db2
- Debezium connector for MongoDB
- Debezium connector for MySQL
- Debezium connector for Oracle Database
- Debezium connector for PostgreSQL
- Debezium connector for SQL Server
- Debezium connector for Vitess
Debezium Server kullanarak PostgreSQL olay akışlarını yakalayan bir örnek yapalım.
2- Debezium server dosyalarını indiriyoruz ve klasöre çıkartıyoruz.
https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.1.1.Final/debezium-server-dist-2.1.1.Final.tar.gz
3- PostgreSQL’de olay akışını takip edeceğimiz için postgres connector kullanmalıyız.
4- debezium-server/conf altındaki application.properties dosyasını güncelleyelim.
5- Aşağıdaki properties dosyasına göre 127.0.0.1 sunucundaki postgre sql’e debezium user ile order veritabanına bağlanıyor. Olay akışını takip etmesini istediğimiz tablolar ise transaction,customer ve product tablolarıdır.
6- İsteğe ve amaça bağlı application.properties dosyasında aşağıdaki linkteki propertyleri uygulayabilirsiniz.
https://debezium.io/documentation/reference/stable/operations/debezium-server.html
application.properties
debezium.sink.type=kafkadebezium.sink.kafka.producer.bootstrap.servers=testserver1:9092,testserver2:9092debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializerdebezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializerdebezium.sink.kafka.topic.creation.enable = truedebezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnectordebezium.source.offset.storage.file.filename=/debezium-server/offsets.datdebezium.source.offset.flush.interval.ms=0debezium.source.database.hostname=127.0.0.1debezium.source.database.port=5432debezium.source.database.user=debeziumdebezium.source.database.password=debeziumdebezium.source.database.dbname=orderdebezium.source.database.server.name=postgresorderdebezium.source.database.encrypt=falsedebezium.source.database.history.producer.security.protocol=SASL_PLAINTEXTdebezium.source.database.history.kafka.bootstrap.servers=testserver1:9092,testserver2:9093debezium.source.database.history.kafka.topic=debeziumdebezium.source.table.include.list=transactions,customer,productdebezium.source.snapshot.mode=neverdebezium.source.plugin.name=pgoutputquarkus.log.console.json=falsedebezium.source.time.precision.mode=connect
7- /debezium-server klasörü altındaki run.sh‘ı çalıştırarak debeziuma start verelim.
8- order.transaction tablosuna yeni bir kayıt geldiği varsayalım. Debezium çalıştır durumda olduğu için, bu değişikliği yakalayıp postgresorder.transaction topic’e yazar. Böylelikle Postgres SQL’de olan değişikliği yakalamış olduk.
Özet
Kafka Connect SOURCE connector’ü aslında DEBEZIUM (CDC) olmaktadır. Örnekteki gibi PostgreSQL’den aldığımız verileri brokerlar aracılığı ile SINK connectorlere ulaştırıyoruz. SINK connectorler ile ister kafka ister çeşitli uygulamalarda verimizi saklayabiliriz.