Airflow’da veri akışlarımızı tasarlarken, çoğu zaman dış kaynaktan aldığımız verilere ihtiyaç duyarız. Bu veriler veritabanındaki bir tablo veya dosya sistemindeki bir dosya gibi çeşitli kaynaklar olabilir. Task’larımızın hata almadan ve düzgün çalışabilmesi için akışa başlamadan önce bu verileri sensörler sayesinde kontrol edebilir ve hazır olduğu anda akışın başlamasını sağlayabiliriz.

Örnek

‘mytable’ tablosundaki verileri kullanan bir taskımız olduğunu varsayalım. Bu tabloya verileri doldurmak bizim kapsamımızda olmasın. Dolayısıyla ‘mytable’ tablosunda verilerin hazır olduğundan emin olamayız. Eğer ‘mytable’ tablosunda verilerimiz hazır değil ise taskımız hata alacaktır veya yanlış çalışaktır. Bunu engellemek için aşağıdaki gibi bir sensör hazırlayabiliriz.

sensor = SqlSensor(task_id='check_data_is_ready',
                   dag=dag,
                   conn_id='my_conn_id',
                   sql='select * from mytable',
                   poke_interval=600,
                   timeout=3600
                   )

Her sensörün kendine özgü ‘success’ kriteri vardır. Bunları dökümanların veya kaynak kodlarından inceleyerek öğrenebilirsiniz.

SqlSensör’de sql sonucu dönen değer (0, ’0’, ’’, None) değil ise success durumuna geçecektir. [1]

Parametreler

connection_id : sorgunun koşulurken kullanacağı bağlantı id’si. Web arayüzde Admin -> Connections sekmesinden tanımlayabilirsiniz.

sql: Koşulacak sorgu. Sensörün başarılı durumuna geçebilmesi için, sorgu sonucu dönen datadaki 1. Satır 1.sutün değerin (0, ’0’, ’’, None) harici bir değer olması gerekir.

poke_interval: sql parametresinin ne kadar sıklıkla koşulacağını belirtmek için kullanılınır. Saniye cinsinden bir değer verilmelidir. Örneğin 600 ise her 10 dakikada bir sql sorgusu koşulur, success kritilerlerine uygun değil ise 600 saniye sonra tekrar eder.

timeout: Sensörün ne kadar saniye sonra fail durumuna geçeceğini belirtir. Mantıken poke_intervalden daha büyük bir değer verilmelidir.

Sensor çeşitleri

airflow.contrib.sensors.file_sensor
airflow.contrib.sensors.ftp_sensor
airflow.contrib.sensors.sftp_sensor
airflow.contrib.sensors.hdfs_sensor
airflow.contrib.sensors.python_sensor
airflow.contrib.sensors.weekday_sensor
airflow.sensors.hdfs_sensor
airflow.sensors.hive_partition_sensor
airflow.sensors.http_sensor
airflow.sensors.sql_sensor

Yukarıda sensorlerden bazılarını listeledim. [2][3] Bunun gibi birçok daha hazır sensörü kullanabilir veya plugins’ler sayesinde kendi sensörünüzü geliştirip kullanabilirsiniz. [4]

Avantajları

Sensörlerin sağladığı faydaları Operatorlerin retry parametresini kullanarak veya çeşitli operatörlerle de yapabileceğini anlayanlar olmuştur. Sensörleri kullanmanın en büyük avantajı kullanımı basit olması ve akışları daha basit, anlaşılır hale getirmesidir.

Bazen tasklarımız cpu, memory gibi çok fazla kaynağa ihtiyaç duyar. Bu gibi tasklarda sensör yerine retry mekanızmasını cpu, memory gibi kısıtlı kaynakları çok fazla harcamasına sebep olacaktır.

Resimde açıkça görüldüğü gibi, sensor3’ün hazır olmamasından dolayı my_heavy_task  başlayamamıştır. Sensörleri kullanmamış olsaydık, bunu tespit edebilmek için my_heavy_task   taşkının loglarını incelemeden bunu anlamak mümkün olmayacaktır.

Dezavantajları

Şu ana kadar gözüme çarpan tek dezavantajı versiyon 1.10.10 ve öncesinde, sensör instance’ları timeout süresini doldurana kadar running state’inde kalmaya devam ediyor. [5] Örnekte poke_interval=600 ve timeout=3600 vermiştik. Burada sensörümüz her 10 dakikada bir sorgu atacak olmasına rağmen eğer success statine geçemez ise 1 saat boyunca running state’ınde kalacak. Bu da cpu’nun gereksiz kullanılmasına sebep olacaktır.

Sonuç

Sensorlerin temel amacı akışınız için gerekli tüm metaryerlerin hazır olduğundan emin olmak. Bu sayede daha hatasız, tam zamanında çalışan akışlar tasarlamamıza yardımcı olurlar.

Kaynakça

[1] https://airflow.apache.org/docs/stable/_modules/airflow/sensors/sql_sensor.html
[2] https://airflow.apache.org/docs/stable/_api/airflow/sensors/index.html?highlight=sensor#module-airflow.sensors
[3] https://airflow.apache.org/docs/stable/_api/airflow/contrib/sensors/index.html?highlight=sensor#module-airflow.contrib.sensors
[4] https://airflow.apache.org/docs/stable/plugins.html
[5] https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization