+
Skip to content

Conversation

MikhailBurdukov
Copy link
Contributor

@MikhailBurdukov MikhailBurdukov commented Mar 28, 2025

Im pretty sure that the logic will be never merged but it looks like useful script to migrate cluster from one zk path to another one.

another great feature that it is resumable, so if smth goes wrong you can resume the execution.

Summary by Sourcery

Add a zero-copy migration script to the chadmin CLI that supports detaching tables, cleaning ZooKeeper nodes, restarting ClickHouse, and restoring replicas in parallel, with progress persisted to a status file for resumability.

New Features:

  • Add a zero-copy migration CLI group with a resumable “migrate” command for migrating replicated tables between ZooKeeper paths
  • Persist migration progress in a YAML status file to allow interruption and resumption of the migration workflow

Enhancements:

  • Extend execute_tasks_in_parallel to accept an optional callback for tracking task completion and updating migration status

@Alex-Burmak
Copy link
Member

@sourcery-ai review

Copy link
Contributor

sourcery-ai bot commented May 23, 2025

Reviewer's Guide

This PR integrates a resumable zero-copy migration workflow into the chadmin CLI by adding a new command group that orchestrates table detachment, ZK cleanup, server restart, and parallel replica restoration using a status file and enhanced process pool callbacks.

Sequence Diagram for Parallel Replica Restoration and Status Update

sequenceDiagram
    participant CLI as "migrate command\n(via restore_replica_step)"
    participant ETP as "execute_tasks_in_parallel()"
    participant WT1 as "WorkerTask 1\n(restore_replica for Table A)"
    participant WT2 as "WorkerTask 2\n(restore_replica for Table B)"
    participant CB as "callback_update_status_file()"
    participant SF as "StatusFile.yaml"

    CLI ->> ETP: Submit tasks [TableA_task, TableB_task, ...], callback_update_status_file
    activate ETP
    ETP ->> WT1: Run target_func(restore_replica)
    activate WT1
    ETP ->> WT2: Run target_func(restore_replica)
    activate WT2

    WT1 -->> ETP: Result for Table A (success/failure)
    deactivate WT1
    ETP ->> CB: callback(TableA_ID)
    activate CB
    CB ->> SF: Update status of Table A to RESTORED
    deactivate CB

    WT2 -->> ETP: Result for Table B (success/failure)
    deactivate WT2
    ETP ->> CB: callback(TableB_ID)
    activate CB
    CB ->> SF: Update status of Table B to RESTORED
    deactivate CB

    ETP -->> CLI: All tasks completed, results returned
    deactivate ETP
Loading

Class Diagram for Zero-Copy Migration Components

classDiagram
    class STATUS {
        <<enumeration>>
        INIT
        DETACHED
        ZK_CLEANED
        RESTORED
    }
    class TableMeta {
        +db: str
        +table: str
        +status: STATUS
        +zk_path: str
    }
    TableMeta *-- STATUS : uses

    class ZeroCopyMigrationCLI {
        <<CLI Group>>
        +migrate(ctx, status_file_path, dry_run, do_restore)
        .. internal functions ..
        #generate_status_file(ctx, status_file_path)
        #load_statuses(ctx, status_file_path)
        #update_status_file(status_file_path, tables_stats)
        #detach_tables(ctx, tables_stat, status_file_path, dry_run)
        #remove_zk_nodes(ctx, tables_stat, status_file_path, dry_run)
        #restart_clickhouse_server(ctx, tables_stat, dry_run)
        #restore_replica_step(ctx, tables_stat, status_file_path, dry_run)
    }
    ZeroCopyMigrationCLI ..> TableMeta : manages
    ZeroCopyMigrationCLI ..> STATUS : manages state via TableMeta

    class ProcessPool {
        <<module>>
        +execute_tasks_in_parallel(tasks: List~WorkerTask~, max_workers: int, keep_going: bool, callback: function) : Dict~str, Any~
    }
    class WorkerTask {
        +identifier: str
        +target_func: function
        +args: dict
    }
    ProcessPool o-- WorkerTask : executes
    ZeroCopyMigrationCLI ..> ProcessPool : uses for restore_replica_step
Loading

Flow Diagram for the Zero-Copy Migration Process

graph TD
    Start["User executes 'migrate' command"] --> CheckStatusFile{"Status file exists?"};
    CheckStatusFile -- No --> GenerateStatusFile["generate_status_file()\n(All tables: INIT)"];
    GenerateStatusFile --> LoadStatusFile;
    CheckStatusFile -- Yes --> LoadStatusFile["load_statuses()"];

    LoadStatusFile --> CheckRestoreFlag{"--do-restore specified?"};

    CheckRestoreFlag -- No --> DetachPhase["1. Detach Tables (INIT -> DETACHED)"];
    DetachPhase --> UpdateStatusFile1["Update Status File"];
    UpdateStatusFile1 --> ZKCleanPhase["2. Clean ZooKeeper Nodes (DETACHED -> ZK_CLEANED)"];
    ZKCleanPhase --> UpdateStatusFile2["Update Status File"];
    UpdateStatusFile2 --> RestartCH["3. Restart ClickHouse Server (if needed)"];
    RestartCH --> EndPreRestore["Pre-Restore Steps Complete"];

    CheckRestoreFlag -- Yes --> RestorePhase["4. Restore Replicas (ZK_CLEANED -> RESTORED)\n(Uses execute_tasks_in_parallel with callback)"];
    RestorePhase --> UpdateStatusFile3["Update Status File (via callback)"];
    UpdateStatusFile3 --> EndRestore["Restore Steps Complete"];
Loading

File-Level Changes

Change Details Files
Enhanced parallel task execution with callback support
  • Added optional callback parameter to execute_tasks_in_parallel signature
  • Invoke callback(idf) after each task completes successfully
ch_tools/common/process_pool.py
Registered zero-copy migration in the main CLI
  • Imported zero_copy_migration_group into the CLI entrypoint
  • Appended zero_copy_migration_group to the commands list
ch_tools/chadmin/chadmin_cli.py
Added a new zero-copy migration CLI group with resumable workflow
  • Defined zero_copy_migration_group and migrate subcommand using click
  • Implemented status file generation, loading, and updates via YAML
  • Built multi-step migration: detach tables, delete ZK nodes, restart server, and restore replicas
  • Leveraged execute_tasks_in_parallel for parallel replica restoration with progress callbacks
ch_tools/chadmin/cli/zero_copy_migration_group.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @MikhailBurdukov - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • time.sleep() call; did you mean to leave this in? (link)
  • Escape or validate identifiers when building SQL to avoid injection (link)
Here's what I looked at during the review
  • 🟡 General issues: 7 issues found
  • 🔴 Security: 1 blocking issue
  • 🟢 Review instructions: all looks good
  • 🟢 Testing: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 15 to 17
def execute_tasks_in_parallel(
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False, callback = None,
) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Add explicit type annotation for the callback parameter

Specify the callback parameter type as Optional[Callable[[str], None]] = None and adjust trailing commas and spacing for consistency.

Suggested change
def execute_tasks_in_parallel(
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False
tasks: List[WorkerTask], max_workers: int = 4, keep_going: bool = False, callback = None,
) -> Dict[str, Any]:
from typing import Callable, Optional
def execute_tasks_in_parallel(
tasks: List[WorkerTask],
max_workers: int = 4,
keep_going: bool = False,
callback: Optional[Callable[[str], None]] = None,
) -> Dict[str, Any]:

Comment on lines +34 to +35
if callback:
callback(idf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Wrap callback invocation in try/except to guard against callback errors

Catching exceptions from the callback will prevent a single callback failure from interrupting the processing of other futures.

Suggested change
if callback:
callback(idf)
if callback:
try:
callback(idf)
except Exception as cb_exc:
logging.warning("Exception in callback for idf %r: %r", idf, cb_exc)

Comment on lines +102 to +105
def migrate(ctx, status_file_path, dry_run, do_restore):
print(Path(status_file_path).exists())
if not Path(status_file_path).exists():
generate_status_file(ctx, status_file_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Remove debug print and use structured logging

Replace the print statement with logging.debug, or remove it after validation to avoid cluttering stdout.

Suggested change
def migrate(ctx, status_file_path, dry_run, do_restore):
print(Path(status_file_path).exists())
if not Path(status_file_path).exists():
generate_status_file(ctx, status_file_path)
import logging
def migrate(ctx, status_file_path, dry_run, do_restore):
logging.debug("Status file exists: %s", Path(status_file_path).exists())
if not Path(status_file_path).exists():
generate_status_file(ctx, status_file_path)

"--status-file-path",
"status_file_path",
default=DEFAULT_STATUS_PATH,
help="Additional check: specified COLUMN name should exists in data to be removed. Example: `initial_query_start_time_microseconds.bin` for `query_log`-table.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Clarify help text for --status-file-path; current message is unrelated

Update the help text to accurately describe what --status-file-path does and how it should be used.

Comment on lines +88 to +94
@option(
"--not-dry-run",
"dry_run",
is_flag=True,
default=True,
help="Get merges from all hosts in the cluster.",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Rename or simplify the dry-run flag to avoid double negation

Use Click's boolean flag syntax (e.g., --dry-run/--no-dry-run) or rename the flag so its name directly reflects its boolean value.

Suggested change
@option(
"--not-dry-run",
"dry_run",
is_flag=True,
default=True,
help="Get merges from all hosts in the cluster.",
)
@option(
"--dry-run/--no-dry-run",
"dry_run",
default=True,
help="Run in dry-run mode (no changes will be made). Use --no-dry-run to apply changes.",
)

if tables_stat[index].status == STATUS.INIT:
execute_query_with_retry(
ctx,
query=f"DETACH TABLE `{tables_stat[index].db}`.`{tables_stat[index].table}`",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Escape or validate identifiers when building SQL to avoid injection

Directly interpolating identifiers can cause SQL injection or syntax issues. Use proper validation or quoting methods provided by your database driver.

Comment on lines +182 to +192
for _ in range(RETRY_COUNT):
try:
execute_query(
ctx,
query,
timeout=QUERY_TIMEOUT,
dry_run=dry_run,
)
return
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Avoid silent failures in retry loop; log and add backoff

Log the exception and add a short delay before retrying to prevent silent errors and rapid retry loops.

Suggested change
for _ in range(RETRY_COUNT):
try:
execute_query(
ctx,
query,
timeout=QUERY_TIMEOUT,
dry_run=dry_run,
)
return
except Exception:
pass
import logging
import time
for _ in range(RETRY_COUNT):
try:
execute_query(
ctx,
query,
timeout=QUERY_TIMEOUT,
dry_run=dry_run,
)
return
except Exception as e:
logging.exception("Exception occurred during query execution, retrying...")
time.sleep(1)

Comment on lines +217 to +239
def restart_clickhouse_server(ctx, tables_stat, dry_run=True):
logging.info("Restart clickhouse step")
for table in tables_stat:
if table.status == STATUS.ZK_CLEANED:
restart_cmd = "service clickhouse-server restart"
logging.info(restart_cmd)
if dry_run:
return

subprocess.run(restart_cmd, shell=True)

deadline = time.time() + QUERY_TIMEOUT

ch_is_alive = False
while time.time() < deadline:
if is_clickhouse_alive():
ch_is_alive = True
break
time.sleep(1)
if not ch_is_alive:
logging.error("CH is dead")
exit(1)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Refactor server restart to occur once, not per table

Currently, the server restarts and exits after processing the first ZK_CLEANED table. Move the restart logic outside the loop to ensure it runs only once for all relevant tables.

Suggested change
def restart_clickhouse_server(ctx, tables_stat, dry_run=True):
logging.info("Restart clickhouse step")
for table in tables_stat:
if table.status == STATUS.ZK_CLEANED:
restart_cmd = "service clickhouse-server restart"
logging.info(restart_cmd)
if dry_run:
return
subprocess.run(restart_cmd, shell=True)
deadline = time.time() + QUERY_TIMEOUT
ch_is_alive = False
while time.time() < deadline:
if is_clickhouse_alive():
ch_is_alive = True
break
time.sleep(1)
if not ch_is_alive:
logging.error("CH is dead")
exit(1)
return
def restart_clickhouse_server(ctx, tables_stat, dry_run=True):
logging.info("Restart clickhouse step")
# Check if any table needs restart
needs_restart = any(table.status == STATUS.ZK_CLEANED for table in tables_stat)
if not needs_restart:
return
restart_cmd = "service clickhouse-server restart"
logging.info(restart_cmd)
if dry_run:
return
subprocess.run(restart_cmd, shell=True)
deadline = time.time() + QUERY_TIMEOUT
ch_is_alive = False
while time.time() < deadline:
if is_clickhouse_alive():
ch_is_alive = True
break
time.sleep(1)
if not ch_is_alive:
logging.error("CH is dead")
exit(1)
return

if is_clickhouse_alive():
ch_is_alive = True
break
time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (arbitrary-sleep): time.sleep() call; did you mean to leave this in?

Source: opengrep

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载