这是indexloc提供的服务,不要输入任何密码
Skip to content

producer: Integrate context.Context with publishing #365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Ulminator
Copy link

This PR sets out to resolve #360 with the integration of context.Context in the nsq.Producer. In order to maintain backwards compatibility, new methods were added that have names with the suffix WithContext. These new methods mostly correspond to those that a client would use to publish messages and they adhere to context timeouts/deadlines. Once the deadline has been reached, an error will be returned to the client and the given ProducerTransaction will be dropped/not published, all while keeping the connection to nsqd alive.

An overview of the new methods added are below:

nsq.Conn

  • ConnectWithContext
  • WriteCommandWithContext

nsq.Producer

  • PingWithContext
  • PublishWithContext
  • MultiPublishWithContext
  • DeferredPublishWithContext
  • PublishAsyncWithContext
  • MultiPublishAsyncWithContext
  • DeferredPublishAsyncWithContext

The idea here being that these WithContext methods would essentially replace the existing ones eventually, which would be a breaking/major version change for go-nsq.

Copy link
Member

@mreiferson mreiferson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, a few minor comments. I don't have any better ideas on naming functions in backwards-compatible way.

producer.go Outdated
// keep the connection alive if related to context timeout
// need to do some stuff that's in Producer.popTransaction here
w.transactions = w.transactions[1:]
t.Error = err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this code we've copied in to this context-specific edge case, can't we consolidate this in to popTransaction? Do we need to define a context-specific error code for the protocol?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I wasn't a huge fan of how that was being handled...

I updated the code to invoke popTransaction without modifying the function signature, meaning I seemingly need to have at least one new "frameType" in protocol.go to handle the context errors. I opted to be explicit and add two "frameTypes" (for context.Cancelled and context.DeadlineExceeded respectively) so that we would not need to infer the context error from the byte slice/handle a default case like so:

if frameType == FrameTypeContextError {
    switch string(data) {
    case context.Canceled.Error():
        t.Error = context.Canceled
    case context.DeadlineExceeded.Error():
        t.Error = context.DeadlineExceeded
    default:
        t.Error = ???
    }
}

The FrameTypeContextCanceled and FrameTypeContextDeadlineExceeded are not really "frameTypes" per se, so I also don't fully love this approach, but it feels better than what it was.

I'm curious to hear your thoughts on this approach and any suggestions you may have on how to improve upon this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can popTransaction take a 3rd error argument so we don't use magic values for some errors?

@Ulminator
Copy link
Author

@mreiferson thanks for the initial review!

Apologies it took so long for me to circle back around to this, but it should be good for another look 👍

@martin-sucha
Copy link
Contributor

Seems exactly like what I need, thanks!

I've reviewed the pull request and it looks good to me!

martin-sucha added a commit to kiwicom/go-nsq that referenced this pull request May 28, 2025
Merging current content of
nsqio#365
return w.PingWithContext(ctx)
}

func (w *Producer) PingWithContext(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to duplicate all of the docstrings onto the new methods

producer.go Outdated
// keep the connection alive if related to context timeout
// need to do some stuff that's in Producer.popTransaction here
w.transactions = w.transactions[1:]
t.Error = err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can popTransaction take a 3rd error argument so we don't use magic values for some errors?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

*: context.Context and timeouts
4 participants