这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/en/engines/table-engines/integrations/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ SETTINGS
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_consumer_reschedule_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
Expand Down Expand Up @@ -79,6 +80,7 @@ Optional parameters:
- `kafka_poll_timeout_ms` — Timeout for single poll from Kafka. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](/operations/settings/settings#max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms).
- `kafka_consumer_reschedule_ms` — Reschedule interval when Kafka stream processing is stalled (e.g., when no messages are available to consume). This setting controls the delay before the consumer retries polling. Must not exceed `kafka_consumers_pool_ttl_ms`. Default: `500` milliseconds.
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`), dead_letter_queue (error related data will be saved in system.dead_letter_queue).
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
Expand Down Expand Up @@ -126,7 +128,7 @@ Do not use this method in new projects. If possible, switch old projects to the

```sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_consumer_reschedule_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
```

</details>
Expand Down Expand Up @@ -173,7 +175,7 @@ Example:

SELECT level, sum(total) FROM daily GROUP BY level;
```
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasn't formed within [stream_flush_interval_ms](/operations/settings/settings#stream_flush_interval_ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.


To stop receiving topic data or to change the conversion logic, detach the materialized view:

Expand Down
7 changes: 5 additions & 2 deletions src/Storages/Kafka/KafkaSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace ErrorCodes
DECLARE(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \
/* default is stream_flush_interval_ms */ \
DECLARE(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
DECLARE(Milliseconds, kafka_consumer_reschedule_ms, 500, "Interval for rescheduling Kafka consumer tasks when they stall.", 0) \
DECLARE(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
DECLARE(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after kafka_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error), dead_letter_queue (error related data will be saved in system.dead_letter).", 0) \
DECLARE(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
Expand Down Expand Up @@ -130,12 +131,14 @@ void KafkaSettings::loadFromNamedCollection(const MutableNamedCollectionPtr & na

void KafkaSettings::sanityCheck() const
{
if (impl->kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS)
UInt64 kafka_consumer_reschedule_ms = impl->kafka_consumer_reschedule_ms.totalMilliseconds();

if (impl->kafka_consumers_pool_ttl_ms < kafka_consumer_reschedule_ms)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})",
impl->kafka_consumers_pool_ttl_ms.value,
KAFKA_RESCHEDULE_MS);
kafka_consumer_reschedule_ms);

if (impl->kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX)
throw Exception(
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace DB
class ASTStorage;
struct KafkaSettingsImpl;

const auto KAFKA_RESCHEDULE_MS = 500;
// How often we check for expired consumers in the pool
const auto KAFKA_CONSUMERS_CLEANUP_CHECK_INTERVAL_MS = 500;
// once per minute leave do reschedule (we can't lock threads in pool forever)
const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
// 10min
Expand Down
9 changes: 5 additions & 4 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ namespace KafkaSetting
extern const KafkaSettingsBool kafka_commit_on_select;
extern const KafkaSettingsUInt64 kafka_consumers_pool_ttl_ms;
extern const KafkaSettingsMilliseconds kafka_flush_interval_ms;
extern const KafkaSettingsMilliseconds kafka_consumer_reschedule_ms;
extern const KafkaSettingsString kafka_format;
extern const KafkaSettingsString kafka_group_name;
extern const KafkaSettingsStreamingHandleErrorMode kafka_handle_error_mode;
Expand Down Expand Up @@ -485,7 +486,7 @@ void StorageKafka::cleanConsumersByTTL()
UInt64 ttl_usec = (*kafka_settings)[KafkaSetting::kafka_consumers_pool_ttl_ms] * 1'000;

std::unique_lock lock(mutex);
std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS);
std::chrono::milliseconds timeout(KAFKA_CONSUMERS_CLEANUP_CHECK_INTERVAL_MS);
while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; }))
{
/// Copy consumers for closing to a new vector to close them without a lock
Expand Down Expand Up @@ -581,15 +582,15 @@ void StorageKafka::threadFunc(size_t idx)
auto some_stream_is_stalled = streamToViews();
if (some_stream_is_stalled)
{
LOG_TRACE(log, "Stream(s) stalled. Reschedule.");
LOG_TRACE(log, "Stream(s) stalled. Rescheduling in {} ms.", (*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds());
break;
}

auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
if (duration.count() > KAFKA_MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
LOG_TRACE(log, "Thread work duration limit exceeded. Rescheduling in {} ms.", (*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds());
break;
}
}
Expand Down Expand Up @@ -621,7 +622,7 @@ void StorageKafka::threadFunc(size_t idx)

// Wait for attached views
if (!task->stream_cancelled)
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
task->holder->scheduleAfter((*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds());
}


Expand Down
10 changes: 6 additions & 4 deletions src/Storages/Kafka/StorageKafka2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace KafkaSetting
extern const KafkaSettingsString kafka_broker_list;
extern const KafkaSettingsString kafka_client_id;
extern const KafkaSettingsMilliseconds kafka_flush_interval_ms;
extern const KafkaSettingsMilliseconds kafka_consumer_reschedule_ms;
extern const KafkaSettingsString kafka_format;
extern const KafkaSettingsString kafka_group_name;
extern const KafkaSettingsStreamingHandleErrorMode kafka_handle_error_mode;
Expand Down Expand Up @@ -1009,15 +1010,15 @@ void StorageKafka2::threadFunc(size_t idx)
// Exit the loop & reschedule if some stream stalled
if (maybe_stall_reason = streamToViews(idx); maybe_stall_reason.has_value())
{
LOG_TRACE(log, "Stream stalled.");
LOG_TRACE(log, "Stream stalled. Rescheduling in {} ms", (*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds());
break;
}

auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time);
if (duration.count() > KAFKA_MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
LOG_TRACE(log, "Thread work duration limit exceeded. Rescheduling in {} ms", (*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds());
break;
}
}
Expand All @@ -1030,10 +1031,11 @@ void StorageKafka2::threadFunc(size_t idx)

if (!task->stream_cancelled)
{
UInt64 kafka_consumer_reschedule_ms = (*kafka_settings)[KafkaSetting::kafka_consumer_reschedule_ms].totalMilliseconds();
if (maybe_stall_reason.has_value() && *maybe_stall_reason == StallKind::ShortStall)
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS / 10);
task->holder->scheduleAfter(kafka_consumer_reschedule_ms / 10);
else
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
task->holder->scheduleAfter(kafka_consumer_reschedule_ms);
}
}

Expand Down
49 changes: 48 additions & 1 deletion tests/integration/test_storage_kafka/test_batch_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3628,8 +3628,55 @@ def test_kafka_assigned_partitions(kafka_cluster):
metrics_before,
)

@pytest.mark.parametrize(
"create_query_generator",
[k.generate_old_create_table_query, k.generate_new_create_table_query],
)
def test_kafka_consumer_reschedule_validation(kafka_cluster, create_query_generator):
"""
Test that kafka_consumer_reschedule_ms is properly validated against
kafka_consumers_pool_ttl_ms (reschedule_ms < pool_ttl_ms).
"""
suffix = k.random_string(6)
kafka_table = f"kafka_reschedule_invalid_{suffix}"
topic_name = f"test_reschedule_validation_{suffix}"

# Should fail: reschedule_ms > pool_ttl_ms
create_query = create_query_generator(
kafka_table,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
settings={
"kafka_consumers_pool_ttl_ms": 1000,
"kafka_consumer_reschedule_ms": 2000, # Invalid: greater than TTL
},
)

with pytest.raises(QueryRuntimeException) as exc:
instance.query(create_query)

assert "cannot be less than" in str(exc.value).lower()

# Should succeed: reschedule_ms < pool_ttl_ms
kafka_table_valid = f"kafka_reschedule_valid_{suffix}"
create_query = create_query_generator(
kafka_table_valid,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=f"{topic_name}_valid",
settings={
"kafka_consumers_pool_ttl_ms": 10000,
"kafka_consumer_reschedule_ms": 100, # Valid
},
)

# This should not raise any exception
instance.query(create_query)



if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()
cluster.shutdown()
164 changes: 164 additions & 0 deletions tests/integration/test_storage_kafka/test_batch_slow_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Long running tests, longer than 30 seconds"""

from helpers.kafka.common_direct import *
import helpers.kafka.common as k

cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with_zookeeper=True, # For Replicated Table
with_zookeeper=True, # For Kafka2

macros={
"kafka_broker": "kafka1",
"kafka_topic_old": k.KAFKA_TOPIC_OLD,
"kafka_group_name_old": k.KAFKA_CONSUMER_GROUP_OLD,
"kafka_topic_new": k.KAFKA_TOPIC_NEW,
"kafka_group_name_new": k.KAFKA_CONSUMER_GROUP_NEW,
"kafka_client_id": "instance",
"kafka_format_json_each_row": "JSONEachRow",
},
clickhouse_path_dir="clickhouse_path",
)


# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
cluster.start()
kafka_id = instance.cluster.kafka_docker_id
print(("kafka_id is {}".format(kafka_id)))
yield cluster
finally:
cluster.shutdown()


@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query("DROP DATABASE IF EXISTS test SYNC; CREATE DATABASE test;")
admin_client = k.get_admin_client(cluster)

def get_topics_to_delete():
return [t for t in admin_client.list_topics() if not t.startswith("_")]

topics = get_topics_to_delete()
logging.debug(f"Deleting topics: {topics}")
result = admin_client.delete_topics(topics)
for topic, error in result.topic_error_codes:
if error != 0:
logging.warning(f"Received error {error} while deleting topic {topic}")
else:
logging.info(f"Deleted topic {topic}")

retries = 0
topics = get_topics_to_delete()
while len(topics) != 0:
logging.info(f"Existing topics: {topics}")
if retries >= 5:
raise Exception(f"Failed to delete topics {topics}")
retries += 1
time.sleep(0.5)
yield # run test


# Tests


@pytest.mark.parametrize(
"create_query_generator",
[k.generate_old_create_table_query, k.generate_new_create_table_query],
)
def test_bad_reschedule(kafka_cluster, create_query_generator):
suffix = k.random_string(6)
kafka_table = f"kafka_{suffix}"

topic_name = "test_bad_reschedule" + k.get_topic_postfix(create_query_generator)

messages = [json.dumps({"key": j + 1, "value": j + 1}) for j in range(20000)]
k.kafka_produce(kafka_cluster, topic_name, messages)

create_query = create_query_generator(
kafka_table,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
settings={
"kafka_max_block_size": 1000,
"kafka_flush_interval_ms": 1000,
},
)
instance.query(
f"""
{create_query};
CREATE MATERIALIZED VIEW test.{kafka_table}_destination ENGINE=MergeTree ORDER BY tuple() AS
SELECT
key,
now() as consume_ts,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.{kafka_table};
"""
)

instance.wait_for_log_line(f"{kafka_table}.*Committed offset 20000")

logging.debug("Timestamps: %s", instance.query(f"SELECT max(consume_ts), min(consume_ts) FROM test.{kafka_table}_destination"))
assert int(instance.query(f"SELECT max(consume_ts) - min(consume_ts) FROM test.{kafka_table}_destination")) < 8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this test is about? Why 8?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this should be removed. I used test_batch_slow_1.py as a template for this, and this test comes from that. Sorry for the confusion. I will address Monday alongside your suggestions



@pytest.mark.parametrize(
"create_query_generator",
[k.generate_old_create_table_query, k.generate_new_create_table_query],
)
def test_kafka_consumer_reschedule_logging(kafka_cluster, create_query_generator):
"""
Introspect that kafka_consumer_reschedule_ms timing.
"""
suffix = k.random_string(6)
kafka_table = f"kafka_reschedule_log_{suffix}"
topic_name = f"test_reschedule_logging_{suffix}"

test_reschedule_ms = 250

create_query = create_query_generator(
kafka_table,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
settings={
"kafka_max_block_size": 100,
"kafka_flush_interval_ms": 100,
"kafka_consumer_reschedule_ms": test_reschedule_ms,
},
)

instance.query(f"""
{create_query};
CREATE MATERIALIZED VIEW test.{kafka_table}_destination
ENGINE=MergeTree ORDER BY tuple() AS
SELECT key, value FROM test.{kafka_table};
""")

messages = [json.dumps({"key": j, "value": j}) for j in range(50)]
k.kafka_produce(kafka_cluster, topic_name, messages)

# Wait for consumption and stall
instance.wait_for_log_line(f"{kafka_table}.*Committed offset 50")

# Wait a bit for stream to stall and log the reschedule message
time.sleep(2.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this, since wait_for_log_line should have retries that are greater then 2 secs in total


# Check that the log shows the correct reschedule interval
instance.wait_for_log_line(f"{kafka_table}.*Rescheduling for {test_reschedule_ms} ms")

# Verify messages consumed
result = int(instance.query(f"SELECT count() FROM test.{kafka_table}_destination"))
assert result == 50