Apache iceberg, petabyte boyutundaki tablolar için tasarlanmış açık kaynak kodlu bir tablo formatıdır. Tablo formatını, bir tabloyu oluşturan tüm dosyaların düzenlenmesini, yönetilmesini ve izlenmesini en iyi şekilde gerçekleştirtiren bir katman olarak düşünebiliriz. 2017 yılında Netflix tarafından ortaya çıkarılan proje daha sonra açık kaynaklı olarak Apache Software Foundation tarafından geliştirilmiştir.

Iceberg, asıl olarak bulut tabanlı depolama sistemlerinde kullanılmak amacıyla tasarlanmıştır. Hive’ın Amazon Depolama Servisinde (Amazon S3) kullanımında karşılaşılan veri tutarlılığı ve performans sorunlarının çözümü için geliştirilmiştir. Hive tablo verilerinin kaydını klasör seviyesinde (folder level) tuttuğu için verilerde yapılacak işlemlerde dosya listeleme işlemleri gerçekleştirmek zorundadır. Listeleme işlemlerinin artması bulut sistemlerinde performansı etkilediği için istenmeyen bir durumdur. Aynı zamanda bulut depolama sistemlerinde çok fazla listeleme işlemi yapıldığında bazı verilerin kaçırılabileceği söylenmektedir. Iceberg ise bir tablonun tüm dosyalarını bir ağaç yapısında (tree structre) takip ederek bir diğer deyişle verilerin takibini dosya seviyesinde (file level) yaparak bu sorunların önüne geçmiştir.

Tabloda yapılan bir değişiklik tüm dosyaların yollarını (file paths) içeren yeni bir metadata dosyasını oluşturur. Bir diğer avantajı, maliyetli dosya listeleme işlemlerinin azaltılması sonucu sorguların daha hızlı çalışır hale gelmesidir.

Iceberg Tablo Yapısı

Iceberg, her bir write ve commit işlemi için bir snapshot (anlık görüntü) tutmaktadır. Snapshotları anlık bir zamandaki tablo durumu olarak düşünebiliriz. Her snapshot, o anki tablonun içeriğini oluşturan tüm veri dosyalarını listeler. Her bir write işleminden sonra aşağıdaki şekildeki yeni bir snapshot oluşmaktadır. Okuyucular (readers) o anki write işlemlerinden etkilenmezler. Aşağıdaki şekilde S2 anında bir tablo okunduğu anda write işlemi başlamış ve S3 snapshotu oluşturulmuştur. Okuyucu S2’yi okurken izole bir şekilde write işlemi yapılmış ve yeni S3 snapshotu oluşturulmuştur. Böylelikle tablolarda olası bir kilitlenmenin (lock) önüne geçilir.

Şekil 1: Iceberg snapshots

Snapshotların detaylı yapısını Şekil 2’deki gibi inceleyebiliriz. Tree structure kullandığından bahsetmiştik. Her bir nodun hangi verileri tuttuğunu kısaca özetleyebiliriz:

  • Snapshot: Tüm veri dosyaları dahil olmak üzere, anlık tablonun durumu.
  • Manifest list: Manifest dosyalarını listeleyen bir dosya, her snapshot için bir adet oluşur.
  • Manifest: Dosyalara ulaşırken bölümlendirilmiş alanların bilgisi tutulur. Partititon olarak düşünebiliriz.
  • Data file: Bir tablonun satırlarını içeren veri dosyası.

Şekil 2: Iceberg tablo formatı

Iceberg parquet, avro, orc dosya türlerini desteklemektedir. Şemada değişikliği yapılırken sadece meta veriler etkilendiği için dosyaların tekrardan yazılmasına gerek yoktur. Iceberg şema güncellemelerinde aşağıdaki değişiklikleri yapabilmektedir:

  • Add: Tabloya yeni bir sütun eklenmesi
  • Drop: Mevcut bir sütunun tablodan kaldırılması
  • Rename: Sütunların yeniden adlandırılması
  • Update: Sütunların veri tiplerinin güncellenmesi

Partition Yapısı

Iceberg hidden partition yapısını kullanmaktadır. Yani partition yapısı kurulurken tablodaki sütunlar ilişkilendirilir. Partition için tabloda yeni bir alan yaratılmaz. Bu yapı sayesinde kullanıcıların partition yapısını bilmelerine gerek yoktur. Tablo yaratılırken belirlenen “partition spec” inin sorgulama sırasında belirtilmesine gerek yoktur, çünkü bunu Iceberg kendi sorgu planında gerçekleştirir. Bu sayede sorguların hızlı çalışmasını sağlar ve sorgulardaki kullanıcı hataları minimuma indirilir.

Yeni bir partition yaratıldığı zaman eski datalar eski partitionlarda kalırken yeni partiitonlara sadece yeni veriler yazılır. Ayrıca yeni partition yaratılırken tablonun yeniden oluşturulması gerekmez, dinamik bir yapı mevcuttur.

Şekil 3: Partition değişimini gösteren bir örnek

Yukarıdaki şekilde görüldüğü gibi başlangıçta ay (month) olarak belirlenen partition yapısı, 2009-01-01 tarihinden itibaren gün (day) olarak değiştiirlmiştir. Peki Iceberg sorgu yazıldığında nasıl davranacak? Örnekteki sorgu çalıştırıldığında Iceberg 2 ayrı sorgu planı oluşturur. Sorgudaki filtre hem eski partiiton’ı hem de yeni partition’ı kapsadığı için yukarıdaki yeşil alanlar sorgu planına dahil edilecektir.

Iceberg Java API

Iceberg’ın Java ve Python API’ları mevcut olup, kurulumu ve kullanımları kolaydır. Bu yazıda Java API üzerinden örnekler göstereceğim. Iceberg tablosu yaratmak için aşağıdaki kodu kullanabiliriz. Şema ve  partition bilgileri elimde olan “customer_db”  tablosuna göre oluşturuldu.

   Schema schema = new Schema(
            optional(1, "customerId", Types.LongType.get()),
            optional(2, "customerFName", Types.StringType.get()),
            optional(3, "customerLName", Types.StringType.get()),
            optional(4, "customerEmail", Types.StringType.get()),
            optional(5, "customerPassword", Types.StringType.get()),
            optional(6, "customerStreet", Types.StringType.get()),
            optional(7, "customerCity", Types.StringType.get()),
            optional(8, "customerState", Types.StringType.get()),
            optional(9, "customerZipcode", Types.LongType.get())
    );
    PartitionSpec spec = PartitionSpec.builderFor(schema)
            .identity("customerCity")
            .build();

    TableIdentifier id = TableIdentifier.parse("retail_db.customers");

Daha sonra tabloyu yaratmak için HDFS’de depolanan tabloları destekleyen HadoopCatalog ile tablomuzu yaratıyoruz.

Configuration conf = new Configuration();
String warehousePath = "hdfs://localhost:9000/user/mehmet/test";

HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
Table table = catalog.createTable(id, schema, spec);

Tablomuza verileri spark kullanarak insert edebiliriz. Bunun için ilk önce verileri spark dataframe ile okuyup daha sonra da Iceberg tablosuna ekliyoruz.

//customers tablosundaki verileri okumak için
Dataset df;
df = spark.read().option("header", true).option("inferSchema", true)
.csv("/home/train/datasets/retail_db/customers.csv");


//Dataframe’i belirlediğimiz partition spec’ine göre ilk önce sort edip sonra append ediyoruz
    df.sortWithinPartitions("customerState").write()
            .format("iceberg").option("write-format", "avro")
            .mode("append").save("hdfs://localhost:9000/user/mehmet/test/retail_db/customers");

Tablodan verileri okumak için:

 Dataset<Row> df;
 df = spark.read().format("iceberg").load("hdfs://localhost:9000/user/mehmet/test/retail_db/customers");

Snapshot ve dosyaların bilgilerini görmek için:

//snapshots
Dataset df;
df = spark.read().format("iceberg").load("hdfs://localhost:9000/user/mehmet/test/retail_db/customers#snapshots");

//Files
Dataset df;
df = spark.read().format("iceberg").load("hdfs://localhost:9000/user/mehmet/test/retail_db/customers#files");

Mevcut partition’ın güncellenmesi:

Configuration conf = new Configuration();
String warehousePath = "hdfs://localhost:9000/user/mehmet/test";
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);

TableIdentifier id = TableIdentifier.parse("retail_db.customers");
Table table = catalog.loadTable(id);
table.updateSpec().addField("customerState").removeField("customerCity").commit();

Zamanda Yolculuk (Time Travel)

Iceberg her write işleminde yeni bir snapshot oluşturduğu için yapılan tüm değişiklikler istenilen snapshotlara gidilerek gözlenebilir. Snapshot verilerine erişmek için:

//snapshots
Dataset df;
df = spark.read()
          .format("iceberg")
          .load("hdfs://localhost:9000/user/mehmet/test/retail_db/customers#snapshots");
System.out.println("Snapshots");
df.show(false);

Size şu sonuçları verecektir:

Şekil 4: Snapshot listesi.

Belirli bir snapshota gitmek için snapshot-id ‘yi kullanabiliriz:

Dataset df;
df = spark.read()
          .format("iceberg")
          .option("snapshot-id", “6335699880622112640”) #Snapshot id’si
          .load("hdfs://localhost:9000/user/mehmet/test/retail_db/customers");

Tabloyu önceki bir sürüme döndürmek için (rollback):

table.rollback()
     .toSnapshotId(“6335699880622112640”)
     .commit();

Hive vs Iceberg

Hem kısa bir özet olması açısından hem de Hive ile olan benzerlik ve farklılıkları gösteren aşağıdaki tabloyu inceleyebilirsiniz:

  Hive Iceberg
Dosya Formatı Text File, SequenceFile , RCFile , Avro, ORC, Parquet Parquet, avro, orc
Zaman Yolculuğu (Time Travel) Yok Var
Dosyaların Takibi Klasör seviyesinde ( O(n) ) Dosya seviyesinde ( O(1) )
Rollbacks Yok Var
Hidden partitioning * Yok Var
In-place partition evolution** Yok Var

(*) Hive, bir partition sütunu ile kaynak sütunu arasındaki ilişkiyi takip etmez.

(**) Hive’da, bir parititon’ı değiştirmek için tüm tablonun yeni partition sütunu ile tamamen yeniden yazılması gerekir.