From ec1bcd54a971e487dfc73cee757867ac3d17132c Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 20 Oct 2025 15:55:02 -0400 Subject: [PATCH] fix: update retry logic for grpc start resumable upload to properly handle client side deadline_exceeded --- .../storage/GapicUploadSessionBuilder.java | 54 ++++++++++++------- .../google/cloud/storage/GrpcStorageImpl.java | 18 +++---- .../cloud/storage/GrpcStorageOptions.java | 20 +++++-- .../com/google/cloud/storage/FakeServer.java | 2 +- .../GapicUploadSessionBuilderSyntaxTest.java | 22 +++++--- ...apicUnbufferedWritableByteChannelTest.java | 45 ++++++++++++++++ .../ITUnbufferedResumableUploadTest.java | 4 +- 7 files changed, 120 insertions(+), 45 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java index 3ae9b8bc23..1e4001fcae 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java @@ -17,13 +17,14 @@ package com.google.cloud.storage; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Retrying.RetrierWithAlg; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; -import com.google.common.util.concurrent.MoreExecutors; import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.StartResumableWriteRequest; @@ -53,7 +54,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel( ApiFuture resumableWrite( UnaryCallable callable, WriteObjectRequest writeObjectRequest, - Opts opts) { + Opts opts, + RetrierWithAlg retrier) { StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder(); if (writeObjectRequest.hasWriteObjectSpec()) { b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec()); @@ -68,23 +70,27 @@ ApiFuture resumableWrite( Function f = uploadId -> writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build(); - ApiFuture futureResumableWrite = - ApiFutures.transform( - callable.futureCall(req), - (resp) -> new ResumableWrite(req, resp, f), - MoreExecutors.directExecutor()); - // make sure we wrap any failure as a storage exception - return ApiFutures.catchingAsync( - futureResumableWrite, - Throwable.class, - throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)), - MoreExecutors.directExecutor()); + SettableApiFuture future = SettableApiFuture.create(); + try { + ResumableWrite resumableWrite = + retrier.run( + () -> { + StartResumableWriteResponse resp = callable.call(req); + return new ResumableWrite(req, resp, f); + }, + Decoder.identity()); + future.set(resumableWrite); + } catch (StorageException e) { + future.setException(e); + } + return future; } ApiFuture bidiResumableWrite( UnaryCallable x, BidiWriteObjectRequest writeObjectRequest, - Opts opts) { + Opts opts, + RetrierWithAlg retrier) { StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder(); if (writeObjectRequest.hasWriteObjectSpec()) { b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec()); @@ -99,9 +105,19 @@ ApiFuture bidiResumableWrite( Function f = uploadId -> writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build(); - return ApiFutures.transform( - x.futureCall(req), - (resp) -> new BidiResumableWrite(req, resp, f), - MoreExecutors.directExecutor()); + SettableApiFuture future = SettableApiFuture.create(); + try { + BidiResumableWrite resumableWrite = + retrier.run( + () -> { + StartResumableWriteResponse resp = x.call(req); + return new BidiResumableWrite(req, resp, f); + }, + Decoder.identity()); + future.set(resumableWrite); + } catch (StorageException e) { + future.setException(e); + } + return future; } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index ce3e6c62d2..d79b30e1fc 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -21,7 +21,6 @@ import static com.google.cloud.storage.ByteSizeConstants._256KiB; import static com.google.cloud.storage.CrossTransportUtils.fmtMethodName; import static com.google.cloud.storage.CrossTransportUtils.throwHttpJsonOnly; -import static com.google.cloud.storage.GrpcToHttpStatusCodeTranslation.resultRetryAlgorithmToCodes; import static com.google.cloud.storage.StorageV2ProtoUtils.bucketAclEntityOrAltEq; import static com.google.cloud.storage.StorageV2ProtoUtils.objectAclEntityOrAltEq; import static com.google.cloud.storage.Utils.bucketNameCodec; @@ -42,7 +41,6 @@ import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.NotFoundException; -import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.BaseService; import com.google.cloud.Policy; @@ -1831,30 +1829,26 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( @VisibleForTesting ApiFuture startResumableWrite( GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts opts) { - Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() .write() .resumableWrite( - storageClient - .startResumableWriteCallable() - .withDefaultCallContext(merge.withRetryableCodes(codes)), + storageClient.startResumableWriteCallable().withDefaultCallContext(merge), req, - opts); + opts, + retrier.withAlg(retryAlgorithmManager.getFor(req))); } ApiFuture startResumableWrite( GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts opts) { - Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() .write() .bidiResumableWrite( - storageClient - .startResumableWriteCallable() - .withDefaultCallContext(merge.withRetryableCodes(codes)), + storageClient.startResumableWriteCallable().withDefaultCallContext(merge), req, - opts); + opts, + retrier.withAlg(retryAlgorithmManager.getFor(req))); } private SourceObject sourceObjectEncode(SourceBlob from) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index a102125e95..54dd12519d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -45,7 +45,6 @@ import com.google.api.gax.rpc.RequestParamsBuilder; import com.google.api.gax.rpc.RequestParamsExtractor; import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.pathtemplate.PathTemplate; @@ -109,6 +108,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.time.Clock; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -360,8 +360,6 @@ private Tuple> resolveSettingsAndOpts() throw .setLogicalTimeout(java.time.Duration.ofDays(28)) .build(); java.time.Duration totalTimeout = baseRetrySettings.getTotalTimeoutDuration(); - Set startResumableWriteRetryableCodes = - builder.startResumableWriteSettings().getRetryableCodes(); // retries for unary methods are generally handled at a different level, except // StartResumableWrite @@ -372,10 +370,22 @@ private Tuple> resolveSettingsAndOpts() throw }); // configure the settings for StartResumableWrite + Duration startResumableTimeoutDuration; + // the default for initialRpcTimeout is the same as totalTimeout. This is not good, because it + // will prevent our retries from even happening. + // If the default values is used, set our per-rpc timeout to 20 seconds to allow our retries + // a chance. + if (baseRetrySettings + .getInitialRpcTimeoutDuration() + .equals(getDefaultRetrySettings().getInitialRpcTimeoutDuration())) { + startResumableTimeoutDuration = Duration.ofSeconds(20); + } else { + startResumableTimeoutDuration = baseRetrySettings.getInitialRpcTimeoutDuration(); + } builder .startResumableWriteSettings() - .setRetrySettings(baseRetrySettings) - .setRetryableCodes(startResumableWriteRetryableCodes); + // set this lower, to allow our retries a chance instead of it being totalTimeout + .setSimpleTimeoutNoRetriesDuration(startResumableTimeoutDuration); // for ReadObject disable retries and move the total timeout to the idle timeout builder .readObjectSettings() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java index d4ba66360b..0481f2b062 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java @@ -75,7 +75,7 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { .setRetryDelayMultiplier(1.2) .setMaxRetryDelayDuration(Duration.ofSeconds(16)) .setMaxAttempts(6) - .setInitialRpcTimeoutDuration(Duration.ofSeconds(25)) + .setInitialRpcTimeoutDuration(Duration.ofSeconds(1)) .setRpcTimeoutMultiplier(1.0) .setMaxRpcTimeoutDuration(Duration.ofSeconds(25)) .build()) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java index e35278b5b1..dddd5b8e16 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java @@ -20,9 +20,9 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.Retrying.RetrierWithAlg; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; @@ -60,8 +60,8 @@ public final class GapicUploadSessionBuilderSyntaxTest { @Before public void setUp() throws Exception { - when(startResumableWrite.futureCall(any())) - .thenReturn(ApiFutures.immediateFuture(StartResumableWriteResponse.getDefaultInstance())); + when(startResumableWrite.call(any())) + .thenReturn(StartResumableWriteResponse.getDefaultInstance()); } @Test @@ -95,7 +95,9 @@ public void syntax_directBuffered_fluent() { @Test public void syntax_resumableUnbuffered_fluent() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); + ResumableMedia.gapic() + .write() + .resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce()); UnbufferedWritableByteChannelSession session = ResumableMedia.gapic() .write() @@ -111,7 +113,9 @@ public void syntax_resumableUnbuffered_fluent() { @Test public void syntax_resumableBuffered_fluent() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); + ResumableMedia.gapic() + .write() + .resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce()); BufferedWritableByteChannelSession session = ResumableMedia.gapic() .write() @@ -151,7 +155,9 @@ public void syntax_directBuffered_incremental() { @Test public void syntax_resumableUnbuffered_incremental() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); + ResumableMedia.gapic() + .write() + .resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce()); GapicWritableByteChannelSessionBuilder b1 = ResumableMedia.gapic() .write() @@ -165,7 +171,9 @@ public void syntax_resumableUnbuffered_incremental() { @Test public void syntax_resumableBuffered_incremental() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); + ResumableMedia.gapic() + .write() + .resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce()); GapicWritableByteChannelSessionBuilder b1 = ResumableMedia.gapic() .write() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java index e23b2a57f7..3aa567a4d6 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java @@ -20,9 +20,12 @@ import static com.google.cloud.storage.TestUtils.getChecksummedData; import static com.google.common.truth.Truth.assertThat; +import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.PermissionDeniedException; import com.google.cloud.storage.Retrying.RetrierWithAlg; +import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; import com.google.common.collect.ImmutableList; @@ -47,7 +50,10 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -210,6 +216,45 @@ public void resumableUpload() throws IOException, InterruptedException, Executio } } + @Test + public void startResumableUpload_deadlineExceeded_isRetried() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + + String uploadId = UUID.randomUUID().toString(); + AtomicInteger callCount = new AtomicInteger(0); + StorageImplBase service = + new StorageImplBase() { + @Override + public void startResumableWrite( + StartResumableWriteRequest req, StreamObserver respond) { + if (callCount.getAndIncrement() > 0) { + respond.onNext( + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build()); + respond.onCompleted(); + } + } + }; + try (FakeServer fake = FakeServer.of(service)) { + GrpcStorageImpl gsi = (GrpcStorageImpl) fake.getGrpcStorageOptions().getService(); + ApiFuture f = + gsi.startResumableWrite( + GrpcCallContext.createDefault(), + WriteObjectRequest.newBuilder() + .setWriteObjectSpec( + WriteObjectSpec.newBuilder() + .setResource( + Object.newBuilder().setBucket("bucket").setName("name").build()) + .setIfGenerationMatch(0) + .build()) + .build(), + Opts.empty()); + + ResumableWrite resumableWrite = f.get(2, TimeUnit.MINUTES); + assertThat(callCount.get()).isEqualTo(2); + assertThat(resumableWrite.newBuilder().build().getUploadId()).isEqualTo(uploadId); + } + } + @Test public void resumableUpload_chunkAutomaticRetry() throws IOException, InterruptedException, ExecutionException { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java index 69b876ad35..59f694fe30 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java @@ -24,6 +24,7 @@ import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.ITUnbufferedResumableUploadTest.ObjectSizes; import com.google.cloud.storage.Retrying.Retrier; +import com.google.cloud.storage.Retrying.RetrierWithAlg; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; @@ -241,7 +242,8 @@ private UnbufferedWritableByteChannelSession grpcSession() .resumableWrite( storageClient.startResumableWriteCallable().withDefaultCallContext(merge), request, - opts); + opts, + RetrierWithAlg.attemptOnce()); return ResumableMedia.gapic() .write()