+
Skip to content

dag_processing code needs to handle OSError("handle is closed") in poll() and recv() calls #22191

Closed
@dlesco

Description

@dlesco

Apache Airflow version

2.1.4

What happened

The problem also exists in the latest version of the Airflow code, but I experienced it in 2.1.4.

This is the root cause of problems experienced in issue#13542.

I'll provide a stack trace below. The problem is in the code of airflow/dag_processing/processor.py (and manager.py), all poll() and recv() calls to the multiprocessing communication channels need to be wrapped in exception handlers, handling OSError("handle is closed") exceptions. If one looks at the Python multiprocessing source code, it throws this exception when the channel's handle has been closed.

This occurs in Airflow when a DAG File Processor has been killed or terminated; the Airflow code closes the communication channel when it is killing or terminating a DAG File Processor process (for example, when a dag_file_processor_timeout occurs).This killing or terminating happens asynchronously (in another process) from the process calling the poll() or recv() on the communication channel. This is why an exception needs to be handled. A pre-check of the handle being open is not good enough, because the other process doing the kill or terminate may close the handle in between your pre-check and actually calling poll() or recv() (a race condition).

What you expected to happen

Here is the stack trace of the occurence I saw:

[2022-03-08 17:41:06,101] {taskinstance.py:914} DEBUG - <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry p
eriod was permitted.
[2022-03-08 17:41:06,101] {taskinstance.py:904} DEBUG - Dependencies all met for <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]>
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: gdai_gcs_sync> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: unity_creative_import_process> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: sales_dm_to_bq> because no tasks in DAG have SLAs
[2022-03-08 17:44:50,454] {settings.py:302} DEBUG - Disposing DB connection pool (PID 1902)
Process ForkProcess-1:
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

This corresponded in time to the following log entries:

% kubectl logs airflow-scheduler-58c997dd98-n8xr8 -c airflow-scheduler --previous | egrep 'Ran scheduling loop in|[[]heartbeat[]]'
[2022-03-08 17:40:47,586] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:49,146] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:50,675] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:50,687] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.54 seconds
[2022-03-08 17:40:52,144] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:53,620] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:40:55,085] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:56,169] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:56,180] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:57,667] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:59,148] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.48 seconds
[2022-03-08 17:41:00,618] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:41:01,742] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:41:01,757] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.58 seconds
[2022-03-08 17:41:03,133] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.55 seconds
[2022-03-08 17:41:04,664] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.53 seconds
[2022-03-08 17:44:50,649] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:44:50,814] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 225.15 seconds

You can see that when this exception occurred, there was a hang in the scheduler for almost 4 minutes, no scheduling loops, and no scheduler_job heartbeats.

This hang probably also caused stuck queued jobs as issue#13542 describes.

How to reproduce

This is hard to reproduce because it is a race condition. But you might be able to reproduce by having in a dagfile top-level code that calls sleep, so that it takes longer to parse than core dag_file_processor_timeout setting. That would cause the parsing processes to be terminated, creating the conditions for this bug to occur.

Operating System

NAME="Ubuntu" VERSION="18.04.6 LTS (Bionic Beaver)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 18.04.6 LTS" VERSION_ID="18.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=bionic UBUNTU_CODENAME=bionic

Versions of Apache Airflow Providers

Not relevant, this is a core dag_processing issue.

Deployment

Composer

Deployment details

"composer-1.17.6-airflow-2.1.4"

In order to isolate the scheduler to a separate machine, so as to not have interference from other processes such as airflow-workers running on the same machine, we created an additional node-pool for the scheduler, and ran these k8s patches to move the scheduler to a separate machine.

New node pool definition:

    {
      name              = "scheduler-pool"
      machine_type      = "n1-highcpu-8"
      autoscaling       = false
      node_count        = 1
      disk_type         = "pd-balanced"
      disk_size         = 64
      image_type        = "COS"
      auto_repair       = true
      auto_upgrade      = true
      max_pods_per_node = 32
    },

patch.sh

#!/bin/bash
if [ $# -lt 1 ]; then
  echo "Usage: $0 namespace"
  echo "Description: Isolate airflow-scheduler onto it's own node-pool (scheduler-pool)."
  echo "Options:"
  echo "  namespace: kubernetes namespace used by Composer"
  exit 1
fi

namespace=$1

set -eu
set -o pipefail

scheduler_patch="$(cat airflow-scheduler-patch.yaml)"
fluentd_patch="$(cat composer-fluentd-daemon-patch.yaml)"

set -x

kubectl -n default patch daemonset composer-fluentd-daemon -p "${fluentd_patch}"
kubectl -n ${namespace} patch deployment airflow-scheduler -p "${scheduler_patch}"

composer-fluentd-daemon-patch.yaml

spec:
  template:
    spec:
      nodeSelector: null
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: cloud.google.com/gke-nodepool
                operator: In
                values:
                - default-pool
                - scheduler-pool

airflow-scheduler-patch.yaml

spec:
  template:
    spec:
      nodeSelector:
        cloud.google.com/gke-nodepool: scheduler-pool
      containers:
      - name: gcs-syncd
        resources:
          limits:
            memory: 2Gi

Anything else

On the below checkbox of submitting a PR, I could submit one, but it'd be untested code, I don't really have the environment setup to test the patch.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载