Veri mühendisliği bir çok farklı domainde ki veriyi; kontrol etme, depolama, manipüle etme gibi bir çok işlevselliğe sahip olmayı gerektiren bir meslek dalıdır. Bu çalışmada büyük operasyonlarda görev alan veri mühendislerinin operasyonun devamlılığı ve çıkabilecek sorunlardan sadece üçünü otomatize etmesi adına gerçekleştirilmiş bir hands-on projedir.
Peki bu sorunlar nelerdir?
- Bir prod ortamında bir çok geliştiricinin core kullanımlarının kontrol altında tutulması gerekmektedir. Bu olmadığı takdirde bir çok başka core tüketmesi gereken iş aksayacaktır veya core / container oranlarının bire yakın olması ile devam eden süreçte verimsiz kaynak kullanımları gerçekleşebilmektedir.
- Airflow DAG’leri scheduler çalışan ve birbirine çeşitli sensörler aracılığı ile bağlı olarak birbirini trigger edebilmektedir. Bu da herhangi bir sensörde oluşan sıkıntının tüm dagleri domino etkisi ile etkilemesi demektir.
- Her şirketin belirli bir disk kapasitesi bulunmaktadır. Günümüz de şirketlerin veri akışları çok astronomik boyutlara doğru ilerlemektedir. Bu süreçte hangi verilerin tutulması gerektiği hangi verilerin silinmesi gerektiği kontrol altında tutulmazsa şirket artık yeni veri girişleri sağlayamacak ve operasyonelliği sekteye uğrayacaktır.
Yukarıda oluşan günlük operasyonların kontrol altında tutulması ve çeşitli tablolara yazılarak takibi sağlanması gerekmektedir gerekirse alarm mailleri oluşturularak ilgili kişilere bu mailin scheduler şekilde ulaştırılması elzemdir. Bu süreçleri de yöneten ve kontrol altında tutulmasını sağlayacak hands-on bir proje gerçeklenmiştir. Bu çalışmalar Airflow Dagine takılarak scheduler çalışması sağlanmıştır.
Çalışmalar Bünyesinde kullanılan teknolojiler:
Apache Hadoop, Apache Hive, YARN, Apache Airlfow ,SQLite, Bash, Python, SQL
Çalışmalar:
- Yarn Loglarının Hive Tablosuna Yazılması
- Airflow DAG Loglarının Okunması Son Fail Taskların Tespiti Ve Task Sürelerinin Çıkarılması
- Hadoop Dosyalarının Bulunma Sürelerinin Hive Tablosuna Yazılması
Yarn Loglarının Hive Tablosuna Yazılması
YARN, kaynak yönetimini uygulama zamanlamadan/izlemeden ayrıştırarak birden çok programlama modelini (Apache Hadoop MapReduce bunlardan biri) destekler. YARN genel resource manager(RM), çalışan düğümü başına Node Manager (NM) ve uygulama başına Application Master(AM) kullanır. Kısacası YARN tam olarak kaynak yönetimini sağlar.
Bu süreçte YARN verilerini alabilimemiz için gerekli pip kütüphanesi airflow venv ortamına install edilir.
Ardından ilgili Rest API’ den verinin çekilebilmesi için gerekli script aşağıda verilmiştir.
Gelen veri ise bir airflow Xcom’una yollanmıştır bu xcom’dan da json verisi json->dataframe dönüşümü sağlanıcak, task fonksiyonunda tutulacaktır(pulling) edilecektir.
Json verisi ilk önce dataframe’e çevrilerek pandas ile manipüle edilebilir hale getirilmiş ve aynı zamanda bir iniş dosyasına(landing file) da “.csv” formatında indirilmiştir.
İlgili fonksiyonlar airflow dagine takılarak task olarak eklenmiştir.
Airflow DAG Loglarının Okunması Son Fail Taskların Tespiti Ve Task Sürelerinin Çıkarılması
Airflow DAG’lerinin status durumları, execution zamanları vb metrikler airflow metadb’sinde depolanır. Bu metadb sayesinde ise DAG’lerin genel durumları hakkında çeşitli dökümler, veya BI dashboardları da çıkarılabilmektedir. Bu çalışmada ise kıstas alınan iki temel metrik bulunmaktadır. Bunlardan birincisi fail olan daglerin tespit edilmesi ve tasklerin çalışma süreleri; tasklerin çalışma sürelerinin zaman zaman uzaması anormal durumların erken fark edilmesi açısında önemlidir.
Çalışma bünyesinde SQLite üzerinde connection oluşturulmuş olup ilgili veriler Hadoop bünyesine iniş dosyası(landing file) üzerinden geçirilmiştir ilgili fonksiyon dag’e takılmış ve hadoop’a ilgili veri uçurularak task şeması tamamlanmıştır.
Hadoop Dosyalarının Bulunma Sürelerinin Hive Tablosuna Yazılması
Bash script kullanılarak hadoop’da bulunan dosyaların bulunma sürelerini airflow dag’i üzerinden scheduler bir biçimde hive tablosuna yazarak, hadoop dosya kontrolünün daha düzenli ve kontrol edilebilir boyuta getirilmesi ve ölçeklenebilir bir çalışma düzlemi oluşturulması amaçlanmıştır. Bu süreçte ilgili bash script “.sh” uzantısı ile yazılmış olup aşağıda verildiği gibi hadoop içerisinde ki dosyaları tarayarak içlerinden 10 gün ve daha fazla gün bulunan dosyaların airflow logunda çıkarılması, ve dosyaların yaşam sürelerinin hive tablosuna yazılması ile sonuçlandırılmıştır.
Burada kullanılan date alma yöntemi awk’ler ile dosya ismi üzerinden default alınmıştır ancak direkt olarak hdfs fs -stat “%Y” {path_dir} üzerinden hdfs dfs -ls -C(sadece dosyanın ismi alınır) şeklinde de alınabilirdi ancak burada hive tablosuna yazılan verinin özelliği silinecek verinin yetkilerinin de kimde olduğunun gösterilmesidir. Sonuç olarak ilgili bash file’da airflow dagine takılarak sonuçlandırılmıştır.
Burada bash ve “.sh” dosya uzantısı sonrası bir boşluk bırakmak gerekmektedir.
Açıklama için: https://stackoverflow.com/questions/66052458/how-to-run-a-shell-script-through-apache-airflow
Sonuç olarak bir operasyonel süreci kontrol altında tutmak ve bu süreçlerden devamlı olarak bilgi almak adına çıkabilecek üç adet problem için çeşitli bilgi kaynakları yaratıldı.