Source code for orion.testing.algo

"""Generic tests for Algorithms"""
from __future__ import annotations

import copy
import inspect
import itertools
import logging
from dataclasses import dataclass, field
from typing import ClassVar, Sequence, TypeVar

import numpy
import pytest

import orion.algo.base
from orion.algo.base import BaseAlgorithm
from orion.algo.base.parallel_strategy import strategy_factory
from import Space
from orion.benchmark.task.branin import Branin
from import SpaceBuilder
from orion.core.utils import backward, format_trials
from orion.core.worker.primary_algo import create_algo
from orion.core.worker.trial import Trial

AlgoType = TypeVar("AlgoType", bound=BaseAlgorithm)

[docs]def customized_mutate_example(search_space, rng, old_value, **kwargs): """Define a customized mutate function example""" multiply_factor = kwargs.pop("multiply_factor", 3.0) add_factor = kwargs.pop("add_factor", 1) if search_space.type == "real": new_value = old_value / multiply_factor elif search_space.type == "integer": new_value = int(old_value + add_factor) else: new_value = old_value return new_value
[docs]@dataclass class TestPhase: name: str """ Name of the test phase.""" n_trials: int """ Number of trials after which the phase should begin.""" method_to_spy: str | None = None """ Name of the method or function that is supposed to create the trials during that test phase. This is currently unused. Tests could potentially pass this as an argument to mocker.spy to check that the method is called the right number of times during each phase. """ # The previous phase or None if this is the first one. prev: TestPhase | None = field(default=None, repr=False) # The next phase, or an int with max_trials. next: TestPhase | int | None = field(default=None, repr=False) @property def length(self) -> int: """Returns the duration of this test phase, in number of trials.""" assert next_start = ( if isinstance(, TestPhase) else ) return next_start - self.n_trials @property def end_n_trials(self) -> int: """Returns the end of this test phase (either start of next phase or max_trials).""" return self.n_trials + self.length
# just so pytest doesn't complain about this. TestPhase.__test__ = False # type: ignore def _are_equal(a, b) -> bool: """Compare two statedicts and return if they are equal. This is required because of annoying numpy array comparisons and such. """ try: numpy.testing.assert_equal(a, b) return True except AssertionError: return False
[docs]def first_phase_only(test): """Decorator to run a test only on the first phase of the algorithm.""" return pytest.mark.usefixtures("first_phase")(test)
[docs]def last_phase_only(test): """Decorator to run a test only on the last test phase of the algorithm.""" return pytest.mark.usefixtures("last_phase")(test)
# NOTE: Can't make the test class generic in python 3.7, because it adds a __new__ constructor to # the type, which prevents it being collected.
[docs]class BaseAlgoTests: """Generic Test-suite for HPO algorithms. This test-suite covers all typical cases for HPO algorithms. To use it for a new algorithm, the class inheriting from this one must redefine the attributes ``algo_name`` with the name of the algorithm used to create it with the algorithm factory ``orion.core.worker.primary_algo.SpaceTransform`` and ``config`` with a base configuration for the algorithm that contains all its arguments. The base space can be redefine if needed with the attribute ``space``. Most algorithms have different phases that should be tested. For instance TPE has a first phase of random search and a second of Bayesian Optimization. The random search and Bayesian optimization phases use different logic and should both be tested. The `phases` class attribute can be set to parametrize all tests with each phase. See ``tests/unittests/algo/`` for an example. """ algo_type: type[AlgoType] """ The type of algorithm under test.""" algo_name: ClassVar[str | None] = None config: ClassVar[dict] = {} space: ClassVar[dict] = {"x": "uniform(0, 1)", "y": "uniform(0, 1)"} phases: ClassVar[list[TestPhase]] = [TestPhase("default", 0, "sample")] """ Test phases for the algorithms. Overwrte this if the algorithm has more than one phase.""" _current_phase: ClassVar[TestPhase] # Reasonable budget in number of trials where we expect the algo to match random search on the # Branin task. branin_task_max_trials: ClassVar[int] = 20 max_trials: ClassVar[int] # The max number of trials required to sufficiently test out the last phase of the algorithm. # Used as a 'delta', so that max_trials is limited to the last phase n_trials + delta. _max_last_phase_trials: ClassVar[int] = 10 # Fixtures available as class attributes: phase: ClassVar[pytest.fixture] # type: ignore objective: float = 10 """Target objective""" def __init_subclass__(cls) -> None: # Set the `algo_type` attribute, if necessary. if not hasattr(cls, "algo_type") or not cls.algo_type: if not cls.algo_name: raise RuntimeError( f"Subclasses of BaseAlgoTests must set the algo_type or algo_name attributes, " f"but class {cls.__qualname__} does not have either." ) cls.algo_type = orion.algo.base.algo_factory.get_class(cls.algo_name) if not cls.algo_name: cls.algo_name = cls.algo_type.__name__.lower() # The first test phase should always have 0 as its n_trials, since algorithms are # supposed to work starting from 0 trials. assert cls.phases[0].n_trials == 0 cls._current_phase = cls.phases[0] assert cls.phases == sorted(cls.phases, key=lambda v: v.n_trials) # Set a default value for the maximum number of trials programmatically. last_phase_start = cls.phases[-1].n_trials # NOTE: Because we auto-generate a max_trials for each class based on its phases, and we # have a default phase above, all subclasses of BaseAlgoTests will have an auto-generated # value for max_trials (even abstract ones for e.g. plugins). # For concrete test classes who use different phases than their parent, but don't define a # max_trials property, we want to auto-generate its value, and not use the max_trials of # their base class. # This is why we use `not in cls.__dict__` instead of `not hasattr(cls, "max_trials")`: if "max_trials" not in cls.__dict__: cls.max_trials = last_phase_start + cls._max_last_phase_trials elif last_phase_start > cls.max_trials - cls._max_last_phase_trials: raise ValueError( f"Test class {cls.__qualname__} isn't configured properly:\n" f"max_trials ({cls.max_trials}) should be larger than the start of the last phase " f"({cls.phases[-1].n_trials}) + delta ({cls._max_last_phase_trials}), for the last " f"phase to be properly tested. " ) elif last_phase_start > cls.max_trials + cls._max_last_phase_trials: raise ValueError( f"Test class {cls.__qualname__} isn't configured properly:\n" f"max_trials ({cls.max_trials}) is larger than necessary, making tests longer to " f"run. Set max_trials to a value that is smaller than the start of the last phase " f"({last_phase_start}) + some delta (for example, {cls._max_last_phase_trials}), " f"so tests run efficiently." ) # Inform the TestPhase object of their neighbours. # This can be used by tests to get the duration, start, end, etc of the test phases. previous: TestPhase | None = None for test_phase in cls.phases: if previous is not None: = test_phase test_phase.prev = previous previous = test_phase cls.phases[-1].next = cls.max_trials @pytest.fixture( name="phase", autouse=True, params=cls.phases, ids=[ for phase in cls.phases], ) @classmethod def phase(cls, request: pytest.FixtureRequest): """Fixture to parametrize tests with different phases.""" test_phase: TestPhase = request.param # type: ignore # Temporarily change the class attribute holding the current phase. original_phase = cls._current_phase cls._current_phase = test_phase # NOTE: If we want to actually use this spy stuff, We could create a spy for each # phase, and then in create_algo, after the force_observe, for each (phase, spy) # pair, check that the call_count is equal to phase.n_trials - prev_phase.n_trials # or something similar. yield test_phase cls._current_phase = original_phase # Store it somewhere on the class so it gets included in the test scope. cls.phase = phase # type: ignore @pytest.fixture() def first_phase(self, phase: TestPhase): if phase != type(self).phases[0]: pytest.skip("Test runs only on first phase.") return phase @pytest.fixture() def last_phase(self, phase: TestPhase): if phase != type(self).phases[-1]: pytest.skip("Test runs only on last phase.") return phase
[docs] @classmethod def set_phases(cls, phases: Sequence[TestPhase]): """Parametrize the tests with different phases. Some algorithms have different phases that should be tested. For instance TPE have a first phase of random search and a second of Bayesian Optimization. The random search and Bayesian optimization are different implementations and both should be tested. Parameters ---------- phases: list of tuples The different phases to test. The format of the tuples should be (str(id of the test), int(number of trials before the phase begins), str(name of the algorithm's attribute to spy (ex: "space.sample")) ) """ cls.phases = [TestPhase(*phase) for phase in phases]
[docs] @classmethod def create_algo( cls, config: dict | None = None, space: Space | None = None, seed: int | Sequence[int] | None = None, n_observed_trials: int | None = None, **kwargs, ): """Create the algorithm based on config. Also initializes the algorithm with the required number of random trials from the previous test phases before returning it. Parameters ---------- config: dict, optional The configuration for the algorithm. ``cls.config`` will be used if ``config`` is ``None``. space: ````, optional Space object to pass to algo. The output of ``cls.create_space()`` will be used if ``space`` is ``None``. seed: int | Sequence[int], optional When passed, `seed_rng` is called before observing anything. n_observed_trials: int | None, optional Number of trials that the algorithm should have already observed when returned. When ``None`` (default), observes the number of trials at which the current phase begins. When set to 0, the algorithm will be freshly initialized. kwargs: dict Values to override algorithm configuration. """ algo_kwargs = copy.deepcopy(config or cls.config) algo_kwargs.update(kwargs) original_space = space or cls.create_space() algo = create_algo(space=original_space, algo_type=cls.algo_type, **algo_kwargs) algo.max_trials = cls.max_trials # Seed the randomness before we observe anything. if seed is not None: algo.seed_rng(seed) if n_observed_trials is None: n_observed_trials = cls._current_phase.n_trials if n_observed_trials: assert n_observed_trials > 0 # Force the algo to observe the given number of trials. cls.force_observe(num=n_observed_trials, algo=algo) assert algo.n_observed == n_observed_trials return algo
[docs] def update_space(self, test_space: dict) -> dict: """Get complete space configuration with partial overwrite The values passed in ``test_space`` will override the default values in ``self.config``. Parameters ---------- test_space: dict The configuration for the space. """ space = copy.deepcopy( space.update(test_space) return space
[docs] @classmethod def create_space(cls, space: dict | None = None): """Create the space object Parameters ---------- space: dict, optional Configuration of the search space. The default ```` will be used if ``space`` is ``None``. """ return SpaceBuilder().build(space if space is not None else
[docs] @classmethod def observe_trials( cls, trials: list[Trial], algo: BaseAlgorithm, rng: numpy.random.RandomState ): """Make the algorithm observe trials Parameters ---------- trials: list of ``orion.core.worker.trial.Trial`` algo: ``orion.algo.base.BaseAlgorithm`` rng: ``numpy.random.RandomState`` Random number generator to generate random objectives. """ backward.algo_observe( algo, trials, [dict(objective=rng.normal()) for _ in trials], )
[docs] @classmethod def get_num(cls, num: int): """Force number of trials to suggest Some algorithms must be tested with specific number of suggests at a time (ex: ASHA). This method can be overridden to change ``num`` based on the special needs. TODO: Remove this or give it a better name. """ return num
[docs] @classmethod def force_observe(cls, num: int, algo: BaseAlgorithm, seed: int = 1): """Force observe ``num`` trials. Parameters ---------- num: int Number of trials to suggest and observe. algo: ``orion.algo.base.BaseAlgorithm`` The algorithm that must suggest and observe. seed: int, optional The seed used to generate random objectives Raises ------ RuntimeError - If the algorithm returns duplicates. Algorithms may return duplicates across workers, but in sequential scenarios as here, it should not happen. - If the algorithm fails to sample any trial at least 5 times. """ rng = numpy.random.RandomState(seed) failed = 0 MAX_FAILED = 5 ids = set() while not algo.is_done and algo.n_observed < num and failed < MAX_FAILED: trials = algo.suggest(cls.get_num(num - algo.n_observed)) if len(trials) == 0: failed += 1 continue for trial in trials: if trial.hash_name in ids: raise RuntimeError(f"algo suggested a duplicate: {trial}") ids.add(trial.hash_name) cls.observe_trials(trials, algo, rng) if failed >= MAX_FAILED: raise RuntimeError( f"Algorithm cannot sample more than {algo.n_observed} trials. Is it normal?" )
[docs] def assert_dim_type_supported(self, test_space: dict): """Test that a given dimension type is properly supported by the algorithm This will test that the algorithm sample trials valid for the given type and that the algorithm can observe these trials. Parameters ---------- test_space: the search space of the test. """ space = self.create_space(self.update_space(test_space)) algo = self.create_algo(space=space) trials = algo.suggest(1) assert len(trials) > 0 assert trials[0] in space self.observe_trials(trials, algo, numpy.random.RandomState(1))
[docs] @first_phase_only def test_configuration(self): """Test that configuration property attribute contains all class arguments.""" algo = self.create_algo() assert algo.configuration != self.create_algo(config={}) assert algo.configuration == {self.algo_name: self.config}
[docs] def test_get_id(self): """Test that the id hashing is valid""" space = self.create_space( space=self.update_space({"f": "fidelity(1, 10, base=2)"}) ) algo = self.create_algo(space=space) def get_id(point, ignore_fidelity=False, exp_id=None): trial = format_trials.tuple_to_trial(point, space) trial.experiment = exp_id return algo.get_id( trial, ignore_fidelity=ignore_fidelity, ) assert get_id([1, 1, 1]) == get_id([1, 1, 1]) assert get_id([1, 1, 1]) != get_id([1, 2, 2]) assert get_id([1, 1, 1]) != get_id([2, 1, 1]) assert get_id([1, 1, 1], ignore_fidelity=False) == get_id( [1, 1, 1], ignore_fidelity=False ) # Fidelity changes id assert get_id([1, 1, 1], ignore_fidelity=False) != get_id( [2, 1, 1], ignore_fidelity=False ) # Non-fidelity changes id assert get_id([1, 1, 1], ignore_fidelity=False) != get_id( [1, 1, 2], ignore_fidelity=False ) assert get_id([1, 1, 1], ignore_fidelity=True) == get_id( [1, 1, 1], ignore_fidelity=True ) # Fidelity does not change id assert get_id([1, 1, 1], ignore_fidelity=True) == get_id( [2, 1, 1], ignore_fidelity=True ) # Non-fidelity still changes id assert get_id([1, 1, 1], ignore_fidelity=True) != get_id( [1, 1, 2], ignore_fidelity=True ) # Experiment id is ignored assert get_id([1, 1, 1], exp_id=1) == get_id([1, 1, 1], exp_id=2)
[docs] @pytest.mark.parametrize("seed", [123, 456]) def test_seed_rng(self, seed: int): """Test that the seeding gives reproducible results.""" numpy.random.seed(seed) algo = self.create_algo(seed=seed) trial_a = algo.suggest(1)[0] numpy.random.seed(seed) new_algo = self.create_algo(seed=seed) assert new_algo.n_observed == algo.n_observed trial_b = new_algo.suggest(1)[0] assert trial_b == trial_a
[docs] @first_phase_only def test_seed_rng_init(self): """Test that if the algo has a `seed` constructor argument and a value is passed, the suggested trials are reproducible. """ if "seed" not in inspect.signature(self.algo_type).parameters: pytest.skip("algo does not have a seed as a constructor argument.") config = self.config.copy() config["seed"] = 1 algo = self.create_algo(config=config) state = algo.state_dict first_trial = algo.suggest(1)[0] second_trial = algo.suggest(1)[0] assert first_trial != second_trial config = self.config.copy() config["seed"] = 2 new_algo = self.create_algo(config=config) new_algo_state = new_algo.state_dict different_seed_trial = new_algo.suggest(1)[0] if _are_equal(new_algo_state, state): assert different_seed_trial == first_trial else: assert different_seed_trial != first_trial config = self.config.copy() config["seed"] = 1 new_algo = self.create_algo(config=config) same_seed_trial = new_algo.suggest(1)[0] assert same_seed_trial == first_trial
[docs] @pytest.mark.parametrize("seed", [123, 456]) def test_state_dict(self, seed: int, phase: TestPhase): """Verify that resetting state makes sampling deterministic. The "source" algo is initialized at the start of each phase. The "target" algo instance is set to different initial conditions. This checks that it always gives the same suggestion as the original algo after set_state is used. """ algo = self.create_algo(seed=seed) state = algo.state_dict a = algo.suggest(1)[0] # Create a new algo, without setting a seed. # The other algorithm is initialized at the start of the next phase. n_initial_trials = phase.end_n_trials # Use max_trials-1 so the algo can always sample at least one trial. if n_initial_trials == self.max_trials: n_initial_trials -= 1 # NOTE: Seed is part of configuration, not state. Configuration is assumed to be the same # for both algorithm instances. new_algo = self.create_algo(n_observed_trials=n_initial_trials, seed=seed) new_state = new_algo.state_dict b = new_algo.suggest(1)[0] # NOTE: For instance, if the algo doesn't have any RNG (e.g. GridSearch), this could be # True: if _are_equal(new_state, state): # If the state is the same, the trials should be the same. assert a == b else: # If the state is different, the trials should be different. assert a != b new_algo.set_state(state) c = new_algo.suggest(1)[0] assert a == c
[docs] def test_suggest_n(self): """Verify that suggest returns correct number of trials if ``num`` is specified in ``suggest``.""" algo = self.create_algo() trials = algo.suggest(5) assert trials is not None assert len(trials) == 5
[docs] def test_has_suggested(self): """Verify that algorithm detects correctly if a trial was suggested""" algo = self.create_algo() a = algo.suggest(1)[0] assert algo.has_suggested(a)
# NOTE: not algo.has_suggested(some random trial) is tested in test_has_suggested_statedict
[docs] def test_has_suggested_statedict(self): """Verify that algorithm detects correctly if a trial was suggested even when state was restored.""" algo = self.create_algo() a = algo.suggest(1)[0] state = algo.state_dict assert algo.has_suggested(a) algo = self.create_algo() assert not algo.has_suggested(a) algo.set_state(state) assert algo.has_suggested(a)
[docs] def test_observe(self): """Verify that algorithm observes trial without any issues""" algo = self.create_algo() a =[0] backward.algo_observe(algo, [a], [dict(objective=1)]) b = algo.suggest(1)[0] backward.algo_observe(algo, [b], [dict(objective=2)])
[docs] def test_has_observed(self): """Verify that algorithm detects correctly if a trial was observed""" algo = self.create_algo() a = algo.suggest(1)[0] assert not algo.has_observed(a) backward.algo_observe(algo, [a], [dict(objective=1)]) assert algo.has_observed(a) b = algo.suggest(1)[0] assert not algo.has_observed(b) backward.algo_observe(algo, [b], [dict(objective=2)]) assert algo.has_observed(b)
[docs] def test_has_observed_statedict(self): """Verify that algorithm detects correctly if a trial was observed even when state was restored.""" algo = self.create_algo() a = algo.suggest(1)[0] backward.algo_observe(algo, [a], [dict(objective=1)]) state = algo.state_dict algo = self.create_algo() assert not algo.has_observed(a) algo.set_state(state) assert algo.has_observed(a) b = algo.suggest(1)[0] backward.algo_observe(algo, [b], [dict(objective=2)]) state = algo.state_dict algo = self.create_algo() assert not algo.has_observed(b) algo.set_state(state) assert algo.has_observed(b)
[docs] def test_n_suggested(self): """Verify that algorithm returns correct number of suggested trials""" algo = self.create_algo() initial = algo.n_suggested algo.suggest(1) assert algo.n_suggested == initial + 1
[docs] def test_n_observed(self): """Verify that algorithm returns correct number of observed trials""" algo = self.create_algo() initial = algo.n_observed trials = algo.suggest(1) assert algo.n_observed == initial assert len(trials) == 1 self.observe_trials(trials, algo, numpy.random.RandomState(1)) assert algo.n_observed == initial + 1
[docs] def test_real_data(self): """Test that algorithm supports real dimensions""" self.assert_dim_type_supported({"x": "uniform(0, 5)"})
[docs] def test_int_data(self): """Test that algorithm supports integer dimensions""" self.assert_dim_type_supported({"x": "uniform(0, 5000, discrete=True)"})
[docs] def test_cat_data(self): """Test that algorithm supports categorical dimensions""" self.assert_dim_type_supported( { # Add 3 dims so that there exists many possible trials for the test "x": "choices(['a', 0.2, 1, None])", "y": "choices(['a', 0.2, 1, None])", "z": "choices(['a', 0.2, 1, None])", }, )
[docs] def test_logreal_data(self): """Test that algorithm supports logreal dimensions""" self.assert_dim_type_supported({"x": "loguniform(1, 5)"})
[docs] def test_logint_data(self): """Test that algorithm supports loginteger dimensions""" self.assert_dim_type_supported({"x": "loguniform(1, 100, discrete=True)"})
[docs] def test_shape_data(self): """Test that algorithm supports dimensions with shape""" self.assert_dim_type_supported({"x": "uniform(0, 5, shape=(3, 2))"})
[docs] def test_broken_trials(self): """Test that algorithm can handle broken trials""" algo = self.create_algo() trial = algo.suggest(1)[0] trial.status = "broken" assert not algo.has_observed(trial) algo.observe([trial]) assert algo.has_observed(trial)
[docs] @first_phase_only def test_is_done_cardinality(self): """Test that algorithm will stop when cardinality is reached""" space = SpaceBuilder().build( { "x": "uniform(0, 4, discrete=True)", "y": "choices(['a', 'b', 'c'])", "z": "loguniform(1, 6, discrete=True)", } ) assert space.cardinality == 5 * 3 * 6 algo = self.create_algo(space=space) # Prevent the algo from exiting early because of a max_trials limit. algo.algorithm.max_trials = None i = 0 for i, (x, y, z) in enumerate(itertools.product(range(5), "abc", range(1, 7))): assert not algo.is_done n = algo.n_suggested backward.algo_observe( algo, [format_trials.tuple_to_trial([x, y, z], space)], [dict(objective=i)], ) assert algo.n_suggested == n + 1 assert i + 1 == space.cardinality assert algo.is_done
[docs] @last_phase_only def test_is_done_max_trials(self): """Test that algorithm will stop when max trials is reached""" algo = self.create_algo() # NOTE: Once is merged, we could update this to # force observe self.max_trials - phase.n_trials instead. self.force_observe(self.max_trials, algo) assert algo.is_done
[docs] @first_phase_only def test_optimize_branin(self): """Test that algorithm optimizes a simple task comparably to random search.""" max_trials = type(self).branin_task_max_trials task = Branin() space = self.create_space(task.get_search_space()) algo = self.create_algo(space=space) algo.algorithm.max_trials = max_trials all_suggested_trials: list[Trial] = [] all_objectives: list[float] = [] # NOTE: Some algos work more effectively if they are asked to produce a batch of trials, # rather than a single trial at a time. max_batch_size = 5 while len(all_suggested_trials) < max_trials and not algo.is_done: trials = algo.suggest(max_batch_size) all_suggested_trials.extend(trials) results = [task(**trial.params) for trial in trials] # NOTE: This is true for the branin task. If we ever test other tasks, this could vary. assert all(len(result) == 1 for result in results) new_objectives = [result[0]["value"] for result in results] all_objectives.extend(new_objectives) # NOTE: Not ideal that we have to unpack and repack the results of the task. results_for_backward_observe = [ {"objective": objective} for objective in new_objectives ] backward.algo_observe( algo=algo, trials=trials, results=results_for_backward_observe ) assert algo.is_done assert min(all_objectives) <= self.objective
[docs]class BaseParallelStrategyTests: """Generic Test-suite for parallel strategies. This test-suite follow the same logic than BaseAlgoTests, but applied for ParallelStrategy classes. """ parallel_strategy_name = None config = {} expected_value = None default_value = None
[docs] def create_strategy(self, config=None, **kwargs): """Create the parallel strategy based on config. Parameters ---------- config: dict, optional The configuration for the parallel strategy. ``self.config`` will be used if ``config`` is ``None``. kwargs: dict Values to override strategy configuration. """ config = copy.deepcopy(config or self.config) config.update(kwargs) return strategy_factory.create(**self.config)
[docs] def get_trials(self): """10 objective observations""" trials = [] for i in range(10): trials.append( Trial( params=[{"name": "x", "type": "real", "value": i}], results=[{"name": "objective", "type": "objective", "value": i}], status="completed", ) ) return trials
[docs] def get_noncompleted_trial(self, status="reserved"): """Return a single trial without results""" return Trial( params=[{"name": "a", "type": "integer", "value": 6}], status=status )
[docs] def get_corrupted_trial(self): """Return a corrupted trial with results but status reserved""" return Trial( params=[{"name": "a", "type": "integer", "value": 6}], results=[{"name": "objective", "type": "objective", "value": 1}], status="reserved", )
[docs] def test_configuration(self): """Test that configuration property attribute contains all class arguments.""" strategy = self.create_strategy() assert strategy.configuration != self.create_strategy(config={}) assert strategy.configuration == self.config
[docs] def test_state_dict(self): """Verify state is restored properly""" strategy = self.create_strategy() strategy.observe(self.get_trials()) new_strategy = self.create_strategy() assert strategy.state_dict != new_strategy.state_dict new_strategy.set_state(strategy.state_dict) assert strategy.state_dict == new_strategy.state_dict noncompleted_trial = self.get_noncompleted_trial() if strategy.infer(noncompleted_trial) is None: assert strategy.infer(noncompleted_trial) == new_strategy.infer( noncompleted_trial ) else: assert ( strategy.infer(noncompleted_trial).objective.value == new_strategy.infer(noncompleted_trial).objective.value )
[docs] def test_infer_no_history(self): """Test that strategy can infer even without having seen trials""" noncompleted_trial = self.get_noncompleted_trial() trial = self.create_strategy().infer(noncompleted_trial) if self.expected_value is None: assert trial is None elif self.default_value is None: assert trial.objective.value == self.expected_value else: assert trial.objective.value == self.default_value
[docs] def test_handle_corrupted_trials(self, caplog): """Test that strategy can handle trials that has objective but status is not properly set to completed.""" corrupted_trial = self.get_corrupted_trial() with caplog.at_level( logging.WARNING, logger="orion.algo.base.parallel_strategy" ): trial = self.create_strategy().infer(corrupted_trial) match = "Trial `{}` has an objective but status is not completed".format( ) assert match in caplog.text assert trial is not None assert trial.objective.value == corrupted_trial.objective.value
def test_handle_noncompleted_trials(self, caplog): with caplog.at_level( logging.WARNING, logger="orion.algo.base.parallel_strategy" ): self.create_strategy().infer(self.get_noncompleted_trial()) assert ( "Trial `{}` has an objective but status is not completed" not in caplog.text )
[docs] def test_strategy_value(self): """Test that ParallelStrategy returns the expected value""" strategy = self.create_strategy() strategy.observe(self.get_trials()) trial = strategy.infer(self.get_noncompleted_trial()) if self.expected_value is None: assert trial is None else: assert trial.objective.value == self.expected_value