这是indexloc提供的服务,不要输入任何密码
Skip to content

Source concurrent dispatch #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

gabo1208
Copy link
Contributor

@gabo1208 gabo1208 commented Nov 19, 2021

Changes

  • 🎁 Concurrent message processing on the source (msg translation to CloudEvent and Dispatching)
  • 🧹 Cleaned up concurrency testing for the Adapter and the Dispatcher
  • 🧹 Moved the converToCloudEvent function to its message.go corresponding file

/kind performance

Fixes #508

Release Note

Now the source Adapter process messages concurrently depending on the channel_config.prefetch_count argument (default to 1)

@google-cla google-cla bot added the cla: yes Indicates the PR's author has signed the CLA. label Nov 19, 2021
@knative-prow-robot knative-prow-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Nov 19, 2021
@gabo1208 gabo1208 force-pushed the source-concurrent-dispatch branch from 79ed991 to 3612c2f Compare November 19, 2021 17:00
@codecov
Copy link

codecov bot commented Nov 19, 2021

Codecov Report

Merging #522 (e178b88) into main (4cf1cf0) will decrease coverage by 0.08%.
The diff coverage is 50.98%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #522      +/-   ##
==========================================
- Coverage   75.86%   75.78%   -0.09%     
==========================================
  Files          46       47       +1     
  Lines        2760     2783      +23     
==========================================
+ Hits         2094     2109      +15     
- Misses        541      549       +8     
  Partials      125      125              
Impacted Files Coverage Δ
pkg/apis/sources/v1alpha1/rabbitmq_defaults.go 0.00% <0.00%> (ø)
pkg/apis/sources/v1alpha1/rabbitmq_types.go 50.00% <ø> (ø)
pkg/adapter/adapter.go 66.50% <44.11%> (-0.17%) ⬇️
pkg/apis/sources/v1alpha1/rabbitmq_validation.go 55.55% <60.00%> (+1.70%) ⬆️
pkg/adapter/message.go 91.66% <100.00%> (+5.00%) ⬆️
pkg/reconciler/source/resources/receive_adapter.go 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4cf1cf0...e178b88. Read the comment docs.

@gabo1208 gabo1208 force-pushed the source-concurrent-dispatch branch 2 times, most recently from 657c7f3 to ad57017 Compare November 20, 2021 16:14
@@ -52,8 +51,8 @@ type ExchangeConfig struct {
}

type ChannelConfig struct {
PrefetchCount int
GlobalQos bool
PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"10" required:"false"`
Copy link
Contributor

@gvmw gvmw Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This environment name feels too verbose and implementation-specific. I think that RABBITMQ_QOS_PREFETCH_COUNT is sufficiently descriptive - wdyt?

Since PrefetchCount and GlobalQos are both used in the context of Qos, I think that it would make sense to rename them to QosPrefetchCount and QosGlobal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in the globalQos

PrefetchCount int
GlobalQos bool
PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"10" required:"false"`
GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my previous comment, RABBITMQ_QOS_GLOBAL feels sufficient to me.

Copy link
Contributor Author

@gabo1208 gabo1208 Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see what everyone thinks about this. This is following the previous convention of env var names.

@@ -281,31 +282,51 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel,

msgs, _ := a.ConsumeMessages(channel, queue, logger)

wg := &sync.WaitGroup{}
workerCount := a.config.ChannelConfig.PrefetchCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetch counts of 100 or even 1000 are common recommendations for high-throughput scenarios. Are we sure that we want to start that many goroutines?

Is there going to be a single source per go process? If we are likely to have many sources per go process, this may cause concurrency problems - locking & context switching seem too high to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "sources"? In general, Goroutines are lightweight and having thousands of them does not necessarily impact (thread) context switching. Not sure where we'd have locking bottlenecks in this code path.

@@ -88,6 +88,8 @@ Sources are Kubernetes objects. In addition to the standard Kubernetes
| `queue_config.delete_when_unused` {optional} | Boolean
| `queue_config.exclusive` {optional} | Boolean
| `queue_config.nowait` {optional} | Boolean
| `channel_config.prefetch_count` {optional} | Int that limits the https://www.rabbitmq.com/consumer-prefetch.html[Consumer Prefetch Value].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explains the environment variable name chosen here https://github.com/knative-sandbox/eventing-rabbitmq/pull/522/files#r754456187

I am still not convinced that exposing this much RabbitMQ-specific detail is a good idea. If these broker types are meant to be easily interchangeable, then the current implementation-specific approach is going against that principle. How are you thinking about this @benmoss @salaboy ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sources aren't implementations of a generic type, they're unified by the contract that they emit messages to some sink. Each one has its own CRD, unlike Broker/Trigger where the CRD is defined in Eventing core.

@gvmw
Copy link
Contributor

gvmw commented Nov 25, 2021

Taking this out for a QA.

@gvmw
Copy link
Contributor

gvmw commented Nov 29, 2021

I had to jump through the following hoops before I could understand the improvement that this PR brings:

  1. mv config/source/{config-observability,400-config-observability}.yaml to fix ko apply -f config/source
  2. add a new example to samples/source that is ready to go - this example from @benmoss came in handy
  3. revert the source change to see the improvement

It would have been easier for me if this PR already included 1. & 2. I think that building out the samples in this repository can only be a good thing, since anyone can try out various features by following the respective READMEs and running a few ko commands. This reminds me that I could have done a better job myself in #492 by adding a README to the multi-namespaces example.

@gvmw
Copy link
Contributor

gvmw commented Nov 29, 2021

I was not expecting the following:

  • Setting the prefetch to 0 resulted in a prefetch of 10
  • Setting a prefetch of -1 was also allowed and resulted in it being set to 10
  • Setting a prefetch of 1000000000 was allowed, and the container kept getting OOMKilled

I would expect the admission webhook to disallow values below 1 and above 10000. The implications are that RabbitMQ will not allow an infinite number of unacknowledged messages to be sent to this client (that's what 0 means), and stop sending more messages if 10000 have not been acknowledged.

@knative-prow-robot knative-prow-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Nov 29, 2021
@gvmw
Copy link
Contributor

gvmw commented Nov 29, 2021

We have talked to @gabo1208 about this and concluded that:

  • The admission webhook will disallow any prefetch values below 1 and above 10000
  • If prefetch is set to any value between 1 to 10000 the actual value will be respected and the goroutines will be set to the prefetch value too
  • Prefetch value defaults to 10 and goroutines default to 10 too.

The reason why we do not allow a prefetch of 0 is so that this client will not be able to receive an infinite number of unacknowledged messages. We don't want to allow users to shoot themselves in the foot.

WDYT @michaelklishin @lukebakken @MarcialRosales @embano1?

@embano1
Copy link
Contributor

embano1 commented Nov 29, 2021

The admission webhook will disallow any prefetch values below 1 and above 10000

+1

If prefetch is set to any value between 1 to 10000 the actual value will be respected and the goroutines will be set to the prefetch value too

+1, perhaps we can indicate this with a (debug) log line or logger field during start (if not already)

Prefetch value defaults to 10 and goroutines default to 10 too

A bit on the fence here in terms of breaking change and delivery guarantees. Before that change, we basically had basic FIFO semantics (at the expense of low throughput). Not sure, but IMHO we still don't have extensive docs around the delivery behavior of the whole chain (ingress/broker/dispatcher) so this could easily trip off people after the fact, especially when doing upgrades.

@lukebakken
Copy link

lukebakken commented Nov 29, 2021

@gvmw it's just a "feeling" but 10000 feels high. But, if that value has been tested and works A-OK, then :shipit: . I am thrilled that 0 is disabled 👍 👍 👍

@gvmw
Copy link
Contributor

gvmw commented Nov 29, 2021

@embano1 A bit on the fence here in terms of breaking change and delivery guarantees. Before that change, we basically had basic FIFO semantics (at the expense of low throughput). Not sure, but IMHO we still don't have extensive docs around the delivery behavior of the whole chain (ingress/broker/dispatcher) so this could easily drip off people after the fact, especially when doing upgrades.

That is a great point, let's default prefetch to 1 and goroutines to 1 as well. We should definitely log this when the source starts because the user did not choose a value and communicating defaults is valuable (PoLA - Principle of Least Astonishment).

@lukebakken it's just a "feeling" but 10000 feels high

I'm glad that you brought it up because 10000 felt high to me too. What would make a more reasonable maximum @lukebakken? I'm thinking 1000.

@gvmw
Copy link
Contributor

gvmw commented Nov 29, 2021

While testing this, we (+@gabo1208) have noticed that the queue does not get deleted when the source is deleted. We are setting the delete_when_unused: true, we are investigating why this happens.

@lukebakken
Copy link

Aha I remembered that we even have a blog article stating there's not much benefit to a prefetch > 1000, at least with quorum queues.

https://blog.rabbitmq.com/posts/2020/05/quorum-queues-and-flow-control-single-queue-benchmarks

@gabo1208 gabo1208 force-pushed the source-concurrent-dispatch branch from ad57017 to a17e870 Compare November 29, 2021 23:21
@knative-prow-robot knative-prow-robot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Nov 29, 2021
@gabo1208 gabo1208 force-pushed the source-concurrent-dispatch branch from 7a5049e to 44ebb05 Compare November 30, 2021 17:10
@gvmw
Copy link
Contributor

gvmw commented Dec 1, 2021

This is almost there @gabo1208, the only thing missing is RabbitmqCluster. This ensures that the sample is self-contained, and kubectl apply -f samples/source/concurrent-dispatch/ is enough to reproduce everything.

diff --git a/samples/source/concurrent-dispatch/200-rabbitmq.yaml b/samples/source/concurrent-dispatch/200-rabbitmq.yaml
new file mode 100644
index 00000000..eb6412d7
--- /dev/null
+++ b/samples/source/concurrent-dispatch/200-rabbitmq.yaml
@@ -0,0 +1,8 @@
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: RabbitmqCluster
+metadata:
+  name: rabbitmq
+  namespace: concurrent-dispatch
+spec:
+  replicas: 1

fixed dispatcher concurrency test format to match the rest

reduced test pre-req times
added prefetch count to source resource adapter tests
added better description for new properties in the source docs
modified sample files structure and added comments + defaults set channel prefetch_count to 1 when it is zero to avoid golang defaults complicated comparations

modified sample files structure and added comments + defaults set channel prefetch_count to 1 when it is zero to avoid golang defaults complicated comparations
… instead of int

fixed validation error value
…escription

modified files distribution on source sample and fixed readme table description
…example promoting the use of internal tools and with all the components needed to do a basic topology + set the prefetch count to use its default value to one but added comment on the file for the user to be able to modify it and understand whats going on
@gabo1208 gabo1208 force-pushed the source-concurrent-dispatch branch from 817f0e5 to b2e68eb Compare December 2, 2021 17:50
@knative-prow-robot knative-prow-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Dec 2, 2021
@gvmw
Copy link
Contributor

gvmw commented Dec 3, 2021

Taking this for another spin on GKE - hopefully the last one!

@gvmw
Copy link
Contributor

gvmw commented Dec 6, 2021

If the Rabbitmq that the source is pointing to is not available, the source will crash, which I think is OK because the error message is descriptive even though it looks scary:

image

image

As soon as RabbitMQ is running and the source pod gets re-scheduled, everything looks good again:
image

Sanity check:

  • the channel used by the source channel defaults to 1, so the behaviour does not change @embano1.
  • prefetch can be set to any value between 1 and 1000 (any other values are not allowed by the admission webhook)
  • modifying the prefetch is not allowed, it's an immutable field (same behaviour as before)

This last point could be improved. When the prefetch changes, the source client should reconnect (re-scheduling the pod is enough) so that the new value is used in the channel that is created on connecting. With the current implementation, the source needs to be deleted and re-declared with the new value. This workaround is OK for now, since the queue backing the source does not need to be deleted.

/lgtm
/approve

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Dec 6, 2021
@benmoss
Copy link
Contributor

benmoss commented Dec 6, 2021

/approve

@knative-prow-robot
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: benmoss, gabo1208, gvmw

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Dec 6, 2021
@knative-prow-robot knative-prow-robot merged commit f292f36 into knative-extensions:main Dec 6, 2021
@embano1
Copy link
Contributor

embano1 commented Dec 6, 2021

If the Rabbitmq that the source is pointing to is not available, the source will crash, which I think is OK

ACK

the channel used by the source channel defaults to 1 [...] When the prefetch changes, the source client should reconnect

Not sure I follow what source is meant here. Isn't prefetch relevant to the worker/dispatcher (trigger) only?

Update: missed that this PR is for the RabbitMQ source

@gvmw
Copy link
Contributor

gvmw commented Dec 14, 2021

@embano1 missed that this PR is for the RabbitMQ source

Here is the Trigger / Dispatcher equivalent #536

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. kind/performance lgtm Indicates that a PR is ready to be merged. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Event source should support concurrent dispatch
7 participants