这是indexloc提供的服务,不要输入任何密码
Skip to content

fix: fix Journaling BlobWriteSessionConfig to properly handle multiple consecutive retries #3166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private GatheringByteChannel openSync() throws IOException {
return sync;
}

private WriteObjectRequest processSegment(ChunkSegment segment) {
private WriteObjectRequest processSegment(ChunkSegment segment, boolean updateCumulativeCrc32c) {
WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder();
if (!first) {
builder.clearUploadId().clearWriteObjectSpec().clearObjectChecksums();
Expand All @@ -162,9 +162,11 @@ private WriteObjectRequest processSegment(ChunkSegment segment) {
int contentSize = b.size();

// update ctx state that tracks overall progress
writeCtx
.getCumulativeCrc32c()
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
if (updateCumulativeCrc32c) {
writeCtx
.getCumulativeCrc32c()
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
}
// resolve current offset and set next
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);

Expand Down Expand Up @@ -202,6 +204,7 @@ private WriteObjectRequest.Builder finishMessage(WriteObjectRequest.Builder b) {
return b;
}

@SuppressWarnings("ConstantValue")
private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
AtomicBoolean recover = new AtomicBoolean(false);
retrier.run(
Expand All @@ -211,9 +214,16 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
sync.close();
}
boolean shouldRecover = recover.getAndSet(true);
// each ChunkSegment will always have its checksum computed, but if a retry happens, and
// we need to rewind and build a new ChunkSegment, we don't want to add it to the
// cumulativeCrc32c value because that will make it appear as the bytes are duplicated.
// If we send "ABCD", get an error and find only "AB" to have been persisted, we don't
// want to add "CD" to the cumulative crc32c as that would be equivalent to "ABCDCD".
boolean updateCumulativeCrc32c = !shouldRecover;
if (!shouldRecover) {
for (ChunkSegment segment : segments) {
WriteObjectRequest writeObjectRequest = processSegment(segment);
WriteObjectRequest writeObjectRequest =
processSegment(segment, updateCumulativeCrc32c);
stream.onNext(writeObjectRequest);
}

Expand Down Expand Up @@ -247,17 +257,22 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
first = true;
writeCtx.getTotalSentBytes().set(persistedSize);
writeCtx.getConfirmedBytes().set(persistedSize);
writeCtx.getCumulativeCrc32c().set(null); // todo: can we rewind checksum?
// intentionally do not modify the cumulativeCrc32c value
// this will stay in the state in sync with what has been written to disk
// when we recover, checksum the individual message but not the cumulative

try (SeekableByteChannel reader = rf.reader()) {
reader.position(persistedSize);
ByteBuffer buf = copyBuffer.get();
// clear before read, in case an error was thrown before
buf.clear();
while (Buffers.fillFrom(buf, reader) != -1) {
buf.flip();
while (buf.hasRemaining()) {
ChunkSegment[] recoverySegments = chunkSegmenter.segmentBuffer(buf);
for (ChunkSegment segment : recoverySegments) {
WriteObjectRequest writeObjectRequest = processSegment(segment);
WriteObjectRequest writeObjectRequest =
processSegment(segment, updateCumulativeCrc32c);
stream.onNext(writeObjectRequest);
}
}
Expand All @@ -280,7 +295,8 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
}
}
long newWritten = writeCtx.getTotalSentBytes().get();
Preconditions.checkState(newWritten == goalSize, "%s == %s", newWritten, goalSize);
Preconditions.checkState(
newWritten == goalSize, "newWritten == goalSize (%s == %s)", newWritten, goalSize);
return null;
},
Decoder.identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.TestUtils.apiException;
import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.cloud.storage.TestUtils.defaultRetryingDeps;
import static com.google.cloud.storage.TestUtils.xxd;
Expand All @@ -31,17 +32,23 @@
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Retrying.DefaultRetrier;
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.Alg;
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.RequestStream;
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.ResponseStream;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.it.ChecksummedTestContent;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.QueryWriteStatusRequest;
import com.google.storage.v2.QueryWriteStatusResponse;
import com.google.storage.v2.StartResumableWriteRequest;
Expand All @@ -50,10 +57,12 @@
import com.google.storage.v2.StorageGrpc.StorageImplBase;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import com.google.storage.v2.WriteObjectSpec;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -76,6 +85,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import net.jqwik.api.Arbitraries;
Expand Down Expand Up @@ -372,6 +382,204 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception {
}
}

@Example
void multipleRetriesAgainstFakeServer() throws Exception {
ChecksummedTestContent content =
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17));

String uploadId = UUID.randomUUID().toString();
StartResumableWriteRequest reqStart =
StartResumableWriteRequest.newBuilder()
.setWriteObjectSpec(
WriteObjectSpec.newBuilder()
.setResource(
Object.newBuilder().setBucket("projects/_/buckets/b").setName("o").build())
.build())
.build();
StartResumableWriteResponse resStart =
StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build();
QueryWriteStatusRequest reqQuery =
QueryWriteStatusRequest.newBuilder().setUploadId(uploadId).build();
QueryWriteStatusResponse resQuery =
QueryWriteStatusResponse.newBuilder().setPersistedSize(8).build();
WriteObjectRequest reqWrite0 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(0)
.setChecksummedData(content.slice(0, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite2 =
WriteObjectRequest.newBuilder()
.setWriteOffset(2)
.setChecksummedData(content.slice(2, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite4 =
WriteObjectRequest.newBuilder()
.setWriteOffset(4)
.setChecksummedData(content.slice(4, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite6 =
WriteObjectRequest.newBuilder()
.setWriteOffset(6)
.setChecksummedData(content.slice(6, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite8 =
WriteObjectRequest.newBuilder()
.setWriteOffset(8)
.setChecksummedData(content.slice(8, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite8WithUploadId = reqWrite8.toBuilder().setUploadId(uploadId).build();
WriteObjectRequest reqWrite10 =
WriteObjectRequest.newBuilder()
.setWriteOffset(10)
.setChecksummedData(content.slice(10, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite12 =
WriteObjectRequest.newBuilder()
.setWriteOffset(12)
.setChecksummedData(content.slice(12, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite14 =
WriteObjectRequest.newBuilder()
.setWriteOffset(14)
.setChecksummedData(content.slice(14, 2).asChecksummedData())
.build();
WriteObjectRequest reqWrite16 =
WriteObjectRequest.newBuilder()
.setWriteOffset(16)
.setChecksummedData(content.slice(16, 1).asChecksummedData())
.build();
WriteObjectRequest reqFinish =
WriteObjectRequest.newBuilder()
.setFinishWrite(true)
.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(content.getCrc32c()).build())
.mergeFrom(reqWrite16)
.build();
WriteObjectResponse resFinish =
WriteObjectResponse.newBuilder()
.setResource(
reqStart.getWriteObjectSpec().getResource().toBuilder()
.setGeneration(1)
.setSize(17)
.setChecksums(
ObjectChecksums.newBuilder()
.setCrc32C(content.getCrc32c())
.setMd5Hash(content.getMd5Bytes())
.build())
.build())
.build();
ImmutableSet<WriteObjectRequest> allReqWrite =
ImmutableSet.of(
reqWrite0,
reqWrite2,
reqWrite4,
reqWrite6,
reqWrite8,
reqWrite10,
reqWrite12,
reqWrite14,
reqWrite16);

AtomicInteger retryCount = new AtomicInteger(0);
StorageImplBase service =
new StorageImplBase() {
@Override
public void startResumableWrite(
StartResumableWriteRequest req, StreamObserver<StartResumableWriteResponse> respond) {
if (req.equals(reqStart)) {
respond.onNext(resStart);
respond.onCompleted();
} else {
unexpected(respond, req);
}
}

@Override
public void queryWriteStatus(
QueryWriteStatusRequest req, StreamObserver<QueryWriteStatusResponse> respond) {
if (req.equals(reqQuery)) {
respond.onNext(resQuery);
respond.onCompleted();
} else {
unexpected(respond, req);
}
}

@Override
public StreamObserver<WriteObjectRequest> writeObject(
StreamObserver<WriteObjectResponse> respond) {
return new StreamObserver<WriteObjectRequest>() {
@Override
public void onNext(WriteObjectRequest value) {
if (value.equals(reqFinish)) {
respond.onNext(resFinish);
respond.onCompleted();
} else if (value.equals(reqWrite10)) {
int i = retryCount.get();
if (i < 2) {
respond.onError(apiException(Code.UNAVAILABLE, "{Unavailable}"));
}
} else if (value.equals(reqWrite8WithUploadId)) {
retryCount.incrementAndGet();
} else if (allReqWrite.contains(value)) {
// do nothing
} else {
unexpected(respond, value);
}
}

@Override
public void onError(Throwable t) {}

@Override
public void onCompleted() {}
};
}

private void unexpected(StreamObserver<?> respond, Message msg) {
respond.onError(
apiException(
Code.UNIMPLEMENTED,
"Unexpected request { " + TextFormat.printer().shortDebugString(msg) + " }"));
}
};
try (FakeServer fakeServer = FakeServer.of(service);
GrpcStorageImpl storage =
(GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) {

BlobInfo info = BlobInfo.newBuilder("b", "o").build();
SettableApiFuture<WriteObjectResponse> resultFuture = SettableApiFuture.create();
BufferHandle recoverBufferHandle = BufferHandle.allocate(2);
SyncAndUploadUnbufferedWritableByteChannel syncAndUpload =
new SyncAndUploadUnbufferedWritableByteChannel(
storage.storageClient.writeObjectCallable(),
storage.storageClient.queryWriteStatusCallable(),
resultFuture,
new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2),
new DefaultRetrier(UnaryOperator.identity(), storage.getOptions()),
StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(),
new WriteCtx<>(
new ResumableWrite(
reqStart,
resStart,
id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build())),
recoveryFileManager.newRecoveryFile(info),
recoverBufferHandle);
try (BufferedWritableByteChannel w =
StorageByteChannels.writable()
.createSynchronized(
new DefaultBufferedWritableByteChannel(recoverBufferHandle, syncAndUpload))) {
w.write(ByteBuffer.wrap(content.getBytes()));
}

Decoder<WriteObjectResponse, BlobInfo> decoder =
Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);
BlobInfo actual = decoder.decode(resultFuture.get(3, TimeUnit.SECONDS));
assertThat(actual.getSize()).isEqualTo(content.getBytes().length);
assertThat(actual.getCrc32c()).isEqualTo(content.getCrc32cBase64());
}
}

static List<ByteString> dataFrames(long length, int segmentLength) {
// todo: rethink this
Random rand = new Random(length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public ChecksummedData asChecksummedData() {
.build();
}

public ChecksummedTestContent slice(int begin, int length) {
return of(bytes, begin, Math.min(length, bytes.length - begin));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down