From 66f7b323fa346b91f6b364bb3e774b0e73dddfed Mon Sep 17 00:00:00 2001 From: Vladislav Moroshan Date: Wed, 17 Jan 2024 10:35:39 +0100 Subject: [PATCH 1/7] doc(Trial): Better indication of what to do with the Metric name for report (#216) Co-authored-by: eddiebergman --- src/amltk/optimization/metric.py | 13 +++++++ src/amltk/optimization/optimizer.py | 8 +++++ src/amltk/optimization/trial.py | 53 +++++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/src/amltk/optimization/metric.py b/src/amltk/optimization/metric.py index b0e1d7dd..9d5a462b 100644 --- a/src/amltk/optimization/metric.py +++ b/src/amltk/optimization/metric.py @@ -52,6 +52,19 @@ def __post_init__(self) -> None: object.__setattr__(self, "bounds", (float(lower), float(upper))) + if self.name[0].isdigit(): + raise ValueError( + f"Metric name {self.name} cannot start with a digit." + " Must be a valid Python identifier.", + ) + + for c in "[](){}<>|&^%$#@!~`": + if c in self.name: + raise ValueError( + f"Metric name {self.name} cannot contain '{c}'." + " Must be a valid Python identifier.", + ) + @override def __str__(self) -> str: parts = [self.name] diff --git a/src/amltk/optimization/optimizer.py b/src/amltk/optimization/optimizer.py index 9071d375..309a182f 100644 --- a/src/amltk/optimization/optimizer.py +++ b/src/amltk/optimization/optimizer.py @@ -20,6 +20,8 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Concatenate, Generic, ParamSpec, TypeVar +from more_itertools import all_unique + from amltk.store.paths.path_bucket import PathBucket if TYPE_CHECKING: @@ -58,6 +60,12 @@ def __init__( optimizer. """ super().__init__() + if not all_unique(metric.name for metric in metrics): + raise ValueError( + "All metrics must have unique names." + f"Got {metrics} with names {[metric.name for metric in metrics]}", + ) + self.metrics = metrics self.bucket = ( bucket diff --git a/src/amltk/optimization/trial.py b/src/amltk/optimization/trial.py index f3b3e145..806831ac 100644 --- a/src/amltk/optimization/trial.py +++ b/src/amltk/optimization/trial.py @@ -72,7 +72,7 @@ @dataclass(kw_only=True) class Trial(RichRenderable, Generic[I]): """A [`Trial`][amltk.optimization.Trial] encapsulates some configuration - that needs to be evaluated. Typically this is what is generated by an + that needs to be evaluated. Typically, this is what is generated by an [`Optimizer.ask()`][amltk.optimization.Optimizer.ask] call. ??? tip "Usage" @@ -84,7 +84,7 @@ class Trial(RichRenderable, Generic[I]): If all went smooth, your trial was successful and you can use [`trial.success()`][amltk.optimization.Trial.success] to generate a success [`Report`][amltk.optimization.Trial.Report], typically - passing what your chosen optimizer expects, e.g. `"loss"` or `"cost"`. + passing what your chosen optimizer expects, e.g., `"loss"` or `"cost"`. If your trial failed, you can instead use the [`trial.fail()`][amltk.optimization.Trial.fail] to generate a @@ -121,7 +121,7 @@ def target_function(trial: Trial) -> Trial.Report: What you can return with [`trial.success()`][amltk.optimization.Trial.success] or [`trial.fail()`][amltk.optimization.Trial.fail] depends on the - [`metrics`][amltk.optimization.Trial.metrics] of the trial. Typically + [`metrics`][amltk.optimization.Trial.metrics] of the trial. Typically, an optimizer will provide the trial with the list of metrics. ??? tip "Metrics" @@ -130,13 +130,48 @@ def target_function(trial: Trial) -> Trial.Report: options: members: False - Some important properties is that they have a unique + Some important properties are that they have a unique [`.name`][amltk.optimization.Trial.name] given the optimization run, - a candidate [`.config`][amltk.optimization.Trial.config]' to evaluate, + a candidate [`.config`][amltk.optimization.Trial.config] to evaluate, a possible [`.seed`][amltk.optimization.Trial.seed] to use, - and an [`.info`][amltk.optimization.Trial.info] object which is the optimizer + and an [`.info`][amltk.optimization.Trial.info] object, which is the optimizer specific information, if required by you. + !!! tip "Reporting success (or failure)" + + When using the [`success()`][amltk.optimization.trial.Trial.success] + or [`fail()`][amltk.optimization.trial.Trial.success] method, make sure to + provide values for all metrics specified in the + [`.metrics`][amltk.optimization.Trial.metrics] attribute. Usually these are + set by the optimizer generating the `Trial`. + + Each metric has a unique name, and it's crucial to use the correct names when + reporting success, otherwise an error will occur. + + ??? example "Reporting success for metrics" + + For example: + + ```python exec="true" result="python" source="material-block" + from amltk.optimization import Trial, Metric + + # Gotten from some optimizer usually, i.e. via `optimizer.ask()` + trial = Trial( + name="example_trial", + config={"param": 42}, + metrics=[Metric(name="accuracy", minimize=False)] + ) + + # Incorrect usage (will raise an error) + try: + report = trial.success(invalid_metric=0.95) + except ValueError as error: + print(error) + + # Correct usage + report = trial.success(accuracy=0.95) + ``` + If using [`Plugins`][amltk.scheduling.plugins.Plugin], they may insert some extra objects in the [`.extra`][amltk.optimization.Trial.extras] dict. @@ -382,7 +417,11 @@ def success(self, **metrics: float | int) -> Trial.Report[I]: else: raise ValueError( f"Cannot report success without {self.metrics=}." - f" Please provide a value for the metric.", + f" Please provide a value for the metric '{_metric.name}'." + f"\nPlease provide '{_metric.name}' as `trial.success(" + f"{_metric.name}=value)` or rename your metric to" + f'`Metric(name="{{provided_key}}", minimize={_metric.minimize}, ' + f"bounds={_metric.bounds})`", ) # Need to check if anything extra was reported! From 0d8a0c204d1b74d3e73bb58c40c4fe3f13cedb42 Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Thu, 18 Jan 2024 13:59:00 +0100 Subject: [PATCH 2/7] other: Update PR and issue templates (#219) --- .github/ISSUE_TEMPLATE/doc_request.md | 17 +++++++++++++++++ .github/ISSUE_TEMPLATE/ux_request.md | 15 +++++++++++++++ .github/PULL_REQUEST_TEMPLATE.md | 1 - 3 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 .github/ISSUE_TEMPLATE/doc_request.md create mode 100644 .github/ISSUE_TEMPLATE/ux_request.md diff --git a/.github/ISSUE_TEMPLATE/doc_request.md b/.github/ISSUE_TEMPLATE/doc_request.md new file mode 100644 index 00000000..8d810fbf --- /dev/null +++ b/.github/ISSUE_TEMPLATE/doc_request.md @@ -0,0 +1,17 @@ +--- +name: Doc request +about: Suggest something that should be documented +title: '' +labels: ['doc'] +assignees: '' + +--- + +**Was there something unclear in the documentation?** +If something in the documentation was unclear, please describe what you interpreted +it to mean and what it actually meant. + +If something wasn't there, please let us know what you were looking for. + +**Experience with AMLTK?** +What's your experience with AMLTK, are you a newcomer or been using it for a while? diff --git a/.github/ISSUE_TEMPLATE/ux_request.md b/.github/ISSUE_TEMPLATE/ux_request.md new file mode 100644 index 00000000..f7e55433 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/ux_request.md @@ -0,0 +1,15 @@ +--- +name: User Experience +about: What would make using/developing AMLTK easier? +title: '' +labels: ['ux'] +assignees: 'eddiebergman' + +--- + +**What would have made your life easier using AMLTK?** +* Perhaps it was an error message which wasn't clear. If so, maybe +you could suggest what would have been clearer and the actual solution once fixed. + +* If developing, maybe there's some part of the development experience that could +be improved? diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2d0023b1..6f8d107d 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -4,7 +4,6 @@ the contribution guidelines: https://github.com/automl/AMLTK/blob/main/CONTRIBUT Please make sure that: -* you updated all docs, this includes the changelog (CHANGELOG.md) * for any new functionality, consider adding a relevant example * add unit tests for new functionalities --> From abc1f75320a5b001248ea7ea07ae17201dae79c0 Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Mon, 22 Jan 2024 10:06:51 +0100 Subject: [PATCH 3/7] feat(Pipeline): `factorize()` a pipeline into its possibilities (#217) --- src/amltk/pipeline/__init__.py | 2 + src/amltk/pipeline/node.py | 22 +++- src/amltk/pipeline/ops.py | 138 ++++++++++++++++++++++ tests/pipeline/test_node.py | 4 +- tests/pipeline/test_ops.py | 208 +++++++++++++++++++++++++++++++++ 5 files changed, 372 insertions(+), 2 deletions(-) create mode 100644 src/amltk/pipeline/ops.py create mode 100644 tests/pipeline/test_ops.py diff --git a/src/amltk/pipeline/__init__.py b/src/amltk/pipeline/__init__.py index 9f6f25e4..03d093f3 100644 --- a/src/amltk/pipeline/__init__.py +++ b/src/amltk/pipeline/__init__.py @@ -11,6 +11,7 @@ as_node, ) from amltk.pipeline.node import Node, request +from amltk.pipeline.ops import factorize __all__ = [ "Node", @@ -23,4 +24,5 @@ "Join", "request", "as_node", + "factorize", ] diff --git a/src/amltk/pipeline/node.py b/src/amltk/pipeline/node.py index 712b53a8..ec1ce55e 100644 --- a/src/amltk/pipeline/node.py +++ b/src/amltk/pipeline/node.py @@ -160,7 +160,6 @@ class Node(RichRenderable, Generic[Item, Space]): fidelities: Mapping[str, Any] | None = field(hash=False) """The fidelities for this node""" - config_transform: Callable[[Config, Any], Config] | None = field(hash=False) """A function that transforms the configuration of this node""" @@ -634,6 +633,27 @@ def build( case _: return builder(self, *builder_args, **builder_kwargs) + def factorize( + self, + *, + min_depth: int = 0, + max_depth: int | None = None, + current_depth: int = 0, + factor_by: Callable[[Node], bool] | None = None, + assign_child: Callable[[Node, Node], Node] | None = None, + ) -> Iterator[Self]: + """Please see [`factorize()`][amltk.pipeline.ops.factorize].""" # noqa: D402 + from amltk.pipeline.ops import factorize + + yield from factorize( + self, + min_depth=min_depth, + max_depth=max_depth, + current_depth=current_depth, + factor_by=factor_by, + assign_child=assign_child, + ) + def _rich_iter(self) -> Iterator[RenderableType]: """Iterate the panels for rich printing.""" yield self.__rich__() diff --git a/src/amltk/pipeline/ops.py b/src/amltk/pipeline/ops.py new file mode 100644 index 00000000..d09daba3 --- /dev/null +++ b/src/amltk/pipeline/ops.py @@ -0,0 +1,138 @@ +"""Operations on pipelines.""" +from __future__ import annotations + +import sys +from collections.abc import Callable, Iterator +from functools import partial +from itertools import product +from typing import TypeVar + +from amltk.pipeline.components import Choice +from amltk.pipeline.node import Node + +MAX_INT = sys.maxsize +NodeT1 = TypeVar("NodeT1", bound=Node) +NodeT2 = TypeVar("NodeT2", bound=Node) + + +def factorize( + node: NodeT1, + *, + min_depth: int = 0, + max_depth: int | None = None, + current_depth: int = 0, + factor_by: Callable[[Node], bool] | None = None, + assign_child: Callable[[NodeT2, Node], NodeT2] | None = None, +) -> Iterator[NodeT1]: + """Factorize a pipeline into all possibilities of its children. + + When dealing with a large pipeline with many choices at various levels, + it can be useful to factorize the pipeline into all possible pipelines. + This effectively returns a new pipeline for every possible choice in the + pipeline. + + ```python exec="true" source="material-block" html="true" + from amltk.pipeline import Sequential, Choice, Node, factorize + + pipeline = Sequential( + Choice(Node(name="hi"), Node(name="hello"), name="choice"), + Node(name="banana"), + name="pipeline", + ) + + from amltk._doc import doc_print; _print = print; print = lambda thing: doc_print(_print, thing) # markdown-exec: hide + print(pipeline) + for i, possibility in enumerate(factorize(pipeline)): + print(f"Pipeline {i}:") + print(possibility) + ``` + + Args: + node: The node to factorize. + min_depth: The minimum depth at which to factorize. If the node is at a + depth less than this, it will not be factorized. + Depth is calculated as the node distance from node which is passed in + plus the `current_depth`. + max_depth: The maximum depth at which to factorize. If the node is at a + depth greater than this, it will not be factorized. + Depth is calculated as the node distance from node which is passed in + plus the `current_depth`. If `None`, there is no maximum depth. + current_depth: The current depth of the node. This is used internally but + can also be used externally to factorize a sub-pipeline. + factor_by: A function that takes a node and returns True if it + should be factorized into its children, False otherwise. By + default, it will split only Choice nodes. One useful example + is to only factor on a particular name of a node. + + ```python + pipeline_per_estimator = list(factorize( + pipeline, + factor_by=lambda _node: _node.name == "estimator" + )) + ``` + + assign_child: A function that takes a node and a child and + returns a new node with that child assigned to it. By default, + it will mutate the node so that it has that child as its + only child. You may wish to pass in custom functionality if there + is more than one way to assign a child to a node or extra logic must + be done to the nodes properties. + + It should return the same type of node as the one passed in. + + Returns: + An iterator over all possible pipelines. + """ # noqa: E501 + if max_depth is None: + max_depth = MAX_INT + + if current_depth < 0: + raise ValueError("current_depth cannot be less than 0") + + # We can exit early as there's no chance we factorize past this point + if current_depth > max_depth: + yield node.copy() + return + + # NOTE: These two functions below are defined here instead to allow custom + # Node types in the future. The default behaviour is defined to just split + # Choice nodes and assign a child to one is to just mutate the node so + # that it has that child as its only child. + if factor_by is None: + factor_by = lambda _node: isinstance(_node, Choice) + + if assign_child is None: + assign_child = lambda _node, _child: _node.mutate(nodes=(_child,)) + + _factorize = partial( + factorize, + factor_by=factor_by, + assign_child=assign_child, + current_depth=current_depth + 1, + min_depth=min_depth, + max_depth=max_depth, + ) + + match node: + case Node(nodes=()): + # Base case, there's no further possibility to factorize + yield node.copy() + case Node(nodes=children) if factor_by(node) and min_depth <= current_depth: + for child in children: + for possible_child in _factorize(child): + split_node_with_child_assigned = assign_child( + node.copy(), # type: ignore + possible_child, + ) + yield split_node_with_child_assigned # type: ignore + + case Node(nodes=children): + # We need to return N copies of this node, with each + # enumerating over all the posibilities of its children + # e.g. + # | children_sets = ((1, 2), (3, 4), (5,)) + # | for child_set in [(1, 3, 5,), (1, 4, 5,), (2, 3, 5,), (2, 4, 5,)]: + # | yield node.mutate(nodes=child_set) + children_sets = (_factorize(c) for c in children) + for child_set in product(*children_sets): + yield node.mutate(nodes=child_set) diff --git a/tests/pipeline/test_node.py b/tests/pipeline/test_node.py index f90aadf7..0295fa26 100644 --- a/tests/pipeline/test_node.py +++ b/tests/pipeline/test_node.py @@ -146,10 +146,12 @@ def test_find() -> None: def test_walk() -> None: + n1 = Node(name="1") + sub3 = Node(name="sub3") sub2 = Node(sub3, name="sub2") - n1 = Node(name="1") n2 = Node(sub2, name="2") + n3 = Node(name="3") seq = n1 >> n2 >> n3 diff --git a/tests/pipeline/test_ops.py b/tests/pipeline/test_ops.py new file mode 100644 index 00000000..73f125a4 --- /dev/null +++ b/tests/pipeline/test_ops.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +from amltk.pipeline import Choice, Node +from amltk.pipeline.components import Sequential +from amltk.pipeline.ops import factorize + + +def test_factorize_base_case(): + node = Node(name="hi") + assert list(factorize(node)) == [node] + + +def test_factorize_with_no_choices_returns_same_pipeline(): + node = Node(Node(name="n1"), Node(name="n2"), name="n0") + assert list(factorize(node)) == [node] + + +def test_factorize_with_single_choice_same_pipeline(): + node = Choice(Node(name="n1"), name="c1") + assert list(factorize(node)) == [node] + + +def test_factorize_with_two_choices_returns_both_pipelines(): + n1 = Node(name="n1") + n2 = Node(name="n2") + choice = Choice(n1, n2, name="c1") + expected = [ + Choice(n1, name="c1"), + Choice(n2, name="c1"), + ] + assert list(factorize(choice)) == expected + + +def test_nested_choice_returns_possible_pipelines(): + n1 = Node(name="n1") + n2 = Node(name="n2") + n3 = Node(name="n3") + choice = Choice(n1, n2, name="c1") + top = Sequential(choice, n3, name="s1") + expected = [ + Sequential(Choice(n1, name="c1"), n3, name="s1"), + Sequential(Choice(n2, name="c1"), n3, name="s1"), + ] + assert list(factorize(top)) == expected + + +def test_choice_followed_by_choice(): + n1 = Node(name="n1") + n2 = Node(name="n2") + n3 = Node(name="n3") + pipeline = Sequential( + Choice(Choice(n1, n2, name="c2"), n3, name="c1"), + name="s1", + ) + expected = [ + Sequential( + Choice( + Choice(n1, name="c2"), + name="c1", + ), + name="s1", + ), + Sequential( + Choice( + Choice(n2, name="c2"), + name="c1", + ), + name="s1", + ), + Sequential( + Choice(n3, name="c1"), + name="s1", + ), + ] + assert list(factorize(pipeline)) == expected + + +def test_double_nested_choice(): + S = Sequential + C = Choice + N = Node + pipeline = S( + S( + C(N(name="n3.1.1"), N(name="n3.1.2"), name="c3.1"), + C(N(name="n3.2.1"), N(name="n3.2.2"), name="c3.2"), + name="s2.1", + ), + N(name="n2.2"), + name="s1", + ) + expected = [ + S( + S( + C(N(name="n3.1.1"), name="c3.1"), + C(N(name="n3.2.1"), name="c3.2"), + name="s2.1", + ), + N(name="n2.2"), + name="s1", + ), + S( + S( + C(N(name="n3.1.1"), name="c3.1"), + C(N(name="n3.2.2"), name="c3.2"), + name="s2.1", + ), + N(name="n2.2"), + name="s1", + ), + S( + S( + C(N(name="n3.1.2"), name="c3.1"), + C(N(name="n3.2.1"), name="c3.2"), + name="s2.1", + ), + N(name="n2.2"), + name="s1", + ), + S( + S( + C(N(name="n3.1.2"), name="c3.1"), + C(N(name="n3.2.2"), name="c3.2"), + name="s2.1", + ), + N(name="n2.2"), + name="s1", + ), + ] + + assert list(factorize(pipeline)) == expected + + +def test_factorize_with_min_depth_triggers_if_at_min_depth(): + n1 = Node(name="n1") + n2 = Node(name="n2") + + # Choice is at depth 1 + seq = Sequential(Choice(n1, n2, name="c1"), name="s1") + + expected = [ + Sequential(Choice(n1, name="c1"), name="s1"), + Sequential(Choice(n2, name="c1"), name="s1"), + ] + assert list(factorize(seq, min_depth=1)) == expected + + +def test_factorize_with_min_depth_greater_does_not_factor(): + n1 = Node(name="n1") + n2 = Node(name="n2") + + # Choice is at depth 0 + choice = Choice(n1, n2, name="c1") + + assert list(factorize(choice, min_depth=1_000)) == [choice] + + +def test_factorize_with_max_depth_does_not_triggers_if_past_max_depth(): + S = Sequential + n1 = Node(name="n1") + n2 = Node(name="n2") + + # Choice is at depth 2 + seq = S(S(Choice(n1, n2, name="c1"), name="s1"), name="s2") + assert list(factorize(seq, max_depth=1)) == [seq] + + +def test_factorize_with_max_depth_triggers_if_not_at_max_depth(): + n1 = Node(name="n1") + n2 = Node(name="n2") + + # Choice is at depth 2 + seq = Sequential(Choice(n1, n2, name="c1"), name="s1") + + expected = [ + Sequential(Choice(n1, name="c1"), name="s1"), + Sequential(Choice(n2, name="c1"), name="s1"), + ] + assert list(factorize(seq, max_depth=1_000)) == expected + + +def test_with_custom_factor_by(): + # We'll factorize by the name of the node + factor_by = lambda _node: _node.name == "estimator" + + e1 = Node(name="e1") + e2 = Node(name="e2") + estimator = Choice(e1, e2, name="estimator") + + p1 = Node(name="p1") + p2 = Node(name="p2") + preprocessor = Choice(p1, p2, name="preprocessor") + + pipeline = Sequential(preprocessor, estimator, name="pipeline") + + # Should only split out the estimator + expected = [ + Sequential( + Choice(p1, p2, name="preprocessor"), + Choice(e1, name="estimator"), + name="pipeline", + ), + Sequential( + Choice(p1, p2, name="preprocessor"), + Choice(e2, name="estimator"), + name="pipeline", + ), + ] + assert list(factorize(pipeline, factor_by=factor_by)) == expected From f844a5e7e6d3d62fc09596747d50600551e9f2e7 Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Mon, 22 Jan 2024 10:46:58 +0100 Subject: [PATCH 4/7] ci: Update pre-commit (#221) --- .pre-commit-config.yaml | 6 +++--- src/amltk/scheduling/executors/dask_jobqueue.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 24cd4858..bf4bfd31 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,7 +26,7 @@ repos: - id: debug-statements files: '^src/.*\.py$' - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.7.0 + rev: v1.8.0 hooks: - id: mypy exclude: "test_comm_task" # Pre-commit mypy hates this one, crashes on (l106) @@ -38,7 +38,7 @@ repos: - "--ignore-missing-imports" - "--show-traceback" - repo: https://github.com/python-jsonschema/check-jsonschema - rev: 0.27.1 + rev: 0.27.3 hooks: - id: check-github-workflows files: '^github/workflows/.*\.ya?ml$' @@ -50,7 +50,7 @@ repos: hooks: - id: commitizen - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.1.5 + rev: v0.1.14 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix, --no-cache] diff --git a/src/amltk/scheduling/executors/dask_jobqueue.py b/src/amltk/scheduling/executors/dask_jobqueue.py index 2f856bd6..9652c422 100644 --- a/src/amltk/scheduling/executors/dask_jobqueue.py +++ b/src/amltk/scheduling/executors/dask_jobqueue.py @@ -149,7 +149,7 @@ def map( @override def shutdown( self, - wait: bool = True, # noqa: FBT001, FBT002 + wait: bool = True, **kwargs: Any, ) -> None: """See [concurrent.futures.Executor.shutdown][].""" From bb93c060c4aa2658d067c1864217b047f7e50fe4 Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Mon, 22 Jan 2024 10:50:43 +0100 Subject: [PATCH 5/7] ci: Add concurrency groups to workflows (#222) --- .github/workflows/docs.yml | 3 +++ .github/workflows/pre-commit.yml | 3 +++ .github/workflows/test.yml | 3 +++ 3 files changed, 9 insertions(+) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 97f9d8ba..b895c4ad 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -1,5 +1,8 @@ # This workflow is just to test that the docs build successfully. name: docs +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true on: workflow_dispatch: push: diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index e14adbfc..b3eddd26 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -1,4 +1,7 @@ name: pre-commit +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true on: workflow_dispatch: push: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cf89a9d1..09af8047 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,4 +1,7 @@ name: tests +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true on: workflow_dispatch: From ec6f2353a9e20ed5fa3c77d2e65b2c8ecc0e8011 Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Mon, 22 Jan 2024 10:52:20 +0100 Subject: [PATCH 6/7] feat(Pynisher): Detect tasks with `Trial` to report `FAIL` (#220) --- src/amltk/scheduling/plugins/pynisher.py | 131 +++++++++++++++--- .../plugins/test_pynisher_plugin.py | 114 +++++++++++++++ 2 files changed, 228 insertions(+), 17 deletions(-) diff --git a/src/amltk/scheduling/plugins/pynisher.py b/src/amltk/scheduling/plugins/pynisher.py index a2a6cd90..65cd43ef 100644 --- a/src/amltk/scheduling/plugins/pynisher.py +++ b/src/amltk/scheduling/plugins/pynisher.py @@ -82,14 +82,17 @@ def callback(exception): """ # noqa: E501 from __future__ import annotations +import traceback from collections.abc import Callable +from dataclasses import dataclass from multiprocessing.context import BaseContext -from typing import TYPE_CHECKING, ClassVar, Literal, TypeAlias, TypeVar +from typing import TYPE_CHECKING, ClassVar, Generic, Literal, TypeAlias, TypeVar from typing_extensions import ParamSpec, Self, override import pynisher import pynisher.exceptions +from amltk.optimization import Trial from amltk.scheduling.events import Event from amltk.scheduling.plugins.plugin import Plugin @@ -104,6 +107,56 @@ def callback(exception): R = TypeVar("R") +@dataclass +class _PynisherWrap(Generic[P, R]): + fn: Callable[P, R] + memory_limit: int | tuple[int, str] | None = None + cputime_limit: int | tuple[float, str] | None = None + walltime_limit: int | tuple[float, str] | None = None + terminate_child_processes: bool = True + context: BaseContext | None = None + disable_trial_handling: bool = False + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: + if any( + limit is not None + for limit in (self.memory_limit, self.cputime_limit, self.walltime_limit) + ): + fn = pynisher.Pynisher( + self.fn, + memory=self.memory_limit, + cpu_time=self.cputime_limit, + wall_time=self.walltime_limit, + terminate_child_processes=True, + context=self.context, + ) + else: + fn = self.fn + + trial: Trial | None = None + if not self.disable_trial_handling: + if len(args) > 0 and isinstance(args[0], Trial): + trial = args[0] + elif (_trial := kwargs.get("trial")) is not None: + if not isinstance(_trial, Trial): + raise ValueError( + f"Expected 'trial' to be a Trial instance, got {type(trial)}" + f"\n{trial=}", + ) + trial = _trial + + if trial is not None: + try: + return fn(*args, **kwargs) + except pynisher.PynisherException as e: + tb = traceback.format_exc() + trial.exception = e + trial.traceback = tb + return trial.fail() # type: ignore + else: + return fn(*args, **kwargs) + + class PynisherPlugin(Plugin): """A plugin that wraps a task in a pynisher to enforce limits on it. @@ -259,6 +312,7 @@ def __init__( cputime_limit: int | tuple[float, str] | None = None, walltime_limit: int | tuple[float, str] | None = None, context: BaseContext | None = None, + disable_trial_handling: bool = False, ): """Initialize a `PynisherPlugin` instance. @@ -274,12 +328,61 @@ def __init__( `("s", "m", "h")`. Defaults to `None`. context: The context to use for multiprocessing. Defaults to `None`. See [`multiprocessing.get_context()`][multiprocessing.get_context] + disable_trial_handling: By default, the `PynisherPlugin` will auto-detect + if the task is one for a `Trial`. If so, it will catch any pynisher + specific exceptions and return a `Trial.Report` with `trial.fail()`, + instead of raising the expcetion. This has the effect that the report + can be caught with `task.on_result` where it can be handled. This will + also prevent the specific events `@pynisher-timeout`, + `@pynisher-memory-limit`, `@pynisher-cputime-limit` + and `@pynisher-walltime-limit` from being emitted. + + If this + is `True`, then the pynisher exceptions will be raised as normal and + should be handled with `task.on_exception` where there is no direct + access to the `Trial` submitted. + + ??? note "Auto-Detection" + + This will be triggered if the first positional argument is a + `Trial` or if any of the keyword arguments are `"trial"`. + + ```python + from amltk.optimization import Trial + from amltk.scheduling import Scheduler + + def trial_evaluator_one(trial: Trial, ...) -> int: + ... + + def trial_evaluator_two(..., trial: Trial) -> int: + ... + + scheduler = Scheduler.with_processes(1) + + task_one = scheduler.task( + trial_evaluator_one, + plugins=PynisherPlugin(memory_limit=(1, "gb") + ) + task_two = scheduler.task( + trial_evaluator_two, + plugins=PynisherPlugin(memory_limit=(1, "gb") + ) + + # Will auto-detect + trial = Trial(...) + task_one.submit(trial, ...) + task_two.submit(..., trial=trial) + + # Will not auto-detect + task_one.submit(42, trial) + ``` """ super().__init__() self.memory_limit = memory_limit self.cputime_limit = cputime_limit self.walltime_limit = walltime_limit self.context = context + self.disable_trial_handling = disable_trial_handling self.task: Task @@ -291,22 +394,16 @@ def pre_submit( **kwargs: P.kwargs, ) -> tuple[Callable[P, R], tuple, dict]: """Wrap a task function in a `Pynisher` instance.""" - # If any of our limits is set, we need to wrap it in Pynisher - # to enfore these limits. - if any( - limit is not None - for limit in (self.memory_limit, self.cputime_limit, self.walltime_limit) - ): - fn = pynisher.Pynisher( - fn, - memory=self.memory_limit, - cpu_time=self.cputime_limit, - wall_time=self.walltime_limit, - terminate_child_processes=True, - context=self.context, - ) - - return fn, args, kwargs + _fn = _PynisherWrap( + fn, + disable_trial_handling=self.disable_trial_handling, + memory_limit=self.memory_limit, + cputime_limit=self.cputime_limit, + walltime_limit=self.walltime_limit, + context=self.context, + terminate_child_processes=True, + ) + return _fn, args, kwargs @override def attach_task(self, task: Task) -> None: diff --git a/tests/scheduling/plugins/test_pynisher_plugin.py b/tests/scheduling/plugins/test_pynisher_plugin.py index c2905dac..91156285 100644 --- a/tests/scheduling/plugins/test_pynisher_plugin.py +++ b/tests/scheduling/plugins/test_pynisher_plugin.py @@ -12,6 +12,7 @@ from distributed.cfexecutor import ClientExecutor from pytest_cases import case, fixture, parametrize_with_cases +from amltk.optimization import Trial from amltk.scheduling import ExitState, Scheduler from amltk.scheduling.plugins.pynisher import PynisherPlugin @@ -58,6 +59,25 @@ def big_memory_function(mem_in_bytes: int) -> bytearray: return z # noqa: RET504 +def trial_with_big_memory(trial: Trial, mem_in_bytes: int) -> Trial.Report: + with trial.begin(): + pass + + # We're particularly interested when the memory error happens during the + # task execution, not during the trial begin period + big_memory_function(mem_in_bytes) + + return trial.success() + + +def trial_with_time_wasting(trial: Trial, duration: int) -> Trial.Report: + with trial.begin(): + time_wasting_function(duration) + + time_wasting_function(duration) + return trial.success() + + def time_wasting_function(duration: int) -> int: time.sleep(duration) return duration @@ -253,3 +273,97 @@ def start_task() -> None: ) assert end_status.code == ExitState.Code.EXCEPTION assert isinstance(end_status.exception, PynisherPlugin.WallTimeoutException) + + +def test_trial_gets_autodetect_memory(scheduler: Scheduler) -> None: + if not PynisherPlugin.supports("memory"): + pytest.skip("Pynisher does not support memory limits on this system") + + one_half_gb = int(1e9 * 1.5) + two_gb = int(1e9) * 2 + task = scheduler.task( + trial_with_big_memory, + plugins=PynisherPlugin( + memory_limit=one_half_gb, + disable_trial_handling=False, + ), + ) + trial = Trial(name="test_trial", config={}) + + @scheduler.on_start + def start_task() -> None: + task.submit(trial, mem_in_bytes=two_gb) + + reports: list[Trial.Report] = [] + + @task.on_result + def trial_report(_, report: Trial.Report) -> None: + reports.append(report) + + status = scheduler.run(on_exception="raise") + assert status.code == ExitState.Code.EXHAUSTED + + assert task.event_counts == Counter( + {task.SUBMITTED: 1, task.DONE: 1, task.RESULT: 1}, + ) + + assert scheduler.event_counts == Counter( + { + scheduler.STARTED: 1, + scheduler.FUTURE_RESULT: 1, + scheduler.FINISHING: 1, + scheduler.FINISHED: 1, + scheduler.EMPTY: 1, + scheduler.FUTURE_SUBMITTED: 1, + scheduler.FUTURE_DONE: 1, + }, + ) + assert len(reports) == 1 + assert reports[0].status == Trial.Status.FAIL + assert isinstance(reports[0].exception, PynisherPlugin.MemoryLimitException) + + +def test_trial_gets_autodetect_time(scheduler: Scheduler) -> None: + if not PynisherPlugin.supports("wall_time"): + pytest.skip("Pynisher does not support wall_time limits on this system") + + task = scheduler.task( + trial_with_time_wasting, + plugins=PynisherPlugin( + walltime_limit=1, + disable_trial_handling=False, + ), + ) + trial = Trial(name="test_trial", config={}) + + @scheduler.on_start + def start_task() -> None: + task.submit(trial=trial, duration=3) + + reports: list[Trial.Report] = [] + + @task.on_result + def trial_report(_, report: Trial.Report) -> None: + reports.append(report) + + status = scheduler.run(on_exception="raise") + assert status.code == ExitState.Code.EXHAUSTED + + assert task.event_counts == Counter( + {task.SUBMITTED: 1, task.DONE: 1, task.RESULT: 1}, + ) + + assert scheduler.event_counts == Counter( + { + scheduler.STARTED: 1, + scheduler.FUTURE_RESULT: 1, + scheduler.FINISHING: 1, + scheduler.FINISHED: 1, + scheduler.EMPTY: 1, + scheduler.FUTURE_SUBMITTED: 1, + scheduler.FUTURE_DONE: 1, + }, + ) + assert len(reports) == 1 + assert reports[0].status == Trial.Status.FAIL + assert isinstance(reports[0].exception, PynisherPlugin.WallTimeoutException) From 9d65711b4a3daa7ab4db563d6159f80b16f0f9c0 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 22 Jan 2024 10:21:24 +0000 Subject: [PATCH 7/7] =?UTF-8?q?bump:=20version=201.7.0=20=E2=86=92=201.8.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 7 +++++++ pyproject.toml | 4 ++-- src/amltk/__version__.py | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 118d29ef..eb172db2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 1.8.0 (2024-01-22) + +### Feat + +- **Pynisher**: Detect tasks with `Trial` to report `FAIL` (#220) +- **Pipeline**: `factorize()` a pipeline into its possibilities (#217) + ## 1.7.0 (2024-01-16) ### Feat diff --git a/pyproject.toml b/pyproject.toml index 7c86a28e..ac4b17b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "amltk" -version = "1.7.0" +version = "1.8.0" dependencies = [ "typing_extensions", # Better typing "more_itertools", # Better iteration @@ -97,7 +97,7 @@ exclude_lines = [ [tool.commitizen] name = "cz_conventional_commits" -version = "1.7.0" +version = "1.8.0" update_changelog_on_bump = true version_files = ["pyproject.toml:version", "src/amltk/__version__.py"] changelog_start_rev = "1.0.0" diff --git a/src/amltk/__version__.py b/src/amltk/__version__.py index 869de5e5..fc1822ec 100644 --- a/src/amltk/__version__.py +++ b/src/amltk/__version__.py @@ -1,3 +1,3 @@ from __future__ import annotations -version = "1.7.0" +version = "1.8.0"