-
Notifications
You must be signed in to change notification settings - Fork 1.5k
refactor(storage): Pipeline gRPC writes. #12422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Modify the gRPC writer to send additional data while waiting for the current chunk to flush. This is a substantial refactor. Per the Go io.Writer interface contract, we must never modify or retain the slice that the caller provides to Write. However, that doesn't mean we have to copy every byte into a writer-controlled buffer: we can refer to the byte slice in place. Therefore, if callers call Write() with more bytes than the chunk size, we can send them to the service immediately as long as we don't return from Write() until we no longer need the caller's slice. This is an in-place refactor which retains the existing flush behavior, so its benefits are most obvious in somewhat unlikely scenarios. For example: a 100MiB upload to a bucket in a remote region with chunk size 256KiB can see a latency reduction of ~35%. There are two followup investigations made possible by this refactor. The first is to flush less frequently when the caller provides write slices much larger than the chunk size. This may provide an even larger throughput improvement for cases like the one above, and is straightforward to implement. The second is to flush more frequently when the caller provides write slices much smaller than the chunk size. (e.g. split a 16MiB chunk into 2x8MiB sub-chunks, and flush each when they're full.) This can avoid pipeline stalls in more scenarios, by increasing the likelihood that part of the chunk is available to buffer data without waiting for a flush acknowledgement inline.
tritone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this is looking pretty good to me; only significant comment is the one about channel overhead for many small writes. Profiling this will help understand whether this contributes significant overhead.
I think a bigger question is how to test this to make sure it doesn't cause regressions. @vadlakondaswetha has already done some GCSFuse performance testing using this PR which has helped iron out some issues. We'll also need to do a close comparison of throughput, CPU and memory between the release candidate with this PR and the previous release.
Overall though this seems like a really good refactor and adds clarity to the flows around different types of writes.
| w.initializeSender() | ||
| } else { | ||
| select { | ||
| case <-w.donec: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this case intended for if you send on a writer that's already closed? if so what is supposed to happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either the writer is already closed, or the writer gets closed due to a permanent error before it can process the provided command. Basically this is how we handle asynchronous failures which the calling code may not have detected yet.
The underlying writer promises to set w.streamResult before closing w.donec
| return true | ||
|
|
||
| done := make(chan struct{}) | ||
| cmd := &gRPCWriterCommandWrite{p: p, done: done} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you profile this with many small-buffer writes? Just wanted to make sure that the extra overhead of the channel for each cmd is not too much of an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this offline; sounds like it could be worth it to just preserve this outer channel between Write() calls rather than closing/recreating each time. But profiling will tell us what the actual overhead is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See http://b/422440765#comment4 and comment 5 for more discussion. I'm inclined to merge like this and improve afterwards if necessary, if that's alright.
To summarize for any reader without Google internal bug access: there is a minor (few % on a very low base) CPU increase and therefore latency increase and IOPS decrease for 1-byte operations. Once op size goes above a de minimis value, this effect disappears and the throughput increase from pipelining results in slightly lower mean and tail latencies and higher IOPS. There is still a small % CPU increase, but since the write is now large enough to block on IO for more of the time, the CPU base is even lower.
We could trade the simplicity of a write completion channel per op for a somewhat less forgiving interface with a write completion channel per storage.Writer. This is consistent with the existing API contract based on io.PipeWriter, and gets us ~1/3 of the new CPU cost back. I think it's reasonable to do that in a focused followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks for the summary here.
storage/grpc_writer.go
Outdated
| } | ||
| w.streamSender = w.pickBufferSender() | ||
|
|
||
| runWriteLoop := func(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This additional wrapper seems unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w.lastErr is just to persist the prior failure across multiple run() retries, and it's convenient to get that from the writeLoop return instead of remembering it each place we break out of that loop.
But you're right it doesn't have to be named: moved these two lines into the run() call directly as an anonymous function.
| type gRPCWriteRequestParams struct { | ||
| appendable bool | ||
| bucket string | ||
| routingToken *string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is having an empty string routing token valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is not. Do you suggest switching this from *string to string and copying from e.RoutingToken in maybeHandleRedirectionError?
Review comments
76def25 to
d5e2d6e
Compare
Don't retain user write buffers after returning from Write
| } | ||
| requests := make(chan gRPCBidiWriteRequest, w.sendableUnits) | ||
| // only one request will be outstanding at a time. | ||
| requestAcks := make(chan struct{}, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice fix!
| return true | ||
|
|
||
| done := make(chan struct{}) | ||
| cmd := &gRPCWriterCommandWrite{p: p, done: done} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks for the summary here.
Correct issues with reduced ack tracking.
Update attr size during completions
|
@tritone IIUC this is ready to merge now |
Oneshot writes did not report progress prior to googleapis#12422. This fixes them so that they also don't report progress after that. Also add an emulator test, since it turns out our first test of that behavior was in the integration tests!
Oneshot writes did not report progress prior to #12422. This fixes them so that they also don't report progress after that. Also add an emulator test, since it turns out our first test of that behavior was in the integration tests!
storage.Writer took an assumption that CloseWithError() could be called more than once, and was thread-safe with respect to concurrent Write(), Flush(), and Close() calls. This was not honored in the refactor in googleapis#12422. Modify Writer so that it is thread-safe to provide these behaviors, and support repeated Close() and CloseWithError() calls. To address this, we start the sender goroutine earlier, and gather the first buffer in that goroutine. It's possible that some workloads which gather less than one buffer worth of data with a sequence of small writes will observe a performance hit here, since those writes used to be direct copies but will now be a channel ping-pong. If that's an issue, it could be improved by wrapping the buffer in a mutex and doing more explicit concurrency control.
storage.Writer took an assumption that CloseWithError() could be called more than once, and was thread-safe with respect to concurrent Write(), Flush(), and Close() calls. This was not honored in the refactor in #12422. Modify Writer so that it is thread-safe to provide these behaviors, and support repeated Close() and CloseWithError() calls. To address this, we start the sender goroutine earlier, and gather the first buffer in that goroutine. It's possible that some workloads which gather less than one buffer worth of data with a sequence of small writes will observe a performance hit here, since those writes used to be direct copies but will now be a channel ping-pong. If that's an issue, it could be improved by wrapping the buffer in a mutex and doing more explicit concurrency control.
🤖 I have created a release *beep* *boop* --- ## [1.57.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.57.0) (2025-09-23) ### Features * **storage/control:** Add new GetIamPolicy, SetIamPolicy, and TestIamPermissions RPCs ([d73f912](https://togithub.com/googleapis/google-cloud-go/commit/d73f9123be77bb3278f48d510cd0fb22feb605bc)) * **storage:** Post support dynamic key name ([#12677](https://togithub.com/googleapis/google-cloud-go/issues/12677)) ([9e761f9](https://togithub.com/googleapis/google-cloud-go/commit/9e761f961a2c4351b3e0793ed655314ac5853903)) * **storage:** WithMeterProvider allows custom meter provider configuration ([#12668](https://togithub.com/googleapis/google-cloud-go/issues/12668)) ([7f574b0](https://togithub.com/googleapis/google-cloud-go/commit/7f574b01e0b454c1ef5c13e6a58075e394ee990d)) ### Bug Fixes * **storage:** Free buffers in Bidi Reader ([#12839](https://togithub.com/googleapis/google-cloud-go/issues/12839)) ([bc247fd](https://togithub.com/googleapis/google-cloud-go/commit/bc247fdc3f5234a8bd6934e58d5b0b578f1335cb)) * **storage:** Make Writer thread-safe. ([#12753](https://togithub.com/googleapis/google-cloud-go/issues/12753)) ([9ea380b](https://togithub.com/googleapis/google-cloud-go/commit/9ea380bea5b980a9054d201be4f315a195da2182)) * **storage:** No progress report for oneshot write ([#12746](https://togithub.com/googleapis/google-cloud-go/issues/12746)) ([b97c286](https://togithub.com/googleapis/google-cloud-go/commit/b97c286ec369a10a81b1a8a3a1aae18b46d2dfbc)) ### Performance Improvements * **storage:** Pipeline gRPC writes ([#12422](https://togithub.com/googleapis/google-cloud-go/issues/12422)) ([1f2c5fe](https://togithub.com/googleapis/google-cloud-go/commit/1f2c5fe2843724302086fe04cb8dab8b515969c5)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
I'd like to pick up a [bug fix](googleapis/google-cloud-go#12839) and a [write performance improvement](googleapis/google-cloud-go#12422)
I'd like to pick up googleapis/google-cloud-go#12839 and googleapis/google-cloud-go#12422 This PR is the result of running `bazel run go get cloud.google.com/go/storage`
Modify the gRPC writer to send additional data while waiting for the current chunk to flush. This is a substantial refactor.
Per the Go io.Writer interface contract, we must never modify or retain the slice that the caller provides to Write. However, that doesn't mean we have to copy every byte into a writer-controlled buffer: we can refer to the byte slice in place. Therefore, if callers call Write() with more bytes than the chunk size, we can send them to the service immediately as long as we don't return from Write() until we no longer need the caller's slice.
By sending data as soon as callers provide it, we get a substantial single-stream throughput increase for large objects. This is especially evident when callers provide large buffers to Write() calls.
There are two followup investigations made possible by this refactor. The first is to flush less frequently when the caller provides write slices much larger than the chunk size. This may provide an even larger throughput improvement when Write() is called with a large buffer, and is straightforward to implement.
The second is to flush more frequently when the caller provides write slices much smaller than the chunk size. (E.g. split a 16MiB chunk into 2x8MiB sub-chunks, and flush each when they're full.) This can avoid pipeline stalls in more scenarios, by increasing the likelihood that part of the chunk is available to buffer data without waiting for a flush acknowledgement.