Kafka Connect’e Giriş

Kafka Connect nedir? / ne değildir ?

Apache Kafka ekosisteminde olup kafka ile kafka dışı sistemler(dosya sistemleri, veritabanları vb.) arasında güvenli ve ölçeklenebilir veri akışlarını otomatikleştirmek için tasarlanmış ücretsiz ve açık kaynaklı bir framework’tür.

ETL/ELT aracı değildir. Temel amaç veriyi taşımaktır. Karmaşık ve durum bilgisi gerektiren işler için uygun değildir.

Veritabanı yedeği değildir. Veriyi taşır ancak veri yedekleme işi için uygun değildir.

Kafka’nın yerini alabilecek bir ürün değildir, Kafka’nın üzerine konumlanmıştır.

2. Neden Kafka Connect?

Ölçeklenebilirlik: Dağıtık yapısı sayesinde büyük veri hacimlerini işleyebilir, task sayısını arttırarak.

Konfigürasyon odaklılık: Kod yazma mantığı yerine JSON konfigürasyon dosyaları ile oluşturulur. Yapılacak her değişiklik json dosyası üzerinde yapılır ve bu jsonlar çok kolay bir şekilde import, export edilerek başka connectorlerde de kullanılabilir.

Standartlaştırma: Tüm veri kaynakları için ortak bir standartlaştırılmış yapı oluşturup hızlıca çoğaltılabilen bir yöntem sağlar.

3. Temel Kavramlar

Kafka Connect kullanırken karşılaşacağımız temel kavramlardan bazıları aşağıdaki gibidir.

Source Connector

Dış sistemden veriyi okuyup kafka topicine yazar.

PostgresConnector, MySQLConnector, SFTP Source örnek olarak verilebilir.

Sink Connector

Kafkadan veri okuyarak okuduğu veriyi bir hedefe yazar.

HDFS Sink, JDBC Sink, S3 Sink örnek olarak verilebilir.

Task

Bir connectore atanmış paralelleştirilebilen iş birimidir. Task sayısı genellikle partition sayısı, hedef veya kaynak sistemin concurrency limiti, veri hacmi ile belirlenir.

Worker

Veri taşıma işini yapan taskları barından JVM processidir. Connector ve taskları çalıştıran, izleyen ve hata yönetimini sağlayan altyapıdır. İki farklı modu vardır.

Standalone

Tek node üzerinde çalışır. Tek process olduğu için tüm connector ve tasklar burada çalışır. Node düşerse tüm connectorler doğal olarak düşer. Hata toleransı ve ölçeklenebilirlik yoktur. Genellikle local ve dev ortamındaki geliştirmeler, testler için kullanılır.

Distributed

Prod ortamlar ve yüksek hacimli veri akışları için ölçeklenebilir yapı. Birden fazla worker içeren kümeler birbiriyle koordine olarak çalışır.

Connector Config Dosyası

Kaynak, hedef ve kafka ile ilgili ayarları barındıran json formatındaki dosya. Değişiklikler bu dosya üzerinden yapılarak deploy edilir.

4. Kafka Connect Mimarisi

Kafka connect mimarisi

Akış iki yönlü de çalışabilmektedir hem kafka dışı sistemlerden veri alıp kafkaya yazar, hem de kafkadan veriyi alıp farklı hedeflere yazabilir.

Connector: Kaynak veya hedefi tanımlar. Bir connector birden çok task oluşturabilir ve bu tasklar paralel çalışabilir. Tasklar veriyi taşıyan kısımdır.

Transform : Mesaj topice yazılmadan önce veya consume edilmeden önce uygulanan filtre kısmı. Buradaki yapı SMT ( Single Message Transform) şeklindedir. Örnek olarak alan/key silme, rename gibi basit işlemleri yapabilirken karmaşık işlemleri desteklemez.

Converter : Mesajın nasıl decode/encode edileceğini belirleyen kısım. En sık olarak Json format kullanılır.

5. Konfigürasyon yönetimi

Tüm bu connector oluşturma, silme ve güncelleme işlemleri Confluent Control Center, Cloudera SMM vb. arayüzlerden yapılabildiği gibi API aracılığıyla da yapılabilir. Yazının sonunda bu konu ile alakalı daha detaylı inceleyebileceğiniz bir link ekliyor olacağım.

Rest API üzerinden metod türlerine göre aşağıdaki işlemler yapılabilir.

GET: Connector durumunu, Task durumunu veya yapılandırmasını sorgulama.

Örnek :

curl https://mycluster:28085/connectors/sourcetest/status komutunu çalıştırdığımızda aşağıdaki sonuç dönmüştür.

{
"name": "sourcetest",
"connector": {
"state": "RUNNING",
"worker_id": "mycluster:28085"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "mycluster:28085"
}
],
"type": "source"
}

POST: Yeni bir connector oluşturma veya connectoru tekrar başlatma

PUT: Mevcut bir connectorün ayarlarını, parametrelerini güncelleme

DELETE: Bir connectorü silme işlemini gerçekleştirir.

Kafka connect connectorlere ait configleri 3 farklı kafka topicte tutmaktadır.

connect-configs : bütün connectorlerin json formatındaki configleri burada saklanır.

connect-status : task ve connectorlerin son durumları burada saklanır.

connect-offsets : Source connectorlerin en son kaldığı yeri tutarak tekrar başladıklarında aynı yerden devam etmeleri sağlar ve veri kaybının önüne geçer.

6. Plugin desteği

Kafka connect plugin tabanlı bir frameworktür. Bu sayede ekstra JARlar oluşturularak amaca uygun yeni yapılar kurulabilir. Kendimiz yazabileceğimiz gibi default kurulumda gelmeyen farklı lisanslar altında bulunan connectorleri de sonradan ekosistemimize dahil edebiliriz. Burada en önemli nokta bazı vendorlar (özellikle managed/cloud Kafka servisleri) üçüncü parti veya custom plugin yüklenmesine izin vermeyebilir veya desteklemeyebilir. Production sistemlerinde buna dikkat edilmelidir. Yeni bir connectore ait jar dosyaları worker sunucularında bulunan dizinlere eklendikten sonra workerlere restart atılarak sisteme dahil edilir.

Bu yazıda Kafka Connectle ilgili temel bilgileri ele aldık. Bir sonraki yazıda ise örnek bir connector üzerinden ilerleyerek, connector kurulumu ve config ayarlarını ve canlı sistemden örneklerle devam edeceğiz.

Bu konuda inceleyebileceğiniz bazı kaynaklar aşağıda belirtilmiştir

  1. Kafka Connect Rest API

https://docs.cloudera.com/runtime/7.3.1/kafka-rest-api-reference/index.html

  • Çözümlerimiz
  • Teknolojiler
  • Başarı Hikayelerimiz
  • Şirket
  • Takım