Description
Apache Airflow version
2.1.4
What happened
The problem also exists in the latest version of the Airflow code, but I experienced it in 2.1.4.
This is the root cause of problems experienced in issue#13542.
I'll provide a stack trace below. The problem is in the code of airflow/dag_processing/processor.py (and manager.py), all poll() and recv() calls to the multiprocessing communication channels need to be wrapped in exception handlers, handling OSError("handle is closed") exceptions. If one looks at the Python multiprocessing source code, it throws this exception when the channel's handle has been closed.
This occurs in Airflow when a DAG File Processor has been killed or terminated; the Airflow code closes the communication channel when it is killing or terminating a DAG File Processor process (for example, when a dag_file_processor_timeout occurs).This killing or terminating happens asynchronously (in another process) from the process calling the poll() or recv() on the communication channel. This is why an exception needs to be handled. A pre-check of the handle being open is not good enough, because the other process doing the kill or terminate may close the handle in between your pre-check and actually calling poll() or recv() (a race condition).
What you expected to happen
Here is the stack trace of the occurence I saw:
[2022-03-08 17:41:06,101] {taskinstance.py:914} DEBUG - <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry p
eriod was permitted.
[2022-03-08 17:41:06,101] {taskinstance.py:904} DEBUG - Dependencies all met for <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]>
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: gdai_gcs_sync> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: unity_creative_import_process> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: sales_dm_to_bq> because no tasks in DAG have SLAs
[2022-03-08 17:44:50,454] {settings.py:302} DEBUG - Disposing DB connection pool (PID 1902)
Process ForkProcess-1:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
processor_manager.start()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
return self._run_parsing_loop()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
self._collect_results_from_processor(processor)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
if processor.result is not None:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
if not self.done:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
if self._parent_channel.poll():
File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 255, in poll
self._check_closed()
File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
This corresponded in time to the following log entries:
% kubectl logs airflow-scheduler-58c997dd98-n8xr8 -c airflow-scheduler --previous | egrep 'Ran scheduling loop in|[[]heartbeat[]]'
[2022-03-08 17:40:47,586] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:49,146] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:50,675] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:50,687] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.54 seconds
[2022-03-08 17:40:52,144] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:53,620] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:40:55,085] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:56,169] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:56,180] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:57,667] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:59,148] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.48 seconds
[2022-03-08 17:41:00,618] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:41:01,742] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:41:01,757] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.58 seconds
[2022-03-08 17:41:03,133] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.55 seconds
[2022-03-08 17:41:04,664] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.53 seconds
[2022-03-08 17:44:50,649] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:44:50,814] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 225.15 seconds
You can see that when this exception occurred, there was a hang in the scheduler for almost 4 minutes, no scheduling loops, and no scheduler_job heartbeats.
This hang probably also caused stuck queued jobs as issue#13542 describes.
How to reproduce
This is hard to reproduce because it is a race condition. But you might be able to reproduce by having in a dagfile top-level code that calls sleep, so that it takes longer to parse than core dag_file_processor_timeout setting. That would cause the parsing processes to be terminated, creating the conditions for this bug to occur.
Operating System
NAME="Ubuntu" VERSION="18.04.6 LTS (Bionic Beaver)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 18.04.6 LTS" VERSION_ID="18.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=bionic UBUNTU_CODENAME=bionic
Versions of Apache Airflow Providers
Not relevant, this is a core dag_processing issue.
Deployment
Composer
Deployment details
"composer-1.17.6-airflow-2.1.4"
In order to isolate the scheduler to a separate machine, so as to not have interference from other processes such as airflow-workers running on the same machine, we created an additional node-pool for the scheduler, and ran these k8s patches to move the scheduler to a separate machine.
New node pool definition:
{
name = "scheduler-pool"
machine_type = "n1-highcpu-8"
autoscaling = false
node_count = 1
disk_type = "pd-balanced"
disk_size = 64
image_type = "COS"
auto_repair = true
auto_upgrade = true
max_pods_per_node = 32
},
patch.sh
#!/bin/bash
if [ $# -lt 1 ]; then
echo "Usage: $0 namespace"
echo "Description: Isolate airflow-scheduler onto it's own node-pool (scheduler-pool)."
echo "Options:"
echo " namespace: kubernetes namespace used by Composer"
exit 1
fi
namespace=$1
set -eu
set -o pipefail
scheduler_patch="$(cat airflow-scheduler-patch.yaml)"
fluentd_patch="$(cat composer-fluentd-daemon-patch.yaml)"
set -x
kubectl -n default patch daemonset composer-fluentd-daemon -p "${fluentd_patch}"
kubectl -n ${namespace} patch deployment airflow-scheduler -p "${scheduler_patch}"
composer-fluentd-daemon-patch.yaml
spec:
template:
spec:
nodeSelector: null
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-nodepool
operator: In
values:
- default-pool
- scheduler-pool
airflow-scheduler-patch.yaml
spec:
template:
spec:
nodeSelector:
cloud.google.com/gke-nodepool: scheduler-pool
containers:
- name: gcs-syncd
resources:
limits:
memory: 2Gi
Anything else
On the below checkbox of submitting a PR, I could submit one, but it'd be untested code, I don't really have the environment setup to test the patch.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct