+
Skip to content

SerializedDagNotFound: DAG not found in serialized_dag table #18843

Closed
@KulykDmytro

Description

@KulykDmytro

Apache Airflow version

2.1.4 (latest released)

Operating System

Linux 5.4.149-73.259.amzn2.x86_64

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

AWS EKS over own helm chart

What happened

We have an issue back from 2.0.x #13504
Each time scheduler is restarted it deletes all DAGS deom serialized_dag table and trying to serialize them again from the scratch. Afterwards scheduler pod become failed with error:

[2021-10-08 20:19:40,683] {kubernetes_executor.py:761} INFO - Shutting down Kubernetes executor                                                                                            
[2021-10-08 20:19:41,705] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 32                                                                                                 
[2021-10-08 20:19:42,207] {process_utils.py:207} INFO - Waiting up to 5 seconds for processes to exit...                                                                                   
[2021-10-08 20:19:42,223] {process_utils.py:66} INFO - Process psutil.Process(pid=32, status='terminated', exitcode=0, started='20:19:40') (32) terminated with exit code 0                
[2021-10-08 20:19:42,225] {process_utils.py:66} INFO - Process psutil.Process(pid=40, status='terminated', started='20:19:40') (40) terminated with exit code None                         
[2021-10-08 20:19:42,226] {process_utils.py:66} INFO - Process psutil.Process(pid=36, status='terminated', started='20:19:40') (36) terminated with exit code None                         
[2021-10-08 20:19:42,226] {scheduler_job.py:722} INFO - Exited execute loop                                                                                                                
Traceback (most recent call last):                                                                                                                                                         
  File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                                                                             
    sys.exit(main())                                                                                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 40, in main                                                                                            
    args.func(args)                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command                                                                                   
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper                                                                                        
    return f(*args, **kwargs)                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler                                                                 
    job.run()                                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run                                                                                       
    self._execute()                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 695, in _execute                                                                             
    self._run_scheduler_loop()                                                                                                                                                             
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 788, in _run_scheduler_loop                                                                  
    num_queued_tis = self._do_scheduling(session)                                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 927, in _do_scheduling                                                                       
    num_queued_tis = self._critical_section_execute_task_instances(session=session)                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 551, in _critical_section_execute_task_instances                                             
    queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 431, in _executable_task_instances_to_queued                                                 
    serialized_dag = self.dagbag.get_dag(dag_id, session=session)                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag                                                                                   
    self._add_dag_from_db(dag_id=dag_id, session=session)                                                                                                                                  
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db                                                                          
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")                                                                                                       
airflow.exceptions.SerializedDagNotFound: DAG 'aws_transforms_player_hourly' not found in serialized_dag table                                                                             

causing All DAGs to be absent in serialized_dag table

Python version: 3.9.7
Airflow version: 2.1.4
Node: airflow-webserver-7b45758f99-rk8dg
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 49, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 97, in view_func
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 2027, in tree
    dag = current_app.dag_bag.get_dag(dag_id)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'canary_dag' not found in serialized_dag table

What you expected to happen

Scheduler shouldn't fail

How to reproduce

restart scheduler pod
observe its failure
open dag in webserver
observe an error

Anything else

issue is temporary gone when i've run "serialize" script from webserver pod until next scheduler reboot

from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel

dag_bag = DagBag()

# Check DB for missing serialized DAGs, and add them if missing
for dag_id in dag_bag.dag_ids:
    if not SerializedDagModel.get(dag_id):
        dag = dag_bag.get_dag(dag_id)
        SerializedDagModel.write_dag(dag)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载