这是indexloc提供的服务,不要输入任何密码
Skip to content

GCSToBigQueryOperator error on create_table arguments when table is of type external #55497

@FVidalCarneiro

Description

@FVidalCarneiro

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==17.1.0

Apache Airflow version

2.10.5

Operating System

Debian 11

Deployment

Google Cloud Composer

Deployment details

I have just upgraded to the newest cloud-composer version composer-2.14.1-airflow-2.10.5

What happened

After the upgrade, some of my dags using the GCSToBigQueryOperator with the option to create and external table failed with the error

TypeError: BigQueryHook.create_table() missing 2 required positional arguments: 'dataset_id' and 'table_id'

I then investigated the code and found:

On the GCSToBigQueryOperator class, version 17.1.0, there is an method called _create_external_table called when the external_table=True parameter is set on the operator. This method in turn calls a BigQuery hook like so:

        self.hook.create_table(
            table_resource=table_obj_api_repr,
            project_id=self.project_id or self.hook.project_id,
            location=self.location,
            exists_ok=True,
        )

As of the most recent versions of the BigQuery airflow packages, the hook also receives the arguments dataset_id and table_id (even if these are available in the table_resource I suppose). This therefore results in the error:

TypeError: BigQueryHook.create_table() missing 2 required positional arguments: 'dataset_id' and 'table_id'

This is a simple correction that needs to be made on the operator code.

What you think should happen instead

This error should not occur. The call of the method create_table on this operator should be updated / corrected.

How to reproduce

You can run this test dag:

from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.dates import days_ago

# Fixed schema (id + value, both STRINGs)
SCHEMA_FIELDS = [
    {"name": "id", "type": "STRING", "mode": "NULLABLE"},
    {"name": "value", "type": "STRING", "mode": "NULLABLE"},
]

with DAG(
    dag_id="test_original_gcs_to_bq",
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
    tags=["test", "bigquery"],
) as dag:

    load_test_full = GCSToBigQueryOperator(
        task_id="load_test_full",
        bucket="composer_upgrade_quick_test",
        source_objects=["data/test_data.json"],
        destination_project_dataset_table="my-project.composer.new_test_table_full",
        skip_leading_rows=0,
        write_disposition="WRITE_TRUNCATE",
        create_disposition="CREATE_IF_NEEDED",
        source_format="NEWLINE_DELIMITED_JSON",
        schema_fields=SCHEMA_FIELDS,
        ignore_unknown_values=True,
        allow_jagged_rows=True,
        autodetect=False,
        external_table=True,
    )

Anything else

No response

Are you willing to submit PR?

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions