这是indexloc提供的服务,不要输入任何密码
Skip to content

Conversation

@JerAguilon
Copy link

@JerAguilon JerAguilon commented Nov 14, 2025

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a [user-readable short description]

Add kafka_consumer_reschedule_ms as a tunable Kafka table engine setting in order to adjust how long consumers sleep for new data.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

This pull request is to address this issue that I reported: #89204

Essentially, if one were to set kafka_flush_interval_ms to be quite low (say, 250ms), one would expect that data would make its way to the downstream table in ~250ms. However, this is not the case, even if ingestion is happening very quickly. The reason is because there is a hardcoded 500ms stall that happens (a) when a consumer sees no messages or (b) when 1 minute passes.

This means that ingestion can happen much later than expected, as noted in my investigation. To fix, allow end users to tune this param while keeping the 500ms default.

@CLAassistant
Copy link

CLAassistant commented Nov 14, 2025

CLA assistant check
All committers have signed the CLA.

@JerAguilon JerAguilon changed the title Reschedule setting Make KAFKA_RESCHEDULE_MS a Kafka table setting Nov 15, 2025
@JerAguilon JerAguilon marked this pull request as ready for review November 15, 2025 01:35
@azat azat self-assigned this Nov 15, 2025
SELECT level, sum(total) FROM daily GROUP BY level;
```
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.

main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with_zookeeper=True, # For Replicated Table
with_zookeeper=True, # For Kafka2

instance.wait_for_log_line(f"{kafka_table}.*Committed offset 20000")

logging.debug("Timestamps: %s", instance.query(f"SELECT max(consume_ts), min(consume_ts) FROM test.{kafka_table}_destination"))
assert int(instance.query(f"SELECT max(consume_ts) - min(consume_ts) FROM test.{kafka_table}_destination")) < 8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this test is about? Why 8?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this should be removed. I used test_batch_slow_1.py as a template for this, and this test comes from that. Sorry for the confusion. I will address Monday alongside your suggestions

instance.wait_for_log_line(f"{kafka_table}.*Committed offset 50")

# Wait a bit for stream to stall and log the reschedule message
time.sleep(2.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this, since wait_for_log_line should have retries that are greater then 2 secs in total

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants