-
Notifications
You must be signed in to change notification settings - Fork 70
Source concurrent dispatch #522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source concurrent dispatch #522
Conversation
79ed991
to
3612c2f
Compare
Codecov Report
@@ Coverage Diff @@
## main #522 +/- ##
==========================================
- Coverage 75.86% 75.78% -0.09%
==========================================
Files 46 47 +1
Lines 2760 2783 +23
==========================================
+ Hits 2094 2109 +15
- Misses 541 549 +8
Partials 125 125
Continue to review full report at Codecov.
|
657c7f3
to
ad57017
Compare
pkg/adapter/adapter.go
Outdated
@@ -52,8 +51,8 @@ type ExchangeConfig struct { | |||
} | |||
|
|||
type ChannelConfig struct { | |||
PrefetchCount int | |||
GlobalQos bool | |||
PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"10" required:"false"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This environment name feels too verbose and implementation-specific. I think that RABBITMQ_QOS_PREFETCH_COUNT
is sufficiently descriptive - wdyt?
Since PrefetchCount
and GlobalQos
are both used in the context of Qos
, I think that it would make sense to rename them to QosPrefetchCount
and QosGlobal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as in the globalQos
PrefetchCount int | ||
GlobalQos bool | ||
PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"10" required:"false"` | ||
GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my previous comment, RABBITMQ_QOS_GLOBAL
feels sufficient to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see what everyone thinks about this. This is following the previous convention of env var names.
@@ -281,31 +282,51 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel, | |||
|
|||
msgs, _ := a.ConsumeMessages(channel, queue, logger) | |||
|
|||
wg := &sync.WaitGroup{} | |||
workerCount := a.config.ChannelConfig.PrefetchCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefetch counts of 100 or even 1000 are common recommendations for high-throughput scenarios. Are we sure that we want to start that many goroutines?
Is there going to be a single source per go process? If we are likely to have many sources per go process, this may cause concurrency problems - locking & context switching seem too high to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "sources"? In general, Goroutines are lightweight and having thousands of them does not necessarily impact (thread) context switching. Not sure where we'd have locking bottlenecks in this code path.
source/README.adoc
Outdated
@@ -88,6 +88,8 @@ Sources are Kubernetes objects. In addition to the standard Kubernetes | |||
| `queue_config.delete_when_unused` {optional} | Boolean | |||
| `queue_config.exclusive` {optional} | Boolean | |||
| `queue_config.nowait` {optional} | Boolean | |||
| `channel_config.prefetch_count` {optional} | Int that limits the https://www.rabbitmq.com/consumer-prefetch.html[Consumer Prefetch Value]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This explains the environment variable name chosen here https://github.com/knative-sandbox/eventing-rabbitmq/pull/522/files#r754456187
I am still not convinced that exposing this much RabbitMQ-specific detail is a good idea. If these broker types are meant to be easily interchangeable, then the current implementation-specific approach is going against that principle. How are you thinking about this @benmoss @salaboy ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sources aren't implementations of a generic type, they're unified by the contract that they emit messages to some sink. Each one has its own CRD, unlike Broker/Trigger where the CRD is defined in Eventing core.
Taking this out for a QA. |
I had to jump through the following hoops before I could understand the improvement that this PR brings:
It would have been easier for me if this PR already included 1. & 2. I think that building out the samples in this repository can only be a good thing, since anyone can try out various features by following the respective READMEs and running a few |
I was not expecting the following:
I would expect the admission webhook to disallow values below |
We have talked to @gabo1208 about this and concluded that:
The reason why we do not allow a prefetch of |
+1
+1, perhaps we can indicate this with a (debug) log line or logger field during start (if not already)
A bit on the fence here in terms of breaking change and delivery guarantees. Before that change, we basically had basic FIFO semantics (at the expense of low throughput). Not sure, but IMHO we still don't have extensive docs around the delivery behavior of the whole chain (ingress/broker/dispatcher) so this could easily trip off people after the fact, especially when doing upgrades. |
@gvmw it's just a "feeling" but |
That is a great point, let's default prefetch to
I'm glad that you brought it up because |
While testing this, we (+@gabo1208) have noticed that the queue does not get deleted when the source is deleted. We are setting the |
Aha I remembered that we even have a blog article stating there's not much benefit to a prefetch > 1000, at least with quorum queues. https://blog.rabbitmq.com/posts/2020/05/quorum-queues-and-flow-control-single-queue-benchmarks |
ad57017
to
a17e870
Compare
7a5049e
to
44ebb05
Compare
This is almost there @gabo1208, the only thing missing is diff --git a/samples/source/concurrent-dispatch/200-rabbitmq.yaml b/samples/source/concurrent-dispatch/200-rabbitmq.yaml
new file mode 100644
index 00000000..eb6412d7
--- /dev/null
+++ b/samples/source/concurrent-dispatch/200-rabbitmq.yaml
@@ -0,0 +1,8 @@
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: RabbitmqCluster
+metadata:
+ name: rabbitmq
+ namespace: concurrent-dispatch
+spec:
+ replicas: 1 |
fixed dispatcher concurrency test format to match the rest reduced test pre-req times
added prefetch count to source resource adapter tests
added better description for new properties in the source docs
…n values and the reconciler
…s source examples
modified sample files structure and added comments + defaults set channel prefetch_count to 1 when it is zero to avoid golang defaults complicated comparations modified sample files structure and added comments + defaults set channel prefetch_count to 1 when it is zero to avoid golang defaults complicated comparations
… instead of int fixed validation error value
…escription modified files distribution on source sample and fixed readme table description
…example promoting the use of internal tools and with all the components needed to do a basic topology + set the prefetch count to use its default value to one but added comment on the file for the user to be able to modify it and understand whats going on
817f0e5
to
b2e68eb
Compare
Taking this for another spin on GKE - hopefully the last one! |
If the Rabbitmq that the source is pointing to is not available, the source will crash, which I think is OK because the error message is descriptive even though it looks scary: As soon as RabbitMQ is running and the source pod gets re-scheduled, everything looks good again: Sanity check:
This last point could be improved. When the prefetch changes, the source client should reconnect (re-scheduling the pod is enough) so that the new value is used in the channel that is created on connecting. With the current implementation, the source needs to be deleted and re-declared with the new value. This workaround is OK for now, since the queue backing the source does not need to be deleted. /lgtm |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: benmoss, gabo1208, gvmw The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
ACK
Update: missed that this PR is for the RabbitMQ |
Changes
converToCloudEvent
function to itsmessage.go
corresponding file/kind performance
Fixes #508
Release Note