How to Setup Airflow Multi-Node Cluster with Celery & RabbitMQ
What is Airflow?
Programmatically author, schedule & monitor workflow. It provides Functional abstraction as an idempotent DAG(Directed Acyclic Graph). Function’s as an abstraction service for executing tasks at scheduled intervals.
Airflow Single Node Cluster
In Single Node Airflow Cluster, all the components (worker, scheduler, webserver) are been installed on the same node known as “Master Node”. To Scale a Single Node Cluster, Airflow has to be configured with the LocalExecutor mode. Worker pulls the task to run from IPC (Inter process communication) queue, this scales very well until the amount of resources available at the Master Node. To scale Airflow on multi-node, Celery Executor has to be enabled.
Airflow Multi-Node Cluster
In Multi-node Airflow Architecture deamon processes are been distributed across all worker nodes. As Webserver and scheduler would be installed at Master Node and Workers would be installed at each different worker nodes so It can scale pretty well horizontally as well as vertically. to use this mode of architecture, Airflow has to be configured with CeleryExecutor.
Celery Backend needs to be configured to enable CeleryExecutor mode at Airflow Architecture. Popular framework / application for Celery backend are Redis and RabbitMQ. RabbitMQ is a message broker, Its job is to manage communication between multiple task services by operating message queues. Instead of IPC communication channel which would be in Single Node Architecture, RabbitMQ Provides Publish — Subscriber mechanism model to exchange messages at different queues. Each queue at RabbitMQ has published with events / messages as Task commands, Celery workers will retrieve the Task Commands from the each queue and execute them as truly distributed and concurrent way. Which can really accelerates the truly powerful concurrent and parallel Task Execution across the cluster.
Celery
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. Airflow uses it to execute several Task level Concurrency on several worker nodes using multiprocessing and multitasking. Multi-node Airflow architecture allows you to Scale up Airflow by adding new workers easily.
Airflow Multi-Node Cluster with Celery Installation and Configuration steps
Note: We are using CentOS 7 Linux operating system.
- Install RabbitMQ
yum install epel-release
yum install rabbitmq-server
- Enable and start RabbitMQ Server
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
- Enable RabbitMQ Web Management Console Interface
rabbitmq-plugins enable rabbitmq_management
rabbitmq server default port number is 15672, default username and password for web management console is admin/admin.
- Install pyamqp tranport protocol for RabbitMQ and PostGreSQL Adaptor
pip install pyamqp
amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not. You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (http://github.com/celery/py-amqp)
Install PostGreSQL Adaptor: psycopg2
Psycopg is a PostgreSQL adapter for the Python programming language
pip install psycopg2
- Install Airflow
pip install 'apache-airflow[all]'
Check version of airflow
airflow version
We are using airflow version v1.10.0, recommended and stable at current time.
- Initialize Database
After Installation and configuration, you need to initialize database before you can run the DAGs and it’s task. so latest changes would get reflected to Airflow metadata from configuration.
airflow initdb
- Celery Installation
Celery should be installed on master node and all the worker nodes.
pip install celery==4.3.0
Check the version of Celery
celery --version
4.3.0 (rhubarb)
- Change in airflow.cfg file for Celery Executor
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
dags_are_paused_at_creation = True
load_examples = False
Once you have made this changes in the configuration file airflow.cfg, you have to update the airflow metadata with command airflow initdb and later restart the airflow.
You can now start the airflow webserver with below command
# default port is 8080
airflow webserver -p 8000
You can start the scheduler
# start the scheduler
airflow scheduler
You have to also start the airflow worker at each worker nodes.
airflow worker
Once you’re done with starting various airflow services. You can check fantastic airflow UI at
http://<IP-ADDRESS/HOSTNAME>:8000
as we have given port 8000 in our webserver start service command, otherwise default port number is 8080. Yes! We are done with Building Multi-Node Airflow Architecture cluster. :)