这是indexloc提供的服务,不要输入任何密码
Skip to content

Conversation

@cjc25
Copy link
Contributor

@cjc25 cjc25 commented Jun 7, 2025

Modify the gRPC writer to send additional data while waiting for the current chunk to flush. This is a substantial refactor.

Per the Go io.Writer interface contract, we must never modify or retain the slice that the caller provides to Write. However, that doesn't mean we have to copy every byte into a writer-controlled buffer: we can refer to the byte slice in place. Therefore, if callers call Write() with more bytes than the chunk size, we can send them to the service immediately as long as we don't return from Write() until we no longer need the caller's slice.

By sending data as soon as callers provide it, we get a substantial single-stream throughput increase for large objects. This is especially evident when callers provide large buffers to Write() calls.

There are two followup investigations made possible by this refactor. The first is to flush less frequently when the caller provides write slices much larger than the chunk size. This may provide an even larger throughput improvement when Write() is called with a large buffer, and is straightforward to implement.

The second is to flush more frequently when the caller provides write slices much smaller than the chunk size. (E.g. split a 16MiB chunk into 2x8MiB sub-chunks, and flush each when they're full.) This can avoid pipeline stalls in more scenarios, by increasing the likelihood that part of the chunk is available to buffer data without waiting for a flush acknowledgement.

Modify the gRPC writer to send additional data while waiting for the
current chunk to flush. This is a substantial refactor.

Per the Go io.Writer interface contract, we must never modify or retain
the slice that the caller provides to Write. However, that doesn't mean
we have to copy every byte into a writer-controlled buffer: we can refer
to the byte slice in place. Therefore, if callers call Write() with more
bytes than the chunk size, we can send them to the service immediately
as long as we don't return from Write() until we no longer need the
caller's slice.

This is an in-place refactor which retains the existing flush behavior,
so its benefits are most obvious in somewhat unlikely scenarios. For
example: a 100MiB upload to a bucket in a remote region with chunk size
256KiB can see a latency reduction of ~35%.

There are two followup investigations made possible by this refactor.
The first is to flush less frequently when the caller provides write
slices much larger than the chunk size. This may provide an even larger
throughput improvement for cases like the one above, and is
straightforward to implement.

The second is to flush more frequently when the caller provides write
slices much smaller than the chunk size. (e.g. split a 16MiB chunk into
2x8MiB sub-chunks, and flush each when they're full.) This can avoid
pipeline stalls in more scenarios, by increasing the likelihood that
part of the chunk is available to buffer data without waiting for a
flush acknowledgement inline.
@cjc25 cjc25 requested review from a team as code owners June 7, 2025 03:43
@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Jun 7, 2025
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is looking pretty good to me; only significant comment is the one about channel overhead for many small writes. Profiling this will help understand whether this contributes significant overhead.

I think a bigger question is how to test this to make sure it doesn't cause regressions. @vadlakondaswetha has already done some GCSFuse performance testing using this PR which has helped iron out some issues. We'll also need to do a close comparison of throughput, CPU and memory between the release candidate with this PR and the previous release.

Overall though this seems like a really good refactor and adds clarity to the flows around different types of writes.

w.initializeSender()
} else {
select {
case <-w.donec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case intended for if you send on a writer that's already closed? if so what is supposed to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either the writer is already closed, or the writer gets closed due to a permanent error before it can process the provided command. Basically this is how we handle asynchronous failures which the calling code may not have detected yet.

The underlying writer promises to set w.streamResult before closing w.donec

return true

done := make(chan struct{})
cmd := &gRPCWriterCommandWrite{p: p, done: done}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you profile this with many small-buffer writes? Just wanted to make sure that the extra overhead of the channel for each cmd is not too much of an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline; sounds like it could be worth it to just preserve this outer channel between Write() calls rather than closing/recreating each time. But profiling will tell us what the actual overhead is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See http://b/422440765#comment4 and comment 5 for more discussion. I'm inclined to merge like this and improve afterwards if necessary, if that's alright.

To summarize for any reader without Google internal bug access: there is a minor (few % on a very low base) CPU increase and therefore latency increase and IOPS decrease for 1-byte operations. Once op size goes above a de minimis value, this effect disappears and the throughput increase from pipelining results in slightly lower mean and tail latencies and higher IOPS. There is still a small % CPU increase, but since the write is now large enough to block on IO for more of the time, the CPU base is even lower.

We could trade the simplicity of a write completion channel per op for a somewhat less forgiving interface with a write completion channel per storage.Writer. This is consistent with the existing API contract based on io.PipeWriter, and gets us ~1/3 of the new CPU cost back. I think it's reasonable to do that in a focused followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks for the summary here.

}
w.streamSender = w.pickBufferSender()

runWriteLoop := func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This additional wrapper seems unneeded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w.lastErr is just to persist the prior failure across multiple run() retries, and it's convenient to get that from the writeLoop return instead of remembering it each place we break out of that loop.

But you're right it doesn't have to be named: moved these two lines into the run() call directly as an anonymous function.

type gRPCWriteRequestParams struct {
appendable bool
bucket string
routingToken *string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is having an empty string routing token valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not. Do you suggest switching this from *string to string and copying from e.RoutingToken in maybeHandleRedirectionError?

@cjc25 cjc25 force-pushed the grpc-writer-pipelining branch from 76def25 to d5e2d6e Compare June 30, 2025 17:28
}
requests := make(chan gRPCBidiWriteRequest, w.sendableUnits)
// only one request will be outstanding at a time.
requestAcks := make(chan struct{}, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice fix!

return true

done := make(chan struct{})
cmd := &gRPCWriterCommandWrite{p: p, done: done}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks for the summary here.

@cjc25
Copy link
Contributor Author

cjc25 commented Aug 14, 2025

@tritone IIUC this is ready to merge now

@tritone tritone merged commit 1f2c5fe into googleapis:main Aug 19, 2025
9 checks passed
@cjc25 cjc25 deleted the grpc-writer-pipelining branch August 20, 2025 00:11
cjc25 added a commit to cjc25/google-cloud-go that referenced this pull request Aug 21, 2025
Oneshot writes did not report progress prior to
googleapis#12422. This fixes
them so that they also don't report progress after that.

Also add an emulator test, since it turns out our first test of that
behavior was in the integration tests!
BrennaEpp pushed a commit that referenced this pull request Aug 21, 2025
Oneshot writes did not report progress prior to
#12422. This fixes
them so that they also don't report progress after that.

Also add an emulator test, since it turns out our first test of that
behavior was in the integration tests!
cjc25 added a commit to cjc25/google-cloud-go that referenced this pull request Aug 22, 2025
storage.Writer took an assumption that CloseWithError() could be called
more than once, and was thread-safe with respect to concurrent Write(),
Flush(), and Close() calls. This was not honored in the refactor in
googleapis#12422.

Modify Writer so that it is thread-safe to provide these behaviors, and
support repeated Close() and CloseWithError() calls.

To address this, we start the sender goroutine earlier, and gather the
first buffer in that goroutine. It's possible that some workloads which
gather less than one buffer worth of data with a sequence of small
writes will observe a performance hit here, since those writes used to
be direct copies but will now be a channel ping-pong. If that's an
issue, it could be improved by wrapping the buffer in a mutex and doing
more explicit concurrency control.
tritone pushed a commit that referenced this pull request Aug 22, 2025
storage.Writer took an assumption that CloseWithError() could be called
more than once, and was thread-safe with respect to concurrent Write(),
Flush(), and Close() calls. This was not honored in the refactor in
#12422.

Modify Writer so that it is thread-safe to provide these behaviors, and
support repeated Close() and CloseWithError() calls.

To address this, we start the sender goroutine earlier, and gather the
first buffer in that goroutine. It's possible that some workloads which
gather less than one buffer worth of data with a sequence of small
writes will observe a performance hit here, since those writes used to
be direct copies but will now be a channel ping-pong. If that's an
issue, it could be improved by wrapping the buffer in a mutex and doing
more explicit concurrency control.
gcf-merge-on-green bot pushed a commit that referenced this pull request Sep 23, 2025
🤖 I have created a release *beep* *boop*
---


## [1.57.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.57.0) (2025-09-23)


### Features

* **storage/control:** Add new GetIamPolicy, SetIamPolicy, and TestIamPermissions RPCs ([d73f912](https://togithub.com/googleapis/google-cloud-go/commit/d73f9123be77bb3278f48d510cd0fb22feb605bc))
* **storage:** Post support dynamic key name ([#12677](https://togithub.com/googleapis/google-cloud-go/issues/12677)) ([9e761f9](https://togithub.com/googleapis/google-cloud-go/commit/9e761f961a2c4351b3e0793ed655314ac5853903))
* **storage:** WithMeterProvider allows custom meter provider configuration ([#12668](https://togithub.com/googleapis/google-cloud-go/issues/12668)) ([7f574b0](https://togithub.com/googleapis/google-cloud-go/commit/7f574b01e0b454c1ef5c13e6a58075e394ee990d))


### Bug Fixes

* **storage:** Free buffers in Bidi Reader ([#12839](https://togithub.com/googleapis/google-cloud-go/issues/12839)) ([bc247fd](https://togithub.com/googleapis/google-cloud-go/commit/bc247fdc3f5234a8bd6934e58d5b0b578f1335cb))
* **storage:** Make Writer thread-safe. ([#12753](https://togithub.com/googleapis/google-cloud-go/issues/12753)) ([9ea380b](https://togithub.com/googleapis/google-cloud-go/commit/9ea380bea5b980a9054d201be4f315a195da2182))
* **storage:** No progress report for oneshot write ([#12746](https://togithub.com/googleapis/google-cloud-go/issues/12746)) ([b97c286](https://togithub.com/googleapis/google-cloud-go/commit/b97c286ec369a10a81b1a8a3a1aae18b46d2dfbc))


### Performance Improvements

* **storage:** Pipeline gRPC writes ([#12422](https://togithub.com/googleapis/google-cloud-go/issues/12422)) ([1f2c5fe](https://togithub.com/googleapis/google-cloud-go/commit/1f2c5fe2843724302086fe04cb8dab8b515969c5))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
vanja-p added a commit to buildbuddy-io/buildbuddy that referenced this pull request Oct 10, 2025
I'd like to pick up a [bug fix](googleapis/google-cloud-go#12839) and a [write performance improvement](googleapis/google-cloud-go#12422)
vanja-p added a commit to buildbuddy-io/buildbuddy that referenced this pull request Oct 13, 2025
I'd like to pick up
googleapis/google-cloud-go#12839 and
googleapis/google-cloud-go#12422

This PR is the result of running `bazel run go get
cloud.google.com/go/storage`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the Cloud Storage API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants