-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==17.1.0
Apache Airflow version
2.10.5
Operating System
Debian 11
Deployment
Google Cloud Composer
Deployment details
I have just upgraded to the newest cloud-composer version composer-2.14.1-airflow-2.10.5
What happened
After the upgrade, some of my dags using the GCSToBigQueryOperator with the option to create and external table failed with the error
TypeError: BigQueryHook.create_table() missing 2 required positional arguments: 'dataset_id' and 'table_id'
I then investigated the code and found:
On the GCSToBigQueryOperator class, version 17.1.0, there is an method called _create_external_table called when the external_table=True parameter is set on the operator. This method in turn calls a BigQuery hook like so:
self.hook.create_table(
table_resource=table_obj_api_repr,
project_id=self.project_id or self.hook.project_id,
location=self.location,
exists_ok=True,
)
As of the most recent versions of the BigQuery airflow packages, the hook also receives the arguments dataset_id and table_id (even if these are available in the table_resource I suppose). This therefore results in the error:
TypeError: BigQueryHook.create_table() missing 2 required positional arguments: 'dataset_id' and 'table_id'
This is a simple correction that needs to be made on the operator code.
What you think should happen instead
This error should not occur. The call of the method create_table on this operator should be updated / corrected.
How to reproduce
You can run this test dag:
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.utils.dates import days_ago
# Fixed schema (id + value, both STRINGs)
SCHEMA_FIELDS = [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
]
with DAG(
dag_id="test_original_gcs_to_bq",
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
tags=["test", "bigquery"],
) as dag:
load_test_full = GCSToBigQueryOperator(
task_id="load_test_full",
bucket="composer_upgrade_quick_test",
source_objects=["data/test_data.json"],
destination_project_dataset_table="my-project.composer.new_test_table_full",
skip_leading_rows=0,
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
source_format="NEWLINE_DELIMITED_JSON",
schema_fields=SCHEMA_FIELDS,
ignore_unknown_values=True,
allow_jagged_rows=True,
autodetect=False,
external_table=True,
)
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR - (fix: Adding two required arguments on BigQuery hook create_table #55501)!
Code of Conduct
- I agree to follow this project's Code of Conduct