-
Notifications
You must be signed in to change notification settings - Fork 10
zero-copy migration script #306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@sourcery-ai review |
Reviewer's GuideThis PR integrates a resumable zero-copy migration workflow into the chadmin CLI by adding a new command group that orchestrates table detachment, ZK cleanup, server restart, and parallel replica restoration using a status file and enhanced process pool callbacks. Sequence Diagram for Parallel Replica Restoration and Status UpdatesequenceDiagram
participant CLI as "migrate command\n(via restore_replica_step)"
participant ETP as "execute_tasks_in_parallel()"
participant WT1 as "WorkerTask 1\n(restore_replica for Table A)"
participant WT2 as "WorkerTask 2\n(restore_replica for Table B)"
participant CB as "callback_update_status_file()"
participant SF as "StatusFile.yaml"
CLI ->> ETP: Submit tasks [TableA_task, TableB_task, ...], callback_update_status_file
activate ETP
ETP ->> WT1: Run target_func(restore_replica)
activate WT1
ETP ->> WT2: Run target_func(restore_replica)
activate WT2
WT1 -->> ETP: Result for Table A (success/failure)
deactivate WT1
ETP ->> CB: callback(TableA_ID)
activate CB
CB ->> SF: Update status of Table A to RESTORED
deactivate CB
WT2 -->> ETP: Result for Table B (success/failure)
deactivate WT2
ETP ->> CB: callback(TableB_ID)
activate CB
CB ->> SF: Update status of Table B to RESTORED
deactivate CB
ETP -->> CLI: All tasks completed, results returned
deactivate ETP
Class Diagram for Zero-Copy Migration ComponentsclassDiagram
class STATUS {
<<enumeration>>
INIT
DETACHED
ZK_CLEANED
RESTORED
}
class TableMeta {
+db: str
+table: str
+status: STATUS
+zk_path: str
}
TableMeta *-- STATUS : uses
class ZeroCopyMigrationCLI {
<<CLI Group>>
+migrate(ctx, status_file_path, dry_run, do_restore)
.. internal functions ..
#generate_status_file(ctx, status_file_path)
#load_statuses(ctx, status_file_path)
#update_status_file(status_file_path, tables_stats)
#detach_tables(ctx, tables_stat, status_file_path, dry_run)
#remove_zk_nodes(ctx, tables_stat, status_file_path, dry_run)
#restart_clickhouse_server(ctx, tables_stat, dry_run)
#restore_replica_step(ctx, tables_stat, status_file_path, dry_run)
}
ZeroCopyMigrationCLI ..> TableMeta : manages
ZeroCopyMigrationCLI ..> STATUS : manages state via TableMeta
class ProcessPool {
<<module>>
+execute_tasks_in_parallel(tasks: List~WorkerTask~, max_workers: int, keep_going: bool, callback: function) : Dict~str, Any~
}
class WorkerTask {
+identifier: str
+target_func: function
+args: dict
}
ProcessPool o-- WorkerTask : executes
ZeroCopyMigrationCLI ..> ProcessPool : uses for restore_replica_step
Flow Diagram for the Zero-Copy Migration Processgraph TD
Start["User executes 'migrate' command"] --> CheckStatusFile{"Status file exists?"};
CheckStatusFile -- No --> GenerateStatusFile["generate_status_file()\n(All tables: INIT)"];
GenerateStatusFile --> LoadStatusFile;
CheckStatusFile -- Yes --> LoadStatusFile["load_statuses()"];
LoadStatusFile --> CheckRestoreFlag{"--do-restore specified?"};
CheckRestoreFlag -- No --> DetachPhase["1. Detach Tables (INIT -> DETACHED)"];
DetachPhase --> UpdateStatusFile1["Update Status File"];
UpdateStatusFile1 --> ZKCleanPhase["2. Clean ZooKeeper Nodes (DETACHED -> ZK_CLEANED)"];
ZKCleanPhase --> UpdateStatusFile2["Update Status File"];
UpdateStatusFile2 --> RestartCH["3. Restart ClickHouse Server (if needed)"];
RestartCH --> EndPreRestore["Pre-Restore Steps Complete"];
CheckRestoreFlag -- Yes --> RestorePhase["4. Restore Replicas (ZK_CLEANED -> RESTORED)\n(Uses execute_tasks_in_parallel with callback)"];
RestorePhase --> UpdateStatusFile3["Update Status File (via callback)"];
UpdateStatusFile3 --> EndRestore["Restore Steps Complete"];
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @MikhailBurdukov - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- time.sleep() call; did you mean to leave this in? (link)
- Escape or validate identifiers when building SQL to avoid injection (link)
Here's what I looked at during the review
- 🟡 General issues: 7 issues found
- 🔴 Security: 1 blocking issue
- 🟢 Review instructions: all looks good
- 🟢 Testing: all looks good
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
def execute_tasks_in_parallel( | ||
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False | ||
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False, callback = None, | ||
) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Add explicit type annotation for the callback parameter
Specify the callback parameter type as Optional[Callable[[str], None]] = None and adjust trailing commas and spacing for consistency.
def execute_tasks_in_parallel( | |
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False | |
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False, callback = None, | |
) -> Dict[str, Any]: | |
from typing import Callable, Optional | |
def execute_tasks_in_parallel( | |
tasks: List[WorkerTask], | |
max_workers: int = 4, | |
keep_going: bool = False, | |
callback: Optional[Callable[[str], None]] = None, | |
) -> Dict[str, Any]: |
if callback: | ||
callback(idf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Wrap callback invocation in try/except to guard against callback errors
Catching exceptions from the callback will prevent a single callback failure from interrupting the processing of other futures.
if callback: | |
callback(idf) | |
if callback: | |
try: | |
callback(idf) | |
except Exception as cb_exc: | |
logging.warning("Exception in callback for idf %r: %r", idf, cb_exc) |
def migrate(ctx, status_file_path, dry_run, do_restore): | ||
print(Path(status_file_path).exists()) | ||
if not Path(status_file_path).exists(): | ||
generate_status_file(ctx, status_file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Remove debug print and use structured logging
Replace the print statement with logging.debug, or remove it after validation to avoid cluttering stdout.
def migrate(ctx, status_file_path, dry_run, do_restore): | |
print(Path(status_file_path).exists()) | |
if not Path(status_file_path).exists(): | |
generate_status_file(ctx, status_file_path) | |
import logging | |
def migrate(ctx, status_file_path, dry_run, do_restore): | |
logging.debug("Status file exists: %s", Path(status_file_path).exists()) | |
if not Path(status_file_path).exists(): | |
generate_status_file(ctx, status_file_path) |
"--status-file-path", | ||
"status_file_path", | ||
default=DEFAULT_STATUS_PATH, | ||
help="Additional check: specified COLUMN name should exists in data to be removed. Example: `initial_query_start_time_microseconds.bin` for `query_log`-table.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: Clarify help text for --status-file-path; current message is unrelated
Update the help text to accurately describe what --status-file-path does and how it should be used.
@option( | ||
"--not-dry-run", | ||
"dry_run", | ||
is_flag=True, | ||
default=True, | ||
help="Get merges from all hosts in the cluster.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Rename or simplify the dry-run flag to avoid double negation
Use Click's boolean flag syntax (e.g., --dry-run/--no-dry-run) or rename the flag so its name directly reflects its boolean value.
@option( | |
"--not-dry-run", | |
"dry_run", | |
is_flag=True, | |
default=True, | |
help="Get merges from all hosts in the cluster.", | |
) | |
@option( | |
"--dry-run/--no-dry-run", | |
"dry_run", | |
default=True, | |
help="Run in dry-run mode (no changes will be made). Use --no-dry-run to apply changes.", | |
) |
if tables_stat[index].status == STATUS.INIT: | ||
execute_query_with_retry( | ||
ctx, | ||
query=f"DETACH TABLE `{tables_stat[index].db}`.`{tables_stat[index].table}`", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 issue (security): Escape or validate identifiers when building SQL to avoid injection
Directly interpolating identifiers can cause SQL injection or syntax issues. Use proper validation or quoting methods provided by your database driver.
for _ in range(RETRY_COUNT): | ||
try: | ||
execute_query( | ||
ctx, | ||
query, | ||
timeout=QUERY_TIMEOUT, | ||
dry_run=dry_run, | ||
) | ||
return | ||
except Exception: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Avoid silent failures in retry loop; log and add backoff
Log the exception and add a short delay before retrying to prevent silent errors and rapid retry loops.
for _ in range(RETRY_COUNT): | |
try: | |
execute_query( | |
ctx, | |
query, | |
timeout=QUERY_TIMEOUT, | |
dry_run=dry_run, | |
) | |
return | |
except Exception: | |
pass | |
import logging | |
import time | |
for _ in range(RETRY_COUNT): | |
try: | |
execute_query( | |
ctx, | |
query, | |
timeout=QUERY_TIMEOUT, | |
dry_run=dry_run, | |
) | |
return | |
except Exception as e: | |
logging.exception("Exception occurred during query execution, retrying...") | |
time.sleep(1) |
def restart_clickhouse_server(ctx, tables_stat, dry_run=True): | ||
logging.info("Restart clickhouse step") | ||
for table in tables_stat: | ||
if table.status == STATUS.ZK_CLEANED: | ||
restart_cmd = "service clickhouse-server restart" | ||
logging.info(restart_cmd) | ||
if dry_run: | ||
return | ||
|
||
subprocess.run(restart_cmd, shell=True) | ||
|
||
deadline = time.time() + QUERY_TIMEOUT | ||
|
||
ch_is_alive = False | ||
while time.time() < deadline: | ||
if is_clickhouse_alive(): | ||
ch_is_alive = True | ||
break | ||
time.sleep(1) | ||
if not ch_is_alive: | ||
logging.error("CH is dead") | ||
exit(1) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Refactor server restart to occur once, not per table
Currently, the server restarts and exits after processing the first ZK_CLEANED table. Move the restart logic outside the loop to ensure it runs only once for all relevant tables.
def restart_clickhouse_server(ctx, tables_stat, dry_run=True): | |
logging.info("Restart clickhouse step") | |
for table in tables_stat: | |
if table.status == STATUS.ZK_CLEANED: | |
restart_cmd = "service clickhouse-server restart" | |
logging.info(restart_cmd) | |
if dry_run: | |
return | |
subprocess.run(restart_cmd, shell=True) | |
deadline = time.time() + QUERY_TIMEOUT | |
ch_is_alive = False | |
while time.time() < deadline: | |
if is_clickhouse_alive(): | |
ch_is_alive = True | |
break | |
time.sleep(1) | |
if not ch_is_alive: | |
logging.error("CH is dead") | |
exit(1) | |
return | |
def restart_clickhouse_server(ctx, tables_stat, dry_run=True): | |
logging.info("Restart clickhouse step") | |
# Check if any table needs restart | |
needs_restart = any(table.status == STATUS.ZK_CLEANED for table in tables_stat) | |
if not needs_restart: | |
return | |
restart_cmd = "service clickhouse-server restart" | |
logging.info(restart_cmd) | |
if dry_run: | |
return | |
subprocess.run(restart_cmd, shell=True) | |
deadline = time.time() + QUERY_TIMEOUT | |
ch_is_alive = False | |
while time.time() < deadline: | |
if is_clickhouse_alive(): | |
ch_is_alive = True | |
break | |
time.sleep(1) | |
if not ch_is_alive: | |
logging.error("CH is dead") | |
exit(1) | |
return |
if is_clickhouse_alive(): | ||
ch_is_alive = True | ||
break | ||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (arbitrary-sleep): time.sleep() call; did you mean to leave this in?
Source: opengrep
Im pretty sure that the logic will be never merged but it looks like useful script to migrate cluster from one zk path to another one.
another great feature that it is resumable, so if smth goes wrong you can resume the execution.
Summary by Sourcery
Add a zero-copy migration script to the chadmin CLI that supports detaching tables, cleaning ZooKeeper nodes, restarting ClickHouse, and restoring replicas in parallel, with progress persisted to a status file for resumability.
New Features:
Enhancements: