diff --git a/changelog.md b/changelog.md index 6edf29bf..197a7404 100644 --- a/changelog.md +++ b/changelog.md @@ -22,6 +22,7 @@ Internal -------- * Improve pull request template lint commands. * Complete typehinting the non-test codebase. +* Modernization: conversion to f-strings. 1.37.1 (2025/07/28) diff --git a/mycli/main.py b/mycli/main.py index 20ce3409..2fc36753 100755 --- a/mycli/main.py +++ b/mycli/main.py @@ -236,21 +236,21 @@ def register_special_commands(self) -> None: def change_table_format(self, arg: str, **_) -> Generator[tuple, None, None]: try: self.main_formatter.format_name = arg - yield (None, None, None, "Changed table format to {}".format(arg)) + yield (None, None, None, f"Changed table format to {arg}") except ValueError: - msg = "Table format {} not recognized. Allowed formats:".format(arg) + msg = f"Table format {arg} not recognized. Allowed formats:" for table_type in self.main_formatter.supported_formats: - msg += "\n\t{}".format(table_type) + msg += f"\n\t{table_type}" yield (None, None, None, msg) def change_redirect_format(self, arg: str, **_) -> Generator[tuple, None, None]: try: self.redirect_formatter.format_name = arg - yield (None, None, None, "Changed redirect format to {}".format(arg)) + yield (None, None, None, f"Changed redirect format to {arg}") except ValueError: - msg = "Redirect format {} not recognized. Allowed formats:".format(arg) + msg = f"Redirect format {arg} not recognized. Allowed formats:" for table_type in self.redirect_formatter.supported_formats: - msg += "\n\t{}".format(table_type) + msg += f"\n\t{table_type}" yield (None, None, None, msg) def change_db(self, arg: str, **_) -> Generator[tuple, None, None]: @@ -265,7 +265,12 @@ def change_db(self, arg: str, **_) -> Generator[tuple, None, None]: assert isinstance(self.sqlexecute, SQLExecute) self.sqlexecute.change_db(arg) - yield (None, None, None, 'You are now connected to database "%s" as user "%s"' % (self.sqlexecute.dbname, self.sqlexecute.user)) + yield ( + None, + None, + None, + f'You are now connected to database "{self.sqlexecute.dbname}" as user "{self.sqlexecute.user}"', + ) def execute_from_file(self, arg: str, **_) -> Iterable[tuple]: if not arg: @@ -293,7 +298,7 @@ def change_prompt_format(self, arg: str, **_) -> list[tuple]: return [(None, None, None, message)] self.prompt_format = self.get_prompt(arg) - return [(None, None, None, "Changed prompt format to %s" % arg)] + return [(None, None, None, f"Changed prompt format to {arg}")] def initialize_logging(self) -> None: log_file = os.path.expanduser(self.config["main"]["log_file"]) @@ -315,7 +320,7 @@ def initialize_logging(self) -> None: elif dir_path_exists(log_file): handler = logging.FileHandler(log_file) else: - self.echo('Error: Unable to open the log file "{}".'.format(log_file), err=True, fg="red") + self.echo(f'Error: Unable to open the log file "{log_file}".', err=True, fg="red") return formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) %(name)s %(levelname)s - %(message)s") @@ -523,7 +528,7 @@ def _connect() -> None: self.logger.debug("Database connection failed: %r.", e) self.logger.error("traceback: %r", traceback.format_exc()) self.logger.debug("Retrying over TCP/IP") - self.echo("Failed to connect to local MySQL server through socket '{}':".format(socket)) + self.echo(f"Failed to connect to local MySQL server through socket '{socket}':") self.echo(str(e), err=True) self.echo("Retrying over TCP/IP", err=True) @@ -542,7 +547,7 @@ def _connect() -> None: try: port = int(port) except ValueError: - self.echo("Error: Invalid port number: '{0}'.".format(port), err=True, fg="red") + self.echo(f"Error: Invalid port number: '{port}'.", err=True, fg="red") sys.exit(1) _connect() @@ -664,7 +669,7 @@ def run_cli(self) -> None: else: history = None self.echo( - 'Error: Unable to open the history file "{}". Your query history will not be saved.'.format(history_file), + f'Error: Unable to open the history file "{history_file}". Your query history will not be saved.', err=True, fg="red", ) @@ -712,7 +717,7 @@ def output_res(res: Generator[tuple], start: float) -> None: threshold = 1000 if is_select(status) and cur and cur.rowcount > threshold: self.echo( - "The result set has more than {} rows.".format(threshold), + f"The result set has more than {threshold} rows.", fg="red", ) if not confirm("Do you want to continue?"): @@ -747,8 +752,8 @@ def output_res(res: Generator[tuple], start: float) -> None: if self.beep_after_seconds > 0 and t >= self.beep_after_seconds: self.bell() if special.is_timing_enabled(): - self.echo("Time: %0.03fs" % t) - self.echo("Time: %0.03fs" % t) + self.echo(f"Time: {t:0.03f}s") + self.echo(f"Time: {t:0.03f}s") except KeyboardInterrupt: pass @@ -840,7 +845,7 @@ def one_iteration(text: str | None = None) -> None: special.write_tee(self.get_prompt(self.prompt_format) + text) if self.logfile: - self.logfile.write("\n# %s\n" % datetime.now()) + self.logfile.write(f"\n# {datetime.now()}\n") self.logfile.write(text) self.logfile.write("\n") @@ -864,7 +869,7 @@ def one_iteration(text: str | None = None) -> None: # Restart connection to the database sqlexecute.connect() try: - for title, cur, headers, status in sqlexecute.run("kill %s" % connection_id_to_kill): + for title, cur, headers, status in sqlexecute.run(f"kill {connection_id_to_kill}"): status_str = str(status).lower() if status_str.find("ok") > -1: logger.debug("cancelled query, connection id: %r, sql: %r", connection_id_to_kill, text) @@ -877,7 +882,7 @@ def one_iteration(text: str | None = None) -> None: ) self.echo(f"Failed to confirm query cancellation, id: {connection_id_to_kill}", err=True, fg="red") except Exception as e: - self.echo("Encountered error while cancelling query: {}".format(e), err=True, fg="red") + self.echo(f"Encountered error while cancelling query: {e}", err=True, fg="red") else: logger.debug("Did not get a connection id, skip cancelling query") self.echo("Did not get a connection id, skip cancelling query", err=True, fg="red") @@ -1277,7 +1282,7 @@ def get_last_query(self) -> str | None: @click.option("-d", "--dsn", default="", envvar="DSN", help="Use DSN configured into the [alias_dsn] section of myclirc file.") @click.option("--list-dsn", "list_dsn", is_flag=True, help="list of DSN configured into the [alias_dsn] section of myclirc file.") @click.option("--list-ssh-config", "list_ssh_config", is_flag=True, help="list ssh configurations in the ssh config (requires paramiko).") -@click.option("-R", "--prompt", "prompt", help='Prompt format (Default: "{0}").'.format(MyCli.default_prompt)) +@click.option("-R", "--prompt", "prompt", help=f'Prompt format (Default: "{MyCli.default_prompt}").') @click.option("-l", "--logfile", type=click.File(mode="a", encoding="utf-8"), help="Log every query and its results to a file.") @click.option("--defaults-group-suffix", type=str, help="Read MySQL config groups with the specified suffix.") @click.option("--defaults-file", type=click.Path(), help="Only read MySQL options from the given file.") @@ -1372,7 +1377,7 @@ def cli( sys.exit(1) for alias, value in alias_dsn.items(): if verbose: - click.secho("{} : {}".format(alias, value)) + click.secho(f"{alias} : {value}") else: click.secho(alias) sys.exit(0) @@ -1381,7 +1386,7 @@ def cli( for host in ssh_config.get_hostnames(): if verbose: host_config = ssh_config.lookup(host) - click.secho("{} : {}".format(host, host_config.get("hostname"))) + click.secho(f"{host} : {host_config.get('hostname')}") else: click.secho(host) sys.exit(0) @@ -1525,7 +1530,7 @@ def cli( ) if combined_init_cmd: - click.echo("Executing init-command: %s" % combined_init_cmd, err=True) + click.echo(f"Executing init-command: {combined_init_cmd}", err=True) mycli.logger.debug("Launch Params: \n\tdatabase: %r\tuser: %r\thost: %r\tport: %r", database, user, host, port)