From ae0d4dd0675697d60f64db9f428faa9e6fb0d4ab Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 11:29:13 +0000 Subject: [PATCH 01/31] Moved all singularity-related download functions into a new file --- .gitignore | 1 - nf_core/pipelines/download.py | 632 +-------------------- nf_core/pipelines/downloads/__init__.py | 0 nf_core/pipelines/downloads/singularity.py | 584 +++++++++++++++++++ nf_core/pipelines/downloads/utils.py | 41 ++ tests/pipelines/test_download.py | 3 +- 6 files changed, 654 insertions(+), 607 deletions(-) create mode 100644 nf_core/pipelines/downloads/__init__.py create mode 100644 nf_core/pipelines/downloads/singularity.py create mode 100644 nf_core/pipelines/downloads/utils.py diff --git a/.gitignore b/.gitignore index 7fe467abc9..cdbbc0ac9e 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,6 @@ env/ build/ develop-eggs/ dist/ -downloads/ eggs/ .eggs/ lib64/ diff --git a/nf_core/pipelines/download.py b/nf_core/pipelines/download.py index 3d371ca681..2b03f65f8c 100644 --- a/nf_core/pipelines/download.py +++ b/nf_core/pipelines/download.py @@ -1,23 +1,20 @@ """Downloads a nf-core pipeline to the local file system.""" -import concurrent.futures import io import logging import os import re import shutil -import subprocess import tarfile import textwrap from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional from zipfile import ZipFile import git import questionary import requests -import requests_cache import rich import rich.progress from git.exc import GitCommandError, InvalidGitRepositoryError @@ -27,6 +24,8 @@ import nf_core.modules.modules_utils import nf_core.pipelines.list import nf_core.utils +from nf_core.pipelines.downloads.singularity import SingularityFetcher +from nf_core.pipelines.downloads.utils import DownloadError, DownloadProgress from nf_core.synced_repo import RemoteProgressbar, SyncedRepo from nf_core.utils import ( NFCORE_CACHE_DIR, @@ -43,46 +42,6 @@ ) -class DownloadError(RuntimeError): - """A custom exception that is raised when nf-core pipelines download encounters a problem that we already took into consideration. - In this case, we do not want to print the traceback, but give the user some concise, helpful feedback instead. - """ - - -class DownloadProgress(rich.progress.Progress): - """Custom Progress bar class, allowing us to have two progress - bars with different columns / layouts. - """ - - def get_renderables(self): - for task in self.tasks: - if task.fields.get("progress_type") == "summary": - self.columns = ( - "[magenta]{task.description}", - rich.progress.BarColumn(bar_width=None), - "[progress.percentage]{task.percentage:>3.0f}%", - "•", - "[green]{task.completed}/{task.total} completed", - ) - if task.fields.get("progress_type") == "download": - self.columns = ( - "[blue]{task.description}", - rich.progress.BarColumn(bar_width=None), - "[progress.percentage]{task.percentage:>3.1f}%", - "•", - rich.progress.DownloadColumn(), - "•", - rich.progress.TransferSpeedColumn(), - ) - if task.fields.get("progress_type") == "singularity_pull": - self.columns = ( - "[magenta]{task.description}", - "[blue]{task.fields[current_log]}", - rich.progress.BarColumn(bar_width=None), - ) - yield self.make_tasks_table([task]) - - class DownloadWorkflow: """Downloads a nf-core workflow from GitHub to the local file system. @@ -1100,49 +1059,6 @@ def gather_registries(self, workflow_directory: str) -> None: # add chttps://community-cr-prod.seqera.io/docker/registry/v2/ to the set to support the new Seqera Singularity container registry self.registry_set.add("community-cr-prod.seqera.io/docker/registry/v2") - def symlink_singularity_images(self, image_out_path: str) -> None: - """Create a symlink for each registry in the registry set that points to the image. - We have dropped the explicit registries from the modules in favor of the configurable registries. - Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. - - The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img - by prepending all registries in self.registry_set to the image name. - - Unfortunately, out output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org - or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped - before to ensure that it is really the base name. - """ - - if self.registry_set: - # Create a regex pattern from the set, in case trimming is needed. - trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) - - for registry in self.registry_set: - # Nextflow will convert it like this as well, so we need it mimic its behavior - registry = registry.replace("/", "-") - - if not bool(re.search(trim_pattern, os.path.basename(image_out_path))): - symlink_name = os.path.join("./", f"{registry}-{os.path.basename(image_out_path)}") - else: - trimmed_name = re.sub(f"{trim_pattern}", "", os.path.basename(image_out_path)) - symlink_name = os.path.join("./", f"{registry}-{trimmed_name}") - - symlink_full = os.path.join(os.path.dirname(image_out_path), symlink_name) - target_name = os.path.join("./", os.path.basename(image_out_path)) - - if not os.path.exists(symlink_full) and target_name != symlink_name: - os.makedirs(os.path.dirname(symlink_full), exist_ok=True) - image_dir = os.open(os.path.dirname(image_out_path), os.O_RDONLY) - try: - os.symlink( - target_name, - symlink_name, - dir_fd=image_dir, - ) - log.debug(f"Symlinked {target_name} as {symlink_name}.") - finally: - os.close(image_dir) - def get_singularity_images(self, current_revision: str = "") -> None: """Loop through container names and download Singularity images""" @@ -1153,6 +1069,23 @@ def get_singularity_images(self, current_revision: str = "") -> None: f"Processing workflow revision {current_revision}, found {len(self.containers)} container image{'s' if len(self.containers) > 1 else ''} in total." ) + if self.container_cache_utilisation in ["amend", "copy"]: + if os.environ.get("NXF_SINGULARITY_CACHEDIR"): + cache_path_dir = os.environ["NXF_SINGULARITY_CACHEDIR"] + if not os.path.isdir(cache_path_dir): + log.debug(f"Cache directory not found, creating: {cache_path_dir}") + os.makedirs(cache_path_dir) + else: + raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!") + + assert self.outdir + out_path_dir = os.path.abspath(os.path.join(self.outdir, "singularity-images")) + + # Check that the directories exist + if not os.path.isdir(out_path_dir): + log.debug(f"Output directory not found, creating: {out_path_dir}") + os.makedirs(out_path_dir) + with DownloadProgress() as progress: task = progress.add_task( "Collecting container images", @@ -1160,389 +1093,15 @@ def get_singularity_images(self, current_revision: str = "") -> None: progress_type="summary", ) - # Organise containers based on what we need to do with them - containers_exist: List[str] = [] - containers_cache: List[Tuple[str, str, Optional[str]]] = [] - containers_download: List[Tuple[str, str, Optional[str]]] = [] - containers_pull: List[Tuple[str, str, Optional[str]]] = [] - for container in self.containers: - # Fetch the output and cached filenames for this container - out_path, cache_path = self.singularity_image_filenames(container) - - # Check that the directories exist - out_path_dir = os.path.dirname(out_path) - if not os.path.isdir(out_path_dir): - log.debug(f"Output directory not found, creating: {out_path_dir}") - os.makedirs(out_path_dir) - if cache_path: - cache_path_dir = os.path.dirname(cache_path) - if not os.path.isdir(cache_path_dir): - log.debug(f"Cache directory not found, creating: {cache_path_dir}") - os.makedirs(cache_path_dir) - - # We already have the target file in place or in remote cache, return - if os.path.exists(out_path) or os.path.basename(out_path) in self.containers_remote: - containers_exist.append(container) - continue - - # We have a copy of this in the NXF_SINGULARITY_CACHE dir - if cache_path and os.path.exists(cache_path): - containers_cache.append((container, out_path, cache_path)) - continue - - # Direct download within Python - if container.startswith("http"): - containers_download.append((container, out_path, cache_path)) - continue - - # Pull using singularity - containers_pull.append((container, out_path, cache_path)) - - # Exit if we need to pull images and Singularity is not installed - if len(containers_pull) > 0: - if not (shutil.which("singularity") or shutil.which("apptainer")): - raise OSError( - "Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH" - ) - - if containers_exist: - if self.container_cache_index is not None: - log.info( - f"{len(containers_exist)} containers are already cached remotely and won't be retrieved." - ) - # Go through each method of fetching containers in order - for container in containers_exist: - progress.update(task, description="Image file exists at destination") - progress.update(task, advance=1) - - if containers_cache: - for container in containers_cache: - progress.update(task, description="Copying singularity images from cache") - self.singularity_copy_cache_image(*container) - progress.update(task, advance=1) - - if containers_download or containers_pull: - # if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. - with concurrent.futures.ThreadPoolExecutor(max_workers=self.parallel_downloads) as pool: - progress.update(task, description="Downloading singularity images") - - # Kick off concurrent downloads - future_downloads = [ - pool.submit(self.singularity_download_image, *containers, progress) - for containers in containers_download - ] - - # Make ctrl-c work with multi-threading - self.kill_with_fire = False - - try: - # Iterate over each threaded download, waiting for them to finish - for future in concurrent.futures.as_completed(future_downloads): - future.result() - try: - progress.update(task, advance=1) - except Exception as e: - log.error(f"Error updating progress bar: {e}") - - except KeyboardInterrupt: - # Cancel the future threads that haven't started yet - for future in future_downloads: - future.cancel() - # Set the variable that the threaded function looks for - # Will trigger an exception from each thread - self.kill_with_fire = True - # Re-raise exception on the main thread - raise - - for containers in containers_pull: - progress.update(task, description="Pulling singularity images") - # it is possible to try multiple registries / mirrors if multiple were specified. - # Iteration happens over a copy of self.container_library[:], as I want to be able to remove failing registries for subsequent images. - for library in self.container_library[:]: - try: - self.singularity_pull_image(*containers, library, progress) - # Pulling the image was successful, no ContainerError was raised, break the library loop - break - except ContainerError.ImageExistsError: - # Pulling not required - break - except ContainerError.RegistryNotFoundError as e: - self.container_library.remove(library) - # The only library was removed - if not self.container_library: - log.error(e.message) - log.error(e.helpmessage) - raise OSError from e - else: - # Other libraries can be used - continue - except ContainerError.ImageNotFoundError as e: - # Try other registries - if e.error_log.absolute_URI: - break # there no point in trying other registries if absolute URI was specified. - else: - continue - except ContainerError.InvalidTagError: - # Try other registries - continue - except ContainerError.OtherError as e: - # Try other registries - log.error(e.message) - log.error(e.helpmessage) - if e.error_log.absolute_URI: - break # there no point in trying other registries if absolute URI was specified. - else: - continue - else: - # The else clause executes after the loop completes normally. - # This means the library loop completed without breaking, indicating failure for all libraries (registries) - log.error( - f"Not able to pull image of {containers}. Service might be down or internet connection is dead." - ) - # Task should advance in any case. Failure to pull will not kill the download process. - progress.update(task, advance=1) - - def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str]]: - """Check Singularity cache for image, copy to destination folder if found. - - Args: - container (str): A pipeline's container name. Can be direct download URL - or a Docker Hub repository ID. - - Returns: - tuple (str, str): Returns a tuple of (out_path, cache_path). - out_path is the final target output path. it may point to the NXF_SINGULARITY_CACHEDIR, if cache utilisation was set to 'amend'. - If cache utilisation was set to 'copy', it will point to the target folder, a subdirectory of the output directory. In the latter case, - cache_path may either be None (image is not yet cached locally) or point to the image in the NXF_SINGULARITY_CACHEDIR, so it will not be - downloaded from the web again, but directly copied from there. See get_singularity_images() for implementation. - """ - - # Generate file paths - # Based on simpleName() function in Nextflow code: - # https://github.com/nextflow-io/nextflow/blob/671ae6d85df44f906747c16f6d73208dbc402d49/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L69-L94 - out_name = container - # Strip URI prefix - out_name = re.sub(r"^.*:\/\/", "", out_name) - # Detect file extension - extension = ".img" - if ".sif:" in out_name: - extension = ".sif" - out_name = out_name.replace(".sif:", "-") - elif out_name.endswith(".sif"): - extension = ".sif" - out_name = out_name[:-4] - # Strip : and / characters - out_name = out_name.replace("/", "-").replace(":", "-") - # Add file extension - out_name = out_name + extension - - # Trim potential registries from the name for consistency. - # This will allow pipelines to work offline without symlinked images, - # if docker.registry / singularity.registry are set to empty strings at runtime, which can be included in the HPC config profiles easily. - if self.registry_set: - # Create a regex pattern from the set of registries - trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) - # Use the pattern to trim the string - out_name = re.sub(f"{trim_pattern}", "", out_name) - - # Full destination and cache paths - out_path = os.path.abspath(os.path.join(self.outdir, "singularity-images", out_name)) - cache_path = None - if os.environ.get("NXF_SINGULARITY_CACHEDIR"): - cache_path = os.path.join(os.environ["NXF_SINGULARITY_CACHEDIR"], out_name) - # Use only the cache - set this as the main output path - if self.container_cache_utilisation == "amend": - out_path = cache_path - cache_path = None - elif self.container_cache_utilisation in ["amend", "copy"]: - raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!") - - return (out_path, cache_path) - - def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: Optional[str]) -> None: - """Copy Singularity image from NXF_SINGULARITY_CACHEDIR to target folder.""" - # Copy to destination folder if we have a cached version - if cache_path and os.path.exists(cache_path): - log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") - shutil.copyfile(cache_path, out_path) - # Create symlinks to ensure that the images are found even with different registries being used. - self.symlink_singularity_images(out_path) - - def singularity_download_image( - self, container: str, out_path: str, cache_path: Optional[str], progress: DownloadProgress - ) -> None: - """Download a singularity image from the web. - - Use native Python to download the file. - - Args: - container (str): A pipeline's container name. Usually it is of similar format - to ``https://depot.galaxyproject.org/singularity/name:version`` - out_path (str): The final target output path - cache_path (str, None): The NXF_SINGULARITY_CACHEDIR path if set, None if not - progress (Progress): Rich progress bar instance to add tasks to. - """ - log.debug(f"Downloading Singularity image: '{container}'") - - # Set output path to save file to - output_path = cache_path or out_path - output_path_tmp = f"{output_path}.partial" - log.debug(f"Downloading to: '{output_path_tmp}'") - - # Set up progress bar - nice_name = container.split("/")[-1][:50] - task = progress.add_task(nice_name, start=False, total=False, progress_type="download") - try: - # Delete temporary file if it already exists - if os.path.exists(output_path_tmp): - os.remove(output_path_tmp) - - # Open file handle and download - with open(output_path_tmp, "wb") as fh: - # Disable caching as this breaks streamed downloads - with requests_cache.disabled(): - r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) - filesize = r.headers.get("Content-length") - if filesize: - progress.update(task, total=int(filesize)) - progress.start_task(task) - - # Stream download - for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): - # Check that the user didn't hit ctrl-c - if self.kill_with_fire: - raise KeyboardInterrupt - progress.update(task, advance=len(data)) - fh.write(data) - - # Rename partial filename to final filename - os.rename(output_path_tmp, output_path) - - # Copy cached download if we are using the cache - if cache_path: - log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") - progress.update(task, description="Copying from cache to target directory") - shutil.copyfile(cache_path, out_path) - self.symlink_singularity_images(cache_path) # symlinks inside the cache directory - - # Create symlinks to ensure that the images are found even with different registries being used. - self.symlink_singularity_images(out_path) - - progress.remove_task(task) - - except: - # Kill the progress bars - for t in progress.task_ids: - progress.remove_task(t) - # Try to delete the incomplete download - log.debug(f"Deleting incompleted singularity image download:\n'{output_path_tmp}'") - if output_path_tmp and os.path.exists(output_path_tmp): - os.remove(output_path_tmp) - if output_path and os.path.exists(output_path): - os.remove(output_path) - # Re-raise the caught exception - raise - finally: - del output_path_tmp - - def singularity_pull_image( - self, container: str, out_path: str, cache_path: Optional[str], library: List[str], progress: DownloadProgress - ) -> None: - """Pull a singularity image using ``singularity pull`` - - Attempt to use a local installation of singularity to pull the image. - - Args: - container (str): A pipeline's container name. Usually it is of similar format - to ``nfcore/name:version``. - library (list of str): A list of libraries to try for pulling the image. - - Raises: - Various exceptions possible from `subprocess` execution of Singularity. - """ - output_path = cache_path or out_path - - # where the output of 'singularity pull' is first generated before being copied to the NXF_SINGULARITY_CACHDIR. - # if not defined by the Singularity administrators, then use the temporary directory to avoid storing the images in the work directory. - if os.environ.get("SINGULARITY_CACHEDIR") is None: - os.environ["SINGULARITY_CACHEDIR"] = str(NFCORE_CACHE_DIR) - - # Sometimes, container still contain an explicit library specification, which - # resulted in attempted pulls e.g. from docker://quay.io/quay.io/qiime2/core:2022.11 - # Thus, if an explicit registry is specified, the provided -l value is ignored. - # Additionally, check if the container to be pulled is native Singularity: oras:// protocol. - container_parts = container.split("/") - if len(container_parts) > 2: - address = container if container.startswith("oras://") else f"docker://{container}" - absolute_URI = True - else: - address = f"docker://{library}/{container.replace('docker://', '')}" - absolute_URI = False - - if shutil.which("singularity"): - singularity_command = [ - "singularity", - "pull", - "--name", - output_path, - address, - ] - elif shutil.which("apptainer"): - singularity_command = ["apptainer", "pull", "--name", output_path, address] - else: - raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") - log.debug(f"Building singularity image: {address}") - log.debug(f"Singularity command: {' '.join(singularity_command)}") - - # Progress bar to show that something is happening - task = progress.add_task( - container, - start=False, - total=False, - progress_type="singularity_pull", - current_log="", - ) - - # Run the singularity pull command - with subprocess.Popen( - singularity_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - ) as proc: - lines = [] - if proc.stdout is not None: - for line in proc.stdout: - lines.append(line) - progress.update(task, current_log=line.strip()) - - if lines: - # something went wrong with the container retrieval - if any("FATAL: " in line for line in lines): - progress.remove_task(task) - raise ContainerError( - container=container, - registry=library, - address=address, - absolute_URI=absolute_URI, - out_path=out_path if out_path else cache_path or "", - singularity_command=singularity_command, - error_msg=lines, + singularity_fetcher = SingularityFetcher(self.container_library, self.registry_set, progress) + singularity_fetcher.fetch_containers( + self.containers, + out_path_dir, + self.containers_remote, + self.container_cache_utilisation == "amend", + task, ) - # Copy cached download if we are using the cache - if cache_path: - log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") - progress.update(task, current_log="Copying from cache to target directory") - shutil.copyfile(cache_path, out_path) - self.symlink_singularity_images(cache_path) # symlinks inside the cache directory - - # Create symlinks to ensure that the images are found even with different registries being used. - self.symlink_singularity_images(out_path) - - progress.remove_task(task) - def compress_download(self): """Take the downloaded files and make a compressed .tar.gz archive.""" log.debug(f"Creating archive: {self.output_filename}") @@ -1849,140 +1408,3 @@ def bare_clone(self, destination): self.repo.clone(os.path.abspath(destination), bare=True) except (OSError, GitCommandError, InvalidGitRepositoryError) as e: log.error(f"[red]Failure to create the pipeline download[/]\n{e}\n") - - -# Distinct errors for the container download, required for acting on the exceptions - - -class ContainerError(Exception): - """A class of errors related to pulling containers with Singularity/Apptainer""" - - def __init__( - self, - container, - registry, - address, - absolute_URI, - out_path, - singularity_command, - error_msg, - ): - self.container = container - self.registry = registry - self.address = address - self.absolute_URI = absolute_URI - self.out_path = out_path - self.singularity_command = singularity_command - self.error_msg = error_msg - - for line in error_msg: - if re.search(r"dial\stcp.*no\ssuch\shost", line): - self.error_type = self.RegistryNotFoundError(self) - break - elif ( - re.search(r"requested\saccess\sto\sthe\sresource\sis\sdenied", line) - or re.search(r"StatusCode:\s404", line) - or re.search(r"400|Bad\s?Request", line) - or re.search(r"invalid\sstatus\scode\sfrom\sregistry\s400", line) - ): - # Unfortunately, every registry seems to return an individual error here: - # Docker.io: denied: requested access to the resource is denied - # unauthorized: authentication required - # Quay.io: StatusCode: 404, \n'] - # ghcr.io: Requesting bearer token: invalid status code from registry 400 (Bad Request) - self.error_type = self.ImageNotFoundError(self) - break - elif re.search(r"manifest\sunknown", line): - self.error_type = self.InvalidTagError(self) - break - elif re.search(r"ORAS\sSIF\simage\sshould\shave\sa\ssingle\slayer", line): - self.error_type = self.NoSingularityContainerError(self) - break - elif re.search(r"Image\sfile\salready\sexists", line): - self.error_type = self.ImageExistsError(self) - break - else: - continue - else: - self.error_type = self.OtherError(self) - - log.error(self.error_type.message) - log.info(self.error_type.helpmessage) - log.debug(f"Failed command:\n{' '.join(singularity_command)}") - log.debug(f"Singularity error messages:\n{''.join(error_msg)}") - - raise self.error_type - - class RegistryNotFoundError(ConnectionRefusedError): - """The specified registry does not resolve to a valid IP address""" - - def __init__(self, error_log): - self.error_log = error_log - self.message = ( - f'[bold red]The specified container library "{self.error_log.registry}" is invalid or unreachable.[/]\n' - ) - self.helpmessage = ( - f'Please check, if you made a typo when providing "-l / --library {self.error_log.registry}"\n' - ) - super().__init__(self.message, self.helpmessage, self.error_log) - - class ImageNotFoundError(FileNotFoundError): - """The image can not be found in the registry""" - - def __init__(self, error_log): - self.error_log = error_log - if not self.error_log.absolute_URI: - self.message = ( - f'[bold red]"Pulling "{self.error_log.container}" from "{self.error_log.address}" failed.[/]\n' - ) - self.helpmessage = f'Saving image of "{self.error_log.container}" failed.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.f\n' - else: - self.message = f'[bold red]"The pipeline requested the download of non-existing container image "{self.error_log.address}"[/]\n' - self.helpmessage = f'Please try to rerun \n"{" ".join(self.error_log.singularity_command)}" manually with a different registry.f\n' - - super().__init__(self.message) - - class InvalidTagError(AttributeError): - """Image and registry are valid, but the (version) tag is not""" - - def __init__(self, error_log): - self.error_log = error_log - self.message = f'[bold red]"{self.error_log.address.split(":")[-1]}" is not a valid tag of "{self.error_log.container}"[/]\n' - self.helpmessage = f'Please chose a different library than {self.error_log.registry}\nor try to locate the "{self.error_log.address.split(":")[-1]}" version of "{self.error_log.container}" manually.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' - super().__init__(self.message) - - class ImageExistsError(FileExistsError): - """Image already exists in cache/output directory.""" - - def __init__(self, error_log): - self.error_log = error_log - self.message = ( - f'[bold red]"{self.error_log.container}" already exists at destination and cannot be pulled[/]\n' - ) - self.helpmessage = f'Saving image of "{self.error_log.container}" failed, because "{self.error_log.out_path}" exists.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' - super().__init__(self.message) - - class NoSingularityContainerError(RuntimeError): - """The container image is no native Singularity Image Format.""" - - def __init__(self, error_log): - self.error_log = error_log - self.message = ( - f'[bold red]"{self.error_log.container}" is no valid Singularity Image Format container.[/]\n' - ) - self.helpmessage = f"Pulling \"{self.error_log.container}\" failed, because it appears invalid. To convert from Docker's OCI format, prefix the URI with 'docker://' instead of 'oras://'.\n" - super().__init__(self.message) - - class OtherError(RuntimeError): - """Undefined error with the container""" - - def __init__(self, error_log): - self.error_log = error_log - if not self.error_log.absolute_URI: - self.message = f'[bold red]"{self.error_log.container}" failed for unclear reasons.[/]\n' - self.helpmessage = f'Pulling of "{self.error_log.container}" failed.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' - else: - self.message = f'[bold red]"The pipeline requested the download of non-existing container image "{self.error_log.address}"[/]\n' - self.helpmessage = f'Please try to rerun \n"{" ".join(self.error_log.singularity_command)}" manually with a different registry.f\n' - - super().__init__(self.message, self.helpmessage, self.error_log) diff --git a/nf_core/pipelines/downloads/__init__.py b/nf_core/pipelines/downloads/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py new file mode 100644 index 0000000000..9c862cd952 --- /dev/null +++ b/nf_core/pipelines/downloads/singularity.py @@ -0,0 +1,584 @@ +import concurrent.futures +import io +import logging +import os +import re +import shutil +import subprocess +from typing import Collection, Container, Iterable, List, Tuple + +import requests +import requests_cache +import rich +import rich.progress + +log = logging.getLogger(__name__) + + +class SingularityFetcher: + """Class to manage all Singularity operations for fetching containers. + + The guiding principles are that: + - Container download/pull/copy methods are unaware of the concepts of + "library" and "cache". They are just told to fetch a container and + put it in a certain location. + - Only the `fetch_containers` method is aware of the concepts of "library" + and "cache". It is a sort of orchestrator that decides where to fetch + each container and calls the appropriate methods. + - All methods are integrated with a progress bar + """ + + def __init__( + self, container_library: Iterable[str], registry_set: Iterable[str], progress: rich.progress.Progress + ) -> None: + self.container_library = list(container_library) + self.registry_set = registry_set + self.progress = progress + self.kill_with_fire = False + + def get_container_filename(self, container: str) -> str: + """Check Singularity cache for image, copy to destination folder if found. + + Args: + container (str): A pipeline's container name. Can be direct download URL + or a Docker Hub repository ID. + + Returns: + # TODO + tuple (str, str): Returns a tuple of (out_path, cache_path). + """ + + # Generate file paths + # Based on simpleName() function in Nextflow code: + # https://github.com/nextflow-io/nextflow/blob/671ae6d85df44f906747c16f6d73208dbc402d49/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L69-L94 + out_name = container + # Strip URI prefix + out_name = re.sub(r"^.*:\/\/", "", out_name) + # Detect file extension + extension = ".img" + if ".sif:" in out_name: + extension = ".sif" + out_name = out_name.replace(".sif:", "-") + elif out_name.endswith(".sif"): + extension = ".sif" + out_name = out_name[:-4] + # Strip : and / characters + out_name = out_name.replace("/", "-").replace(":", "-") + # Add file extension + out_name = out_name + extension + + # Trim potential registries from the name for consistency. + # This will allow pipelines to work offline without symlinked images, + # if docker.registry / singularity.registry are set to empty strings at runtime, which can be included in the HPC config profiles easily. + if self.registry_set: + # Create a regex pattern from the set of registries + trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) + # Use the pattern to trim the string + out_name = re.sub(f"{trim_pattern}", "", out_name) + + return out_name + + def symlink_registries(self, image_path: str) -> None: + """Create a symlink for each registry in the registry set that points to the image. + We have dropped the explicit registries from the modules in favor of the configurable registries. + Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. + + The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img + by prepending all registries in registry_set to the image name. + + Unfortunately, out output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org + or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped + before to ensure that it is really the base name. + """ + + # Create a regex pattern from the set, in case trimming is needed. + trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) + + for registry in self.registry_set: + # Nextflow will convert it like this as well, so we need it mimic its behavior + registry = registry.replace("/", "-") + + if not bool(re.search(trim_pattern, os.path.basename(image_path))): + symlink_name = os.path.join("./", f"{registry}-{os.path.basename(image_path)}") + else: + trimmed_name = re.sub(f"{trim_pattern}", "", os.path.basename(image_path)) + symlink_name = os.path.join("./", f"{registry}-{trimmed_name}") + + symlink_full = os.path.join(os.path.dirname(image_path), symlink_name) + target_name = os.path.join("./", os.path.basename(image_path)) + + if not os.path.exists(symlink_full) and target_name != symlink_name: + os.makedirs(os.path.dirname(symlink_full), exist_ok=True) + image_dir = os.open(os.path.dirname(image_path), os.O_RDONLY) + try: + os.symlink( + target_name, + symlink_name, + dir_fd=image_dir, + ) + log.debug(f"Symlinked {target_name} as {symlink_name}.") + finally: + os.close(image_dir) + + def download_images( + self, + containers_download: Iterable[Tuple[str, str]], + task: rich.progress.TaskID, + parallel_downloads: int, + ) -> None: + # if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. + with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: + # Kick off concurrent downloads + future_downloads = [ + pool.submit(self.download_image, container, output_path) + for (container, output_path) in containers_download + ] + + # Make ctrl-c work with multi-threading + self.kill_with_fire = False + + try: + # Iterate over each threaded download, waiting for them to finish + for future in concurrent.futures.as_completed(future_downloads): + future.result() + try: + self.progress.update(task, advance=1) + except Exception as e: + log.error(f"Error updating progress bar: {e}") + + except KeyboardInterrupt: + # Cancel the future threads that haven't started yet + for future in future_downloads: + future.cancel() + # Set the variable that the threaded function looks for + # Will trigger an exception from each thread + self.kill_with_fire = True + # Re-raise exception on the main thread + raise + + def download_image(self, container: str, output_path: str) -> None: + """Download a singularity image from the web. + + Use native Python to download the file. + + Args: + container (str): A pipeline's container name. Usually it is of similar format + to ``https://depot.galaxyproject.org/singularity/name:version`` + out_path (str): The final target output path + cache_path (str, None): The NXF_SINGULARITY_CACHEDIR path if set, None if not + progress (Progress): Rich progress bar instance to add tasks to. + """ + log.debug(f"Downloading Singularity image: '{container}'") + + # Set output path to save file to + output_path_tmp = f"{output_path}.partial" + log.debug(f"Downloading to: '{output_path_tmp}'") + + # Set up progress bar + nice_name = container.split("/")[-1][:50] + task = self.progress.add_task(nice_name, start=False, total=False, progress_type="download") + try: + # Delete temporary file if it already exists + if os.path.exists(output_path_tmp): + os.remove(output_path_tmp) + + # Open file handle and download + with open(output_path_tmp, "wb") as fh: + # Disable caching as this breaks streamed downloads + with requests_cache.disabled(): + r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) + filesize = r.headers.get("Content-length") + if filesize: + self.progress.update(task, total=int(filesize)) + self.progress.start_task(task) + + # Stream download + for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): + # Check that the user didn't hit ctrl-c + if self.kill_with_fire: + raise KeyboardInterrupt + self.progress.update(task, advance=len(data)) + fh.write(data) + + # Rename partial filename to final filename + os.rename(output_path_tmp, output_path) + self.symlink_registries(output_path) + + except: + # Try to delete the incomplete download + log.debug(f"Deleting incompleted singularity image download:\n'{output_path_tmp}'") + if output_path_tmp and os.path.exists(output_path_tmp): + os.remove(output_path_tmp) + if output_path and os.path.exists(output_path): + os.remove(output_path) + # Re-raise the caught exception + raise + finally: + self.progress.remove_task(task) + del output_path_tmp + + def pull_images(self, containers_pull: Iterable[Tuple[str, str]], task: rich.progress.TaskID) -> None: + for container, output_path in containers_pull: + # it is possible to try multiple registries / mirrors if multiple were specified. + # Iteration happens over a copy of self.container_library[:], as I want to be able to remove failing registries for subsequent images. + for library in self.container_library[:]: + try: + self.pull_image(container, output_path, library) + # Pulling the image was successful, no ContainerError was raised, break the library loop + break + except ContainerError.ImageExistsError: + # Pulling not required + break + except ContainerError.RegistryNotFoundError as e: + self.container_library.remove(library) + # The only library was removed + if not self.container_library: + log.error(e.message) + log.error(e.helpmessage) + raise OSError from e + else: + # Other libraries can be used + continue + except ContainerError.ImageNotFoundError as e: + # Try other registries + if e.error_log.absolute_URI: + break # there no point in trying other registries if absolute URI was specified. + else: + continue + except ContainerError.InvalidTagError: + # Try other registries + continue + except ContainerError.OtherError as e: + # Try other registries + log.error(e.message) + log.error(e.helpmessage) + if e.error_log.absolute_URI: + break # there no point in trying other registries if absolute URI was specified. + else: + continue + else: + # The else clause executes after the loop completes normally. + # This means the library loop completed without breaking, indicating failure for all libraries (registries) + log.error( + f"Not able to pull image of {container}. Service might be down or internet connection is dead." + ) + # Task should advance in any case. Failure to pull will not kill the download process. + self.progress.update(task, advance=1) + + def pull_image(self, container: str, output_path: str, library: str) -> None: + """Pull a singularity image using ``singularity pull`` + + Attempt to use a local installation of singularity to pull the image. + + Args: + container (str): A pipeline's container name. Usually it is of similar format + to ``nfcore/name:version``. + library (list of str): A list of libraries to try for pulling the image. + + Raises: + Various exceptions possible from `subprocess` execution of Singularity. + """ + # Sometimes, container still contain an explicit library specification, which + # resulted in attempted pulls e.g. from docker://quay.io/quay.io/qiime2/core:2022.11 + # Thus, if an explicit registry is specified, the provided -l value is ignored. + # Additionally, check if the container to be pulled is native Singularity: oras:// protocol. + container_parts = container.split("/") + if len(container_parts) > 2: + address = container if container.startswith("oras://") else f"docker://{container}" + absolute_URI = True + else: + address = f"docker://{library}/{container.replace('docker://', '')}" + absolute_URI = False + + if shutil.which("singularity"): + singularity_command = [ + "singularity", + "pull", + "--name", + output_path, + address, + ] + elif shutil.which("apptainer"): + singularity_command = ["apptainer", "pull", "--name", output_path, address] + else: + raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") + log.debug(f"Building singularity image: {address}") + log.debug(f"Singularity command: {' '.join(singularity_command)}") + + # Progress bar to show that something is happening + task = self.progress.add_task( + container, + start=False, + total=False, + progress_type="singularity_pull", + current_log="", + ) + + # Run the singularity pull command + with subprocess.Popen( + singularity_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + ) as proc: + lines = [] + if proc.stdout is not None: + for line in proc.stdout: + lines.append(line) + self.progress.update(task, current_log=line.strip()) + + if lines: + # something went wrong with the container retrieval + if any("FATAL: " in line for line in lines): + self.progress.remove_task(task) + raise ContainerError( + container=container, + registry=library, + address=address, + absolute_URI=absolute_URI, + out_path=output_path, + singularity_command=singularity_command, + error_msg=lines, + ) + self.symlink_registries(output_path) + + self.progress.remove_task(task) + + def copy_image(self, container: str, src_path: str, dest_path: str) -> None: + """Copy Singularity image from one directory to another.""" + log.debug(f"Copying {container} from '{os.path.basename(src_path)}' to '{os.path.basename(dest_path)}'") + shutil.copyfile(src_path, dest_path) + # Create symlinks to ensure that the images are found even with different registries being used. + self.symlink_registries(dest_path) + + def fetch_containers( + self, + containers: Collection[str], + output_dir: str, + exclude_list: Container[str], + amend_cachedir: bool, + task: rich.progress.TaskID, + ): + def check_env_var(name): + dir = os.environ.get(name) + if dir and os.path.exists(dir): + return dir + + library_dir = check_env_var("NXF_SINGULARITY_LIBRARYDIR") + cache_dir = check_env_var("NXF_SINGULARITY_CACHEDIR") + + # Check each container in the list and defer actions + containers_download: List[Tuple[str, str]] = [] + containers_pull: List[Tuple[str, str]] = [] + containers_copy: List[Tuple[str, str, str]] = [] + + # We may add more tasks as containers need to be copied between the various caches + total_tasks = len(containers) + + for container in containers: + container_filename = self.get_container_filename(container) + + # Files in the remote cache are already downloaded and can be ignored + if container_filename in exclude_list: + log.debug(f"Skipping download of container '{container_filename}' as it is cached remotely.") + self.progress.update(task, advance=1, description=f"Skipping {container_filename}") + continue + + # Generate file paths for all three locations + output_path = os.path.join(output_dir, container_filename) + + if os.path.exists(output_path): + log.debug(f"Skipping download of container '{container_filename}' as it is in already present.") + self.progress.update(task, advance=1, description=f"{container_filename} exists at destination") + continue + + library_path = os.path.join(library_dir, container_filename) if library_dir else None + cache_path = os.path.join(cache_dir, container_filename) if cache_dir else None + + # get the container from the library + if library_path and os.path.exists(library_path): + containers_copy.append((container, library_path, output_path)) + # update the cache if needed + if cache_path and amend_cachedir and not os.path.exists(cache_path): + containers_copy.append((container, library_path, cache_path)) + total_tasks += 1 + self.progress.update(task, total=total_tasks) + + # get the container from the cache + elif cache_path and os.path.exists(cache_path): + containers_copy.append((container, cache_path, output_path)) + + # no library or cache + else: + # fetch method (download or pull) + if container.startswith("http"): + fetch_list = containers_download + else: + fetch_list = containers_pull + + if cache_path and amend_cachedir: + # download into the cache + fetch_list.append((container, cache_path)) + # and copy from the cache to the output + containers_copy.append((container, cache_path, output_path)) + total_tasks += 1 + self.progress.update(task, total=total_tasks) + + else: + # download or pull directly to the output + fetch_list.append((container, output_path)) + + # Download all containers + if containers_download: + self.progress.update(task, description="Downloading singularity images") + self.download_images(containers_download, task, parallel_downloads=4) + + # Pull all containers + if containers_pull: + if not (shutil.which("singularity") or shutil.which("apptainer")): + raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") + self.progress.update(task, description="Pulling singularity images") + self.pull_images(containers_pull, task) + + # Copy all containers + self.progress.update(task, description="Copying singularity images from/to cache") + for container, src_path, dest_path in containers_copy: + self.progress.update(task, description="Copying singularity images from cache") + self.copy_image(container, src_path, dest_path) + self.progress.update(task, advance=1) + + +# Distinct errors for the container download, required for acting on the exceptions +class ContainerError(Exception): + """A class of errors related to pulling containers with Singularity/Apptainer""" + + def __init__( + self, + container, + registry, + address, + absolute_URI, + out_path, + singularity_command, + error_msg, + ): + self.container = container + self.registry = registry + self.address = address + self.absolute_URI = absolute_URI + self.out_path = out_path + self.singularity_command = singularity_command + self.error_msg = error_msg + + for line in error_msg: + if re.search(r"dial\stcp.*no\ssuch\shost", line): + self.error_type = self.RegistryNotFoundError(self) + break + elif ( + re.search(r"requested\saccess\sto\sthe\sresource\sis\sdenied", line) + or re.search(r"StatusCode:\s404", line) + or re.search(r"400|Bad\s?Request", line) + or re.search(r"invalid\sstatus\scode\sfrom\sregistry\s400", line) + ): + # Unfortunately, every registry seems to return an individual error here: + # Docker.io: denied: requested access to the resource is denied + # unauthorized: authentication required + # Quay.io: StatusCode: 404, \n'] + # ghcr.io: Requesting bearer token: invalid status code from registry 400 (Bad Request) + self.error_type = self.ImageNotFoundError(self) + break + elif re.search(r"manifest\sunknown", line): + self.error_type = self.InvalidTagError(self) + break + elif re.search(r"ORAS\sSIF\simage\sshould\shave\sa\ssingle\slayer", line): + self.error_type = self.NoSingularityContainerError(self) + break + elif re.search(r"Image\sfile\salready\sexists", line): + self.error_type = self.ImageExistsError(self) + break + else: + continue + else: + self.error_type = self.OtherError(self) + + log.error(self.error_type.message) + log.info(self.error_type.helpmessage) + log.debug(f"Failed command:\n{' '.join(singularity_command)}") + log.debug(f"Singularity error messages:\n{''.join(error_msg)}") + + raise self.error_type + + class RegistryNotFoundError(ConnectionRefusedError): + """The specified registry does not resolve to a valid IP address""" + + def __init__(self, error_log): + self.error_log = error_log + self.message = ( + f'[bold red]The specified container library "{self.error_log.registry}" is invalid or unreachable.[/]\n' + ) + self.helpmessage = ( + f'Please check, if you made a typo when providing "-l / --library {self.error_log.registry}"\n' + ) + super().__init__(self.message, self.helpmessage, self.error_log) + + class ImageNotFoundError(FileNotFoundError): + """The image can not be found in the registry""" + + def __init__(self, error_log): + self.error_log = error_log + if not self.error_log.absolute_URI: + self.message = ( + f'[bold red]"Pulling "{self.error_log.container}" from "{self.error_log.address}" failed.[/]\n' + ) + self.helpmessage = f'Saving image of "{self.error_log.container}" failed.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.f\n' + else: + self.message = f'[bold red]"The pipeline requested the download of non-existing container image "{self.error_log.address}"[/]\n' + self.helpmessage = f'Please try to rerun \n"{" ".join(self.error_log.singularity_command)}" manually with a different registry.f\n' + + super().__init__(self.message) + + class InvalidTagError(AttributeError): + """Image and registry are valid, but the (version) tag is not""" + + def __init__(self, error_log): + self.error_log = error_log + self.message = f'[bold red]"{self.error_log.address.split(":")[-1]}" is not a valid tag of "{self.error_log.container}"[/]\n' + self.helpmessage = f'Please chose a different library than {self.error_log.registry}\nor try to locate the "{self.error_log.address.split(":")[-1]}" version of "{self.error_log.container}" manually.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' + super().__init__(self.message) + + class ImageExistsError(FileExistsError): + """Image already exists in cache/output directory.""" + + def __init__(self, error_log): + self.error_log = error_log + self.message = ( + f'[bold red]"{self.error_log.container}" already exists at destination and cannot be pulled[/]\n' + ) + self.helpmessage = f'Saving image of "{self.error_log.container}" failed, because "{self.error_log.out_path}" exists.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' + super().__init__(self.message) + + class NoSingularityContainerError(RuntimeError): + """The container image is no native Singularity Image Format.""" + + def __init__(self, error_log): + self.error_log = error_log + self.message = ( + f'[bold red]"{self.error_log.container}" is no valid Singularity Image Format container.[/]\n' + ) + self.helpmessage = f"Pulling \"{self.error_log.container}\" failed, because it appears invalid. To convert from Docker's OCI format, prefix the URI with 'docker://' instead of 'oras://'.\n" + super().__init__(self.message) + + class OtherError(RuntimeError): + """Undefined error with the container""" + + def __init__(self, error_log): + self.error_log = error_log + if not self.error_log.absolute_URI: + self.message = f'[bold red]"{self.error_log.container}" failed for unclear reasons.[/]\n' + self.helpmessage = f'Pulling of "{self.error_log.container}" failed.\nPlease troubleshoot the command \n"{" ".join(self.error_log.singularity_command)}" manually.\n' + else: + self.message = f'[bold red]"The pipeline requested the download of non-existing container image "{self.error_log.address}"[/]\n' + self.helpmessage = f'Please try to rerun \n"{" ".join(self.error_log.singularity_command)}" manually with a different registry.f\n' + + super().__init__(self.message, self.helpmessage, self.error_log) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py new file mode 100644 index 0000000000..c2929c28ce --- /dev/null +++ b/nf_core/pipelines/downloads/utils.py @@ -0,0 +1,41 @@ +import rich.progress + + +class DownloadError(RuntimeError): + """A custom exception that is raised when nf-core pipelines download encounters a problem that we already took into consideration. + In this case, we do not want to print the traceback, but give the user some concise, helpful feedback instead. + """ + + +class DownloadProgress(rich.progress.Progress): + """Custom Progress bar class, allowing us to have two progress + bars with different columns / layouts. + """ + + def get_renderables(self): + for task in self.tasks: + if task.fields.get("progress_type") == "summary": + self.columns = ( + "[magenta]{task.description}", + rich.progress.BarColumn(bar_width=None), + "[progress.percentage]{task.percentage:>3.0f}%", + "•", + "[green]{task.completed}/{task.total} completed", + ) + if task.fields.get("progress_type") == "download": + self.columns = ( + "[blue]{task.description}", + rich.progress.BarColumn(bar_width=None), + "[progress.percentage]{task.percentage:>3.1f}%", + "•", + rich.progress.DownloadColumn(), + "•", + rich.progress.TransferSpeedColumn(), + ) + if task.fields.get("progress_type") == "singularity_pull": + self.columns = ( + "[magenta]{task.description}", + "[blue]{task.fields[current_log]}", + rich.progress.BarColumn(bar_width=None), + ) + yield self.make_tasks_table([task]) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 6db7392107..bdcdf5c66b 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -15,7 +15,8 @@ import nf_core.pipelines.create.create import nf_core.pipelines.list import nf_core.utils -from nf_core.pipelines.download import ContainerError, DownloadWorkflow, WorkflowRepo +from nf_core.pipelines.download import DownloadWorkflow, WorkflowRepo +from nf_core.pipelines.downloads.singularity import ContainerError from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd From bbd297fca788639f3b964577b85e52a9fd11cc7c Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 14:18:59 +0000 Subject: [PATCH 02/31] Added a helper method to use a temporary file as intermediate output --- nf_core/pipelines/downloads/singularity.py | 173 ++++++++++----------- nf_core/pipelines/downloads/utils.py | 21 +++ 2 files changed, 102 insertions(+), 92 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 9c862cd952..220c1c24df 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -12,6 +12,8 @@ import rich import rich.progress +from nf_core.pipelines.downloads.utils import intermediate_file + log = logging.getLogger(__name__) @@ -177,45 +179,28 @@ def download_image(self, container: str, output_path: str) -> None: # Set up progress bar nice_name = container.split("/")[-1][:50] task = self.progress.add_task(nice_name, start=False, total=False, progress_type="download") - try: - # Delete temporary file if it already exists - if os.path.exists(output_path_tmp): - os.remove(output_path_tmp) - - # Open file handle and download - with open(output_path_tmp, "wb") as fh: - # Disable caching as this breaks streamed downloads - with requests_cache.disabled(): - r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) - filesize = r.headers.get("Content-length") - if filesize: - self.progress.update(task, total=int(filesize)) - self.progress.start_task(task) - - # Stream download - for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): - # Check that the user didn't hit ctrl-c - if self.kill_with_fire: - raise KeyboardInterrupt - self.progress.update(task, advance=len(data)) - fh.write(data) - - # Rename partial filename to final filename - os.rename(output_path_tmp, output_path) - self.symlink_registries(output_path) - - except: - # Try to delete the incomplete download - log.debug(f"Deleting incompleted singularity image download:\n'{output_path_tmp}'") - if output_path_tmp and os.path.exists(output_path_tmp): - os.remove(output_path_tmp) - if output_path and os.path.exists(output_path): - os.remove(output_path) - # Re-raise the caught exception - raise - finally: - self.progress.remove_task(task) - del output_path_tmp + + # Open file handle and download + # This temporary will be automatically renamed to the target if there are no errors + with intermediate_file(output_path) as fh: + # Disable caching as this breaks streamed downloads + with requests_cache.disabled(): + r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) + filesize = r.headers.get("Content-length") + if filesize: + self.progress.update(task, total=int(filesize)) + self.progress.start_task(task) + + # Stream download + for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): + # Check that the user didn't hit ctrl-c + if self.kill_with_fire: + raise KeyboardInterrupt + self.progress.update(task, advance=len(data)) + fh.write(data) + + self.symlink_registries(output_path) + self.progress.remove_task(task) def pull_images(self, containers_pull: Iterable[Tuple[str, str]], task: rich.progress.TaskID) -> None: for container, output_path in containers_pull: @@ -290,65 +275,69 @@ def pull_image(self, container: str, output_path: str, library: str) -> None: address = f"docker://{library}/{container.replace('docker://', '')}" absolute_URI = False - if shutil.which("singularity"): - singularity_command = [ - "singularity", - "pull", - "--name", - output_path, - address, - ] - elif shutil.which("apptainer"): - singularity_command = ["apptainer", "pull", "--name", output_path, address] - else: - raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") - log.debug(f"Building singularity image: {address}") - log.debug(f"Singularity command: {' '.join(singularity_command)}") - - # Progress bar to show that something is happening - task = self.progress.add_task( - container, - start=False, - total=False, - progress_type="singularity_pull", - current_log="", - ) - - # Run the singularity pull command - with subprocess.Popen( - singularity_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - ) as proc: - lines = [] - if proc.stdout is not None: - for line in proc.stdout: - lines.append(line) - self.progress.update(task, current_log=line.strip()) - - if lines: - # something went wrong with the container retrieval - if any("FATAL: " in line for line in lines): - self.progress.remove_task(task) - raise ContainerError( - container=container, - registry=library, - address=address, - absolute_URI=absolute_URI, - out_path=output_path, - singularity_command=singularity_command, - error_msg=lines, - ) - self.symlink_registries(output_path) + with intermediate_file(output_path) as output_path_tmp: + if shutil.which("singularity"): + singularity_command = [ + "singularity", + "pull", + "--name", + output_path_tmp.name, + address, + ] + elif shutil.which("apptainer"): + singularity_command = ["apptainer", "pull", "--name", output_path_tmp.name, address] + else: + raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") + log.debug(f"Building singularity image: {address}") + log.debug(f"Singularity command: {' '.join(singularity_command)}") + + # Progress bar to show that something is happening + task = self.progress.add_task( + container, + start=False, + total=False, + progress_type="singularity_pull", + current_log="", + ) + + # Run the singularity pull command + with subprocess.Popen( + singularity_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + ) as proc: + lines = [] + if proc.stdout is not None: + for line in proc.stdout: + lines.append(line) + self.progress.update(task, current_log=line.strip()) + + if lines: + # something went wrong with the container retrieval + if any("FATAL: " in line for line in lines): + self.progress.remove_task(task) + raise ContainerError( + container=container, + registry=library, + address=address, + absolute_URI=absolute_URI, + out_path=output_path, + singularity_command=singularity_command, + error_msg=lines, + ) + self.symlink_registries(output_path) self.progress.remove_task(task) def copy_image(self, container: str, src_path: str, dest_path: str) -> None: """Copy Singularity image from one directory to another.""" log.debug(f"Copying {container} from '{os.path.basename(src_path)}' to '{os.path.basename(dest_path)}'") - shutil.copyfile(src_path, dest_path) + + with intermediate_file(dest_path) as dest_path_tmp: + shutil.copyfile(src_path, dest_path_tmp.name) + # Create symlinks to ensure that the images are found even with different registries being used. self.symlink_registries(dest_path) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index c2929c28ce..48c3c1d0bf 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -1,5 +1,12 @@ +import contextlib +import logging +import os +import tempfile + import rich.progress +log = logging.getLogger(__name__) + class DownloadError(RuntimeError): """A custom exception that is raised when nf-core pipelines download encounters a problem that we already took into consideration. @@ -7,6 +14,20 @@ class DownloadError(RuntimeError): """ +@contextlib.contextmanager +def intermediate_file(output_path): + tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path)) + try: + yield tmp + + finally: + if os.path.exists(tmp.name): + log.debug(f"Deleting incomplete singularity image:\n'{tmp.name}'") + os.remove(tmp.name) + # Re-raise exception on the main thread + raise + + class DownloadProgress(rich.progress.Progress): """Custom Progress bar class, allowing us to have two progress bars with different columns / layouts. From a9b2b0043f1227b829cf880cffe028b70d778f50 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 17:06:10 +0000 Subject: [PATCH 03/31] First round of fixing the tests --- tests/pipelines/test_download.py | 148 +++++++++++++------------------ 1 file changed, 62 insertions(+), 86 deletions(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index bdcdf5c66b..72648e5dc8 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -16,7 +16,7 @@ import nf_core.pipelines.list import nf_core.utils from nf_core.pipelines.download import DownloadWorkflow, WorkflowRepo -from nf_core.pipelines.downloads.singularity import ContainerError +from nf_core.pipelines.downloads.singularity import ContainerError, SingularityFetcher from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd @@ -449,87 +449,65 @@ def test_reconcile_seqera_container_uris(self, tmp_path): @with_temporary_folder @mock.patch("rich.progress.Progress.add_task") def test_singularity_pull_image_singularity_installed(self, tmp_dir, mock_rich_progress): - download_obj = DownloadWorkflow(pipeline="dummy", outdir=tmp_dir) + singularity_fetcher = SingularityFetcher([], [], mock_rich_progress) # Test successful pull - download_obj.singularity_pull_image( - "hello-world", f"{tmp_dir}/hello-world.sif", None, "docker.io", mock_rich_progress - ) + singularity_fetcher.pull_image("hello-world", f"{tmp_dir}/hello-world.sif", "docker.io") # Pull again, but now the image already exists with pytest.raises(ContainerError.ImageExistsError): - download_obj.singularity_pull_image( - "hello-world", f"{tmp_dir}/hello-world.sif", None, "docker.io", mock_rich_progress - ) + singularity_fetcher.pull_image("hello-world", f"{tmp_dir}/hello-world.sif", "docker.io") # Test successful pull with absolute URI (use tiny 3.5MB test container from the "Kogia" project: https://github.com/bschiffthaler/kogia) - download_obj.singularity_pull_image( - "docker.io/bschiffthaler/sed", f"{tmp_dir}/sed.sif", None, "docker.io", mock_rich_progress - ) + singularity_fetcher.pull_image("docker.io/bschiffthaler/sed", f"{tmp_dir}/sed.sif", "docker.io") # Test successful pull with absolute oras:// URI - download_obj.singularity_pull_image( + singularity_fetcher.pull_image( "oras://community.wave.seqera.io/library/umi-transfer:1.0.0--e5b0c1a65b8173b6", f"{tmp_dir}/umi-transfer-oras.sif", - None, "docker.io", - mock_rich_progress, ) # try pulling Docker container image with oras:// with pytest.raises(ContainerError.NoSingularityContainerError): - download_obj.singularity_pull_image( + singularity_fetcher.pull_image( "oras://ghcr.io/matthiaszepper/umi-transfer:dev", f"{tmp_dir}/umi-transfer-oras_impostor.sif", - None, "docker.io", - mock_rich_progress, ) # try to pull from non-existing registry (Name change hello-world_new.sif is needed, otherwise ImageExistsError is raised before attempting to pull.) with pytest.raises(ContainerError.RegistryNotFoundError): - download_obj.singularity_pull_image( + singularity_fetcher.pull_image( "hello-world", f"{tmp_dir}/break_the_registry_test.sif", - None, "register-this-domain-to-break-the-test.io", - mock_rich_progress, ) # test Image not found for several registries with pytest.raises(ContainerError.ImageNotFoundError): - download_obj.singularity_pull_image( - "a-container", f"{tmp_dir}/acontainer.sif", None, "quay.io", mock_rich_progress - ) + singularity_fetcher.pull_image("a-container", f"{tmp_dir}/acontainer.sif", "quay.io") with pytest.raises(ContainerError.ImageNotFoundError): - download_obj.singularity_pull_image( - "a-container", f"{tmp_dir}/acontainer.sif", None, "docker.io", mock_rich_progress - ) + singularity_fetcher.pull_image("a-container", f"{tmp_dir}/acontainer.sif", "docker.io") with pytest.raises(ContainerError.ImageNotFoundError): - download_obj.singularity_pull_image( - "a-container", f"{tmp_dir}/acontainer.sif", None, "ghcr.io", mock_rich_progress - ) + singularity_fetcher.pull_image("a-container", f"{tmp_dir}/acontainer.sif", "ghcr.io") # test Image not found for absolute URI. with pytest.raises(ContainerError.ImageNotFoundError): - download_obj.singularity_pull_image( + singularity_fetcher.pull_image( "docker.io/bschiffthaler/nothingtopullhere", f"{tmp_dir}/nothingtopullhere.sif", - None, "docker.io", - mock_rich_progress, ) # Traffic from Github Actions to GitHub's Container Registry is unlimited, so no harm should be done here. with pytest.raises(ContainerError.InvalidTagError): - download_obj.singularity_pull_image( + singularity_fetcher.pull_image( "ewels/multiqc:go-rewrite", f"{tmp_dir}/multiqc-go.sif", - None, "ghcr.io", - mock_rich_progress, ) @pytest.mark.skipif( @@ -539,10 +517,8 @@ def test_singularity_pull_image_singularity_installed(self, tmp_dir, mock_rich_p @with_temporary_folder @mock.patch("rich.progress.Progress.add_task") def test_singularity_pull_image_successfully(self, tmp_dir, mock_rich_progress): - download_obj = DownloadWorkflow(pipeline="dummy", outdir=tmp_dir) - download_obj.singularity_pull_image( - "hello-world", f"{tmp_dir}/yet-another-hello-world.sif", None, "docker.io", mock_rich_progress - ) + singularity_fetcher = SingularityFetcher([], [], mock_rich_progress) + singularity_fetcher.pull_image("hello-world", f"{tmp_dir}/yet-another-hello-world.sif", "docker.io") # # Tests for 'get_singularity_images' @@ -571,6 +547,7 @@ def test_get_singularity_images(self, tmp_path, mock_fetch_wf_config): download_obj.get_singularity_images() @with_temporary_folder + @mock.patch("rich.progress.Progress.add_task") @mock.patch("os.makedirs") @mock.patch("os.symlink") @mock.patch("os.open") @@ -586,6 +563,7 @@ def test_symlink_singularity_images( mock_open, mock_symlink, mock_makedirs, + mock_rich_progress, ): # Setup mock_dirname.return_value = f"{tmp_path}/path/to" @@ -593,18 +571,17 @@ def test_symlink_singularity_images( mock_open.return_value = 12 # file descriptor mock_close.return_value = 12 # file descriptor - download_obj = DownloadWorkflow( - pipeline="dummy", - outdir=tmp_path, - container_library=( + # Call the method + singularity_fetcher = SingularityFetcher( + ( "quay.io", "community-cr-prod.seqera.io/docker/registry/v2", "depot.galaxyproject.org/singularity", ), + [], + mock_rich_progress, ) - - # Call the method - download_obj.symlink_singularity_images(f"{tmp_path}/path/to/singularity-image.img") + singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/singularity-image.img") # Check that os.makedirs was called with the correct arguments mock_makedirs.assert_any_call(f"{tmp_path}/path/to", exist_ok=True) @@ -633,6 +610,7 @@ def test_symlink_singularity_images( mock_symlink.assert_has_calls(expected_calls, any_order=True) @with_temporary_folder + @mock.patch("rich.progress.Progress.add_task") @mock.patch("os.makedirs") @mock.patch("os.symlink") @mock.patch("os.open") @@ -650,6 +628,7 @@ def test_symlink_singularity_images_registry( mock_open, mock_symlink, mock_makedirs, + mock_rich_progress, ): # Setup mock_resub.return_value = "singularity-image.img" @@ -667,11 +646,20 @@ def test_symlink_singularity_images_registry( download_obj.registry_set = {"quay.io", "community-cr-prod.seqera.io/docker/registry/v2"} # Call the method with registry - should not happen, but preserve it then. - download_obj.symlink_singularity_images(f"{tmp_path}/path/to/quay.io-singularity-image.img") + singularity_fetcher = SingularityFetcher( + ( + "quay.io", + "community-cr-prod.seqera.io/docker/registry/v2", + "depot.galaxyproject.org/singularity", + ), + [], + mock_rich_progress, + ) + singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/quay.io-singularity-image.img") print(mock_resub.call_args) # Check that os.makedirs was called with the correct arguments - mock_makedirs.assert_any_call(f"{tmp_path}/path/to", exist_ok=True) + # mock_makedirs.assert_any_call(f"{tmp_path}/path/to", exist_ok=True) # Check that os.symlink was called with the correct arguments mock_symlink.assert_called_with( @@ -737,17 +725,16 @@ def test_gather_registries(self, tmp_path, mock_fetch_wf_config): @with_temporary_folder @mock.patch("rich.progress.Progress.add_task") def test_singularity_pull_image_singularity_not_installed(self, tmp_dir, mock_rich_progress): - download_obj = DownloadWorkflow(pipeline="dummy", outdir=tmp_dir) + singularity_fetcher = SingularityFetcher([], [], mock_rich_progress) with pytest.raises(OSError): - download_obj.singularity_pull_image( - "a-container", f"{tmp_dir}/anothercontainer.sif", None, "quay.io", mock_rich_progress - ) + singularity_fetcher.pull_image("a-container", f"{tmp_dir}/anothercontainer.sif", "quay.io") # # Test for 'singularity_image_filenames' function # @with_temporary_folder - def test_singularity_image_filenames(self, tmp_path): + @mock.patch("rich.progress.Progress.add_task") + def test_singularity_image_filenames(self, tmp_path, mock_rich_progress): os.environ["NXF_SINGULARITY_CACHEDIR"] = f"{tmp_path}/cachedir" download_obj = DownloadWorkflow(pipeline="dummy", outdir=tmp_path) @@ -761,81 +748,70 @@ def test_singularity_image_filenames(self, tmp_path): "community.wave.seqera.io/library", "community-cr-prod.seqera.io/docker/registry/v2", } + singularity_fetcher = SingularityFetcher([], download_obj.registry_set, mock_rich_progress) ## Test phase I: Container not yet cached, should be amended to cache # out_path: str, Path to cache # cache_path: None - result = download_obj.singularity_image_filenames( + result = singularity_fetcher.get_container_filename( "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0" ) - # Assert that the result is a tuple of length 2 - self.assertIsInstance(result, tuple) - self.assertEqual(len(result), 2) - - # Assert that the types of the elements are (str, None) - self.assertTrue(all((isinstance(element, str), element is None) for element in result)) + # Assert that the result is a string + self.assertIsInstance(result, str) # assert that the correct out_path is returned that points to the cache - assert result[0].endswith("/cachedir/bbmap-38.93--he522d1c_0.img") + assert result.endswith("bbmap-38.93--he522d1c_0.img") ## Test phase II: Test various container names # out_path: str, Path to cache # cache_path: None # Test --- mulled containers # - result = download_obj.singularity_image_filenames( + result = singularity_fetcher.get_container_filename( "quay.io/biocontainers/mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2:59cdd445419f14abac76b31dd0d71217994cbcc9-0" ) - assert result[0].endswith( - "/cachedir/biocontainers-mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2-59cdd445419f14abac76b31dd0d71217994cbcc9-0.img" + assert result.endswith( + "biocontainers-mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2-59cdd445419f14abac76b31dd0d71217994cbcc9-0.img" ) # Test --- Docker containers without registry # - result = download_obj.singularity_image_filenames("nf-core/ubuntu:20.04") - assert result[0].endswith("/cachedir/nf-core-ubuntu-20.04.img") + result = singularity_fetcher.get_container_filename("nf-core/ubuntu:20.04") + assert result.endswith("nf-core-ubuntu-20.04.img") # Test --- Docker container with explicit registry -> should be trimmed # - result = download_obj.singularity_image_filenames("docker.io/nf-core/ubuntu:20.04") - assert result[0].endswith("/cachedir/nf-core-ubuntu-20.04.img") + result = singularity_fetcher.get_container_filename("docker.io/nf-core/ubuntu:20.04") + assert result.endswith("nf-core-ubuntu-20.04.img") # Test --- Docker container with explicit registry not in registry set -> can't be trimmed - result = download_obj.singularity_image_filenames("mirage-the-imaginative-registry.io/nf-core/ubuntu:20.04") - assert result[0].endswith("/cachedir/mirage-the-imaginative-registry.io-nf-core-ubuntu-20.04.img") + result = singularity_fetcher.get_container_filename("mirage-the-imaginative-registry.io/nf-core/ubuntu:20.04") + assert result.endswith("mirage-the-imaginative-registry.io-nf-core-ubuntu-20.04.img") # Test --- Seqera Docker containers: Trimmed, because it is hard-coded in the registry set. - result = download_obj.singularity_image_filenames( + result = singularity_fetcher.get_container_filename( "community.wave.seqera.io/library/coreutils:9.5--ae99c88a9b28c264" ) - assert result[0].endswith("/cachedir/coreutils-9.5--ae99c88a9b28c264.img") + assert result.endswith("coreutils-9.5--ae99c88a9b28c264.img") # Test --- Seqera Singularity containers: Trimmed, because it is hard-coded in the registry set. - result = download_obj.singularity_image_filenames( + result = singularity_fetcher.get_container_filename( "https://community-cr-prod.seqera.io/docker/registry/v2/blobs/sha256/c2/c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975/data" ) - assert result[0].endswith( - "cachedir/blobs-sha256-c2-c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975-data.img" + assert result.endswith( + "blobs-sha256-c2-c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975-data.img" ) ## Test phase III: Container will be cached but also copied to out_path # out_path: str, Path to cache # cache_path: str, Path to cache download_obj.container_cache_utilisation = "copy" - result = download_obj.singularity_image_filenames( + result = singularity_fetcher.get_container_filename( "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0" ) self.assertTrue(all(isinstance(element, str) for element in result)) - assert result[0].endswith("/singularity-images/bbmap-38.93--he522d1c_0.img") - assert result[1].endswith("/cachedir/bbmap-38.93--he522d1c_0.img") - - ## Test phase IV: Expect an error if no NXF_SINGULARITY_CACHEDIR is defined - os.environ["NXF_SINGULARITY_CACHEDIR"] = "" - with self.assertRaises(FileNotFoundError): - download_obj.singularity_image_filenames( - "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0" - ) + assert result.endswith("bbmap-38.93--he522d1c_0.img") # # Test for '--singularity-cache remote --singularity-cache-index'. Provide a list of containers already available in a remote location. @@ -867,7 +843,7 @@ def test_remote_container_functionality(self, tmp_dir): # Tests for the main entry method 'download_workflow' # @with_temporary_folder - @mock.patch("nf_core.pipelines.download.DownloadWorkflow.singularity_pull_image") + @mock.patch("nf_core.pipelines.downloads.singularity.SingularityFetcher.pull_image") @mock.patch("shutil.which") def test_download_workflow_with_success(self, tmp_dir, mock_download_image, mock_singularity_installed): os.environ["NXF_SINGULARITY_CACHEDIR"] = "foo" From dc333b39f34ed990c8156281df06391fce731f15 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 22:42:11 +0000 Subject: [PATCH 04/31] downloads.singularity functions should rely on proper parameters, no environment variables --- nf_core/pipelines/download.py | 19 ++++++++++++++----- nf_core/pipelines/downloads/singularity.py | 12 +++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/nf_core/pipelines/download.py b/nf_core/pipelines/download.py index 2b03f65f8c..d93c4a5fad 100644 --- a/nf_core/pipelines/download.py +++ b/nf_core/pipelines/download.py @@ -1069,12 +1069,19 @@ def get_singularity_images(self, current_revision: str = "") -> None: f"Processing workflow revision {current_revision}, found {len(self.containers)} container image{'s' if len(self.containers) > 1 else ''} in total." ) + # Find out what the library directory is + library_dir = os.environ.get("NXF_SINGULARITY_LIBRARYDIR") + if library_dir and not os.path.isdir(library_dir): + # Since the library is read-only, if the directory isn't there, we can forget about it + library_dir = None + + # Find out what the cache directory is + cache_dir = os.environ.get("NXF_SINGULARITY_CACHEDIR") if self.container_cache_utilisation in ["amend", "copy"]: - if os.environ.get("NXF_SINGULARITY_CACHEDIR"): - cache_path_dir = os.environ["NXF_SINGULARITY_CACHEDIR"] - if not os.path.isdir(cache_path_dir): - log.debug(f"Cache directory not found, creating: {cache_path_dir}") - os.makedirs(cache_path_dir) + if cache_dir: + if not os.path.isdir(cache_dir): + log.debug(f"Cache directory not found, creating: {cache_dir}") + os.makedirs(cache_dir) else: raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!") @@ -1098,6 +1105,8 @@ def get_singularity_images(self, current_revision: str = "") -> None: self.containers, out_path_dir, self.containers_remote, + library_dir, + cache_dir, self.container_cache_utilisation == "amend", task, ) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 220c1c24df..9bf58c8e5e 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -5,7 +5,7 @@ import re import shutil import subprocess -from typing import Collection, Container, Iterable, List, Tuple +from typing import Collection, Container, Iterable, List, Optional, Tuple import requests import requests_cache @@ -346,17 +346,11 @@ def fetch_containers( containers: Collection[str], output_dir: str, exclude_list: Container[str], + library_dir: Optional[str], + cache_dir: Optional[str], amend_cachedir: bool, task: rich.progress.TaskID, ): - def check_env_var(name): - dir = os.environ.get(name) - if dir and os.path.exists(dir): - return dir - - library_dir = check_env_var("NXF_SINGULARITY_LIBRARYDIR") - cache_dir = check_env_var("NXF_SINGULARITY_CACHEDIR") - # Check each container in the list and defer actions containers_download: List[Tuple[str, str]] = [] containers_pull: List[Tuple[str, str]] = [] From fbc09ac8def46514635ce7fc2bf5e981c341b892 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 22:42:58 +0000 Subject: [PATCH 05/31] Duplicated with the line above --- nf_core/pipelines/downloads/singularity.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 9bf58c8e5e..921eccf3cb 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -427,7 +427,6 @@ def fetch_containers( # Copy all containers self.progress.update(task, description="Copying singularity images from/to cache") for container, src_path, dest_path in containers_copy: - self.progress.update(task, description="Copying singularity images from cache") self.copy_image(container, src_path, dest_path) self.progress.update(task, advance=1) From fee7182683286b91fcf309104b1a9df512872383 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 23:11:53 +0000 Subject: [PATCH 06/31] Added tests for utils.intermediate_file and fixed it ! --- nf_core/pipelines/downloads/utils.py | 17 ++++++++---- tests/pipelines/test_download.py | 41 ++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 48c3c1d0bf..bc126f76b3 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -16,16 +16,23 @@ class DownloadError(RuntimeError): @contextlib.contextmanager def intermediate_file(output_path): - tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path)) + """Context manager to help ensure the output file is either complete or non-existent. + It does that by creating a temporary file in the same directory as the output file, + letting the caller write to it, and then moving it to the final location. + If an exception is raised, the temporary file is deleted and the output file is not touched. + """ + tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete_on_close=False) try: yield tmp - + tmp.close() + os.rename(tmp.name, output_path) + except Exception: + # Re-raise exception on the main thread + raise finally: if os.path.exists(tmp.name): - log.debug(f"Deleting incomplete singularity image:\n'{tmp.name}'") + log.debug(f"Deleting incomplete singularity image:\n'{output_path}'") os.remove(tmp.name) - # Re-raise exception on the main thread - raise class DownloadProgress(rich.progress.Progress): diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 72648e5dc8..d6c00ecb3b 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -4,6 +4,7 @@ import os import re import shutil +import subprocess import tempfile import unittest from pathlib import Path @@ -17,12 +18,52 @@ import nf_core.utils from nf_core.pipelines.download import DownloadWorkflow, WorkflowRepo from nf_core.pipelines.downloads.singularity import ContainerError, SingularityFetcher +from nf_core.pipelines.downloads.utils import intermediate_file from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd from ..utils import TEST_DATA_DIR, with_temporary_folder +class DownloadUtilsTest(unittest.TestCase): + # + # Tests for 'utils.intermediate_file' + # + @with_temporary_folder + def test_intermediate_file(self, outdir): + # Code that doesn't fail. The file shall exist + output_path = os.path.join(outdir, "testfile1") + with intermediate_file(output_path) as tmp: + tmp_path = tmp.name + tmp.write(b"Hello, World!") + + assert os.path.exists(output_path) + assert os.path.getsize(output_path) == 13 + assert not os.path.exists(tmp_path) + + # Same as above, but with an external command. The file shall exist + output_path = os.path.join(outdir, "testfile2") + with intermediate_file(output_path) as tmp: + tmp_path = tmp.name + subprocess.check_call([f"echo 'Hello, World!' > {tmp_path}"], shell=True) + + assert os.path.exists(output_path) + assert os.path.getsize(output_path) == 14 # Extra \n ! + assert not os.path.exists(tmp_path) + + # Code that fails. The file shall not exist + output_path = os.path.join(outdir, "testfile3") + try: + with intermediate_file(output_path) as tmp: + tmp_path = tmp.name + raise ValueError("This is a test error") + except Exception as e: + assert isinstance(e, ValueError) + + assert not os.path.exists(output_path) + assert not os.path.exists(tmp_path) + + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) def use_caplog(self, caplog): From c507d9e3248a52b2469d40eb9d58acfbb7337804 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Tue, 25 Mar 2025 23:13:27 +0000 Subject: [PATCH 07/31] Simpler implementation --- nf_core/pipelines/downloads/utils.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index bc126f76b3..41fe5458a1 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -21,18 +21,10 @@ def intermediate_file(output_path): letting the caller write to it, and then moving it to the final location. If an exception is raised, the temporary file is deleted and the output file is not touched. """ - tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete_on_close=False) - try: + with tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete_on_close=False) as tmp: yield tmp tmp.close() os.rename(tmp.name, output_path) - except Exception: - # Re-raise exception on the main thread - raise - finally: - if os.path.exists(tmp.name): - log.debug(f"Deleting incomplete singularity image:\n'{output_path}'") - os.remove(tmp.name) class DownloadProgress(rich.progress.Progress): From aba52331c4c096b5b0a94ff63718a58f21338b57 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 00:05:38 +0000 Subject: [PATCH 08/31] Added helper functions to manage the main task so that callers don't have to be aware of the TaskID --- nf_core/pipelines/download.py | 6 ++-- nf_core/pipelines/downloads/singularity.py | 34 ++++++++++------------ nf_core/pipelines/downloads/utils.py | 18 ++++++++++++ 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/nf_core/pipelines/download.py b/nf_core/pipelines/download.py index d93c4a5fad..14780da8cd 100644 --- a/nf_core/pipelines/download.py +++ b/nf_core/pipelines/download.py @@ -1094,11 +1094,10 @@ def get_singularity_images(self, current_revision: str = "") -> None: os.makedirs(out_path_dir) with DownloadProgress() as progress: - task = progress.add_task( - "Collecting container images", + progress.add_main_task( total=len(self.containers), - progress_type="summary", ) + # "Collecting container images", singularity_fetcher = SingularityFetcher(self.container_library, self.registry_set, progress) singularity_fetcher.fetch_containers( @@ -1108,7 +1107,6 @@ def get_singularity_images(self, current_revision: str = "") -> None: library_dir, cache_dir, self.container_cache_utilisation == "amend", - task, ) def compress_download(self): diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 921eccf3cb..b5192dd23f 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -9,10 +9,8 @@ import requests import requests_cache -import rich -import rich.progress -from nf_core.pipelines.downloads.utils import intermediate_file +from nf_core.pipelines.downloads.utils import DownloadProgress, intermediate_file log = logging.getLogger(__name__) @@ -31,7 +29,7 @@ class SingularityFetcher: """ def __init__( - self, container_library: Iterable[str], registry_set: Iterable[str], progress: rich.progress.Progress + self, container_library: Iterable[str], registry_set: Iterable[str], progress: DownloadProgress ) -> None: self.container_library = list(container_library) self.registry_set = registry_set @@ -125,7 +123,6 @@ def symlink_registries(self, image_path: str) -> None: def download_images( self, containers_download: Iterable[Tuple[str, str]], - task: rich.progress.TaskID, parallel_downloads: int, ) -> None: # if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. @@ -144,7 +141,7 @@ def download_images( for future in concurrent.futures.as_completed(future_downloads): future.result() try: - self.progress.update(task, advance=1) + self.progress.update_main_task(advance=1) except Exception as e: log.error(f"Error updating progress bar: {e}") @@ -202,7 +199,7 @@ def download_image(self, container: str, output_path: str) -> None: self.symlink_registries(output_path) self.progress.remove_task(task) - def pull_images(self, containers_pull: Iterable[Tuple[str, str]], task: rich.progress.TaskID) -> None: + def pull_images(self, containers_pull: Iterable[Tuple[str, str]]) -> None: for container, output_path in containers_pull: # it is possible to try multiple registries / mirrors if multiple were specified. # Iteration happens over a copy of self.container_library[:], as I want to be able to remove failing registries for subsequent images. @@ -248,7 +245,7 @@ def pull_images(self, containers_pull: Iterable[Tuple[str, str]], task: rich.pro f"Not able to pull image of {container}. Service might be down or internet connection is dead." ) # Task should advance in any case. Failure to pull will not kill the download process. - self.progress.update(task, advance=1) + self.progress.update_main_task(advance=1) def pull_image(self, container: str, output_path: str, library: str) -> None: """Pull a singularity image using ``singularity pull`` @@ -349,7 +346,6 @@ def fetch_containers( library_dir: Optional[str], cache_dir: Optional[str], amend_cachedir: bool, - task: rich.progress.TaskID, ): # Check each container in the list and defer actions containers_download: List[Tuple[str, str]] = [] @@ -365,7 +361,7 @@ def fetch_containers( # Files in the remote cache are already downloaded and can be ignored if container_filename in exclude_list: log.debug(f"Skipping download of container '{container_filename}' as it is cached remotely.") - self.progress.update(task, advance=1, description=f"Skipping {container_filename}") + self.progress.update_main_task(advance=1, description=f"Skipping {container_filename}") continue # Generate file paths for all three locations @@ -373,7 +369,7 @@ def fetch_containers( if os.path.exists(output_path): log.debug(f"Skipping download of container '{container_filename}' as it is in already present.") - self.progress.update(task, advance=1, description=f"{container_filename} exists at destination") + self.progress.update_main_task(advance=1, description=f"{container_filename} exists at destination") continue library_path = os.path.join(library_dir, container_filename) if library_dir else None @@ -386,7 +382,7 @@ def fetch_containers( if cache_path and amend_cachedir and not os.path.exists(cache_path): containers_copy.append((container, library_path, cache_path)) total_tasks += 1 - self.progress.update(task, total=total_tasks) + self.progress.update_main_task(total=total_tasks) # get the container from the cache elif cache_path and os.path.exists(cache_path): @@ -406,7 +402,7 @@ def fetch_containers( # and copy from the cache to the output containers_copy.append((container, cache_path, output_path)) total_tasks += 1 - self.progress.update(task, total=total_tasks) + self.progress.update_main_task(total=total_tasks) else: # download or pull directly to the output @@ -414,21 +410,21 @@ def fetch_containers( # Download all containers if containers_download: - self.progress.update(task, description="Downloading singularity images") - self.download_images(containers_download, task, parallel_downloads=4) + self.progress.update_main_task(description="Downloading singularity images") + self.download_images(containers_download, parallel_downloads=4) # Pull all containers if containers_pull: if not (shutil.which("singularity") or shutil.which("apptainer")): raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") - self.progress.update(task, description="Pulling singularity images") - self.pull_images(containers_pull, task) + self.progress.update_main_task(description="Pulling singularity images") + self.pull_images(containers_pull) # Copy all containers - self.progress.update(task, description="Copying singularity images from/to cache") + self.progress.update_main_task(description="Copying singularity images from/to cache") for container, src_path, dest_path in containers_copy: self.copy_image(container, src_path, dest_path) - self.progress.update(task, advance=1) + self.progress.update_main_task(advance=1) # Distinct errors for the container download, required for acting on the exceptions diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 41fe5458a1..2b524efd79 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -30,6 +30,7 @@ def intermediate_file(output_path): class DownloadProgress(rich.progress.Progress): """Custom Progress bar class, allowing us to have two progress bars with different columns / layouts. + Also provide helper functions to control the top-level task. """ def get_renderables(self): @@ -59,3 +60,20 @@ def get_renderables(self): rich.progress.BarColumn(bar_width=None), ) yield self.make_tasks_table([task]) + + # These functions allow callers not having to track the main TaskID + # They are pass-through functions to the rich.progress methods + def add_main_task(self, **kwargs) -> rich.progress.TaskID: + """Add a top-level task to the progress bar. + This task will be used to track the overall progress of the container downloads. + """ + self.main_task = self.add_task( + "Container download", + progress_type="summary", + **kwargs, + ) + return self.main_task + + def update_main_task(self, **kwargs): + """Update the top-level task with new information.""" + self.update(self.main_task, **kwargs) From 0403a370365fa1a625e277d114a6280605716861 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 00:07:22 +0000 Subject: [PATCH 09/31] No need to build a temp path here --- nf_core/pipelines/downloads/singularity.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index b5192dd23f..541367440d 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -167,11 +167,7 @@ def download_image(self, container: str, output_path: str) -> None: cache_path (str, None): The NXF_SINGULARITY_CACHEDIR path if set, None if not progress (Progress): Rich progress bar instance to add tasks to. """ - log.debug(f"Downloading Singularity image: '{container}'") - - # Set output path to save file to - output_path_tmp = f"{output_path}.partial" - log.debug(f"Downloading to: '{output_path_tmp}'") + log.debug(f"Downloading Singularity image '{container}' to {output_path}") # Set up progress bar nice_name = container.split("/")[-1][:50] From 159c34e31111174d9acad1140f4a96c395674a6c Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 09:56:07 +0000 Subject: [PATCH 10/31] More tests --- tests/pipelines/test_download.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index d6c00ecb3b..6c76c16334 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -32,6 +32,8 @@ class DownloadUtilsTest(unittest.TestCase): @with_temporary_folder def test_intermediate_file(self, outdir): # Code that doesn't fail. The file shall exist + + # Directly write to the file, as in download_image output_path = os.path.join(outdir, "testfile1") with intermediate_file(output_path) as tmp: tmp_path = tmp.name @@ -41,7 +43,7 @@ def test_intermediate_file(self, outdir): assert os.path.getsize(output_path) == 13 assert not os.path.exists(tmp_path) - # Same as above, but with an external command. The file shall exist + # Run an external command as in pull_image output_path = os.path.join(outdir, "testfile2") with intermediate_file(output_path) as tmp: tmp_path = tmp.name @@ -52,10 +54,13 @@ def test_intermediate_file(self, outdir): assert not os.path.exists(tmp_path) # Code that fails. The file shall not exist + + # Directly write to the file and raise an exception output_path = os.path.join(outdir, "testfile3") try: with intermediate_file(output_path) as tmp: tmp_path = tmp.name + tmp.write(b"Hello, World!") raise ValueError("This is a test error") except Exception as e: assert isinstance(e, ValueError) @@ -63,6 +68,19 @@ def test_intermediate_file(self, outdir): assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) + # Run an external command and raise an exception + output_path = os.path.join(outdir, "testfile4") + try: + with intermediate_file(output_path) as tmp: + tmp_path = tmp.name + subprocess.check_call([f"echo 'Hello, World!' > {tmp_path}"], shell=True) + subprocess.check_call(["ls", "/dummy"]) + except Exception as e: + assert isinstance(e, subprocess.CalledProcessError) + + assert not os.path.exists(output_path) + assert not os.path.exists(tmp_path) + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) From 693da6414bfb6659bc090973e99e5ebfe9e41bb4 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 10:47:34 +0000 Subject: [PATCH 11/31] Fixed the symlink_registries tests --- nf_core/pipelines/downloads/singularity.py | 2 +- tests/pipelines/test_download.py | 32 +++++++--------------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 541367440d..6ed61c991b 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -86,7 +86,7 @@ def symlink_registries(self, image_path: str) -> None: The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img by prepending all registries in registry_set to the image name. - Unfortunately, out output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org + Unfortunately, the output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped before to ensure that it is really the base name. """ diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 6c76c16334..ca9a020188 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -632,12 +632,12 @@ def test_symlink_singularity_images( # Call the method singularity_fetcher = SingularityFetcher( + [], ( "quay.io", "community-cr-prod.seqera.io/docker/registry/v2", "depot.galaxyproject.org/singularity", ), - [], mock_rich_progress, ) singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/singularity-image.img") @@ -696,41 +696,29 @@ def test_symlink_singularity_images_registry( mock_open.return_value = 12 # file descriptor mock_close.return_value = 12 # file descriptor - download_obj = DownloadWorkflow( - pipeline="dummy", - outdir=tmp_path, - container_library=("quay.io", "community-cr-prod.seqera.io/docker/registry/v2"), - ) - - download_obj.registry_set = {"quay.io", "community-cr-prod.seqera.io/docker/registry/v2"} - - # Call the method with registry - should not happen, but preserve it then. + # Call the method with registry name included - should not happen, but preserve it then. singularity_fetcher = SingularityFetcher( + [], ( - "quay.io", + "quay.io", # Same as in the filename "community-cr-prod.seqera.io/docker/registry/v2", - "depot.galaxyproject.org/singularity", ), - [], mock_rich_progress, ) singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/quay.io-singularity-image.img") - print(mock_resub.call_args) # Check that os.makedirs was called with the correct arguments - # mock_makedirs.assert_any_call(f"{tmp_path}/path/to", exist_ok=True) + mock_makedirs.assert_called_once_with(f"{tmp_path}/path/to", exist_ok=True) # Check that os.symlink was called with the correct arguments - mock_symlink.assert_called_with( + # assert_called_once_with also tells us that there was no attempt to + # - symlink to itself + # - symlink to the same registry + mock_symlink.assert_called_once_with( "./quay.io-singularity-image.img", - "./community-cr-prod.seqera.io-docker-registry-v2-singularity-image.img", + "./community-cr-prod.seqera.io-docker-registry-v2-singularity-image.img", # "quay.io-" has been trimmed dir_fd=12, ) - # Check that there is no attempt to symlink to itself (test parameters would result in that behavior if not checked in the function) - assert ( - unittest.mock.call("./quay.io-singularity-image.img", "./quay.io-singularity-image.img", dir_fd=12) - not in mock_symlink.call_args_list - ) # Normally it would be called for each registry, but since quay.io is part of the name, it # will only be called once, as no symlink to itself must be created. From c5379fb24a9426964a9dac01fcf0c0b2c01e86ae Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 11:40:18 +0000 Subject: [PATCH 12/31] Rewrote to support older versions of Python --- nf_core/pipelines/downloads/utils.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 2b524efd79..d8fc87ca0c 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -21,10 +21,15 @@ def intermediate_file(output_path): letting the caller write to it, and then moving it to the final location. If an exception is raised, the temporary file is deleted and the output file is not touched. """ - with tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete_on_close=False) as tmp: + tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete=False) + try: yield tmp tmp.close() os.rename(tmp.name, output_path) + except: + if os.path.exists(tmp.name): + os.unlink(tmp.name) + raise class DownloadProgress(rich.progress.Progress): From 8ee8eed8b845566ab8bbcb9b406450702c8c1605 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 14:12:02 +0000 Subject: [PATCH 13/31] Added a dest for the main_task-management methods added to DownloadProgress --- tests/pipelines/test_download.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index ca9a020188..5e5531a88b 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -18,7 +18,7 @@ import nf_core.utils from nf_core.pipelines.download import DownloadWorkflow, WorkflowRepo from nf_core.pipelines.downloads.singularity import ContainerError, SingularityFetcher -from nf_core.pipelines.downloads.utils import intermediate_file +from nf_core.pipelines.downloads.utils import DownloadProgress, intermediate_file from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd @@ -81,6 +81,29 @@ def test_intermediate_file(self, outdir): assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) + def test_download_progress_main_task(self): + with DownloadProgress() as progress: + # No task initially + assert progress.tasks == [] + + # Add a task, it should be there + task_id = progress.add_main_task(total=42) + assert task_id == 0 + assert len(progress.tasks) == 1 + assert progress.task_ids[0] == task_id + assert progress.tasks[0].total == 42 + + # Add another task, there should now be two + other_task_id = progress.add_task("Another task", total=28) + assert other_task_id == 1 + assert len(progress.tasks) == 2 + assert progress.task_ids[1] == other_task_id + assert progress.tasks[1].total == 28 + + progress.update_main_task(total=35) + assert progress.tasks[0].total == 35 + assert progress.tasks[1].total == 28 + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) From 71ab538bc43b88c53853bda77f7c2d1f81894feb Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 15:54:29 +0000 Subject: [PATCH 14/31] Added some tests for the DownloadProgress object --- tests/pipelines/test_download.py | 93 ++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 5e5531a88b..fce279cbb0 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -12,6 +12,9 @@ from unittest import mock import pytest +import rich.progress_bar +import rich.table +import rich.text import nf_core.pipelines.create.create import nf_core.pipelines.list @@ -104,6 +107,96 @@ def test_download_progress_main_task(self): assert progress.tasks[0].total == 35 assert progress.tasks[1].total == 28 + def test_download_progress_renderables(self): + # Test the "singularity_pull" progress type + with DownloadProgress() as progress: + assert progress.tasks == [] + progress.add_task( + "Task 1", progress_type="singularity_pull", total=42, completed=11, current_log="example log" + ) + assert len(progress.tasks) == 1 + + renderable = progress.get_renderable() + assert isinstance(renderable, rich.console.Group), type(renderable) + + assert len(renderable.renderables) == 1 + table = renderable.renderables[0] + assert isinstance(table, rich.table.Table) + + assert isinstance(table.columns[0]._cells[0], str) + assert table.columns[0]._cells[0] == "[magenta]Task 1" + + assert isinstance(table.columns[1]._cells[0], str) + assert table.columns[1]._cells[0] == "[blue]example log" + + assert isinstance(table.columns[2]._cells[0], rich.progress_bar.ProgressBar) + assert table.columns[2]._cells[0].completed == 11 + assert table.columns[2]._cells[0].total == 42 + + # Test the "summary" progress type + with DownloadProgress() as progress: + assert progress.tasks == [] + progress.add_task("Task 1", progress_type="summary", total=42, completed=11) + assert len(progress.tasks) == 1 + + renderable = progress.get_renderable() + assert isinstance(renderable, rich.console.Group), type(renderable) + + assert len(renderable.renderables) == 1 + table = renderable.renderables[0] + assert isinstance(table, rich.table.Table) + + assert isinstance(table.columns[0]._cells[0], str) + assert table.columns[0]._cells[0] == "[magenta]Task 1" + + assert isinstance(table.columns[1]._cells[0], rich.progress_bar.ProgressBar) + assert table.columns[1]._cells[0].completed == 11 + assert table.columns[1]._cells[0].total == 42 + + assert isinstance(table.columns[2]._cells[0], str) + assert table.columns[2]._cells[0] == "[progress.percentage] 26%" + + assert isinstance(table.columns[3]._cells[0], str) + assert table.columns[3]._cells[0] == "•" + + assert isinstance(table.columns[4]._cells[0], str) + assert table.columns[4]._cells[0] == "[green]11/42 completed" + + # Test the "download" progress type + with DownloadProgress() as progress: + assert progress.tasks == [] + progress.add_task("Task 1", progress_type="download", total=42, completed=11) + assert len(progress.tasks) == 1 + + renderable = progress.get_renderable() + assert isinstance(renderable, rich.console.Group), type(renderable) + + assert len(renderable.renderables) == 1 + table = renderable.renderables[0] + assert isinstance(table, rich.table.Table) + + assert isinstance(table.columns[0]._cells[0], str) + assert table.columns[0]._cells[0] == "[blue]Task 1" + + assert isinstance(table.columns[1]._cells[0], rich.progress_bar.ProgressBar) + assert table.columns[1]._cells[0].completed == 11 + assert table.columns[1]._cells[0].total == 42 + + assert isinstance(table.columns[2]._cells[0], str) + assert table.columns[2]._cells[0] == "[progress.percentage]26.2%" + + assert isinstance(table.columns[3]._cells[0], str) + assert table.columns[3]._cells[0] == "•" + + assert isinstance(table.columns[4]._cells[0], rich.text.Text) + assert table.columns[4]._cells[0]._text == ["11/42 bytes"] + + assert isinstance(table.columns[5]._cells[0], str) + assert table.columns[5]._cells[0] == "•" + + assert isinstance(table.columns[6]._cells[0], rich.text.Text) + assert table.columns[6]._cells[0]._text == ["?"] + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) From a141820b6354da99a51930bb1a9cb628d73dca61 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 17:52:08 +0000 Subject: [PATCH 15/31] Turned two methods into functions, simplified testing --- nf_core/pipelines/downloads/singularity.py | 149 +++++++++++---------- tests/pipelines/test_download.py | 138 ++++++++----------- 2 files changed, 133 insertions(+), 154 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 6ed61c991b..bcadc54d0b 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -15,6 +15,83 @@ log = logging.getLogger(__name__) +def get_container_filename(container: str, registries: Iterable[str]) -> str: + """Return the expected filename for a container.""" + + # Generate file paths + # Based on simpleName() function in Nextflow code: + # https://github.com/nextflow-io/nextflow/blob/671ae6d85df44f906747c16f6d73208dbc402d49/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L69-L94 + out_name = container + # Strip URI prefix + out_name = re.sub(r"^.*:\/\/", "", out_name) + # Detect file extension + extension = ".img" + if ".sif:" in out_name: + extension = ".sif" + out_name = out_name.replace(".sif:", "-") + elif out_name.endswith(".sif"): + extension = ".sif" + out_name = out_name[:-4] + # Strip : and / characters + out_name = out_name.replace("/", "-").replace(":", "-") + # Add file extension + out_name = out_name + extension + + # Trim potential registries from the name for consistency. + # This will allow pipelines to work offline without symlinked images, + # if docker.registry / singularity.registry are set to empty strings at runtime, which can be included in the HPC config profiles easily. + if registries: + # Create a regex pattern from the set of registries + trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in registries) + # Use the pattern to trim the string + out_name = re.sub(f"{trim_pattern}", "", out_name) + + return out_name + + +def symlink_registries(image_path: str, registries: Iterable[str]) -> None: + """Create a symlink for each registry in the registry set that points to the image. + We have dropped the explicit registries from the modules in favor of the configurable registries. + Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. + + The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img + by prepending all registries in registry_set to the image name. + + Unfortunately, the output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org + or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped + before to ensure that it is really the base name. + """ + + # Create a regex pattern from the set, in case trimming is needed. + trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in registries) + + for registry in registries: + # Nextflow will convert it like this as well, so we need it mimic its behavior + registry = registry.replace("/", "-") + + if not bool(re.search(trim_pattern, os.path.basename(image_path))): + symlink_name = os.path.join("./", f"{registry}-{os.path.basename(image_path)}") + else: + trimmed_name = re.sub(f"{trim_pattern}", "", os.path.basename(image_path)) + symlink_name = os.path.join("./", f"{registry}-{trimmed_name}") + + symlink_full = os.path.join(os.path.dirname(image_path), symlink_name) + target_name = os.path.join("./", os.path.basename(image_path)) + + if not os.path.exists(symlink_full) and target_name != symlink_name: + os.makedirs(os.path.dirname(symlink_full), exist_ok=True) + image_dir = os.open(os.path.dirname(image_path), os.O_RDONLY) + try: + os.symlink( + target_name, + symlink_name, + dir_fd=image_dir, + ) + log.debug(f"Symlinked {target_name} as {symlink_name}.") + finally: + os.close(image_dir) + + class SingularityFetcher: """Class to manage all Singularity operations for fetching containers. @@ -37,46 +114,8 @@ def __init__( self.kill_with_fire = False def get_container_filename(self, container: str) -> str: - """Check Singularity cache for image, copy to destination folder if found. - - Args: - container (str): A pipeline's container name. Can be direct download URL - or a Docker Hub repository ID. - - Returns: - # TODO - tuple (str, str): Returns a tuple of (out_path, cache_path). - """ - - # Generate file paths - # Based on simpleName() function in Nextflow code: - # https://github.com/nextflow-io/nextflow/blob/671ae6d85df44f906747c16f6d73208dbc402d49/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L69-L94 - out_name = container - # Strip URI prefix - out_name = re.sub(r"^.*:\/\/", "", out_name) - # Detect file extension - extension = ".img" - if ".sif:" in out_name: - extension = ".sif" - out_name = out_name.replace(".sif:", "-") - elif out_name.endswith(".sif"): - extension = ".sif" - out_name = out_name[:-4] - # Strip : and / characters - out_name = out_name.replace("/", "-").replace(":", "-") - # Add file extension - out_name = out_name + extension - - # Trim potential registries from the name for consistency. - # This will allow pipelines to work offline without symlinked images, - # if docker.registry / singularity.registry are set to empty strings at runtime, which can be included in the HPC config profiles easily. - if self.registry_set: - # Create a regex pattern from the set of registries - trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) - # Use the pattern to trim the string - out_name = re.sub(f"{trim_pattern}", "", out_name) - - return out_name + """Return the expected filename for a container.""" + return get_container_filename(container, self.registry_set) def symlink_registries(self, image_path: str) -> None: """Create a symlink for each registry in the registry set that points to the image. @@ -90,35 +129,7 @@ def symlink_registries(self, image_path: str) -> None: or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped before to ensure that it is really the base name. """ - - # Create a regex pattern from the set, in case trimming is needed. - trim_pattern = "|".join(f"^{re.escape(registry)}-?".replace("/", "[/-]") for registry in self.registry_set) - - for registry in self.registry_set: - # Nextflow will convert it like this as well, so we need it mimic its behavior - registry = registry.replace("/", "-") - - if not bool(re.search(trim_pattern, os.path.basename(image_path))): - symlink_name = os.path.join("./", f"{registry}-{os.path.basename(image_path)}") - else: - trimmed_name = re.sub(f"{trim_pattern}", "", os.path.basename(image_path)) - symlink_name = os.path.join("./", f"{registry}-{trimmed_name}") - - symlink_full = os.path.join(os.path.dirname(image_path), symlink_name) - target_name = os.path.join("./", os.path.basename(image_path)) - - if not os.path.exists(symlink_full) and target_name != symlink_name: - os.makedirs(os.path.dirname(symlink_full), exist_ok=True) - image_dir = os.open(os.path.dirname(image_path), os.O_RDONLY) - try: - os.symlink( - target_name, - symlink_name, - dir_fd=image_dir, - ) - log.debug(f"Symlinked {target_name} as {symlink_name}.") - finally: - os.close(image_dir) + return symlink_registries(image_path, self.registry_set) def download_images( self, diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index fce279cbb0..e5eb8beba7 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -20,7 +20,12 @@ import nf_core.pipelines.list import nf_core.utils from nf_core.pipelines.download import DownloadWorkflow, WorkflowRepo -from nf_core.pipelines.downloads.singularity import ContainerError, SingularityFetcher +from nf_core.pipelines.downloads.singularity import ( + ContainerError, + SingularityFetcher, + get_container_filename, + symlink_registries, +) from nf_core.pipelines.downloads.utils import DownloadProgress, intermediate_file from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd @@ -721,8 +726,12 @@ def test_get_singularity_images(self, tmp_path, mock_fetch_wf_config): # Test that they are all caught inside get_singularity_images(). download_obj.get_singularity_images() + # + # Tests for 'singularity.symlink_registries' function + # + + # Simple file name with no registry in it @with_temporary_folder - @mock.patch("rich.progress.Progress.add_task") @mock.patch("os.makedirs") @mock.patch("os.symlink") @mock.patch("os.open") @@ -738,7 +747,6 @@ def test_symlink_singularity_images( mock_open, mock_symlink, mock_makedirs, - mock_rich_progress, ): # Setup mock_dirname.return_value = f"{tmp_path}/path/to" @@ -746,17 +754,13 @@ def test_symlink_singularity_images( mock_open.return_value = 12 # file descriptor mock_close.return_value = 12 # file descriptor - # Call the method - singularity_fetcher = SingularityFetcher( - [], - ( - "quay.io", - "community-cr-prod.seqera.io/docker/registry/v2", - "depot.galaxyproject.org/singularity", - ), - mock_rich_progress, - ) - singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/singularity-image.img") + registries = [ + "quay.io", + "community-cr-prod.seqera.io/docker/registry/v2", + "depot.galaxyproject.org/singularity", + ] + + symlink_registries(f"{tmp_path}/path/to/singularity-image.img", registries) # Check that os.makedirs was called with the correct arguments mock_makedirs.assert_any_call(f"{tmp_path}/path/to", exist_ok=True) @@ -784,8 +788,8 @@ def test_symlink_singularity_images( ] mock_symlink.assert_has_calls(expected_calls, any_order=True) + # File name with registry in it @with_temporary_folder - @mock.patch("rich.progress.Progress.add_task") @mock.patch("os.makedirs") @mock.patch("os.symlink") @mock.patch("os.open") @@ -793,7 +797,7 @@ def test_symlink_singularity_images( @mock.patch("re.sub") @mock.patch("os.path.basename") @mock.patch("os.path.dirname") - def test_symlink_singularity_images_registry( + def test_symlink_singularity_symlink_registries( self, tmp_path, mock_dirname, @@ -803,7 +807,6 @@ def test_symlink_singularity_images_registry( mock_open, mock_symlink, mock_makedirs, - mock_rich_progress, ): # Setup mock_resub.return_value = "singularity-image.img" @@ -813,15 +816,11 @@ def test_symlink_singularity_images_registry( mock_close.return_value = 12 # file descriptor # Call the method with registry name included - should not happen, but preserve it then. - singularity_fetcher = SingularityFetcher( - [], - ( - "quay.io", # Same as in the filename - "community-cr-prod.seqera.io/docker/registry/v2", - ), - mock_rich_progress, - ) - singularity_fetcher.symlink_registries(f"{tmp_path}/path/to/quay.io-singularity-image.img") + registries = [ + "quay.io", # Same as in the filename + "community-cr-prod.seqera.io/docker/registry/v2", + ] + symlink_registries(f"{tmp_path}/path/to/quay.io-singularity-image.img", registries) # Check that os.makedirs was called with the correct arguments mock_makedirs.assert_called_once_with(f"{tmp_path}/path/to", exist_ok=True) @@ -893,88 +892,57 @@ def test_singularity_pull_image_singularity_not_installed(self, tmp_dir, mock_ri singularity_fetcher.pull_image("a-container", f"{tmp_dir}/anothercontainer.sif", "quay.io") # - # Test for 'singularity_image_filenames' function + # Test for 'singularity.get_container_filename' function # - @with_temporary_folder - @mock.patch("rich.progress.Progress.add_task") - def test_singularity_image_filenames(self, tmp_path, mock_rich_progress): - os.environ["NXF_SINGULARITY_CACHEDIR"] = f"{tmp_path}/cachedir" - - download_obj = DownloadWorkflow(pipeline="dummy", outdir=tmp_path) - download_obj.outdir = tmp_path - download_obj.container_cache_utilisation = "amend" - - download_obj.registry_set = { + def test_singularity_get_container_filename(self): + registries = [ "docker.io", "quay.io", "depot.galaxyproject.org/singularity", "community.wave.seqera.io/library", "community-cr-prod.seqera.io/docker/registry/v2", - } - singularity_fetcher = SingularityFetcher([], download_obj.registry_set, mock_rich_progress) - - ## Test phase I: Container not yet cached, should be amended to cache - # out_path: str, Path to cache - # cache_path: None + ] - result = singularity_fetcher.get_container_filename( - "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0" + # Test --- galaxy URL # + result = get_container_filename( + "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0", + registries, ) - # Assert that the result is a string - self.assertIsInstance(result, str) - - # assert that the correct out_path is returned that points to the cache - assert result.endswith("bbmap-38.93--he522d1c_0.img") - - ## Test phase II: Test various container names - # out_path: str, Path to cache - # cache_path: None + assert result == "bbmap-38.93--he522d1c_0.img" # Test --- mulled containers # - result = singularity_fetcher.get_container_filename( - "quay.io/biocontainers/mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2:59cdd445419f14abac76b31dd0d71217994cbcc9-0" + result = get_container_filename( + "quay.io/biocontainers/mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2:59cdd445419f14abac76b31dd0d71217994cbcc9-0", + registries, ) - assert result.endswith( - "biocontainers-mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2-59cdd445419f14abac76b31dd0d71217994cbcc9-0.img" + assert ( + result + == "biocontainers-mulled-v2-1fa26d1ce03c295fe2fdcf85831a92fbcbd7e8c2-59cdd445419f14abac76b31dd0d71217994cbcc9-0.img" ) # Test --- Docker containers without registry # - result = singularity_fetcher.get_container_filename("nf-core/ubuntu:20.04") - assert result.endswith("nf-core-ubuntu-20.04.img") + result = get_container_filename("nf-core/ubuntu:20.04", registries) + assert result == "nf-core-ubuntu-20.04.img" # Test --- Docker container with explicit registry -> should be trimmed # - result = singularity_fetcher.get_container_filename("docker.io/nf-core/ubuntu:20.04") - assert result.endswith("nf-core-ubuntu-20.04.img") + result = get_container_filename("docker.io/nf-core/ubuntu:20.04", registries) + assert result == "nf-core-ubuntu-20.04.img" - # Test --- Docker container with explicit registry not in registry set -> can't be trimmed - result = singularity_fetcher.get_container_filename("mirage-the-imaginative-registry.io/nf-core/ubuntu:20.04") - assert result.endswith("mirage-the-imaginative-registry.io-nf-core-ubuntu-20.04.img") + # Test --- Docker container with explicit registry not in registry list -> can't be trimmed + result = get_container_filename("mirage-the-imaginative-registry.io/nf-core/ubuntu:20.04", registries) + assert result == "mirage-the-imaginative-registry.io-nf-core-ubuntu-20.04.img" # Test --- Seqera Docker containers: Trimmed, because it is hard-coded in the registry set. - result = singularity_fetcher.get_container_filename( - "community.wave.seqera.io/library/coreutils:9.5--ae99c88a9b28c264" - ) - assert result.endswith("coreutils-9.5--ae99c88a9b28c264.img") + result = get_container_filename("community.wave.seqera.io/library/coreutils:9.5--ae99c88a9b28c264", registries) + assert result == "coreutils-9.5--ae99c88a9b28c264.img" # Test --- Seqera Singularity containers: Trimmed, because it is hard-coded in the registry set. - result = singularity_fetcher.get_container_filename( - "https://community-cr-prod.seqera.io/docker/registry/v2/blobs/sha256/c2/c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975/data" - ) - assert result.endswith( - "blobs-sha256-c2-c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975-data.img" - ) - - ## Test phase III: Container will be cached but also copied to out_path - # out_path: str, Path to cache - # cache_path: str, Path to cache - download_obj.container_cache_utilisation = "copy" - result = singularity_fetcher.get_container_filename( - "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0" + result = get_container_filename( + "https://community-cr-prod.seqera.io/docker/registry/v2/blobs/sha256/c2/c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975/data", + registries, ) - - self.assertTrue(all(isinstance(element, str) for element in result)) - assert result.endswith("bbmap-38.93--he522d1c_0.img") + assert result == "blobs-sha256-c2-c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975-data.img" # # Test for '--singularity-cache remote --singularity-cache-index'. Provide a list of containers already available in a remote location. From 4133c905279279acaf2b3fd688445994460ddf7e Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 17:54:30 +0000 Subject: [PATCH 16/31] Test comments --- tests/pipelines/test_download.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index e5eb8beba7..bed24bac45 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -35,7 +35,7 @@ class DownloadUtilsTest(unittest.TestCase): # - # Tests for 'utils.intermediate_file' + # Test for 'utils.intermediate_file' # @with_temporary_folder def test_intermediate_file(self, outdir): @@ -89,6 +89,9 @@ def test_intermediate_file(self, outdir): assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) + # + # Test for 'utils.DownloadProgress.add/update_main_task' + # def test_download_progress_main_task(self): with DownloadProgress() as progress: # No task initially @@ -112,6 +115,9 @@ def test_download_progress_main_task(self): assert progress.tasks[0].total == 35 assert progress.tasks[1].total == 28 + # + # Test for 'utils.DownloadProgress.get_renderables' + # def test_download_progress_renderables(self): # Test the "singularity_pull" progress type with DownloadProgress() as progress: From 2b8fbd6fa68d7527dcbefb41426b8b7c88978355 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Wed, 26 Mar 2025 17:59:58 +0000 Subject: [PATCH 17/31] Theoretically it could be None, so check it really is a string --- tests/pipelines/test_download.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index bed24bac45..25fd408c28 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -1032,6 +1032,7 @@ def test_download_workflow_for_platform(self, tmp_dir, _): assert isinstance(download_obj.wf_download_url, dict) and len(download_obj.wf_download_url) == 0 # The outdir for multiple revisions is the pipeline name and date: e.g. nf-core-rnaseq_2023-04-27_18-54 + assert isinstance(download_obj.outdir, str) assert bool(re.search(r"nf-core-rnaseq_\d{4}-\d{2}-\d{1,2}_\d{1,2}-\d{1,2}", download_obj.outdir, re.S)) download_obj.output_filename = f"{download_obj.outdir}.git" From 7813f3032872a7ca27159d73f4add373b029490e Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 09:28:34 +0000 Subject: [PATCH 18/31] More type hints --- nf_core/pipelines/downloads/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index d8fc87ca0c..e52a305dc2 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -2,8 +2,10 @@ import logging import os import tempfile +from typing import Generator, Iterable import rich.progress +import rich.table log = logging.getLogger(__name__) @@ -15,7 +17,7 @@ class DownloadError(RuntimeError): @contextlib.contextmanager -def intermediate_file(output_path): +def intermediate_file(output_path: str) -> Generator[tempfile._TemporaryFileWrapper, None, None]: """Context manager to help ensure the output file is either complete or non-existent. It does that by creating a temporary file in the same directory as the output file, letting the caller write to it, and then moving it to the final location. @@ -38,7 +40,8 @@ class DownloadProgress(rich.progress.Progress): Also provide helper functions to control the top-level task. """ - def get_renderables(self): + def get_renderables(self) -> Generator[rich.table.Table, None, None]: + self.columns: Iterable[str | rich.progress.ProgressColumn] for task in self.tasks: if task.fields.get("progress_type") == "summary": self.columns = ( @@ -79,6 +82,6 @@ def add_main_task(self, **kwargs) -> rich.progress.TaskID: ) return self.main_task - def update_main_task(self, **kwargs): + def update_main_task(self, **kwargs) -> None: """Update the top-level task with new information.""" self.update(self.main_task, **kwargs) From cb71d42fe2446cfc1b1d10dfaf9b92f721f175d8 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 10:46:29 +0000 Subject: [PATCH 19/31] More tests for get_container_filename --- tests/pipelines/test_download.py | 38 +++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 25fd408c28..e866d2aa7c 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -909,12 +909,18 @@ def test_singularity_get_container_filename(self): "community-cr-prod.seqera.io/docker/registry/v2", ] + # Test --- galaxy URL but no registry given # + result = get_container_filename( + "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0", + [], + ) + assert result == "depot.galaxyproject.org-singularity-bbmap-38.93--he522d1c_0.img" + # Test --- galaxy URL # result = get_container_filename( "https://depot.galaxyproject.org/singularity/bbmap:38.93--he522d1c_0", registries, ) - assert result == "bbmap-38.93--he522d1c_0.img" # Test --- mulled containers # @@ -950,6 +956,36 @@ def test_singularity_get_container_filename(self): ) assert result == "blobs-sha256-c2-c262fc09eca59edb5a724080eeceb00fb06396f510aefb229c2d2c6897e63975-data.img" + # Test --- Seqera Oras containers: Trimmed, because it is hard-coded in the registry set. + result = get_container_filename( + "oras://community.wave.seqera.io/library/umi-transfer:1.0.0--e5b0c1a65b8173b6", + registries, + ) + assert result == "umi-transfer-1.0.0--e5b0c1a65b8173b6.img" + + # Test --- SIF Singularity container with explicit registry -> should be trimmed # + result = get_container_filename( + "docker.io-hashicorp-vault-1.16-sha256:e139ff28c23e1f22a6e325696318141259b177097d8e238a3a4c5b84862fadd8.sif", + registries, + ) + assert ( + result == "hashicorp-vault-1.16-sha256-e139ff28c23e1f22a6e325696318141259b177097d8e238a3a4c5b84862fadd8.sif" + ) + + # Test --- SIF Singularity container without registry # + result = get_container_filename( + "singularity-hpc/shpc/tests/testdata/salad_latest.sif", + registries, + ) + assert result == "singularity-hpc-shpc-tests-testdata-salad_latest.sif" + + # Test --- Singularity container from a Singularity registry (and version tag) # + result = get_container_filename( + "library://pditommaso/foo/bar.sif:latest", + registries, + ) + assert result == "pditommaso-foo-bar-latest.sif" + # # Test for '--singularity-cache remote --singularity-cache-index'. Provide a list of containers already available in a remote location. # From 2d8cb4def30951c60f96abaf1b08b92c5174e592 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 12:02:57 +0000 Subject: [PATCH 20/31] No need to keep a wraper as a method --- nf_core/pipelines/downloads/singularity.py | 26 ++++------------------ 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index bcadc54d0b..4d934fd6e1 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -113,24 +113,6 @@ def __init__( self.progress = progress self.kill_with_fire = False - def get_container_filename(self, container: str) -> str: - """Return the expected filename for a container.""" - return get_container_filename(container, self.registry_set) - - def symlink_registries(self, image_path: str) -> None: - """Create a symlink for each registry in the registry set that points to the image. - We have dropped the explicit registries from the modules in favor of the configurable registries. - Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. - - The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img - by prepending all registries in registry_set to the image name. - - Unfortunately, the output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org - or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped - before to ensure that it is really the base name. - """ - return symlink_registries(image_path, self.registry_set) - def download_images( self, containers_download: Iterable[Tuple[str, str]], @@ -203,7 +185,7 @@ def download_image(self, container: str, output_path: str) -> None: self.progress.update(task, advance=len(data)) fh.write(data) - self.symlink_registries(output_path) + symlink_registries(output_path, self.registry_set) self.progress.remove_task(task) def pull_images(self, containers_pull: Iterable[Tuple[str, str]]) -> None: @@ -332,7 +314,7 @@ def pull_image(self, container: str, output_path: str, library: str) -> None: error_msg=lines, ) - self.symlink_registries(output_path) + symlink_registries(output_path, self.registry_set) self.progress.remove_task(task) def copy_image(self, container: str, src_path: str, dest_path: str) -> None: @@ -343,7 +325,7 @@ def copy_image(self, container: str, src_path: str, dest_path: str) -> None: shutil.copyfile(src_path, dest_path_tmp.name) # Create symlinks to ensure that the images are found even with different registries being used. - self.symlink_registries(dest_path) + symlink_registries(dest_path, self.registry_set) def fetch_containers( self, @@ -363,7 +345,7 @@ def fetch_containers( total_tasks = len(containers) for container in containers: - container_filename = self.get_container_filename(container) + container_filename = get_container_filename(container, self.registry_set) # Files in the remote cache are already downloaded and can be ignored if container_filename in exclude_list: From 4bfb45e44d7d00288004e518f3673c7810ed2e6c Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 12:15:11 +0000 Subject: [PATCH 21/31] Better comments --- nf_core/pipelines/downloads/singularity.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 4d934fd6e1..9195edb21d 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -15,8 +15,23 @@ log = logging.getLogger(__name__) +# We have dropped the explicit registries from the modules in favor of the configurable registries. +# Unfortunately, Nextflow still expects the registry to be part of the file name, so we need functions +# to support accessing container images with different registries (or no registry). + + def get_container_filename(container: str, registries: Iterable[str]) -> str: - """Return the expected filename for a container.""" + """Return the expected filename for a container. + + Supports docker, http, oras, and singularity URIs in `container`. + + Registry names provided in `registries` are removed from the filename to ensure that the same image + is used regardless of the registry. Only registry names that are part of `registries` are considered. + If the image name contains another registry, it will be kept in the filename. + + For instance, docker.io/nf-core/ubuntu:20.04 will be nf-core-ubuntu-20.04.img *only* if the registry + contains "docker.io". + """ # Generate file paths # Based on simpleName() function in Nextflow code: @@ -51,11 +66,9 @@ def get_container_filename(container: str, registries: Iterable[str]) -> str: def symlink_registries(image_path: str, registries: Iterable[str]) -> None: """Create a symlink for each registry in the registry set that points to the image. - We have dropped the explicit registries from the modules in favor of the configurable registries. - Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. The base image, e.g. ./nf-core-gatk-4.4.0.0.img will thus be symlinked as for example ./quay.io-nf-core-gatk-4.4.0.0.img - by prepending all registries in registry_set to the image name. + by prepending each registry in `registries` to the image name. Unfortunately, the output image name may contain a registry definition (Singularity image pulled from depot.galaxyproject.org or older pipeline version, where the docker registry was part of the image name in the modules). Hence, it must be stripped From d6b8f6c1d211a6ee19227b2f02e36dc06364294e Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 12:54:31 +0000 Subject: [PATCH 22/31] New class to support the downloads --- nf_core/pipelines/downloads/singularity.py | 87 ++++++++++++++-------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 9195edb21d..bd87a429d2 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -5,7 +5,7 @@ import re import shutil import subprocess -from typing import Collection, Container, Iterable, List, Optional, Tuple +from typing import Collection, Container, Generator, Iterable, List, Optional, Tuple import requests import requests_cache @@ -105,33 +105,21 @@ def symlink_registries(image_path: str, registries: Iterable[str]) -> None: os.close(image_dir) -class SingularityFetcher: - """Class to manage all Singularity operations for fetching containers. +class SingularityImageDownloader: + """Class to manage http downloads of Singularity images. - The guiding principles are that: - - Container download/pull/copy methods are unaware of the concepts of - "library" and "cache". They are just told to fetch a container and - put it in a certain location. - - Only the `fetch_containers` method is aware of the concepts of "library" - and "cache". It is a sort of orchestrator that decides where to fetch - each container and calls the appropriate methods. - - All methods are integrated with a progress bar + Downloads are done in parallel using threads and progress is shown in the progress bar. """ - def __init__( - self, container_library: Iterable[str], registry_set: Iterable[str], progress: DownloadProgress - ) -> None: - self.container_library = list(container_library) - self.registry_set = registry_set + def __init__(self, progress: DownloadProgress) -> None: self.progress = progress self.kill_with_fire = False - def download_images( + def download_images_in_parallel( self, containers_download: Iterable[Tuple[str, str]], parallel_downloads: int, - ) -> None: - # if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. + ) -> Generator[str, None, None]: with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: # Kick off concurrent downloads future_downloads = [ @@ -145,33 +133,32 @@ def download_images( try: # Iterate over each threaded download, waiting for them to finish for future in concurrent.futures.as_completed(future_downloads): - future.result() - try: - self.progress.update_main_task(advance=1) - except Exception as e: - log.error(f"Error updating progress bar: {e}") + output_path = future.result() + yield output_path except KeyboardInterrupt: # Cancel the future threads that haven't started yet for future in future_downloads: future.cancel() # Set the variable that the threaded function looks for - # Will trigger an exception from each thread + # Will trigger an exception from each active thread self.kill_with_fire = True # Re-raise exception on the main thread raise - def download_image(self, container: str, output_path: str) -> None: + def download_image(self, container: str, output_path: str) -> str: """Download a singularity image from the web. - Use native Python to download the file. + Use native Python to download the file. Progress is shown in the progress bar + as a new task (of type "download"). + + This method is integrated with the above `download_images_in_parallel` method. The + `self.kill_with_fire` variable is a sentinel used to check if the user has hit ctrl-c. Args: container (str): A pipeline's container name. Usually it is of similar format to ``https://depot.galaxyproject.org/singularity/name:version`` - out_path (str): The final target output path - cache_path (str, None): The NXF_SINGULARITY_CACHEDIR path if set, None if not - progress (Progress): Rich progress bar instance to add tasks to. + output_path (str): The target output path """ log.debug(f"Downloading Singularity image '{container}' to {output_path}") @@ -198,8 +185,46 @@ def download_image(self, container: str, output_path: str) -> None: self.progress.update(task, advance=len(data)) fh.write(data) - symlink_registries(output_path, self.registry_set) self.progress.remove_task(task) + return output_path + + +class SingularityFetcher: + """Class to manage all Singularity operations for fetching containers. + + The guiding principles are that: + - Container download/pull/copy methods are unaware of the concepts of + "library" and "cache". They are just told to fetch a container and + put it in a certain location. + - Only the `fetch_containers` method is aware of the concepts of "library" + and "cache". It is a sort of orchestrator that decides where to fetch + each container and calls the appropriate methods. + - All methods are integrated with a progress bar + """ + + def __init__( + self, container_library: Iterable[str], registry_set: Iterable[str], progress: DownloadProgress + ) -> None: + self.container_library = list(container_library) + self.registry_set = registry_set + self.progress = progress + self.kill_with_fire = False + + def download_images( + self, + containers_download: Iterable[Tuple[str, str]], + parallel_downloads: int, + ) -> None: + downloader = SingularityImageDownloader(self.progress) + for output_path in downloader.download_images_in_parallel(containers_download, parallel_downloads): + # try-except introduced in 4a95a5b84e2becbb757ce91eee529aa5f8181ec7 + # unclear why rich.progress may raise an exception here as it's supposed to be thread-safe + try: + self.progress.update_main_task(advance=1) + except Exception as e: + log.error(f"Error updating progress bar: {e}") + + symlink_registries(output_path, self.registry_set) def pull_images(self, containers_pull: Iterable[Tuple[str, str]]) -> None: for container, output_path in containers_pull: From 4abd7369113cd5838419908b5e39185f1fa0a8b1 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 13:15:02 +0000 Subject: [PATCH 23/31] Introduced a context manager that creates a new sub task. Useful to guarantee the task gets removed --- nf_core/pipelines/downloads/singularity.py | 148 ++++++++++----------- nf_core/pipelines/downloads/utils.py | 11 +- tests/pipelines/test_download.py | 30 +++++ 3 files changed, 112 insertions(+), 77 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index bd87a429d2..5d167f3b87 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -162,30 +162,28 @@ def download_image(self, container: str, output_path: str) -> str: """ log.debug(f"Downloading Singularity image '{container}' to {output_path}") - # Set up progress bar + # Set up download progress bar as a new task nice_name = container.split("/")[-1][:50] - task = self.progress.add_task(nice_name, start=False, total=False, progress_type="download") - - # Open file handle and download - # This temporary will be automatically renamed to the target if there are no errors - with intermediate_file(output_path) as fh: - # Disable caching as this breaks streamed downloads - with requests_cache.disabled(): - r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) - filesize = r.headers.get("Content-length") - if filesize: - self.progress.update(task, total=int(filesize)) - self.progress.start_task(task) - - # Stream download - for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): - # Check that the user didn't hit ctrl-c - if self.kill_with_fire: - raise KeyboardInterrupt - self.progress.update(task, advance=len(data)) - fh.write(data) - - self.progress.remove_task(task) + with self.progress.sub_task(nice_name, start=False, total=False, progress_type="download") as task: + # Open file handle and download + # This temporary will be automatically renamed to the target if there are no errors + with intermediate_file(output_path) as fh: + # Disable caching as this breaks streamed downloads + with requests_cache.disabled(): + r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) + filesize = r.headers.get("Content-length") + if filesize: + self.progress.update(task, total=int(filesize)) + self.progress.start_task(task) + + # Stream download + for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): + # Check that the user didn't hit ctrl-c + if self.kill_with_fire: + raise KeyboardInterrupt + self.progress.update(task, advance=len(data)) + fh.write(data) + return output_path @@ -299,61 +297,59 @@ def pull_image(self, container: str, output_path: str, library: str) -> None: address = f"docker://{library}/{container.replace('docker://', '')}" absolute_URI = False - with intermediate_file(output_path) as output_path_tmp: - if shutil.which("singularity"): - singularity_command = [ - "singularity", - "pull", - "--name", - output_path_tmp.name, - address, - ] - elif shutil.which("apptainer"): - singularity_command = ["apptainer", "pull", "--name", output_path_tmp.name, address] - else: - raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") - log.debug(f"Building singularity image: {address}") - log.debug(f"Singularity command: {' '.join(singularity_command)}") - - # Progress bar to show that something is happening - task = self.progress.add_task( - container, - start=False, - total=False, - progress_type="singularity_pull", - current_log="", - ) - - # Run the singularity pull command - with subprocess.Popen( - singularity_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - ) as proc: - lines = [] - if proc.stdout is not None: - for line in proc.stdout: - lines.append(line) - self.progress.update(task, current_log=line.strip()) - - if lines: - # something went wrong with the container retrieval - if any("FATAL: " in line for line in lines): - self.progress.remove_task(task) - raise ContainerError( - container=container, - registry=library, - address=address, - absolute_URI=absolute_URI, - out_path=output_path, - singularity_command=singularity_command, - error_msg=lines, + with self.progress.sub_task( + container, + start=False, + total=False, + progress_type="singularity_pull", + current_log="", + ) as task: + with intermediate_file(output_path) as output_path_tmp: + if shutil.which("singularity"): + singularity_command = [ + "singularity", + "pull", + "--name", + output_path_tmp.name, + address, + ] + elif shutil.which("apptainer"): + singularity_command = ["apptainer", "pull", "--name", output_path_tmp.name, address] + else: + raise OSError( + "Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH" ) + log.debug(f"Building singularity image: {address}") + log.debug(f"Singularity command: {' '.join(singularity_command)}") + + # Run the singularity pull command + with subprocess.Popen( + singularity_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + ) as proc: + lines = [] + if proc.stdout is not None: + for line in proc.stdout: + lines.append(line) + self.progress.update(task, current_log=line.strip()) + + if lines: + # something went wrong with the container retrieval + if any("FATAL: " in line for line in lines): + raise ContainerError( + container=container, + registry=library, + address=address, + absolute_URI=absolute_URI, + out_path=output_path, + singularity_command=singularity_command, + error_msg=lines, + ) - symlink_registries(output_path, self.registry_set) - self.progress.remove_task(task) + symlink_registries(output_path, self.registry_set) def copy_image(self, container: str, src_path: str, dest_path: str) -> None: """Copy Singularity image from one directory to another.""" diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index e52a305dc2..f341287a10 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -69,7 +69,7 @@ def get_renderables(self) -> Generator[rich.table.Table, None, None]: ) yield self.make_tasks_table([task]) - # These functions allow callers not having to track the main TaskID + # These two functions allow callers not having to track the main TaskID # They are pass-through functions to the rich.progress methods def add_main_task(self, **kwargs) -> rich.progress.TaskID: """Add a top-level task to the progress bar. @@ -85,3 +85,12 @@ def add_main_task(self, **kwargs) -> rich.progress.TaskID: def update_main_task(self, **kwargs) -> None: """Update the top-level task with new information.""" self.update(self.main_task, **kwargs) + + @contextlib.contextmanager + def sub_task(self, *args, **kwargs) -> Generator[rich.progress.TaskID, None, None]: + """Context manager to create a sub-task under the main task.""" + task = self.add_task(*args, **kwargs) + try: + yield task + finally: + self.remove_task(task) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index e866d2aa7c..6eb81b17b9 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -115,6 +115,36 @@ def test_download_progress_main_task(self): assert progress.tasks[0].total == 35 assert progress.tasks[1].total == 28 + # + # Test for 'utils.DownloadProgress.sub_task' + # + def test_download_progress_sub_task(self): + with DownloadProgress() as progress: + # No task initially + assert progress.tasks == [] + + # Add a sub-task, it should be there + with progress.sub_task("Sub-task", total=42) as sub_task_id: + assert sub_task_id == 0 + assert len(progress.tasks) == 1 + assert progress.task_ids[0] == sub_task_id + assert progress.tasks[0].total == 42 + + # The sub-task should be gone now + assert progress.tasks == [] + + # Add another sub-task, this time that raises an exception + with pytest.raises(ValueError): + with progress.sub_task("Sub-task", total=28) as sub_task_id: + assert sub_task_id == 1 + assert len(progress.tasks) == 1 + assert progress.task_ids[0] == sub_task_id + assert progress.tasks[0].total == 28 + raise ValueError("This is a test error") + + # The sub-task should also be gone now + assert progress.tasks == [] + # # Test for 'utils.DownloadProgress.get_renderables' # From 31f562750a8e91ed582add73c3de0e4516b0fa24 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 13:30:24 +0000 Subject: [PATCH 24/31] Forgot that pytest.raises exists ! --- tests/pipelines/test_download.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 6eb81b17b9..fc88150eb2 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -65,26 +65,22 @@ def test_intermediate_file(self, outdir): # Directly write to the file and raise an exception output_path = os.path.join(outdir, "testfile3") - try: + with pytest.raises(ValueError): with intermediate_file(output_path) as tmp: tmp_path = tmp.name tmp.write(b"Hello, World!") raise ValueError("This is a test error") - except Exception as e: - assert isinstance(e, ValueError) assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) # Run an external command and raise an exception output_path = os.path.join(outdir, "testfile4") - try: + with pytest.raises(subprocess.CalledProcessError): with intermediate_file(output_path) as tmp: tmp_path = tmp.name subprocess.check_call([f"echo 'Hello, World!' > {tmp_path}"], shell=True) subprocess.check_call(["ls", "/dummy"]) - except Exception as e: - assert isinstance(e, subprocess.CalledProcessError) assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) From 06d3760db028484b13668238fcad6dd65429e365 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 14:01:13 +0000 Subject: [PATCH 25/31] Moved the download class to utils --- nf_core/pipelines/downloads/singularity.py | 95 +--------------------- nf_core/pipelines/downloads/utils.py | 87 +++++++++++++++++++- 2 files changed, 90 insertions(+), 92 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 5d167f3b87..978a83915a 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -1,16 +1,11 @@ -import concurrent.futures -import io import logging import os import re import shutil import subprocess -from typing import Collection, Container, Generator, Iterable, List, Optional, Tuple +from typing import Collection, Container, Iterable, List, Optional, Tuple -import requests -import requests_cache - -from nf_core.pipelines.downloads.utils import DownloadProgress, intermediate_file +from nf_core.pipelines.downloads.utils import DownloadProgress, FileDownloader, intermediate_file log = logging.getLogger(__name__) @@ -105,88 +100,6 @@ def symlink_registries(image_path: str, registries: Iterable[str]) -> None: os.close(image_dir) -class SingularityImageDownloader: - """Class to manage http downloads of Singularity images. - - Downloads are done in parallel using threads and progress is shown in the progress bar. - """ - - def __init__(self, progress: DownloadProgress) -> None: - self.progress = progress - self.kill_with_fire = False - - def download_images_in_parallel( - self, - containers_download: Iterable[Tuple[str, str]], - parallel_downloads: int, - ) -> Generator[str, None, None]: - with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: - # Kick off concurrent downloads - future_downloads = [ - pool.submit(self.download_image, container, output_path) - for (container, output_path) in containers_download - ] - - # Make ctrl-c work with multi-threading - self.kill_with_fire = False - - try: - # Iterate over each threaded download, waiting for them to finish - for future in concurrent.futures.as_completed(future_downloads): - output_path = future.result() - yield output_path - - except KeyboardInterrupt: - # Cancel the future threads that haven't started yet - for future in future_downloads: - future.cancel() - # Set the variable that the threaded function looks for - # Will trigger an exception from each active thread - self.kill_with_fire = True - # Re-raise exception on the main thread - raise - - def download_image(self, container: str, output_path: str) -> str: - """Download a singularity image from the web. - - Use native Python to download the file. Progress is shown in the progress bar - as a new task (of type "download"). - - This method is integrated with the above `download_images_in_parallel` method. The - `self.kill_with_fire` variable is a sentinel used to check if the user has hit ctrl-c. - - Args: - container (str): A pipeline's container name. Usually it is of similar format - to ``https://depot.galaxyproject.org/singularity/name:version`` - output_path (str): The target output path - """ - log.debug(f"Downloading Singularity image '{container}' to {output_path}") - - # Set up download progress bar as a new task - nice_name = container.split("/")[-1][:50] - with self.progress.sub_task(nice_name, start=False, total=False, progress_type="download") as task: - # Open file handle and download - # This temporary will be automatically renamed to the target if there are no errors - with intermediate_file(output_path) as fh: - # Disable caching as this breaks streamed downloads - with requests_cache.disabled(): - r = requests.get(container, allow_redirects=True, stream=True, timeout=60 * 5) - filesize = r.headers.get("Content-length") - if filesize: - self.progress.update(task, total=int(filesize)) - self.progress.start_task(task) - - # Stream download - for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): - # Check that the user didn't hit ctrl-c - if self.kill_with_fire: - raise KeyboardInterrupt - self.progress.update(task, advance=len(data)) - fh.write(data) - - return output_path - - class SingularityFetcher: """Class to manage all Singularity operations for fetching containers. @@ -213,8 +126,8 @@ def download_images( containers_download: Iterable[Tuple[str, str]], parallel_downloads: int, ) -> None: - downloader = SingularityImageDownloader(self.progress) - for output_path in downloader.download_images_in_parallel(containers_download, parallel_downloads): + downloader = FileDownloader(self.progress) + for output_path in downloader.download_files_in_parallel(containers_download, parallel_downloads): # try-except introduced in 4a95a5b84e2becbb757ce91eee529aa5f8181ec7 # unclear why rich.progress may raise an exception here as it's supposed to be thread-safe try: diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index f341287a10..1817468234 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -1,9 +1,13 @@ +import concurrent.futures import contextlib +import io import logging import os import tempfile -from typing import Generator, Iterable +from typing import Generator, Iterable, Tuple +import requests +import requests_cache import rich.progress import rich.table @@ -94,3 +98,84 @@ def sub_task(self, *args, **kwargs) -> Generator[rich.progress.TaskID, None, Non yield task finally: self.remove_task(task) + + +class FileDownloader: + """Class to download files. + + Downloads are done in parallel using threads and progress is shown in the progress bar. + """ + + def __init__(self, progress: DownloadProgress) -> None: + self.progress = progress + self.kill_with_fire = False + + def download_files_in_parallel( + self, + download_files: Iterable[Tuple[str, str]], + parallel_downloads: int, + ) -> Generator[str, None, None]: + with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: + # Kick off concurrent downloads + future_downloads = [ + pool.submit(self.download_file, remote_path, output_path) + for (remote_path, output_path) in download_files + ] + + # Make ctrl-c work with multi-threading + self.kill_with_fire = False + + try: + # Iterate over each threaded download, waiting for them to finish + for future in concurrent.futures.as_completed(future_downloads): + output_path = future.result() + yield output_path + + except KeyboardInterrupt: + # Cancel the future threads that haven't started yet + for future in future_downloads: + future.cancel() + # Set the variable that the threaded function looks for + # Will trigger an exception from each active thread + self.kill_with_fire = True + # Re-raise exception on the main thread + raise + + def download_file(self, remote_path: str, output_path: str) -> str: + """Download a file from the web. + + Use native Python to download the file. Progress is shown in the progress bar + as a new task (of type "download"). + + This method is integrated with the above `download_files_in_parallel` method. The + `self.kill_with_fire` variable is a sentinel used to check if the user has hit ctrl-c. + + Args: + remote_path (str): Source URL of the file to download + output_path (str): The target output path + """ + log.debug(f"Downloading '{remote_path}' to {output_path}") + + # Set up download progress bar as a new task + nice_name = remote_path.split("/")[-1][:50] + with self.progress.sub_task(nice_name, start=False, total=False, progress_type="download") as task: + # Open file handle and download + # This temporary will be automatically renamed to the target if there are no errors + with intermediate_file(output_path) as fh: + # Disable caching as this breaks streamed downloads + with requests_cache.disabled(): + r = requests.get(remote_path, allow_redirects=True, stream=True, timeout=60 * 5) + filesize = r.headers.get("Content-length") + if filesize: + self.progress.update(task, total=int(filesize)) + self.progress.start_task(task) + + # Stream download + for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): + # Check that the user didn't hit ctrl-c + if self.kill_with_fire: + raise KeyboardInterrupt + self.progress.update(task, advance=len(data)) + fh.write(data) + + return output_path From 3857fae829d596d53318586c340c1df053625538 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 15:29:30 +0000 Subject: [PATCH 26/31] The output path cannot be a directory or a symbolic link --- nf_core/pipelines/downloads/utils.py | 5 +++++ tests/pipelines/test_download.py | 13 ++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 1817468234..c183d6f90b 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -27,6 +27,11 @@ def intermediate_file(output_path: str) -> Generator[tempfile._TemporaryFileWrap letting the caller write to it, and then moving it to the final location. If an exception is raised, the temporary file is deleted and the output file is not touched. """ + if os.path.isdir(output_path): + raise DownloadError(f"Output path '{output_path}' is a directory") + if os.path.islink(output_path): + raise DownloadError(f"Output path '{output_path}' is a symbolic link") + tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(output_path), delete=False) try: yield tmp diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index fc88150eb2..d593b716eb 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -26,7 +26,7 @@ get_container_filename, symlink_registries, ) -from nf_core.pipelines.downloads.utils import DownloadProgress, intermediate_file +from nf_core.pipelines.downloads.utils import DownloadError, DownloadProgress, intermediate_file from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd @@ -85,6 +85,17 @@ def test_intermediate_file(self, outdir): assert not os.path.exists(output_path) assert not os.path.exists(tmp_path) + # Test for invalid output paths + with pytest.raises(DownloadError): + with intermediate_file(outdir) as tmp: + pass + + output_path = os.path.join(outdir, "testfile5") + os.symlink("/dummy", output_path) + with pytest.raises(DownloadError): + with intermediate_file(output_path) as tmp: + pass + # # Test for 'utils.DownloadProgress.add/update_main_task' # From 31fe9144f203ac8c7e97e18fc538702780a60ede Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 15:52:43 +0000 Subject: [PATCH 27/31] Basic test for FileDownloader.download_file --- tests/pipelines/test_download.py | 36 +++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index d593b716eb..72b4a7b954 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -12,6 +12,7 @@ from unittest import mock import pytest +import requests import rich.progress_bar import rich.table import rich.text @@ -26,7 +27,7 @@ get_container_filename, symlink_registries, ) -from nf_core.pipelines.downloads.utils import DownloadError, DownloadProgress, intermediate_file +from nf_core.pipelines.downloads.utils import DownloadError, DownloadProgress, FileDownloader, intermediate_file from nf_core.synced_repo import SyncedRepo from nf_core.utils import run_cmd @@ -245,6 +246,39 @@ def test_download_progress_renderables(self): assert isinstance(table.columns[6]._cells[0], rich.text.Text) assert table.columns[6]._cells[0]._text == ["?"] + # + # Test for 'utils.FileDownloader' + # + @with_temporary_folder + def test_file_download(self, outdir): + with DownloadProgress() as progress: + # No task initially + assert progress.tasks == [] + assert progress._task_index == 0 + + # Download a file + downloader = FileDownloader(progress) + src_url = "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fai" + output_path = os.path.join(outdir, os.path.basename(src_url)) + downloader.download_file(src_url, output_path) + assert os.path.exists(output_path) + assert os.path.getsize(output_path) == 27 + + # A task was added but is now gone + assert progress._task_index == 1 + assert progress.tasks == [] + + # Invalid URL (schema) + src_url = "dummy://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fax" + output_path = os.path.join(outdir, os.path.basename(src_url)) + with pytest.raises(requests.exceptions.InvalidSchema): + downloader.download_file(src_url, output_path) + assert not os.path.exists(output_path) + + # A task was added but is now gone + assert progress._task_index == 2 + assert progress.tasks == [] + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) From 02cc2caaf5ae83845cd238a26916508970ecf692 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 15:53:57 +0000 Subject: [PATCH 28/31] Empty files are not allowed --- nf_core/pipelines/downloads/utils.py | 6 ++++++ tests/pipelines/test_download.py | 13 ++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index c183d6f90b..5f12a7997c 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -176,11 +176,17 @@ def download_file(self, remote_path: str, output_path: str) -> str: self.progress.start_task(task) # Stream download + has_content = False for data in r.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE): # Check that the user didn't hit ctrl-c if self.kill_with_fire: raise KeyboardInterrupt self.progress.update(task, advance=len(data)) fh.write(data) + has_content = True + + # Check that we actually downloaded something + if not has_content: + raise DownloadError(f"Downloaded file '{remote_path}' is empty") return output_path diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 72b4a7b954..397468dfeb 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -268,6 +268,17 @@ def test_file_download(self, outdir): assert progress._task_index == 1 assert progress.tasks == [] + # No content at the URL + src_url = "http://www.google.com/generate_204" + output_path = os.path.join(outdir, os.path.basename(src_url)) + with pytest.raises(DownloadError): + downloader.download_file(src_url, output_path) + assert not os.path.exists(output_path) + + # A task was added but is now gone + assert progress._task_index == 2 + assert progress.tasks == [] + # Invalid URL (schema) src_url = "dummy://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fax" output_path = os.path.join(outdir, os.path.basename(src_url)) @@ -276,7 +287,7 @@ def test_file_download(self, outdir): assert not os.path.exists(output_path) # A task was added but is now gone - assert progress._task_index == 2 + assert progress._task_index == 3 assert progress.tasks == [] From b648bc1f8dfdee0f72711cb1f617a71689dd109d Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 17:26:54 +0000 Subject: [PATCH 29/31] Test log.debug too --- nf_core/pipelines/downloads/utils.py | 2 +- tests/pipelines/test_download.py | 89 +++++++++++++++++----------- 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 5f12a7997c..301784790e 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -159,7 +159,7 @@ def download_file(self, remote_path: str, output_path: str) -> str: remote_path (str): Source URL of the file to download output_path (str): The target output path """ - log.debug(f"Downloading '{remote_path}' to {output_path}") + log.debug(f"Downloading '{remote_path}' to '{output_path}'") # Set up download progress bar as a new task nice_name = remote_path.split("/")[-1][:50] diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index 397468dfeb..e485010b9f 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -35,6 +35,10 @@ class DownloadUtilsTest(unittest.TestCase): + @pytest.fixture(autouse=True) + def use_caplog(self, caplog): + self._caplog = caplog + # # Test for 'utils.intermediate_file' # @@ -252,43 +256,62 @@ def test_download_progress_renderables(self): @with_temporary_folder def test_file_download(self, outdir): with DownloadProgress() as progress: - # No task initially - assert progress.tasks == [] - assert progress._task_index == 0 - - # Download a file downloader = FileDownloader(progress) - src_url = "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fai" - output_path = os.path.join(outdir, os.path.basename(src_url)) - downloader.download_file(src_url, output_path) - assert os.path.exists(output_path) - assert os.path.getsize(output_path) == 27 - - # A task was added but is now gone - assert progress._task_index == 1 - assert progress.tasks == [] - - # No content at the URL - src_url = "http://www.google.com/generate_204" - output_path = os.path.join(outdir, os.path.basename(src_url)) - with pytest.raises(DownloadError): - downloader.download_file(src_url, output_path) - assert not os.path.exists(output_path) - # A task was added but is now gone - assert progress._task_index == 2 - assert progress.tasks == [] + # Activate the caplog: all download attempts must be logged (even failed ones) + self._caplog.clear() + with self._caplog.at_level(logging.DEBUG): + # No task initially + assert progress.tasks == [] + assert progress._task_index == 0 - # Invalid URL (schema) - src_url = "dummy://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fax" - output_path = os.path.join(outdir, os.path.basename(src_url)) - with pytest.raises(requests.exceptions.InvalidSchema): + # Download a file + src_url = "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fai" + output_path = os.path.join(outdir, os.path.basename(src_url)) downloader.download_file(src_url, output_path) - assert not os.path.exists(output_path) - - # A task was added but is now gone - assert progress._task_index == 3 - assert progress.tasks == [] + assert os.path.exists(output_path) + assert os.path.getsize(output_path) == 27 + assert ( + "nf_core.pipelines.downloads.utils", + logging.DEBUG, + f"Downloading '{src_url}' to '{output_path}'", + ) in self._caplog.record_tuples + + # A task was added but is now gone + assert progress._task_index == 1 + assert progress.tasks == [] + + # No content at the URL + src_url = "http://www.google.com/generate_204" + output_path = os.path.join(outdir, os.path.basename(src_url)) + with pytest.raises(DownloadError): + downloader.download_file(src_url, output_path) + assert not os.path.exists(output_path) + assert ( + "nf_core.pipelines.downloads.utils", + logging.DEBUG, + f"Downloading '{src_url}' to '{output_path}'", + ) in self._caplog.record_tuples + + # A task was added but is now gone + assert progress._task_index == 2 + assert progress.tasks == [] + + # Invalid URL (schema) + src_url = "dummy://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fax" + output_path = os.path.join(outdir, os.path.basename(src_url)) + with pytest.raises(requests.exceptions.InvalidSchema): + downloader.download_file(src_url, output_path) + assert not os.path.exists(output_path) + assert ( + "nf_core.pipelines.downloads.utils", + logging.DEBUG, + f"Downloading '{src_url}' to '{output_path}'", + ) in self._caplog.record_tuples + + # A task was added but is now gone + assert progress._task_index == 3 + assert progress.tasks == [] class DownloadTest(unittest.TestCase): From 467ba3fd803e71c725502c059a2bc10ad1bfa5d3 Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 17:33:28 +0000 Subject: [PATCH 30/31] Test the kill_with_fire flag --- tests/pipelines/test_download.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index e485010b9f..a2d6246b6e 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -313,6 +313,15 @@ def test_file_download(self, outdir): assert progress._task_index == 3 assert progress.tasks == [] + # Fire in the hole ! The download will be aborted and no output file will be created + src_url = "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fai" + output_path = os.path.join(outdir, os.path.basename(src_url)) + os.unlink(output_path) + downloader.kill_with_fire = True + with pytest.raises(KeyboardInterrupt): + downloader.download_file(src_url, output_path) + assert not os.path.exists(output_path) + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True) From ea4bc89703dd268a9d26622c4e23ae57cc4177ec Mon Sep 17 00:00:00 2001 From: Matthieu Muffato Date: Thu, 27 Mar 2025 17:34:35 +0000 Subject: [PATCH 31/31] Refactored the FileDownloader - Replaced the generator with a callback + a list return - Fixed the handling of exceptions so that KeyboardInterrupt is always captured - More tets --- nf_core/pipelines/downloads/singularity.py | 8 +- nf_core/pipelines/downloads/utils.py | 92 ++++++++++++--- tests/pipelines/test_download.py | 131 ++++++++++++++++++++- 3 files changed, 210 insertions(+), 21 deletions(-) diff --git a/nf_core/pipelines/downloads/singularity.py b/nf_core/pipelines/downloads/singularity.py index 978a83915a..5d43859de1 100644 --- a/nf_core/pipelines/downloads/singularity.py +++ b/nf_core/pipelines/downloads/singularity.py @@ -127,7 +127,8 @@ def download_images( parallel_downloads: int, ) -> None: downloader = FileDownloader(self.progress) - for output_path in downloader.download_files_in_parallel(containers_download, parallel_downloads): + + def update_file_progress(input_params: Tuple[str, str], status: FileDownloader.Status) -> None: # try-except introduced in 4a95a5b84e2becbb757ce91eee529aa5f8181ec7 # unclear why rich.progress may raise an exception here as it's supposed to be thread-safe try: @@ -135,7 +136,10 @@ def download_images( except Exception as e: log.error(f"Error updating progress bar: {e}") - symlink_registries(output_path, self.registry_set) + if status == FileDownloader.Status.DONE: + symlink_registries(input_params[1], self.registry_set) + + downloader.download_files_in_parallel(containers_download, parallel_downloads, callback=update_file_progress) def pull_images(self, containers_pull: Iterable[Tuple[str, str]]) -> None: for container, output_path in containers_pull: diff --git a/nf_core/pipelines/downloads/utils.py b/nf_core/pipelines/downloads/utils.py index 301784790e..f24003d827 100644 --- a/nf_core/pipelines/downloads/utils.py +++ b/nf_core/pipelines/downloads/utils.py @@ -1,10 +1,11 @@ import concurrent.futures import contextlib +import enum import io import logging import os import tempfile -from typing import Generator, Iterable, Tuple +from typing import Callable, Dict, Generator, Iterable, List, Optional, Tuple import requests import requests_cache @@ -108,33 +109,88 @@ def sub_task(self, *args, **kwargs) -> Generator[rich.progress.TaskID, None, Non class FileDownloader: """Class to download files. - Downloads are done in parallel using threads and progress is shown in the progress bar. + Downloads are done in parallel using threads. Progress of each download + is shown in a progress bar. + + Users can hook a callback method to be notified after each download. """ + # Enum to report the status of a download thread + Status = enum.Enum("Status", "CANCELLED PENDING RUNNING DONE ERROR") + def __init__(self, progress: DownloadProgress) -> None: + """Initialise the FileDownloader object. + + Args: + progress (DownloadProgress): The progress bar object to use for tracking downloads. + """ self.progress = progress self.kill_with_fire = False + def parse_future_status(self, future: concurrent.futures.Future) -> Status: + """Parse the status of a future object.""" + if future.running(): + return self.Status.RUNNING + if future.cancelled(): + return self.Status.CANCELLED + if future.done(): + if future.exception(): + return self.Status.ERROR + return self.Status.DONE + return self.Status.PENDING + def download_files_in_parallel( self, download_files: Iterable[Tuple[str, str]], parallel_downloads: int, - ) -> Generator[str, None, None]: - with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: - # Kick off concurrent downloads - future_downloads = [ - pool.submit(self.download_file, remote_path, output_path) - for (remote_path, output_path) in download_files - ] + callback: Optional[Callable[[Tuple[str, str], Status], None]] = None, + ) -> List[Tuple[str, str]]: + """Download multiple files in parallel. + + Args: + download_files (Iterable[Tuple[str, str]]): List of tuples with the remote URL and the local output path. + parallel_downloads (int): Number of parallel downloads to run. + callback (Callable[[Tuple[str, str], Status], None]): Optional allback function to call after each download. + The function must take two arguments: the download tuple and the status of the download thread. + """ + + # Make ctrl-c work with multi-threading + self.kill_with_fire = False + + # Track the download threads + future_downloads: Dict[concurrent.futures.Future, Tuple[str, str]] = {} + + # List to store *successful* downloads + successful_downloads = [] - # Make ctrl-c work with multi-threading - self.kill_with_fire = False + def successful_download_callback(future: concurrent.futures.Future) -> None: + if future.done() and not future.cancelled() and future.exception() is None: + successful_downloads.append(future_downloads[future]) + with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_downloads) as pool: + # The entire block needs to be monitored for KeyboardInterrupt so that ntermediate files + # can be cleaned up properly. try: - # Iterate over each threaded download, waiting for them to finish - for future in concurrent.futures.as_completed(future_downloads): - output_path = future.result() - yield output_path + for input_params in download_files: + (remote_path, output_path) = input_params + # Create the download thread as a Future object + future = pool.submit(self.download_file, remote_path, output_path) + future_downloads[future] = input_params + # Callback to record successful downloads + future.add_done_callback(successful_download_callback) + # User callback function (if provided) + if callback: + future.add_done_callback(lambda f: callback(future_downloads[f], self.parse_future_status(f))) + + completed_futures = concurrent.futures.wait( + future_downloads, return_when=concurrent.futures.ALL_COMPLETED + ) + # Get all the exceptions and exclude BaseException-based ones (e.g. KeyboardInterrupt) + exceptions = [ + exc for exc in (f.exception() for f in completed_futures.done) if isinstance(exc, Exception) + ] + if exceptions: + raise DownloadError("Download errors", exceptions) except KeyboardInterrupt: # Cancel the future threads that haven't started yet @@ -146,7 +202,9 @@ def download_files_in_parallel( # Re-raise exception on the main thread raise - def download_file(self, remote_path: str, output_path: str) -> str: + return successful_downloads + + def download_file(self, remote_path: str, output_path: str) -> None: """Download a file from the web. Use native Python to download the file. Progress is shown in the progress bar @@ -188,5 +246,3 @@ def download_file(self, remote_path: str, output_path: str) -> str: # Check that we actually downloaded something if not has_content: raise DownloadError(f"Downloaded file '{remote_path}' is empty") - - return output_path diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index a2d6246b6e..564d516e28 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -251,7 +251,7 @@ def test_download_progress_renderables(self): assert table.columns[6]._cells[0]._text == ["?"] # - # Test for 'utils.FileDownloader' + # Test for 'utils.FileDownloader.download_file' # @with_temporary_folder def test_file_download(self, outdir): @@ -322,6 +322,135 @@ def test_file_download(self, outdir): downloader.download_file(src_url, output_path) assert not os.path.exists(output_path) + # + # Test for 'utils.FileDownloader.download_files_in_parallel' + # + @with_temporary_folder + def test_parallel_downloads(self, outdir): + # Prepare the download paths + def make_tuple(url): + return (url, os.path.join(outdir, os.path.basename(url))) + + download_fai = make_tuple( + "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fai" + ) + download_dict = make_tuple( + "https://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.dict" + ) + download_204 = make_tuple("http://www.google.com/generate_204") + download_schema = make_tuple( + "dummy://github.com/nf-core/test-datasets/raw/refs/heads/modules/data/genomics/sarscov2/genome/genome.fasta.fax" + ) + + with DownloadProgress() as progress: + downloader = FileDownloader(progress) + + # Download two files + assert downloader.kill_with_fire is False + downloads = [download_fai, download_dict] + downloaded_files = downloader.download_files_in_parallel(downloads, parallel_downloads=1) + assert len(downloaded_files) == 2 + assert downloaded_files == downloads + assert os.path.exists(download_fai[1]) + assert os.path.exists(download_dict[1]) + assert downloader.kill_with_fire is False + os.unlink(download_fai[1]) + os.unlink(download_dict[1]) + + # This time, the second file will raise an exception + assert downloader.kill_with_fire is False + downloads = [download_fai, download_204] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1) + assert downloader.kill_with_fire is False + assert os.path.exists(download_fai[1]) + assert not os.path.exists(download_204[1]) + os.unlink(download_fai[1]) + + # Now we swap the two files. The first one will raise an exception but the + # second one will still be downloaded because only KeyboardInterrupt can + # stop everything altogether. + assert downloader.kill_with_fire is False + downloads = [download_204, download_fai] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1) + assert downloader.kill_with_fire is False + assert os.path.exists(download_fai[1]) + assert not os.path.exists(download_204[1]) + os.unlink(download_fai[1]) + + # We check that there's the same behaviour with `requests` errors. + assert downloader.kill_with_fire is False + downloads = [download_schema, download_fai] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1) + assert downloader.kill_with_fire is False + assert os.path.exists(download_fai[1]) + assert not os.path.exists(download_schema[1]) + os.unlink(download_fai[1]) + + # Now we check the callback method + callbacks = [] + + def callback(*args): + callbacks.append(args) + + # We check the same scenarios as above + callbacks = [] + downloads = [download_fai, download_dict] + downloader.download_files_in_parallel(downloads, parallel_downloads=1, callback=callback) + assert len(callbacks) == 2 + assert callbacks == [ + (download_fai, FileDownloader.Status.DONE), + (download_dict, FileDownloader.Status.DONE), + ] + + callbacks = [] + downloads = [download_fai, download_204] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1, callback=callback) + assert len(callbacks) == 2 + assert callbacks == [ + (download_fai, FileDownloader.Status.DONE), + (download_204, FileDownloader.Status.ERROR), + ] + + callbacks = [] + downloads = [download_204, download_fai] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1, callback=callback) + assert len(callbacks) == 2 + assert callbacks == [ + (download_204, FileDownloader.Status.ERROR), + (download_fai, FileDownloader.Status.DONE), + ] + + callbacks = [] + downloads = [download_schema, download_fai] + with pytest.raises(DownloadError): + downloader.download_files_in_parallel(downloads, parallel_downloads=1, callback=callback) + assert len(callbacks) == 2 + assert callbacks == [ + (download_schema, FileDownloader.Status.ERROR), + (download_fai, FileDownloader.Status.DONE), + ] + + # Finally, we check how the function behaves when a KeyboardInterrupt is raised + with mock.patch("concurrent.futures.wait", side_effect=KeyboardInterrupt): + callbacks = [] + downloads = [download_fai, download_204, download_dict] + with pytest.raises(KeyboardInterrupt): + downloader.download_files_in_parallel(downloads, parallel_downloads=1, callback=callback) + assert len(callbacks) == 3 + # Note: whn the KeyboardInterrupt is raised, download_204 and download_dict are not yet started. + # They are therefore cancelled and pushed to the callback list immediately. download_fai is last + # because it is running and can't be cancelled. + assert callbacks == [ + (download_204, FileDownloader.Status.CANCELLED), + (download_dict, FileDownloader.Status.CANCELLED), + (download_fai, FileDownloader.Status.ERROR), + ] + class DownloadTest(unittest.TestCase): @pytest.fixture(autouse=True)