-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Make KAFKA_RESCHEDULE_MS a Kafka table setting #90112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
JerAguilon
wants to merge
5
commits into
ClickHouse:master
Choose a base branch
from
JerAguilon:reschedule-setting
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
+193
−15
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
a4cef84 to
080a0df
Compare
080a0df to
61442a5
Compare
61442a5 to
1bd2636
Compare
azat
reviewed
Nov 15, 2025
* Noticed that `StorageKafka2.cpp` wasn't calling the sanityCheck function on KafkaSetting * Make tests cleaner * I am running tests on an Ubuntu VM and tests were failing with this error described here:ClickHouse#15611. To fix, add additional filtering to not raise an error on this warning.
Author
|
Ready for another review. TYSM for the prompt feedback @azat. |
JerAguilon
commented
Nov 16, 2025
| UInt64 ttl_usec = (*kafka_settings)[KafkaSetting::kafka_consumers_pool_ttl_ms] * 1'000; | ||
|
|
||
| std::unique_lock lock(mutex); | ||
| std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS); |
Author
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be explicit - this cleaning loop has nothing to do with consumer rescheduling as far as I can tell. So I am keeping this constant at 500ms.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Changelog category (leave one):
Changelog entry (a [user-readable short description]
Add
kafka_consumer_reschedule_msas a tunableKafkatable engine setting in order to adjust how long consumers sleep for new data.Documentation entry for user-facing changes
This pull request is to address this issue that I reported: #89204
Essentially, if one were to set
kafka_flush_interval_msto be quite low (say, 250ms), one would expect that data would make its way to the downstream table in ~250ms. However, this is not the case, even if ingestion is happening very quickly. The reason is because there is a hardcoded 500ms stall that happens (a) when a consumer sees no messages or (b) when 1 minute passes.This means that ingestion can happen much later than expected, as noted in my investigation. To fix, allow end users to tune this param while keeping the 500ms default.