这是indexloc提供的服务,不要输入任何密码
Skip to content

feat(spanner): add option for how to call BeginTransaction #12436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,13 +1114,34 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
t.setSessionEligibilityForLongRunning(sh)
}
if t.shouldExplicitBegin(attempt) {
initTx := func(t *ReadWriteTransaction) {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
t.wb = []*Mutation{}
t.txOpts = c.txo.merge(options)
t.ct = c.ct
t.otConfig = c.otConfig
}
if t.shouldExplicitBegin(attempt, options) {
if t == nil {
t = &ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
}
}
initTx(t)
// Make sure we set the current session handle before calling BeginTransaction.
// Note that the t.begin(ctx) call could change the session that is being used by the transaction, as the
// BeginTransaction RPC invocation will be retried on a new session if it returns SessionNotFound.
t.txReadOnly.sh = sh
if err = t.begin(ctx, nil); err != nil {
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
if attempt > 0 {
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
} else {
trace.TracePrintf(ctx, nil, "Error during the initial BeginTransaction for a ReadWrite transaction: %v", ToSpannerError(err))
}
return ToSpannerError(err)
}
} else {
Expand All @@ -1133,17 +1154,9 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
previousTx: previousTx,
}
t.txReadOnly.sh = sh
initTx(t)
}
attempt++
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
t.wb = []*Mutation{}
t.txOpts = c.txo.merge(options)
t.ct = c.ct
t.otConfig = c.otConfig

trace.TracePrintf(ctx, map[string]interface{}{"transactionSelector": t.getTransactionSelector().String()},
"Starting transaction attempt")
Expand Down
94 changes: 75 additions & 19 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1801,18 +1801,24 @@ func TestClient_ReadWriteTransaction(t *testing.T) {
}
}

func validateIsolationLevelForRWTransactions(t *testing.T, server *MockedSpannerInMemTestServer, expected sppb.TransactionOptions_IsolationLevel) {
func validateIsolationLevelForRWTransactions(t *testing.T, server *MockedSpannerInMemTestServer, expected sppb.TransactionOptions_IsolationLevel, beginTransactionOption BeginTransactionOption) {
found := false
requests := drainRequestsFromServer(server.TestSpanner)
for _, req := range requests {
switch sqlReq := req.(type) {
case *sppb.ExecuteSqlRequest:
if beginTransactionOption == ExplicitBeginTransaction {
t.Fatalf("got TransactionOptions on ExecuteSqlRequest in combination with ExplicitBeginTransaction")
}
found = true
if sqlReq.GetTransaction().GetBegin().GetIsolationLevel() != expected {
t.Fatalf("Invalid IsolationLevel\n Expected: %v\n Got: %v\n", expected, sqlReq.GetTransaction().GetBegin().GetIsolationLevel())
}
break
case *sppb.BeginTransactionRequest:
if beginTransactionOption == InlinedBeginTransaction {
t.Fatalf("got BeginTransaction RPC in combination with InlinedBeginTransaction")
}
found = true
if sqlReq.GetOptions().GetIsolationLevel() != expected {
t.Fatalf("Invalid IsolationLevel\n Expected: %v\n Got: %v\n", expected, sqlReq.GetOptions().GetIsolationLevel())
Expand Down Expand Up @@ -1843,7 +1849,7 @@ func TestClient_ReadWriteTransactionWithNoIsolationLevelForRWTransactionAtClient
t.Fatal(err)
}
defer teardown()
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, InlinedBeginTransaction)
}

func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtClientConfig(t *testing.T) {
Expand All @@ -1853,7 +1859,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtClientCo
t.Fatal(err)
}
defer teardown()
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
}

func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransactionLevel(t *testing.T) {
Expand All @@ -1863,7 +1869,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransact
t.Fatal(err)
}
defer teardown()
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
}

func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransactionLevelWithAbort(t *testing.T) {
Expand All @@ -1882,7 +1888,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransact
if err != nil {
t.Fatal(err)
}
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
}

func TestClient_ApplyMutationsWithAtLeastOnceIsolationLevel(t *testing.T) {
Expand All @@ -1897,7 +1903,7 @@ func TestClient_ApplyMutationsWithAtLeastOnceIsolationLevel(t *testing.T) {
if err != nil {
t.Fatal(err)
}
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, ExplicitBeginTransaction)
}

func TestClient_ApplyMutationsWithIsolationLevel(t *testing.T) {
Expand All @@ -1912,61 +1918,111 @@ func TestClient_ApplyMutationsWithIsolationLevel(t *testing.T) {
if err != nil {
t.Fatal(err)
}
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE, ExplicitBeginTransaction)
}

func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t *testing.T) {
func consumeIterator(iter *RowIterator) error {
defer iter.Stop()
for {
_, err := iter.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return err
}
}
return nil
}

func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevelWithExplicitBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t, ExplicitBeginTransaction)
}

func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevelWithInlineBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t, InlinedBeginTransaction)
}

func testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
ctx,
client,
TransactionOptions{IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ})
TransactionOptions{IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ, BeginTransactionOption: beginTransactionOption})
if err != nil {
t.Fatalf("Unexpected error when creating transaction: %v", err)
}

iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
if err := consumeIterator(iter); err != nil {
t.Fatal(err)
}

validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, beginTransactionOption)
}

func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevelWithExplicitBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t, ExplicitBeginTransaction)
}

validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevelWithInlineBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t, InlinedBeginTransaction)
}

func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t *testing.T) {
func testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{TransactionOptions: TransactionOptions{IsolationLevel: sppb.TransactionOptions_SERIALIZABLE}})
defer teardown()
ctx := context.Background()
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
ctx,
client,
TransactionOptions{})
TransactionOptions{BeginTransactionOption: beginTransactionOption})
if err != nil {
t.Fatalf("Unexpected error when creating transaction: %v", err)
}

iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
if err := consumeIterator(iter); err != nil {
t.Fatal(err)
}

validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE, beginTransactionOption)
}

func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevelWithExplicitBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t, ExplicitBeginTransaction)
}

validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE)
func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevelWithInlineBegin(t *testing.T) {
t.Parallel()
testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t, InlinedBeginTransaction)
}

func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevel(t *testing.T) {
func testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{TransactionOptions: TransactionOptions{}})
defer teardown()
ctx := context.Background()
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
ctx,
client,
TransactionOptions{})
TransactionOptions{BeginTransactionOption: beginTransactionOption})
if err != nil {
t.Fatalf("Unexpected error when creating transaction: %v", err)
}

iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
if err := consumeIterator(iter); err != nil {
t.Fatal(err)
}

validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED)
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, beginTransactionOption)
}

func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion spanner/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *RowIterator) Next() (*Row, error) {
// if request contains TransactionSelector::Begin option, this is here as fallback to retry with
// explicit transactionID after a retry.
r.setTransactionID(nil)
r.err = errInlineBeginTransactionFailed()
r.err = r.updateTxState(errInlineBeginTransactionFailed(nil))
return nil, r.err
}
r.setTransactionID = nil
Expand Down
2 changes: 1 addition & 1 deletion spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2006,7 +2006,7 @@ func isFailedInlineBeginTransaction(err error) bool {
if err == nil {
return false
}
return ErrCode(err) == codes.Internal && strings.Contains(err.Error(), errInlineBeginTransactionFailed().Error())
return ErrCode(err) == codes.Internal && strings.Contains(err.Error(), errInlineBeginTransactionFailedMsg)
}

// isClientClosing returns true if the given error is a
Expand Down
Loading
Loading