Apache Airflow is a job orchestration framework that allows developers to programmatically author, schedule, and monitor data pipelines. Apache Airflow itself and all the workflows are written in Python. So workflows become easy to code, more maintainable, versionable, testable, and collaborative. 

Concepts of Apache Airflow

Dag (Directed Acyclic Graph)

DAGs are workflows that you specify job description. DAG is a collection of all the tasks and their dependencies. For example in a simple ETL DAG might look like this:
Extract (Task) -> Transform (Task) -> Load (Task) -> Check Data Consistency (Task) -> Send ETL Status Mail (Task)

Task

Task is a runnable job that triggered by operators.

Operator

Operators are a Python class that acts as a template for a type of job. They describe a single task in a workflow/DAG. There are various operator template in Airflow:

  • BashOperator: Executes a bash command.
  • PythonOperator: Calls an arbitrary Python function.
  • EmailOperator: Sends an email.
  • OracleOperatorPostgresOperatorMySqlOperator, etc. : Executes SQL Commands.
  • SqoopOperator: Executes Sqoop jobs.
  • HiveOperator: Executes Hive commands.

Web Server

Apache Airflow’s Web UI that displays the status of DAGs. And also it provides an interface to read logs, trigger DAGs, or only tasks.

Scheduler

Scheduler is a mechanism that decides which tasks need to be scheduled, queue, run, wait, etc.

Executor

Executors are the mechanism by which tasks get run. There are a few different varieties of executors.

Apache Airflow Executors

Currently, Apache Airflow has 6 types of executors. 

  • Sequential Executor (Single Node)
  • Local Executor (Single Node)
  • Celery Executor (Multi-Node)
  • Dask Executor (Multi-Node) 
  • Kubernetes Executor (Multi-Node) 
  • Scaling Out with Mesos (Multi-Node) 

Single Node Architecture

Figure 1 – Airflow Single Node Architecture

In Single Node Architecture, all Airflow components (Scheduler, Web server, Worker) are located in one node. In this architecture, Local Executor is the best way to go in production until your resource is sufficient. And you can scale it Vertically. Sequential Executor must be used only testing.

Figure 2 – Vertical vs Horizontal Scaling

Unfortunately, Single Node Executors have SPOF (Single Point of Failure). When the Node that contains all Airflow components crashes, the tasks will stop and each component must be run one by one after starting the Node.

One of the good options to go in Production with Multi-Node…

is Celery Executor.
Multi-Node architecture allows to distribute each Airflow component to several machines and it ensures Horizontal scaling as well as Vertical scaling for Airflow cluster. So this architecture prevents SPOF (Single Point of Failure) and it would ensure High Availability for the Airflow cluster.

Celery Executor is the mature option to switch to Multi-Node architecture. In the Celery Executor, each component can be located either on a node or several nodes.
You can scale out the Workers according to your resource requirement or request of Load Balancer / High availability.

Celery itself is a Distributed Task Queue to process vast amounts of messages in Python.
Therefore, you must set a message broker for Celery Executor. Broker is the message (task) transport layer in Celery. The Worker pulls tasks from the broker. You can choose one of the below alternatives [1]. I’m going to use Redis in this article.

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No

Flower UI

Flower is a monitoring tool which you can view tasks’ status. It provides real-time monitoring of the following features. 

  • Tasks’s status and statistics that distributed among worker nodes
    • Start time, run time, due date, etc.
    • Information about the server where the task running/failed/scheduled etc.
  • Currently running tasks
  • Currently scheduled tasks
  • STARTED, RUNNING, FINISHED, FAILED, etc. task statuses
  • Configuration Viewer 
    • worker_concurrency, message_broker, queues in the brokers

Figure 3 – Airflow – Multi-Node Celery Executor Architecture

I’m going to install Miniconda py37_4.8.2 and Airflow 1.10.9 on an Oracle Linux 7 server and Apache Airflow 1.10.9 version.

Pre-Installation 

Miniconda installation and preparing Airflow virtual environment

I recommend you using a Conda virtual environment to install Apache Airflow. Because pip is not enough to download OS dependencies of Python libraries such as gcc, gcc-cc+, MKL, etc. Conda can handle non-Python library dependencies and it can keep them in an isolated environment from OS. You should use Minicondabecause of it’s lighter than Anaconda.

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

chmod +x Miniconda3-latest-Linux-x86_64.sh sudo ./Miniconda3-latest-Linux-x86_64.sh
  • You must choose an installation location for Miniconda, I chose /opt/miniconda.

 

Redis installation

  • I’m going to set Redis as message broker. To install Redis run below command
sudo yum install redis
  • Open the Redis-Server
sudo service redis start
  • Check service status and be sure the redis-server is running. It will be run on 6379 port. 
sudo service redis status
  • Other nodes must be access to port 6379
  • Set “bind 0.0.0.0” and “protected-mode no” in redis.conf file and restart the service
vim /etc/redis.conf
sudo service redis restart
netstat -nltp | grep 6379

Installation

  • After installation of prerequisites, create a Python3.x conda environment in a specific location which you want
conda create --prefix=/opt/airflowenv python=3.7
  • Activate the environment
conda activate /opt/airflowenv
  • Install Airflow !
conda install --channel conda-forge airflow
  • Install Celery
conda install -channel conda-forge celery
  • You must specify AIRFLOW_HOME location. Otherwise Airflow will use ~/airflow folder. (default)
export AIRFLOW_HOME=/opt/airflow

Note: You can set several specific environment variables only when activating conda environment. To ensure this firstly you must create following folders and files and reactivate the environment:

/<conda_env_path>/etc/conda/activate.d/myenvs.sh

 

/<conda_env_path>/etc/conda/deactivate.d/unsetmyenvs.sh 

  • Initialize the Airflow SQLite repository database. (We will change this in next steps)
airflow initdb
  • To modify Airflow configurations, open the $AIRFLOW_HOME/airflow.cfg file
vim $AIRFLOW_HOME/airflow.cfg
executor = CeleryExecutor
  • Set metadata database (Airflow official document recommends using MySQL or PostgreSQL)
sql_alchemy_conn = mysql://<user>:<pw>@<hostname>:3306/airflow

or

sql_alchemy_conn = postgresql+psycopg2://<user>:<pw>@<hostname>:5432/airflow
  • Set result_backend
result_backend = db+postgresql://<user>:<pw>@<hostname>:5432/airflow

or

result_backend = db+mysql://<user>:<pw>@<hostname>:3306/airflow
  • Set message broker
broker_url = redis://<user>@<hostname>:6379/0
  • Again initialize the repository database. Now Airflow will establish the Postgres/MySQL metadata database with CeleryExecutor configurations instead of SQLite.
airflow initdb
  • After completing the steps above, you must copy all Airflow environment and Airflow home folders to other Airflow role/component hosts using scp or etc. (master1, worker1, workerN etc.).
  • Then you can start all the main Airflow components after activating environments on the machine which you prefer. For example:
[master1]$ conda activate /env_path/ && airflow webserver -p 8080
[master1]$ conda activate /env_path/ && airflow flower
[master2]$ conda activate /env_path/ && airflow scheduler
[worker1]$ conda activate /env_path/ && airflow worker
[worker2]$ conda activate /env_path/ && airflow worker 
[worker3]$ conda activate /env_path/ && airflow worker

Web UI

You can open the Web UI on master1:8080

Figure 4 – Airflow Web UI

Flower UI

You can open the Flower UI on master1:5555

Figure 5 – Airflow Flower UI

Finally installation is complete.

References