这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package com.google.cloud.storage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.BidiWriteObjectResponse;
import com.google.storage.v2.StartResumableWriteRequest;
Expand Down Expand Up @@ -53,7 +54,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(
ApiFuture<ResumableWrite> resumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
WriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
Opts<ObjectTargetOpt> opts,
RetrierWithAlg retrier) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -68,23 +70,27 @@ ApiFuture<ResumableWrite> resumableWrite(
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
ApiFuture<ResumableWrite> futureResumableWrite =
ApiFutures.transform(
callable.futureCall(req),
(resp) -> new ResumableWrite(req, resp, f),
MoreExecutors.directExecutor());
// make sure we wrap any failure as a storage exception
return ApiFutures.catchingAsync(
futureResumableWrite,
Throwable.class,
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
MoreExecutors.directExecutor());
SettableApiFuture<ResumableWrite> future = SettableApiFuture.create();
try {
ResumableWrite resumableWrite =
retrier.run(
() -> {
StartResumableWriteResponse resp = callable.call(req);
return new ResumableWrite(req, resp, f);
},
Decoder.identity());
future.set(resumableWrite);
} catch (StorageException e) {
future.setException(e);
}
return future;
}

ApiFuture<BidiResumableWrite> bidiResumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
BidiWriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
Opts<ObjectTargetOpt> opts,
RetrierWithAlg retrier) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -99,9 +105,19 @@ ApiFuture<BidiResumableWrite> bidiResumableWrite(
Function<String, BidiWriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
return ApiFutures.transform(
x.futureCall(req),
(resp) -> new BidiResumableWrite(req, resp, f),
MoreExecutors.directExecutor());
SettableApiFuture<BidiResumableWrite> future = SettableApiFuture.create();
try {
BidiResumableWrite resumableWrite =
retrier.run(
() -> {
StartResumableWriteResponse resp = x.call(req);
return new BidiResumableWrite(req, resp, f);
},
Decoder.identity());
future.set(resumableWrite);
} catch (StorageException e) {
future.setException(e);
}
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
import static com.google.cloud.storage.CrossTransportUtils.fmtMethodName;
import static com.google.cloud.storage.CrossTransportUtils.throwHttpJsonOnly;
import static com.google.cloud.storage.GrpcToHttpStatusCodeTranslation.resultRetryAlgorithmToCodes;
import static com.google.cloud.storage.StorageV2ProtoUtils.bucketAclEntityOrAltEq;
import static com.google.cloud.storage.StorageV2ProtoUtils.objectAclEntityOrAltEq;
import static com.google.cloud.storage.Utils.bucketNameCodec;
Expand All @@ -42,7 +41,6 @@
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.BaseService;
import com.google.cloud.Policy;
Expand Down Expand Up @@ -1831,30 +1829,26 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
@VisibleForTesting
ApiFuture<ResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
.write()
.resumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
req,
opts);
opts,
retrier.withAlg(retryAlgorithmManager.getFor(req)));
}

ApiFuture<BidiResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
.write()
.bidiResumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
req,
opts);
opts,
retrier.withAlg(retryAlgorithmManager.getFor(req)));
}

private SourceObject sourceObjectEncode(SourceBlob from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.google.api.gax.rpc.RequestParamsBuilder;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.pathtemplate.PathTemplate;
Expand Down Expand Up @@ -109,6 +108,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -360,8 +360,6 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw
.setLogicalTimeout(java.time.Duration.ofDays(28))
.build();
java.time.Duration totalTimeout = baseRetrySettings.getTotalTimeoutDuration();
Set<Code> startResumableWriteRetryableCodes =
builder.startResumableWriteSettings().getRetryableCodes();

// retries for unary methods are generally handled at a different level, except
// StartResumableWrite
Expand All @@ -372,10 +370,22 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw
});

// configure the settings for StartResumableWrite
Duration startResumableTimeoutDuration;
// the default for initialRpcTimeout is the same as totalTimeout. This is not good, because it
// will prevent our retries from even happening.
// If the default values is used, set our per-rpc timeout to 20 seconds to allow our retries
// a chance.
if (baseRetrySettings
.getInitialRpcTimeoutDuration()
.equals(getDefaultRetrySettings().getInitialRpcTimeoutDuration())) {
startResumableTimeoutDuration = Duration.ofSeconds(20);
} else {
startResumableTimeoutDuration = baseRetrySettings.getInitialRpcTimeoutDuration();
}
builder
.startResumableWriteSettings()
.setRetrySettings(baseRetrySettings)
.setRetryableCodes(startResumableWriteRetryableCodes);
// set this lower, to allow our retries a chance instead of it being totalTimeout
.setSimpleTimeoutNoRetriesDuration(startResumableTimeoutDuration);
// for ReadObject disable retries and move the total timeout to the idle timeout
builder
.readObjectSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
.setRetryDelayMultiplier(1.2)
.setMaxRetryDelayDuration(Duration.ofSeconds(16))
.setMaxAttempts(6)
.setInitialRpcTimeoutDuration(Duration.ofSeconds(25))
.setInitialRpcTimeoutDuration(Duration.ofSeconds(1))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
Expand Down Expand Up @@ -60,8 +60,8 @@ public final class GapicUploadSessionBuilderSyntaxTest {

@Before
public void setUp() throws Exception {
when(startResumableWrite.futureCall(any()))
.thenReturn(ApiFutures.immediateFuture(StartResumableWriteResponse.getDefaultInstance()));
when(startResumableWrite.call(any()))
.thenReturn(StartResumableWriteResponse.getDefaultInstance());
}

@Test
Expand Down Expand Up @@ -95,7 +95,9 @@ public void syntax_directBuffered_fluent() {
@Test
public void syntax_resumableUnbuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
ResumableMedia.gapic()
.write()
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand All @@ -111,7 +113,9 @@ public void syntax_resumableUnbuffered_fluent() {
@Test
public void syntax_resumableBuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
ResumableMedia.gapic()
.write()
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -151,7 +155,9 @@ public void syntax_directBuffered_incremental() {
@Test
public void syntax_resumableUnbuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
ResumableMedia.gapic()
.write()
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand All @@ -165,7 +171,9 @@ public void syntax_resumableUnbuffered_incremental() {
@Test
public void syntax_resumableBuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
ResumableMedia.gapic()
.write()
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import static com.google.cloud.storage.TestUtils.getChecksummedData;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.PermissionDeniedException;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory;
import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory;
import com.google.common.collect.ImmutableList;
Expand All @@ -47,7 +50,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -210,6 +216,45 @@ public void resumableUpload() throws IOException, InterruptedException, Executio
}
}

@Test
public void startResumableUpload_deadlineExceeded_isRetried()
throws IOException, InterruptedException, ExecutionException, TimeoutException {

String uploadId = UUID.randomUUID().toString();
AtomicInteger callCount = new AtomicInteger(0);
StorageImplBase service =
new StorageImplBase() {
@Override
public void startResumableWrite(
StartResumableWriteRequest req, StreamObserver<StartResumableWriteResponse> respond) {
if (callCount.getAndIncrement() > 0) {
respond.onNext(
StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build());
respond.onCompleted();
}
}
};
try (FakeServer fake = FakeServer.of(service)) {
GrpcStorageImpl gsi = (GrpcStorageImpl) fake.getGrpcStorageOptions().getService();
ApiFuture<ResumableWrite> f =
gsi.startResumableWrite(
GrpcCallContext.createDefault(),
WriteObjectRequest.newBuilder()
.setWriteObjectSpec(
WriteObjectSpec.newBuilder()
.setResource(
Object.newBuilder().setBucket("bucket").setName("name").build())
.setIfGenerationMatch(0)
.build())
.build(),
Opts.empty());

ResumableWrite resumableWrite = f.get(2, TimeUnit.MINUTES);
assertThat(callCount.get()).isEqualTo(2);
assertThat(resumableWrite.newBuilder().build().getUploadId()).isEqualTo(uploadId);
}
}

@Test
public void resumableUpload_chunkAutomaticRetry()
throws IOException, InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.ITUnbufferedResumableUploadTest.ObjectSizes;
import com.google.cloud.storage.Retrying.Retrier;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
Expand Down Expand Up @@ -241,7 +242,8 @@ private UnbufferedWritableByteChannelSession<WriteObjectResponse> grpcSession()
.resumableWrite(
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
request,
opts);
opts,
RetrierWithAlg.attemptOnce());

return ResumableMedia.gapic()
.write()
Expand Down