这是indexloc提供的服务,不要输入任何密码
Skip to content

feat: support per-job reservation assignment #1477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
jobPrefix?: string;
location?: string;
projectId?: string;
reservation?: string;
};

export type PagedRequest<P> = P & {
Expand Down Expand Up @@ -114,6 +115,7 @@
job?: Job;
maxResults?: number;
jobTimeoutMs?: number;
reservation?: string;
pageToken?: string;
wrapIntegers?: boolean | IntegerTypeCastOptions;
parseJSON?: boolean;
Expand Down Expand Up @@ -362,17 +364,17 @@
private _universeDomain: string;
private _enableQueryPreview: boolean;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {

Check warning on line 367 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
}

getDatasetsStream(options?: GetDatasetsOptions): ResourceStream<Dataset> {

Check warning on line 372 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Dataset>({}, () => {});
}

getJobsStream(options?: GetJobsOptions): ResourceStream<Job> {

Check warning on line 377 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Job>({}, () => {});
}
Expand Down Expand Up @@ -1562,6 +1564,11 @@
delete query.jobId;
}

if (query.reservation) {
reqOpts.configuration.reservation = query.reservation;
delete query.reservation;
}

this.createJob(reqOpts, callback!);
}

Expand All @@ -1581,7 +1588,7 @@
const parameterMode = is.array(params) ? 'positional' : 'named';
const queryParameters: bigquery.IQueryParameter[] = [];
if (parameterMode === 'named') {
const namedParams = params as {[param: string]: any};

Check warning on line 1591 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
for (const namedParameter of Object.getOwnPropertyNames(namedParams)) {
const value = namedParams[namedParameter];
let queryParameter;
Expand Down Expand Up @@ -1731,11 +1738,16 @@
location: this.location,
};

if (options.location) {
reqOpts.jobReference.location = options.location;
if (reqOpts.location) {
reqOpts.jobReference.location = reqOpts.location;
delete reqOpts.location;
}

if (reqOpts.configuration && reqOpts.reservation) {
reqOpts.configuration.reservation = reqOpts.reservation;
delete reqOpts.reservation;
}

const job = this.job(jobId!, {
location: reqOpts.jobReference.location,
});
Expand Down Expand Up @@ -2224,7 +2236,7 @@

options = extend({job}, queryOpts, options);
if (res && res.jobComplete) {
let rows: any = [];

Check warning on line 2239 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
if (res.schema && res.rows) {
rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, {
wrapIntegers: options.wrapIntegers || false,
Expand Down Expand Up @@ -2327,6 +2339,13 @@
useLegacySql: false,
requestId: randomUUID(),
jobCreationMode: 'JOB_CREATION_OPTIONAL',
reservation: queryObj.reservation,
continuous: queryObj.continuous,
destinationEncryptionConfiguration:
queryObj.destinationEncryptionConfiguration,
writeIncrementalResults: queryObj.writeIncrementalResults,
connectionProperties: queryObj.connectionProperties,
preserveNulls: queryObj.preserveNulls,
};
if (!this._enableQueryPreview) {
delete req.jobCreationMode;
Expand Down
10 changes: 7 additions & 3 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
RequestCallback,
JobRequest,
} from '.';
import {JobMetadata} from './job';
import {JobMetadata, JobOptions} from './job';
import bigquery from './types';

// This is supposed to be a @google-cloud/storage `File` type. The storage npm
Expand Down Expand Up @@ -424,8 +424,7 @@ class Model extends ServiceObject {
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const body: any = {
const body: JobOptions = {
configuration: {
extract: extend(true, options, {
sourceModel: {
Expand All @@ -447,6 +446,11 @@ class Model extends ServiceObject {
delete options.jobId;
}

if (body.configuration && options.reservation) {
body.configuration.reservation = options.reservation;
delete options.reservation;
}

this.bigQuery.createJob(body, callback!);
}

Expand Down
43 changes: 32 additions & 11 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import {
} from '.';
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
import {Duplex, Writable} from 'stream';
import {JobMetadata} from './job';
import {JobMetadata, JobOptions} from './job';
import bigquery from './types';
import {IntegerTypeCastOptions} from './bigquery';
import {RowQueue} from './rowQueue';
Expand Down Expand Up @@ -923,8 +923,7 @@ class Table extends ServiceObject {
const callback =
typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const body: any = {
const body: JobOptions = {
configuration: {
copy: extend(true, metadata, {
destinationTable: {
Expand Down Expand Up @@ -955,6 +954,11 @@ class Table extends ServiceObject {
delete metadata.jobId;
}

if (body.configuration && metadata.reservation) {
body.configuration.reservation = metadata.reservation;
delete metadata.reservation;
}

this.bigQuery.createJob(body, callback!);
}

Expand Down Expand Up @@ -1045,8 +1049,7 @@ class Table extends ServiceObject {
const callback =
typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const body: any = {
const body: JobOptions = {
configuration: {
copy: extend(true, metadata, {
destinationTable: {
Expand Down Expand Up @@ -1080,6 +1083,11 @@ class Table extends ServiceObject {
delete metadata.jobId;
}

if (body.configuration && metadata.reservation) {
body.configuration.reservation = metadata.reservation;
delete metadata.reservation;
}

this.bigQuery.createJob(body, callback!);
}

Expand Down Expand Up @@ -1218,8 +1226,7 @@ class Table extends ServiceObject {
delete options.gzip;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const body: any = {
const body: JobOptions = {
configuration: {
extract: extend(true, options, {
sourceTable: {
Expand All @@ -1245,6 +1252,11 @@ class Table extends ServiceObject {
delete options.jobId;
}

if (body.configuration && options.reservation) {
body.configuration.reservation = options.reservation;
delete options.reservation;
}

this.bigQuery.createJob(body, callback!);
}

Expand Down Expand Up @@ -1399,8 +1411,7 @@ class Table extends ServiceObject {
return [jobResponse, jobResponse.metadata];
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const body: any = {
const body: JobOptions = {
configuration: {
load: {
destinationTable: {
Expand All @@ -1427,7 +1438,12 @@ class Table extends ServiceObject {
delete metadata.jobId;
}

extend(true, body.configuration.load, metadata, {
if (body.configuration && metadata.reservation) {
body.configuration.reservation = metadata.reservation;
delete metadata.reservation;
}

extend(true, body.configuration?.load, metadata, {
sourceUris: toArray(source).map(src => {
if (!util.isCustomType(src, 'storage/file')) {
throw new Error('Source must be a File object.');
Expand All @@ -1437,7 +1453,12 @@ class Table extends ServiceObject {
// the file's extension. If no match, don't set, and default upstream
// to CSV.
const format = FORMATS[path.extname(src.name).substr(1).toLowerCase()];
if (!metadata.sourceFormat && format) {
if (
!metadata.sourceFormat &&
format &&
body.configuration &&
body.configuration.load
) {
body.configuration.load.sourceFormat = format;
}
return 'gs://' + src.bucket.name + '/' + src.name;
Expand Down
38 changes: 31 additions & 7 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

/**
* Discovery Revision: 20250313
* Discovery Revision: 20250511
*/

/**
Expand Down Expand Up @@ -417,7 +417,7 @@ declare namespace bigquery {
};

/**
* Configuration for BigLake managed tables.
* Configuration for BigQuery tables for Apache Iceberg (formerly BigLake managed tables.)
*/
type IBigLakeConfiguration = {
/**
Expand Down Expand Up @@ -1093,6 +1093,10 @@ declare namespace bigquery {
* The dataset reference. Use this property to access specific parts of the dataset's ID, such as project ID or dataset ID.
*/
datasetReference?: IDatasetReference;
/**
* Output only. Reference to a read-only external dataset defined in data catalogs outside of BigQuery. Filled out when the dataset type is EXTERNAL.
*/
externalDatasetReference?: IExternalDatasetReference;
/**
* An alternate name for the dataset. The friendly name is purely decorative in nature.
*/
Expand Down Expand Up @@ -2549,7 +2553,7 @@ declare namespace bigquery {
*/
timePartitioning?: ITimePartitioning;
/**
* Optional. [Experimental] Default time zone that will apply when parsing timestamp values that have no specific time zone.
* Optional. Default time zone that will apply when parsing timestamp values that have no specific time zone.
*/
timeZone?: string;
/**
Expand All @@ -2561,7 +2565,7 @@ declare namespace bigquery {
*/
useAvroLogicalTypes?: boolean;
/**
* Optional. Specifies the action that occurs if the destination table already exists. The following values are supported: * WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the data, removes the constraints and uses the schema from the load job. * WRITE_APPEND: If the table already exists, BigQuery appends the data to the table. * WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result. The default value is WRITE_APPEND. Each action is atomic and only occurs if BigQuery is able to complete the job successfully. Creation, truncation and append actions occur as one atomic update upon job completion.
* Optional. Specifies the action that occurs if the destination table already exists. The following values are supported: * WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the data, removes the constraints and uses the schema from the load job. * WRITE_TRUNCATE_DATA: If the table already exists, BigQuery overwrites the data, but keeps the constraints and schema of the existing table. * WRITE_APPEND: If the table already exists, BigQuery appends the data to the table. * WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result. The default value is WRITE_APPEND. Each action is atomic and only occurs if BigQuery is able to complete the job successfully. Creation, truncation and append actions occur as one atomic update upon job completion.
*/
writeDisposition?: string;
};
Expand Down Expand Up @@ -2675,7 +2679,7 @@ declare namespace bigquery {
*/
userDefinedFunctionResources?: Array<IUserDefinedFunctionResource>;
/**
* Optional. Specifies the action that occurs if the destination table already exists. The following values are supported: * WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the data, removes the constraints, and uses the schema from the query result. * WRITE_APPEND: If the table already exists, BigQuery appends the data to the table. * WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result. The default value is WRITE_EMPTY. Each action is atomic and only occurs if BigQuery is able to complete the job successfully. Creation, truncation and append actions occur as one atomic update upon job completion.
* Optional. Specifies the action that occurs if the destination table already exists. The following values are supported: * WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the data, removes the constraints, and uses the schema from the query result. * WRITE_TRUNCATE_DATA: If the table already exists, BigQuery overwrites the data, but keeps the constraints and schema of the existing table. * WRITE_APPEND: If the table already exists, BigQuery appends the data to the table. * WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result. The default value is WRITE_EMPTY. Each action is atomic and only occurs if BigQuery is able to complete the job successfully. Creation, truncation and append actions occur as one atomic update upon job completion.
*/
writeDisposition?: string;
/**
Expand Down Expand Up @@ -3055,7 +3059,7 @@ declare namespace bigquery {
*/
referencedRoutines?: Array<IRoutineReference>;
/**
* Output only. Referenced tables for the job. Queries that reference more than 50 tables will not have a complete list.
* Output only. Referenced tables for the job.
*/
referencedTables?: Array<ITableReference>;
/**
Expand Down Expand Up @@ -4149,6 +4153,10 @@ declare namespace bigquery {
* Total units of work remaining for the query. This number can be revised (increased or decreased) while the query is running.
*/
pendingUnits?: string;
/**
* Total shuffle usage ratio in shuffle RAM per reservation of this query. This will be provided for reservation customers only.
*/
shuffleRamUsageRatio?: number;
/**
* Cumulative slot-ms consumed by the query.
*/
Expand Down Expand Up @@ -4954,7 +4962,7 @@ declare namespace bigquery {

type ITable = {
/**
* Optional. Specifies the configuration of a BigLake managed table.
* Optional. Specifies the configuration of a BigQuery table for Apache Iceberg.
*/
biglakeConfiguration?: IBigLakeConfiguration;
/**
Expand Down Expand Up @@ -6390,6 +6398,14 @@ declare namespace bigquery {
* Optional. The version of the provided access policy schema. Valid values are 0, 1, and 3. Requests specifying an invalid value will be rejected. This version refers to the schema version of the access policy and not the version of access policy. This field's value can be equal or more than the access policy schema provided in the request. For example, * Operations updating conditional access policy binding in datasets must specify version 3. Some of the operations are : - Adding a new access policy entry with condition. - Removing an access policy entry with condition. - Updating an access policy entry with condition. * But dataset with no conditional role bindings in access policy may specify any valid value or leave the field unset. If unset or if 0 or 1 value is used for dataset with conditional bindings, request will be rejected. This field will be mapped to IAM Policy version (https://cloud.google.com/iam/docs/policies#versions) and will be used to set policy in IAM.
*/
accessPolicyVersion?: number;
/**
* Optional. Specifies the fields of dataset that update/patch operation is targeting By default, both metadata and ACL fields are updated.
*/
updateMode?:
| 'UPDATE_MODE_UNSPECIFIED'
| 'UPDATE_METADATA'
| 'UPDATE_ACL'
| 'UPDATE_FULL';
};

/**
Expand All @@ -6400,6 +6416,14 @@ declare namespace bigquery {
* Optional. The version of the provided access policy schema. Valid values are 0, 1, and 3. Requests specifying an invalid value will be rejected. This version refers to the schema version of the access policy and not the version of access policy. This field's value can be equal or more than the access policy schema provided in the request. For example, * Operations updating conditional access policy binding in datasets must specify version 3. Some of the operations are : - Adding a new access policy entry with condition. - Removing an access policy entry with condition. - Updating an access policy entry with condition. * But dataset with no conditional role bindings in access policy may specify any valid value or leave the field unset. If unset or if 0 or 1 value is used for dataset with conditional bindings, request will be rejected. This field will be mapped to IAM Policy version (https://cloud.google.com/iam/docs/policies#versions) and will be used to set policy in IAM.
*/
accessPolicyVersion?: number;
/**
* Optional. Specifies the fields of dataset that update/patch operation is targeting By default, both metadata and ACL fields are updated.
*/
updateMode?:
| 'UPDATE_MODE_UNSPECIFIED'
| 'UPDATE_METADATA'
| 'UPDATE_ACL'
| 'UPDATE_FULL';
};
}

Expand Down
37 changes: 37 additions & 0 deletions test/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,20 @@ describe('BigQuery', () => {
bq.createQueryJob(options, assert.ifError);
});

it('should accept a reservation id', done => {
const options = {
query: QUERY_STRING,
reservation: 'reservation/1',
};

bq.createJob = (reqOpts: JobOptions) => {
assert.strictEqual(reqOpts.configuration?.reservation, 'reservation/1');
done();
};

bq.createQueryJob(options, assert.ifError);
});

it('should accept a location', done => {
const options = {
query: QUERY_STRING,
Expand Down Expand Up @@ -3300,6 +3314,29 @@ describe('BigQuery', () => {

bq.query(QUERY_STRING, fakeOptions, assert.ifError);
});

it('should accept a reservation id', done => {
const query: Query = {
query: QUERY_STRING,
reservation: 'reservation/1',
};
const fakeJob = {
getQueryResults: (options: {}) => {
done();
},
};

bq.createJob = (reqOpts: JobOptions, callback: Function) => {
assert(reqOpts.configuration?.reservation, 'reservation/1');
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.buildQueryRequest_ = (query: {}, opts: {}) => {
return undefined;
};

bq.query(query, assert.ifError);
});
});

describe('buildQueryRequest_', () => {
Expand Down
19 changes: 19 additions & 0 deletions test/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,25 @@ describe('BigQuery/Model', () => {
model.createExtractJob(URI, options, done);
});

it('should accept a reservation id', done => {
const options = {
reservation: 'reservation/1',
};

model.bigQuery.createJob = (
reqOpts: JobOptions,
callback: Function,
) => {
assert.strictEqual(
reqOpts.configuration?.reservation,
'reservation/1',
);
callback(); // the done fn
};

model.createExtractJob(URI, options, done);
});

it('should accept a job id', done => {
const jobId = 'job-id';
const options = {jobId};
Expand Down
Loading
Loading