-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Make KAFKA_RESCHEDULE_MS a Kafka table setting #90112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,164 @@ | ||||||
| """Long running tests, longer than 30 seconds""" | ||||||
|
|
||||||
| from helpers.kafka.common_direct import * | ||||||
| import helpers.kafka.common as k | ||||||
|
|
||||||
| cluster = ClickHouseCluster(__file__) | ||||||
| instance = cluster.add_instance( | ||||||
| "instance", | ||||||
| main_configs=["configs/kafka.xml", "configs/named_collection.xml"], | ||||||
| user_configs=["configs/users.xml"], | ||||||
| with_kafka=True, | ||||||
| with_zookeeper=True, # For Replicated Table | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| macros={ | ||||||
| "kafka_broker": "kafka1", | ||||||
| "kafka_topic_old": k.KAFKA_TOPIC_OLD, | ||||||
| "kafka_group_name_old": k.KAFKA_CONSUMER_GROUP_OLD, | ||||||
| "kafka_topic_new": k.KAFKA_TOPIC_NEW, | ||||||
| "kafka_group_name_new": k.KAFKA_CONSUMER_GROUP_NEW, | ||||||
| "kafka_client_id": "instance", | ||||||
| "kafka_format_json_each_row": "JSONEachRow", | ||||||
| }, | ||||||
| clickhouse_path_dir="clickhouse_path", | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| # Fixtures | ||||||
| @pytest.fixture(scope="module") | ||||||
| def kafka_cluster(): | ||||||
| try: | ||||||
| cluster.start() | ||||||
| kafka_id = instance.cluster.kafka_docker_id | ||||||
| print(("kafka_id is {}".format(kafka_id))) | ||||||
| yield cluster | ||||||
| finally: | ||||||
| cluster.shutdown() | ||||||
|
|
||||||
|
|
||||||
| @pytest.fixture(autouse=True) | ||||||
| def kafka_setup_teardown(): | ||||||
| instance.query("DROP DATABASE IF EXISTS test SYNC; CREATE DATABASE test;") | ||||||
| admin_client = k.get_admin_client(cluster) | ||||||
|
|
||||||
| def get_topics_to_delete(): | ||||||
| return [t for t in admin_client.list_topics() if not t.startswith("_")] | ||||||
|
|
||||||
| topics = get_topics_to_delete() | ||||||
| logging.debug(f"Deleting topics: {topics}") | ||||||
| result = admin_client.delete_topics(topics) | ||||||
| for topic, error in result.topic_error_codes: | ||||||
| if error != 0: | ||||||
| logging.warning(f"Received error {error} while deleting topic {topic}") | ||||||
| else: | ||||||
| logging.info(f"Deleted topic {topic}") | ||||||
|
|
||||||
| retries = 0 | ||||||
| topics = get_topics_to_delete() | ||||||
| while len(topics) != 0: | ||||||
| logging.info(f"Existing topics: {topics}") | ||||||
| if retries >= 5: | ||||||
| raise Exception(f"Failed to delete topics {topics}") | ||||||
| retries += 1 | ||||||
| time.sleep(0.5) | ||||||
| yield # run test | ||||||
|
|
||||||
|
|
||||||
| # Tests | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.parametrize( | ||||||
| "create_query_generator", | ||||||
| [k.generate_old_create_table_query, k.generate_new_create_table_query], | ||||||
| ) | ||||||
| def test_bad_reschedule(kafka_cluster, create_query_generator): | ||||||
| suffix = k.random_string(6) | ||||||
| kafka_table = f"kafka_{suffix}" | ||||||
|
|
||||||
| topic_name = "test_bad_reschedule" + k.get_topic_postfix(create_query_generator) | ||||||
|
|
||||||
| messages = [json.dumps({"key": j + 1, "value": j + 1}) for j in range(20000)] | ||||||
| k.kafka_produce(kafka_cluster, topic_name, messages) | ||||||
|
|
||||||
| create_query = create_query_generator( | ||||||
| kafka_table, | ||||||
| "key UInt64, value UInt64", | ||||||
| topic_list=topic_name, | ||||||
| consumer_group=topic_name, | ||||||
| settings={ | ||||||
| "kafka_max_block_size": 1000, | ||||||
| "kafka_flush_interval_ms": 1000, | ||||||
| }, | ||||||
| ) | ||||||
| instance.query( | ||||||
| f""" | ||||||
| {create_query}; | ||||||
| CREATE MATERIALIZED VIEW test.{kafka_table}_destination ENGINE=MergeTree ORDER BY tuple() AS | ||||||
| SELECT | ||||||
| key, | ||||||
| now() as consume_ts, | ||||||
| value, | ||||||
| _topic, | ||||||
| _key, | ||||||
| _offset, | ||||||
| _partition, | ||||||
| _timestamp | ||||||
| FROM test.{kafka_table}; | ||||||
| """ | ||||||
| ) | ||||||
|
|
||||||
| instance.wait_for_log_line(f"{kafka_table}.*Committed offset 20000") | ||||||
|
|
||||||
| logging.debug("Timestamps: %s", instance.query(f"SELECT max(consume_ts), min(consume_ts) FROM test.{kafka_table}_destination")) | ||||||
| assert int(instance.query(f"SELECT max(consume_ts) - min(consume_ts) FROM test.{kafka_table}_destination")) < 8 | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this test is about? Why 8?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, this should be removed. I used |
||||||
|
|
||||||
|
|
||||||
| @pytest.mark.parametrize( | ||||||
| "create_query_generator", | ||||||
| [k.generate_old_create_table_query, k.generate_new_create_table_query], | ||||||
| ) | ||||||
| def test_kafka_consumer_reschedule_logging(kafka_cluster, create_query_generator): | ||||||
| """ | ||||||
| Introspect that kafka_consumer_reschedule_ms timing. | ||||||
| """ | ||||||
| suffix = k.random_string(6) | ||||||
| kafka_table = f"kafka_reschedule_log_{suffix}" | ||||||
| topic_name = f"test_reschedule_logging_{suffix}" | ||||||
|
|
||||||
| test_reschedule_ms = 250 | ||||||
|
|
||||||
| create_query = create_query_generator( | ||||||
| kafka_table, | ||||||
| "key UInt64, value UInt64", | ||||||
| topic_list=topic_name, | ||||||
| consumer_group=topic_name, | ||||||
| settings={ | ||||||
| "kafka_max_block_size": 100, | ||||||
| "kafka_flush_interval_ms": 100, | ||||||
| "kafka_consumer_reschedule_ms": test_reschedule_ms, | ||||||
| }, | ||||||
| ) | ||||||
|
|
||||||
| instance.query(f""" | ||||||
| {create_query}; | ||||||
| CREATE MATERIALIZED VIEW test.{kafka_table}_destination | ||||||
| ENGINE=MergeTree ORDER BY tuple() AS | ||||||
| SELECT key, value FROM test.{kafka_table}; | ||||||
| """) | ||||||
|
|
||||||
| messages = [json.dumps({"key": j, "value": j}) for j in range(50)] | ||||||
| k.kafka_produce(kafka_cluster, topic_name, messages) | ||||||
|
|
||||||
| # Wait for consumption and stall | ||||||
| instance.wait_for_log_line(f"{kafka_table}.*Committed offset 50") | ||||||
|
|
||||||
| # Wait a bit for stream to stall and log the reschedule message | ||||||
| time.sleep(2.0) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we can remove this, since |
||||||
|
|
||||||
| # Check that the log shows the correct reschedule interval | ||||||
| instance.wait_for_log_line(f"{kafka_table}.*Rescheduling for {test_reschedule_ms} ms") | ||||||
|
|
||||||
| # Verify messages consumed | ||||||
| result = int(instance.query(f"SELECT count() FROM test.{kafka_table}_destination")) | ||||||
| assert result == 50 | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.