Veri mühendisliği bir çok farklı domainde ki veriyi; kontrol etme, depolama, manipüle etme gibi bir çok işlevselliğe sahip olmayı gerektiren bir meslek dalıdır. Bu çalışmada büyük operasyonlarda görev alan veri mühendislerinin operasyonun devamlılığı ve çıkabilecek sorunlardan sadece üçünü otomatize etmesi adına gerçekleştirilmiş bir hands-on projedir.

Peki bu sorunlar nelerdir?

  • Bir prod ortamında bir çok geliştiricinin core kullanımlarının kontrol altında tutulması gerekmektedir. Bu olmadığı takdirde bir çok başka core tüketmesi gereken iş aksayacaktır veya core / container oranlarının bire yakın olması ile devam eden süreçte verimsiz kaynak kullanımları gerçekleşebilmektedir.
  • Airflow DAG’leri scheduler çalışan ve birbirine çeşitli sensörler aracılığı ile bağlı olarak birbirini trigger edebilmektedir. Bu da herhangi bir sensörde oluşan sıkıntının tüm dagleri domino etkisi ile etkilemesi demektir.
  • Her şirketin belirli bir disk kapasitesi bulunmaktadır. Günümüz de şirketlerin veri akışları çok astronomik boyutlara doğru ilerlemektedir. Bu süreçte hangi verilerin tutulması gerektiği hangi verilerin silinmesi gerektiği kontrol altında tutulmazsa şirket artık yeni veri girişleri sağlayamacak ve operasyonelliği sekteye uğrayacaktır.

Yukarıda oluşan günlük operasyonların kontrol altında tutulması ve çeşitli tablolara yazılarak takibi sağlanması gerekmektedir gerekirse alarm mailleri oluşturularak ilgili kişilere bu mailin scheduler şekilde ulaştırılması elzemdir. Bu süreçleri de yöneten ve kontrol altında tutulmasını sağlayacak hands-on bir proje gerçeklenmiştir. Bu çalışmalar Airflow Dagine takılarak scheduler çalışması sağlanmıştır.

Çalışmalar Bünyesinde kullanılan teknolojiler:

Apache Hadoop, Apache Hive, YARN, Apache Airlfow ,SQLite, Bash, Python, SQL

Çalışmalar:

  • Yarn Loglarının Hive Tablosuna Yazılması
  • Airflow DAG Loglarının Okunması Son Fail Taskların Tespiti Ve Task Sürelerinin Çıkarılması
  • Hadoop Dosyalarının Bulunma Sürelerinin Hive Tablosuna Yazılması

Yarn Loglarının Hive Tablosuna Yazılması

YARN, kaynak yönetimini uygulama zamanlamadan/izlemeden ayrıştırarak birden çok programlama modelini (Apache Hadoop MapReduce bunlardan biri) destekler. YARN genel resource manager(RM), çalışan düğümü başına Node Manager (NM) ve uygulama başına Application Master(AM) kullanır. Kısacası YARN tam olarak kaynak yönetimini sağlar.

Bu süreçte YARN verilerini alabilimemiz için gerekli pip kütüphanesi airflow venv ortamına install edilir.

Copy to Clipboard

Ardından ilgili Rest API’ den verinin çekilebilmesi için gerekli script aşağıda verilmiştir.

Copy to Clipboard

Gelen veri ise bir airflow Xcom’una yollanmıştır bu xcom’dan da json verisi json->dataframe dönüşümü sağlanıcak, task fonksiyonunda tutulacaktır(pulling) edilecektir.

Json verisi ilk önce dataframe’e çevrilerek pandas ile manipüle edilebilir hale getirilmiş ve aynı zamanda bir iniş dosyasına(landing file) da “.csv” formatında indirilmiştir.

Copy to Clipboard

İlgili fonksiyonlar airflow dagine takılarak task olarak eklenmiştir.

Copy to Clipboard

Bu süreçte hedeflenen şema ise iniş dosyasına(landing file) indirilen “.csv” formatında ki dosyaların hadoop ortamına uçurulmasıdır. Aşağıda bulunan scriptte formatlama işlemin de atanan değişkenler ise, airflow aynı zamanda bünyesinde değişken saklama özelliklerine sahiptir. Bunu json’lar aracılığı ile yapmak mümkündür. Bu da projenin dinamik ve bir json keyine bağlı olarak farklı parametreler, dosya yolları gibi bir çok farklı değişkene json key’i aracılığı ile adapte olması sağlanmıştır.
Copy to Clipboard
Hadoop’a ilgili veri lokal üzerinden yollandıktan sonra hive üzerinde bir ddl schema oluşturulup (ilgili database içerisinde -log_resources-) ardından hive ile insert ve load işlemi gerçekleştirilir. Bir internal tablo olarak oluşturulmuştur.
Copy to Clipboard
İlgili string operasyonellikleri daglere takılarak(hive ve bash operatorleri aracılığı ile) task şeması sonuçlandırılmıştır.
Copy to Clipboard

Airflow DAG Loglarının Okunması Son Fail Taskların Tespiti Ve Task Sürelerinin Çıkarılması

Airflow DAG’lerinin status durumları, execution zamanları vb metrikler airflow metadb’sinde depolanır. Bu metadb sayesinde ise DAG’lerin genel durumları hakkında çeşitli dökümler, veya BI dashboardları da çıkarılabilmektedir. Bu çalışmada ise kıstas alınan iki temel metrik bulunmaktadır. Bunlardan birincisi fail olan daglerin tespit edilmesi ve tasklerin çalışma süreleri; tasklerin çalışma sürelerinin zaman zaman uzaması anormal durumların erken fark edilmesi açısında önemlidir.

Copy to Clipboard

Çalışma bünyesinde SQLite üzerinde connection oluşturulmuş olup ilgili veriler Hadoop bünyesine iniş dosyası(landing file) üzerinden geçirilmiştir ilgili fonksiyon dag’e takılmış ve hadoop’a ilgili veri uçurularak task şeması tamamlanmıştır.

Copy to Clipboard

Hadoop Dosyalarının Bulunma Sürelerinin Hive Tablosuna Yazılması

Bash script kullanılarak hadoop’da bulunan dosyaların bulunma sürelerini airflow dag’i üzerinden scheduler bir biçimde hive tablosuna yazarak, hadoop dosya kontrolünün daha düzenli ve kontrol edilebilir boyuta getirilmesi ve ölçeklenebilir bir çalışma düzlemi oluşturulması amaçlanmıştır. Bu süreçte ilgili bash script “.sh” uzantısı ile yazılmış olup aşağıda verildiği gibi hadoop içerisinde ki dosyaları tarayarak içlerinden 10 gün ve daha fazla gün bulunan dosyaların airflow logunda çıkarılması, ve dosyaların yaşam sürelerinin hive tablosuna yazılması ile sonuçlandırılmıştır.

Copy to Clipboard

Burada kullanılan date alma yöntemi awk’ler ile dosya ismi üzerinden default alınmıştır ancak direkt olarak hdfs fs -stat “%Y” {path_dir} üzerinden hdfs dfs -ls -C(sadece dosyanın ismi alınır) şeklinde de alınabilirdi ancak burada hive tablosuna yazılan verinin özelliği silinecek verinin yetkilerinin de kimde olduğunun gösterilmesidir. Sonuç olarak ilgili bash file’da airflow dagine takılarak sonuçlandırılmıştır.

Burada bash ve “.sh” dosya uzantısı sonrası bir boşluk bırakmak gerekmektedir.

Açıklama için: https://stackoverflow.com/questions/66052458/how-to-run-a-shell-script-through-apache-airflow

Copy to Clipboard

Sonuç olarak bir operasyonel süreci kontrol altında tutmak ve bu süreçlerden devamlı olarak bilgi almak adına çıkabilecek üç adet problem için çeşitli bilgi kaynakları yaratıldı.