Açık Kaynaklı Araçlarla LakeHouse: Apache Iceberg Case Study

Günümüz veri platformlarında güncel ihtiyaçlardan biri, veriyi güvenilir şekilde saklamak, gerçek zamanlı değişiklikleri işlemek ve analitik için tutarlı tablolar sunmak. Bu noktada açık kaynak çözümler bütçe açısından sunduğu rahatlık ile gündeme geliyor. Bu yazıda MinIO, Apache Iceberg, Spark, Debezium, Kafka, Airflow ve Trino gibi açık kaynaklı araçları bir araya getirerek uçtan uca bir veri gölü senaryosu kurgulayacağım.

Neden Bu Ürünler?

  • MinIO: Açık kaynaklı S3 uyumlu nesne depolama, kolay kurulum ve ölçeklenebilirlik.
  • Apache Iceberg: Açık tablo formatı, schema evolution, snapshot & time-travel desteği.
  • Apache Spark: Büyük veri işleme, batch ve streaming için esneklik.
  • Apache Kafka: Gerçek zamanlı veri taşıma için endüstri standardı.
  • Debezium: Veritabanlarındaki değişiklikleri yakalayan CDC (Change Data Capture) çözümü.
  • Apache Airflow: ETL orkestrasyonu, DAG tabanlı akış yönetimi.
  • Trino: Dağıtık SQL motoru, Iceberg üzerinde hızlı sorgulama.

Neden Iceberg?

Iceberg, klasik Hive tabanlı tablolara göre birçok avantaj sağlar:

  • ACID (Atomicity, Consistency, Isolation, Durability): Iceberg, transaction tabanlı yapısı sayesinde güvenli okuma/yazma işlemleri sunar. Aynı anda birden fazla işlemin veri tutarlılığını bozmadan çalışmasını sağlar ve partial write problemlerini ortadan kaldırır.
  • Schema Evolution: Tablo yapısı değişse bile veri kaybı olmadan devam edilebilir.
  • Time Travel: Eski snapshot’lara geri dönüp sorgulama yapılabilir.
  • Partition Evolution: Partition stratejisi değiştirilebilir.
  • Performans: Metadata layer sayesinde milyonlarca dosya bile hızlı yönetilir.

Kısacası uzun vadeli, güvenilir ve analitik odaklı bir veri gölü için Iceberg en mantıklı tercihlerden biridir.

Senaryolar

Senaryo A — Spark Structured Streaming

MSSQL → Debezium → Kafka → Spark Structured Streaming → Iceberg

Debezium Kurulumu ve Kafka Entegrasyonu

Debezium, veritabanındaki değişiklikleri Kafka’ya CDC event’leri olarak aktarır.

Connector config örneği (MSSQL için):

{
"name": "mssql-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "mssql_host",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "mydb",
"database.server.name": "mssql_server",
"table.include.list": "dbo.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mssql"
}
}

Kafka’ya düşen veriler genelde şu formatta olur:

{
"before": {"id": 1, "name": "Ali"},
"after": {"id": 1, "name": "Ali Veli"},
"op": "u",
"ts_ms": 1690000000000
}

op: işlem tipi (c=create, u=update, d=delete).

Spark Structured Streaming İşlemi

Structured Streaming job’u, Kafka topic’ini okuyup Iceberg tablosuna yazar.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = (SparkSession.builder
.appName("cdc-to-iceberg")
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "hadoop")
.config("spark.sql.catalog.my_catalog.warehouse", "s3a://warehouse/iceberg")
.getOrCreate())schema = StructType([
StructField("before", StringType()),
StructField("after", StringType()),
StructField("op", StringType()),
StructField("ts_ms", StringType())
])kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "mssql_server.dbo.customers")
.load())parsed_df = kafka_df.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data"))# Örneğin sadece insert/update olan kayıtları after’dan alıp yazıyoruz
final_df = parsed_df.select("data.after.*")(final_df.writeStream
.format("iceberg")
.option("checkpointLocation", "s3a://chkpt/customers/")
.toTable("my_catalog.db.customers"))

Bu job sürekli çalışarak gerçek zamanlı güncellenmiş bir Iceberg tablosu sağlar.

Senaryo B — Kafka + Airflow + Spark Batch

MSSQL → Debezium → Kafka → Airflow DAG → Spark Batch → Iceberg

Airflow DAG (Kafka → Spark Batch)

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago
default_args = {"owner": "airflow", "start_date": days_ago(1)}dag = DAG(
dag_id="kafka_to_iceberg_batch",
default_args=default_args,
schedule_interval="@hourly",
catchup=False
)
spark_job = SparkSubmitOperator(
task_id="spark_batch_load",
application="/opt/spark_jobs/kafka_batch_to_iceberg.py",
conn_id="spark_default",
dag=dag
)

Spark Batch Job (kafka_batch_to_iceberg.py)

kafka_df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "mssql_server.dbo.customers")
.load())
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)")
parsed_df.writeTo("my_catalog.db.customers").append()

Burada batch olarak Kafka’dan alınan veri Iceberg’e yazılır. Saatlik, günlük veya custom aralıklarla çalıştırılabilir.

Senaryo C — Kafka Connect → Iceberg Sink (Spark’sız)

MSSQL → Debezium → Kafka → Kafka Connect Iceberg Sink → Iceberg

Avantaj: Spark bağımlılığı yok, sade mimari.
Kullanım Alanı: Daha basit kurulumlar.

Debezium MSSQL Source Connector (connector-mssql.json)

{
"name": "mssql-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "mssql-host",
"database.port": "1433",
"database.user": "sa",
"database.password": "StrongPassword!",
"database.dbname": "mydb",
    "database.server.name": "mssql_server1",
"table.include.list": "dbo.users,dbo.orders",
"database.encrypt": "false",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb",
"include.schema.changes": "true",
"tombstones.on.delete": "false",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

Bu konfigürasyon MSSQL’de CDC aktif tabloları dinler ve Kafka’da şu formatta topic oluşturur:

mssql_server1.dbo.users

mssql_server1.dbo.orders

Iceberg Sink Connector (connector-iceberg.json)

{
"name": "iceberg-sink-connector",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "1",
    "topics": "mssql_server1.dbo.users",
"iceberg.catalog.type": "hadoop",
"iceberg.catalog.warehouse": "s3a://datalake/warehouse",
"iceberg.catalog.hadoop.fs.s3a.endpoint": "http://minio:9000",
"iceberg.catalog.hadoop.fs.s3a.access.key": "minioadmin",
"iceberg.catalog.hadoop.fs.s3a.secret.key": "minioadmin",
"iceberg.catalog.hadoop.fs.s3a.path.style.access": "true",
"iceberg.catalog.name": "hive_prod",
"iceberg.table.auto-create": "true",
"iceberg.table.namespace": "db1",
"iceberg.table.name": "users",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}

Bu ayarlarda:

  • Kafka’daki mssql_server1.dbo.users topic’inden gelen event’ler MinIO üzerindeki Iceberg tablosuna (hive_prod.db1.users) yazılır.
  • auto-create parametresi tablonun Iceberg tarafında yoksa otomatik oluşturulmasını sağlar.

Senaryo D — Kafka Sensor DAG

Airflow, Kafka topic’ini dinleyerek yeni veri geldiğinde Spark job tetikleyebilir.

Örnek DAG

from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import KafkaSensor
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago
default_args = {"owner": "airflow", "start_date": days_ago(1)}dag = DAG("kafka_sensor_to_spark", default_args=default_args, schedule_interval=None)wait_for_kafka = KafkaSensor(
task_id="wait_for_kafka_msg",
topic="mssql_server.dbo.customers",
kafka_conn_id="kafka_default",
dag=dag
)
spark_task = SparkSubmitOperator(
task_id="spark_job",
application="/opt/spark_jobs/kafka_to_iceberg.py",
conn_id="spark_default",
dag=dag
)wait_for_kafka >> spark_task

Burada Spark job’u ancak Kafka’ya veri düştüğünde çalışır.

MSSQL Tarafında CDC Ayarları

ALTER DATABASE TestDB SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
ALTER TABLE dbo.Customers ENABLE CHANGE_TRACKING;

Iceberg Partitioning Stratejisi

CREATE TABLE my_catalog.db.customers (
id BIGINT,
name STRING,
country STRING,
updated_at TIMESTAMP
) USING iceberg
PARTITIONED BY (days(updated_at));

Trino ile Sorgulama

etc/catalog/iceberg.properties:

connector.name=iceberg
warehouse=s3a://datalake/warehouse

Sorgu örneği:

SELECT country, COUNT(*) FROM iceberg.db.customers
WHERE updated_at >= DATE '2025–01–01' GROUP BY country;

Incremental ve Full SQL Load Örnekleri

Full Load (Overwrite)

df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", "dbo.customers").load()
df.writeTo("my_catalog.db.customers").overwritePartitions()

Incremental Load (Append)

new_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", "dbo.customers_delta").load()
new_df.writeTo("my_catalog.db.customers").append()

SQL API ile

INSERT INTO my_catalog.db.customers SELECT * FROM temp_view;

Özetle, bu yazıda MSSQL → Iceberg entegrasyonunu birden fazla yöntem üzerinden ele alarak farklı mimari tercihlere göre uygulanabilecek temel yaklaşımlara yer verdim.

Buradaki değerlendirmelerin, kurumsal veri platformlarını modernleştirmek ve analitik ihtiyaçlar için daha esnek bir altyapı oluşturmak isteyen ekipler için yol gösterici olacağını düşünüyorum.

  • Çözümlerimiz
  • Teknolojiler
  • Başarı Hikayelerimiz
  • Şirket
  • Takım