-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Make KAFKA_RESCHEDULE_MS a Kafka table setting #90112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
a4cef84 to
080a0df
Compare
080a0df to
61442a5
Compare
61442a5 to
1bd2636
Compare
| SELECT level, sum(total) FROM daily GROUP BY level; | ||
| ``` | ||
| To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block. | ||
| To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block. | |
| To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block. |
| main_configs=["configs/kafka.xml", "configs/named_collection.xml"], | ||
| user_configs=["configs/users.xml"], | ||
| with_kafka=True, | ||
| with_zookeeper=True, # For Replicated Table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| with_zookeeper=True, # For Replicated Table | |
| with_zookeeper=True, # For Kafka2 |
| instance.wait_for_log_line(f"{kafka_table}.*Committed offset 20000") | ||
|
|
||
| logging.debug("Timestamps: %s", instance.query(f"SELECT max(consume_ts), min(consume_ts) FROM test.{kafka_table}_destination")) | ||
| assert int(instance.query(f"SELECT max(consume_ts) - min(consume_ts) FROM test.{kafka_table}_destination")) < 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this test is about? Why 8?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this should be removed. I used test_batch_slow_1.py as a template for this, and this test comes from that. Sorry for the confusion. I will address Monday alongside your suggestions
| instance.wait_for_log_line(f"{kafka_table}.*Committed offset 50") | ||
|
|
||
| # Wait a bit for stream to stall and log the reschedule message | ||
| time.sleep(2.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can remove this, since wait_for_log_line should have retries that are greater then 2 secs in total
Changelog category (leave one):
Changelog entry (a [user-readable short description]
Add
kafka_consumer_reschedule_msas a tunableKafkatable engine setting in order to adjust how long consumers sleep for new data.Documentation entry for user-facing changes
This pull request is to address this issue that I reported: #89204
Essentially, if one were to set
kafka_flush_interval_msto be quite low (say, 250ms), one would expect that data would make its way to the downstream table in ~250ms. However, this is not the case, even if ingestion is happening very quickly. The reason is because there is a hardcoded 500ms stall that happens (a) when a consumer sees no messages or (b) when 1 minute passes.This means that ingestion can happen much later than expected, as noted in my investigation. To fix, allow end users to tune this param while keeping the 500ms default.