-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Client
PubSub
Environment
Any
Go Environment
go version 1.21
Likely irrelevant
Expected behavior
Long-running processing, making repeated calls to Subscription.Receive()
does not leak resources.
Actual behavior
gRPC StreamingPull
streams remain open in the backgroud and accumulate over time.
Root cause
pullStream.cancel()
never gets called, and CloseSend()
is not enough to actually terminate the underlying stream.
Code
A minimal fix would be e.g.
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -157,6 +157,9 @@ func (it *messageIterator) stop() {
it.checkDrained()
it.mu.Unlock()
it.wg.Wait()
+ if it.ps != nil {
+ it.ps.cancel()
+ }
}
// checkDrained closes the drained channel if the iterator has been stopped and all
@@ -243,6 +246,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
}
// Any error here is fatal.
if err != nil {
+ if status.Code(err) == codes.Canceled {
+ err = io.EOF
+ }
return nil, it.fail(err)
}
recordStat(it.ctx, PullCount, int64(len(rmsgs)))
Or this could be done either further up or down the call chain. For instance get Subscription.Receive
to do it after everything else is done, so that the cancelation error does not appear sooner (which however may lead to unnecessary wait?).
Additional context
This could well be the same issue as #5888 and #5265, however those were closed, and I thought it would make more sense to reopen a new one with the full diagnostics.