这是indexloc提供的服务,不要输入任何密码
Skip to content

feat(bigquery): add performance insights #12101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ type QueryStatistics struct {

// Statistics for the EXPORT DATA statement as part of Query Job.
ExportDataStatistics *ExportDataStatistics

// Performance insights.
PerformanceInsights *PerformanceInsights
}

// ExportDataStatistics represents statistics for
Expand Down Expand Up @@ -842,6 +845,194 @@ func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
}
}

// PerformanceInsights contains performance insights for the job.
type PerformanceInsights struct {
// Average execution of previous runs.
AvgPreviousExecution time.Duration

// Standalone query stage performance insights, for exploring potential improvements.
StagePerformanceStandaloneInsights []*StagePerformanceStandaloneInsight

// jobs.query stage performance insights compared to previous runs, for diagnosing performance regression.
StagePerformanceChangeInsights []*StagePerformanceChangeInsight
}

func bqToPerformanceInsights(in *bq.PerformanceInsights) *PerformanceInsights {
if in == nil {
return nil
}

var standaloneInsights []*StagePerformanceStandaloneInsight
if sis := in.StagePerformanceStandaloneInsights; len(sis) > 0 {
standaloneInsights = make([]*StagePerformanceStandaloneInsight, 0, len(sis))
for _, si := range sis {
standaloneInsights = append(standaloneInsights, bqToStagePerformanceStandaloneInsight(si))
}
}

var changeInsights []*StagePerformanceChangeInsight
if cis := in.StagePerformanceChangeInsights; len(cis) > 0 {
changeInsights = make([]*StagePerformanceChangeInsight, 0, len(cis))
for _, ci := range cis {
changeInsights = append(changeInsights, bqToStagePerformanceChangeInsight(ci))
}
}

return &PerformanceInsights{
AvgPreviousExecution: time.Duration(in.AvgPreviousExecutionMs) * time.Millisecond,
StagePerformanceStandaloneInsights: standaloneInsights,
StagePerformanceChangeInsights: changeInsights,
}
}

// StagePerformanceStandaloneInsight describes standalone performance insights for a specific stage.
type StagePerformanceStandaloneInsight struct {
// The stage id that the insight mapped to.
StageID int64

// If present, the stage had the following reasons for being disqualified from BI Engine execution.
BIEngineReasons []*BIEngineReason

// High cardinality joins in the stage.
HighCardinalityJoins []*HighCardinalityJoin

// True if the stage has a slot contention issue.
SlotContention bool

// True if the stage has insufficient shuffle quota.
InsufficientShuffleQuota bool

// Partition skew in the stage.
PartitionSkew *PartitionSkew
}

func bqToStagePerformanceStandaloneInsight(in *bq.StagePerformanceStandaloneInsight) *StagePerformanceStandaloneInsight {
if in == nil {
return nil
}

var biEngineReasons []*BIEngineReason
if bers := in.BiEngineReasons; len(bers) > 0 {
biEngineReasons = make([]*BIEngineReason, 0, len(bers))
for _, r := range bers {
biEngineReasons = append(biEngineReasons, bqToBIEngineReason(r))
}
}

var highCardinalityJoins []*HighCardinalityJoin
if hcjs := in.HighCardinalityJoins; len(hcjs) > 0 {
highCardinalityJoins = make([]*HighCardinalityJoin, 0, len(hcjs))
for _, hcj := range hcjs {
highCardinalityJoins = append(highCardinalityJoins, bqToHighCardinalityJoin(hcj))
}
}

return &StagePerformanceStandaloneInsight{
StageID: in.StageId,
BIEngineReasons: biEngineReasons,
HighCardinalityJoins: highCardinalityJoins,
SlotContention: in.SlotContention,
InsufficientShuffleQuota: in.InsufficientShuffleQuota,
PartitionSkew: bqToPartitionSkew(in.PartitionSkew),
}
}

// StagePerformanceChangeInsight contains performance insights compared to the previous executions for a specific stage.
type StagePerformanceChangeInsight struct {
// The stage id that the insight mapped to.
StageID int64

InputDataChange *InputDataChange
}

func bqToStagePerformanceChangeInsight(in *bq.StagePerformanceChangeInsight) *StagePerformanceChangeInsight {
if in == nil {
return nil
}

return &StagePerformanceChangeInsight{
StageID: in.StageId,
InputDataChange: bqToInputDataChange(in.InputDataChange),
}
}

// HighCardinalityJoin contains high cardinality join detailed information.
type HighCardinalityJoin struct {
// Count of left input rows.
LeftRows int64

// Count of right input rows.
RightRows int64

// Count of the output rows.
OutputRows int64

// The index of the join operator in the ExplainQueryStep lists.
StepIndex int64
}

func bqToHighCardinalityJoin(in *bq.HighCardinalityJoin) *HighCardinalityJoin {
if in == nil {
return nil
}

return &HighCardinalityJoin{
LeftRows: in.LeftRows,
RightRows: in.RightRows,
OutputRows: in.OutputRows,
StepIndex: in.StepIndex,
}
}

// PartitionSkew contains partition skew detailed information.
type PartitionSkew struct {
// Source stages which produce skewed data.
SkewSources []*SkewSource
}

func bqToPartitionSkew(in *bq.PartitionSkew) *PartitionSkew {
if in == nil {
return nil
}

var skewSources []*SkewSource
if sss := in.SkewSources; len(sss) > 0 {
skewSources = make([]*SkewSource, 0, len(sss))
for _, s := range sss {
skewSources = append(skewSources, bqToSkewSource(s))
}
}
return &PartitionSkew{SkewSources: skewSources}
}

// SkewSource contains details about source stages which produce skewed data.
type SkewSource struct {
// Stage id of the skew source stage.
StageID int64
}

func bqToSkewSource(in *bq.SkewSource) *SkewSource {
if in == nil {
return nil
}

return &SkewSource{StageID: in.StageId}
}

// InputDataChange contains details about the input data change insight.
type InputDataChange struct {
// Records read difference percentage compared to a previous run.
RecordsReadDiffPercentage float64
}

func bqToInputDataChange(in *bq.InputDataChange) *InputDataChange {
if in == nil {
return nil
}

return &InputDataChange{RecordsReadDiffPercentage: in.RecordsReadDiffPercentage}
}

func (*ExtractStatistics) implementsStatistics() {}
func (*LoadStatistics) implementsStatistics() {}
func (*QueryStatistics) implementsStatistics() {}
Expand Down Expand Up @@ -1115,6 +1306,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
Timeline: timelineFromProto(s.Query.Timeline),
ReferencedTables: tables,
UndeclaredQueryParameterNames: names,
PerformanceInsights: bqToPerformanceInsights(s.Query.PerformanceInsights),
}
}
j.lastStatus.Statistics = js
Expand Down
101 changes: 101 additions & 0 deletions bigquery/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package bigquery

import (
"reflect"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
bq "google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -79,6 +81,105 @@ func TestCreateJobRef(t *testing.T) {
}
}

// Ideally this would be covered by an integration test but simulating
// performance issues in a dummy project is difficult and requires a lot of set
// up.
func Test_JobPerformanceInsights(t *testing.T) {
for _, test := range []struct {
name string
in *bq.PerformanceInsights
want *PerformanceInsights
}{
{
name: "nil",
},
{
name: "time only",
in: &bq.PerformanceInsights{AvgPreviousExecutionMs: 128},
want: &PerformanceInsights{AvgPreviousExecution: 128 * time.Millisecond},
},
{
name: "full",
in: &bq.PerformanceInsights{
AvgPreviousExecutionMs: 128,
StagePerformanceChangeInsights: []*bq.StagePerformanceChangeInsight{
{InputDataChange: &bq.InputDataChange{RecordsReadDiffPercentage: 1.23}, StageId: 123},
{InputDataChange: &bq.InputDataChange{RecordsReadDiffPercentage: 4.56}, StageId: 456},
},
StagePerformanceStandaloneInsights: []*bq.StagePerformanceStandaloneInsight{
{
BiEngineReasons: []*bq.BiEngineReason{
{Code: "bi-code-1", Message: "bi-message-1"},
},
HighCardinalityJoins: []*bq.HighCardinalityJoin{
{LeftRows: 11, OutputRows: 22, RightRows: 33, StepIndex: 112233},
{LeftRows: 44, OutputRows: 55, RightRows: 66, StepIndex: 445566},
},
InsufficientShuffleQuota: true,
PartitionSkew: &bq.PartitionSkew{SkewSources: []*bq.SkewSource{
{StageId: 321},
{StageId: 654},
}},
StageId: 123456,
},
{
BiEngineReasons: []*bq.BiEngineReason{
{Code: "bi-code-2", Message: "bi-message-2"},
{Code: "bi-code-3", Message: "bi-message-3"},
},
HighCardinalityJoins: []*bq.HighCardinalityJoin{
{LeftRows: 77, OutputRows: 88, RightRows: 99, StepIndex: 778899},
},
PartitionSkew: &bq.PartitionSkew{SkewSources: []*bq.SkewSource{{StageId: 987}}},
SlotContention: true,
StageId: 654321,
},
},
},
want: &PerformanceInsights{
AvgPreviousExecution: 128 * time.Millisecond,
StagePerformanceChangeInsights: []*StagePerformanceChangeInsight{
{InputDataChange: &InputDataChange{RecordsReadDiffPercentage: 1.23}, StageID: 123},
{InputDataChange: &InputDataChange{RecordsReadDiffPercentage: 4.56}, StageID: 456},
},
StagePerformanceStandaloneInsights: []*StagePerformanceStandaloneInsight{
{
BIEngineReasons: []*BIEngineReason{
{Code: "bi-code-1", Message: "bi-message-1"},
},
HighCardinalityJoins: []*HighCardinalityJoin{
{LeftRows: 11, OutputRows: 22, RightRows: 33, StepIndex: 112233},
{LeftRows: 44, OutputRows: 55, RightRows: 66, StepIndex: 445566},
},
InsufficientShuffleQuota: true,
PartitionSkew: &PartitionSkew{SkewSources: []*SkewSource{{StageID: 321}, {StageID: 654}}},
StageID: 123456,
},
{
BIEngineReasons: []*BIEngineReason{
{Code: "bi-code-2", Message: "bi-message-2"},
{Code: "bi-code-3", Message: "bi-message-3"},
},
HighCardinalityJoins: []*HighCardinalityJoin{
{LeftRows: 77, OutputRows: 88, RightRows: 99, StepIndex: 778899},
},
PartitionSkew: &PartitionSkew{SkewSources: []*SkewSource{{StageID: 987}}},
SlotContention: true,
StageID: 654321,
},
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
out := bqToPerformanceInsights(test.in)
if !reflect.DeepEqual(test.want, out) {
t.Error("out != want")
}
})
}
}

func fixRandomID(s string) func() {
prev := randomIDFn
randomIDFn = func() string { return s }
Expand Down
Loading