Closed
Description
Apache Airflow version
2.1.4 (latest released)
Operating System
Linux 5.4.149-73.259.amzn2.x86_64
Versions of Apache Airflow Providers
No response
Deployment
Other 3rd-party Helm chart
Deployment details
AWS EKS over own helm chart
What happened
We have an issue back from 2.0.x #13504
Each time scheduler is restarted it deletes all DAGS deom serialized_dag table and trying to serialize them again from the scratch. Afterwards scheduler pod become failed with error:
[2021-10-08 20:19:40,683] {kubernetes_executor.py:761} INFO - Shutting down Kubernetes executor
[2021-10-08 20:19:41,705] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 32
[2021-10-08 20:19:42,207] {process_utils.py:207} INFO - Waiting up to 5 seconds for processes to exit...
[2021-10-08 20:19:42,223] {process_utils.py:66} INFO - Process psutil.Process(pid=32, status='terminated', exitcode=0, started='20:19:40') (32) terminated with exit code 0
[2021-10-08 20:19:42,225] {process_utils.py:66} INFO - Process psutil.Process(pid=40, status='terminated', started='20:19:40') (40) terminated with exit code None
[2021-10-08 20:19:42,226] {process_utils.py:66} INFO - Process psutil.Process(pid=36, status='terminated', started='20:19:40') (36) terminated with exit code None
[2021-10-08 20:19:42,226] {scheduler_job.py:722} INFO - Exited execute loop
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler
job.run()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 695, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 788, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 927, in _do_scheduling
num_queued_tis = self._critical_section_execute_task_instances(session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 551, in _critical_section_execute_task_instances
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 431, in _executable_task_instances_to_queued
serialized_dag = self.dagbag.get_dag(dag_id, session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'aws_transforms_player_hourly' not found in serialized_dag table
causing All DAGs to be absent in serialized_dag table
Python version: 3.9.7
Airflow version: 2.1.4
Node: airflow-webserver-7b45758f99-rk8dg
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/airflow/.local/lib/python3.9/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 49, in decorated
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 97, in view_func
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 60, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 2027, in tree
dag = current_app.dag_bag.get_dag(dag_id)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'canary_dag' not found in serialized_dag table
What you expected to happen
Scheduler shouldn't fail
How to reproduce
restart scheduler pod
observe its failure
open dag in webserver
observe an error
Anything else
issue is temporary gone when i've run "serialize" script from webserver pod until next scheduler reboot
from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel
dag_bag = DagBag()
# Check DB for missing serialized DAGs, and add them if missing
for dag_id in dag_bag.dag_ids:
if not SerializedDagModel.get(dag_id):
dag = dag_bag.get_dag(dag_id)
SerializedDagModel.write_dag(dag)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct