What are the challenges when setting up Multi-Node Airflow Cluster

The last blog I wrote about "Apache Airflow was Setup and Configure Multi Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines". The question was , Did it really went very smooth or were some struggles, yeah there were some challenges.

In this blog, I will talk about those issues I have faced in my journey of Setting up Multi-Node Airflow Cluster.

Issue 1: After Changing LocalExecutor to CeleryExecutor, DAG was in Running mode but None of task actually runs.

Worker was not able to communicate with Scheduler with Celery Executor.

Error:

AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:112} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:113} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’

I have started looking into Airflow code on the same line number where error were thrown but no clue on what is going on. But reason was very clear that the Celery is not able to publish or subscribe messages and were n’t been successful in communication channel.

Solution:

Installed version of the Celery was 3.3.5 (Which is too old and was incompatible with Airflow 1.10 (Current Installed version).

pip install --upgrade celery
3.3.5 => 4.3

Issue 2: After running the DAG on CeleryExecutor, DAG failed with some weird error at-least for me.

Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 289, in __call__
Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 347, in workloop
Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job()
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 447, in receive
Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 419, in _recv
Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/common.py”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found

Solution:

I was not able to figure it anything on this error, really really no clue.

I came through one airflow issue post in Chinese language Ref. https://blog.csdn.net/u013492463/article/details/80881260

I didn’t understand anything but at-least got a little clue that what can be possible cause of this error.

Earlier setting:

broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

I realise that pyamqp would be best choice as many people have used that and earlier blog has given same resolution at some extent.

amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not. You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (http://github.com/celery/py-amqp)

Later setting with resolution:

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

changing amqp to pyamqp has resolved the above error.

Installation:

pip install pyamqp

Issue 3: SQL Alchemy Connection failure

Earlier configuration:

SQL alchemy connection

sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow

Resolved:

Later configuration:

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow

For psycopg2, you need to install pip wheel.

Install PostGreSQL Adaptor: psycopg2

Psycopg is a PostgreSQL adapter for the Python programming language

pip install psycopg2

Issue 4: HDP v 2.6.2 with Ambari, Worker Installation setup on multiple host failure.

After successfully installation of webserver and scheduler at master node i.e Name Node, The goal was to install Celery worker at all Data Nodes so DAGs can run truly parallely and can scale horizontally and vertically. But Ambari gave wacky expressions :) with below error.

by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’pypi.org’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping

Resolution:

This means that pip is not able to download and install wheel on machine. When I was trying to install worker on node using Ambari UI. However, with terminal commands I was able to fire the same commands to install wheel’s of pip. The common solution for this error was run with trusted-user argument or change the repository from where pypi downloads the wheel.

pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0

Tried above one but again it failed with different error stack-track but similar meaning.

resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host pypi.python.org — trusted-host pypi.org — trusted-host files.pythonhosted.org — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.

I did upgrade the pip, but no success.

Finally it was the Hack but may not be the resolution which has worked, I have installed celery and all necessary pip wheel’s which are listed here.

But still it gave same error, But ideally as these wheels installed it was not ignoring though. As per the code Ref. https://github.com/miho120/ambari-airflow-mpack/blob/e1c9ca004adaa3320e35ab7baa7fdb9b9695b635/airflow-service-mpack/common-services/AIRFLOW/1.10.0/package/scripts/airflow_worker_control.py

On the cluster, I have manually commented out those lines temporary (Later reverted back the changes once worker installation was successful) and added the worker from Ambari, which worked like charm :) and this hack made my day.

After installation of worker on another node, you might need airflow service restart from Ambari. You can learn more from my previous blog post; Setup and Configure Multi Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines

By clicking “Accept all cookies,” you agree to the storing of cookies on your device to enhance site navigation, analyze site usage, and assist in our marketing efforts. Privacy policy

Contact Us