Source code for orion.testing.algo

# -*- coding: utf-8 -*-
"""Generic tests for Algorithms"""
import copy
import functools
import inspect
import itertools
from collections import defaultdict

import numpy
import pytest

import orion.algo.base
from orion.algo.asha import ASHA
from orion.algo.gridsearch import GridSearch
from orion.algo.hyperband import Hyperband
from orion.algo.random import Random
from orion.algo.tpe import TPE
from orion.benchmark.task.branin import Branin
from import SpaceBuilder
from orion.core.utils import backward, format_trials
from orion.core.worker.primary_algo import SpaceTransformAlgoWrapper
from import build_space

algorithms = {
    "evolutionaryes": "what is the config?",

[docs]def recursive_getattr(obj, attribute): """Get attribute recursively based on str with pattern sub.(sub.)*name""" attributes = attribute.split(".") attribute = getattr(obj, attributes[0]) if len(attributes) == 1: return attribute return recursive_getattr(attribute, ".".join(attributes[1:]))
[docs]def spy_attr(mocker, algo, attribute): """Return a mocker.spy object on the algorithms's given `attribute`""" attributes = attribute.split(".") attr_to_mock = attributes[-1] if len(attributes) > 1: obj = recursive_getattr(algo, ".".join(attributes[:-1])) else: obj = algo return mocker.spy(obj, attr_to_mock)
methods_with_phase = defaultdict(set) phase_docstring = """\ This test is parametrizable with phases. See ``orion.testing.algo.BaseAlgoTests.set_phases``.\ """
[docs]def phase(method): """Decorator to mark methods that must be parametrized with phases.""" class_name = ".".join(method.__qualname__.split(".")[:-1]) methods_with_phase[class_name].add(method.__name__) if method.__doc__ is None: method.__doc__ = phase_docstring else: method.__doc__ += "\n\n" + phase_docstring return method
[docs]def parametrize_this(cls, method_name, attrs, ids): """Parametrize a method with phases. Notes ----- We need to replace the method to avoid parametrizing the base one. This is a fugly hack because of pytest limitations with inheritance for test classes. """ base_method = getattr(cls, method_name) def method(self, mocker, num, attr): print(method_name, base_method) return base_method(self, mocker, num, attr) setattr(cls, method_name, method) pytest.mark.parametrize("num,attr", attrs, ids=ids)(method)
[docs]def customized_mutate_example(search_space, rng, old_value, **kwargs): """Define a customized mutate function example""" multiply_factor = kwargs.pop("multiply_factor", 3.0) add_factor = kwargs.pop("add_factor", 1) if search_space.type == "real": new_value = old_value / multiply_factor elif search_space.type == "integer": new_value = int(old_value + add_factor) else: new_value = old_value return new_value
[docs]class BaseAlgoTests: """Generic Test-suite for HPO algorithms. This test-suite covers all typical cases for HPO algorithms. To use it for a new algorithm, the class inheriting from this one must redefine the attributes ``algo_name`` with the name of the algorithm used to create it with the algorithm factory ``orion.core.worker.primary_algo.SpaceTransformAlgoWrapper`` and ``config`` with a base configuration for the algorithm that contains all its arguments. The base space can be redefine if needed with the attribute ``space``. Many algorithms have different phases that should be tested. For instance TPE have a first phase of random search and a second of Bayesian Optimization. The random search and Bayesian optimization are different implementations and both should be tested. For this reason, the class method ``orion.testing.algo.BaseAlgoTests.set_phases`` must be called to parametrize the tests with phases. Failure to doing so will causes the tests to crash. See ``tests/unittests/algo/`` for an example. """ algo_name = None config = {} max_trials = 200 space = {"x": "uniform(0, 1)", "y": "uniform(0, 1)"}
[docs] @classmethod def set_phases(cls, phases): """Parametrize the tests with different phases. Some algorithms have different phases that should be tested. For instance TPE have a first phase of random search and a second of Bayesian Optimization. The random search and Bayesian optimization are different implementations and both should be tested. Parameters ---------- phases: list of tuples The different phases to test. The format of the tuples should be (str(id of the test), int(number of trials before the phase begins), str(name of the algorithm's attribute to spy (ex: "space.sample")) ) """ ids = [phase[0] for phase in phases] attrs = [phase[1:] for phase in phases] cls_methods_with_phase = ( methods_with_phase["BaseAlgoTests"] | methods_with_phase[cls.__name__] ) for method_name in sorted(cls_methods_with_phase): parametrize_this(cls, method_name, attrs, ids)
[docs] def create_algo(self, config=None, space=None, **kwargs): """Create the algorithm based on config. Parameters ---------- config: dict, optional The configuration for the algorithm. ``self.config`` will be used if ``config`` is ``None``. space: ````, optional Space object to pass to algo. The output of ``self.create_space()`` will be used if ``space`` is ``None``. kwargs: dict Values to override algorithm configuration. """ config = copy.deepcopy(config or self.config) config.update(kwargs) algo = SpaceTransformAlgoWrapper( orion.algo.base.algo_factory.get_class(self.algo_name), space or self.create_space(), **config, ) algo.algorithm.max_trials = self.max_trials return algo
[docs] def update_space(self, test_space): """Get complete space configuration with partial overwrite The values passed in ``test_space`` will override the default values in ``self.config``. Parameters ---------- test_space: dic The configuration for the space. """ space = copy.deepcopy( space.update(test_space) return space
[docs] def create_space(self, space=None): """Create the space object Parameters ---------- space: dict, optional Configuration of the search space. The default ```` will be used if ``space`` is ``None``. """ return SpaceBuilder().build(space if space is not None else
[docs] def observe_trials(self, trials, algo, objective=0): """Make the algorithm observe trials Parameters ---------- trials: list of ``orion.core.worker.trial.Trial`` Trials formatted as tuples of values algo: ``orion.algo.base.BaseAlgorithm`` The algorithm used to observe trials. objective: int, optional The base objective for the trials. All objectives will have value ``objective + i``. Defaults to 0. """ backward.algo_observe( algo, trials, [dict(objective=objective + i) for i in range(len(trials))] )
[docs] def get_num(self, num): """Force number of trials to suggest Some algorithms must be tested with specific number of suggests at a time (ex: ASHA). This method can be overriden to change ``num`` based on the special needs. """ return num
[docs] def force_observe(self, num, algo): """Force observe ``num`` trials. Parameters ---------- num: int Number of trials to suggest and observe. algo: ``orion.algo.base.BaseAlgorithm`` The algorithm that must suggest and observe. Raises ------ RuntimeError - If the algorithm returns duplicates. Algorithms may return duplicates across workers, but in sequential scenarios as here, it should not happen. - If the algorithm fails to sample any trial at least 5 times. """ objective = 0 failed = 0 MAX_FAILED = 5 ids = set() while not algo.is_done and algo.n_observed < num and failed < MAX_FAILED: trials = algo.suggest(self.get_num(num - algo.n_observed)) if len(trials) == 0: failed += 1 continue for trial in trials: if trial.hash_name in ids: raise RuntimeError(f"algo suggested a duplicate: {trial}") ids.add(trial.hash_name) self.observe_trials(trials, algo, objective) objective += len(trials) if failed >= MAX_FAILED: raise RuntimeError( f"Algorithm cannot sample more than {algo.n_observed} trials. Is it normal?" )
[docs] def spy_phase(self, mocker, num, algo, attribute): """Force observe ``num`` trials and then mock a given method to count calls. Parameters ---------- mocker: ``pytest_mock.mocker`` Mocker from ``pytest_mock``. Should be given by fixtures of the tests. num: int Number of trials to suggest and observe algo: ``orion.algo.base.BaseAlgorithm`` The algorithm to test attribute: str The algorithm attribute or method to mock. The path is respective to the algorithm object. For example, a valid value would be `'space.sample'` which will mock ````. """ self.force_observe(num, algo) spy = spy_attr(mocker, algo.algorithm, attribute) return spy
[docs] def assert_callbacks(self, spy, num, algo): """Callback to make special asserts at end of tests Override this method in algorithm test-suite to customize verifications done at end of tests. Parameters ---------- spy: Mocked object Object mocked by ``BaseAlgoTests.spy_phase``. num: int number of trials of the phase. algo: ``orion.algo.base.BaseAlgorithm`` The algorithm being tested. """ pass
[docs] def assert_dim_type_supported(self, mocker, num, attr, test_space): """Test that a given dimension type is properly supported by the algorithm This will test that the algorithm sample trials valid for the given type and that the algorithm can observe these trials. Parameters ---------- mocker: ``pytest_mock.mocker`` Mocker from ``pytest_mock``. Should be given by fixtures of the tests. num: int Number of trials to suggest and observe algo: ``orion.algo.base.BaseAlgorithm`` The algorithm to test attribute: str The algorithm attribute or method to mock. The path is respective to the algorithm object. For example, a valid value would be `'space.sample'` which will mock ````. """ space = self.create_space(self.update_space(test_space)) algo = self.create_algo(space=space) spy = self.spy_phase(mocker, num, algo, attr) trials = algo.suggest(1) assert trials[0] in space spy.call_count == 1 self.observe_trials(trials, algo, 1) self.assert_callbacks(spy, num, algo)
[docs] def test_configuration(self): """Test that configuration property attribute contains all class arguments.""" algo = self.create_algo() assert algo.configuration != self.create_algo(config={}) assert algo.configuration == {self.algo_name: self.config}
[docs] def test_get_id(self): """Test that the id hashing is valid""" space = self.create_space( space=self.update_space({"f": "fidelity(1, 10, base=2)"}) ) algo = self.create_algo(space=space) def get_id(point, ignore_fidelity=False, exp_id=None): trial = format_trials.tuple_to_trial(point, space) trial.experiment = exp_id return algo.get_id( trial, ignore_fidelity=ignore_fidelity, ) assert get_id([1, 1, 1]) == get_id([1, 1, 1]) assert get_id([1, 1, 1]) != get_id([1, 2, 2]) assert get_id([1, 1, 1]) != get_id([2, 1, 1]) assert get_id([1, 1, 1], ignore_fidelity=False) == get_id( [1, 1, 1], ignore_fidelity=False ) # Fidelity changes id assert get_id([1, 1, 1], ignore_fidelity=False) != get_id( [2, 1, 1], ignore_fidelity=False ) # Non-fidelity changes id assert get_id([1, 1, 1], ignore_fidelity=False) != get_id( [1, 1, 2], ignore_fidelity=False ) assert get_id([1, 1, 1], ignore_fidelity=True) == get_id( [1, 1, 1], ignore_fidelity=True ) # Fidelity does not change id assert get_id([1, 1, 1], ignore_fidelity=True) == get_id( [2, 1, 1], ignore_fidelity=True ) # Non-fidelity still changes id assert get_id([1, 1, 1], ignore_fidelity=True) != get_id( [1, 1, 2], ignore_fidelity=True ) # Experiment id is ignored assert get_id([1, 1, 1], exp_id=1) == get_id([1, 1, 1], exp_id=2)
[docs] @phase def test_seed_rng(self, mocker, num, attr): """Test that the seeding gives reproducibile results.""" algo = self.create_algo() seed = numpy.random.randint(10000) algo.seed_rng(seed) spy = self.spy_phase(mocker, num, algo, attr) trials = algo.suggest(1) trials[0].id != algo.suggest(1)[0].id new_algo = self.create_algo() new_algo.seed_rng(seed) self.force_observe(algo.n_observed, new_algo) assert trials[0].id == new_algo.suggest(1)[0].id self.assert_callbacks(spy, num, new_algo)
[docs] @phase def test_seed_rng_init(self, mocker, num, attr): """Test that the seeding gives reproducibile results.""" algo = self.create_algo(seed=1) spy = self.spy_phase(mocker, num, algo, attr) trials = algo.suggest(1) algo.suggest(1)[0].id != trials[0].id new_algo = self.create_algo(seed=2) self.force_observe(algo.n_observed, new_algo) assert new_algo.suggest(1)[0].id != trials[0].id new_algo = self.create_algo(seed=1) self.force_observe(algo.n_observed, new_algo) assert new_algo.suggest(1)[0].id == trials[0].id self.assert_callbacks(spy, num, new_algo)
[docs] @phase def test_state_dict(self, mocker, num, attr): """Verify that resetting state makes sampling deterministic""" algo = self.create_algo() seed = numpy.random.randint(10000) algo.seed_rng(seed) spy = self.spy_phase(mocker, max(num, 1), algo, attr) state = algo.state_dict a = algo.suggest(1)[0] new_algo = self.create_algo() assert != new_algo.suggest(1)[0].id new_algo.set_state(state) assert == new_algo.suggest(1)[0].id self.assert_callbacks(spy, num, algo)
[docs] @phase def test_suggest_n(self, mocker, num, attr): """Verify that suggest returns correct number of trials if ``num`` is specified in ``suggest``.""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) trials = algo.suggest(5) assert len(trials) == 5
[docs] @phase def test_has_suggested(self, mocker, num, attr): """Verify that algorithm detects correctly if a trial was suggested""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) a = algo.suggest(1)[0] assert algo.has_suggested(a)
# NOTE: not algo.has_suggested(some random trial) is tested in test_has_suggested_statedict
[docs] @phase def test_has_suggested_statedict(self, mocker, num, attr): """Verify that algorithm detects correctly if a trial was suggested even when state was restored.""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) a = algo.suggest(1)[0] state = algo.state_dict assert algo.has_suggested(a) algo = self.create_algo() assert not algo.has_suggested(a) algo.set_state(state) assert algo.has_suggested(a)
[docs] @phase def test_observe(self, mocker, num, attr): """Verify that algorithm observes trial without any issues""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) a =[0] backward.algo_observe(algo, [a], [dict(objective=1)]) b = algo.suggest(1)[0] backward.algo_observe(algo, [b], [dict(objective=2)])
[docs] @phase def test_has_observed(self, mocker, num, attr): """Verify that algorithm detects correctly if a trial was observed""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) a = algo.suggest(1)[0] assert not algo.has_observed(a) backward.algo_observe(algo, [a], [dict(objective=1)]) assert algo.has_observed(a) b = algo.suggest(1)[0] assert not algo.has_observed(b) backward.algo_observe(algo, [b], [dict(objective=2)]) assert algo.has_observed(b)
[docs] @phase def test_has_observed_statedict(self, mocker, num, attr): """Verify that algorithm detects correctly if a trial was observed even when state was restored.""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) a = algo.suggest(1)[0] backward.algo_observe(algo, [a], [dict(objective=1)]) state = algo.state_dict algo = self.create_algo() assert not algo.has_observed(a) algo.set_state(state) assert algo.has_observed(a) b = algo.suggest(1)[0] backward.algo_observe(algo, [b], [dict(objective=2)]) state = algo.state_dict algo = self.create_algo() assert not algo.has_observed(b) algo.set_state(state) assert algo.has_observed(b)
[docs] @phase def test_n_suggested(self, mocker, num, attr): """Verify that algorithm returns correct number of suggested trials""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) assert algo.n_suggested == num algo.suggest(1) assert algo.n_suggested == num + 1
[docs] @phase def test_n_observed(self, mocker, num, attr): """Verify that algorithm returns correct number of observed trials""" algo = self.create_algo() spy = self.spy_phase(mocker, num, algo, attr) assert algo.n_observed == num trials = algo.suggest(1) assert algo.n_observed == num self.observe_trials(trials, algo) assert algo.n_observed == num + 1
[docs] @phase def test_real_data(self, mocker, num, attr): """Test that algorithm supports real dimesions""" self.assert_dim_type_supported(mocker, num, attr, {"x": "uniform(0, 5)"})
[docs] @phase def test_int_data(self, mocker, num, attr): """Test that algorithm supports integer dimesions""" self.assert_dim_type_supported( mocker, num, attr, {"x": "uniform(0, 5000, discrete=True)"} )
[docs] @phase def test_cat_data(self, mocker, num, attr): """Test that algorithm supports categorical dimesions""" self.assert_dim_type_supported( mocker, num, attr, { # Add 3 dims so that there exists many possible trials for the test "x": "choices(['a', 0.2, 1, None])", "y": "choices(['a', 0.2, 1, None])", "z": "choices(['a', 0.2, 1, None])", }, )
[docs] @phase def test_logreal_data(self, mocker, num, attr): """Test that algorithm supports logreal dimesions""" self.assert_dim_type_supported(mocker, num, attr, {"x": "loguniform(1, 5)"})
[docs] @phase def test_logint_data(self, mocker, num, attr): """Test that algorithm supports loginteger dimesions""" self.assert_dim_type_supported( mocker, num, attr, {"x": "loguniform(1, 100, discrete=True)"} )
[docs] @phase def test_shape_data(self, mocker, num, attr): """Test that algorithm supports dimesions with shape""" self.assert_dim_type_supported( mocker, num, attr, {"x": "uniform(0, 5, shape=(3, 2))"} )
[docs] def test_is_done_cardinality(self): """Test that algorithm will stop when cardinality is reached""" space = self.update_space( { "x": "uniform(0, 4, discrete=True)", "y": "choices(['a', 'b', 'c'])", "z": "loguniform(1, 6, discrete=True)", } ) space = self.create_space(space) assert space.cardinality == 5 * 3 * 6 algo = self.create_algo(space=space) for i, (x, y, z) in enumerate(itertools.product(range(5), "abc", range(1, 7))): assert not algo.is_done n = algo.n_suggested backward.algo_observe( algo, [format_trials.tuple_to_trial([x, y, z], space)], [dict(objective=i)], ) assert algo.n_suggested == n + 1 assert i + 1 == space.cardinality assert algo.is_done
[docs] def test_is_done_max_trials(self): """Test that algorithm will stop when max trials is reached""" algo = self.create_algo() self.force_observe(self.max_trials, algo) assert algo.is_done
[docs] def test_optimize_branin(self): """Test that algorithm optimizes somehow (this is on-par with random search)""" MAX_TRIALS = 20 task = Branin() space = self.create_space(task.get_search_space()) algo = self.create_algo(config={}, space=space) algo.algorithm.max_trials = MAX_TRIALS safe_guard = 0 trials = [] objectives = [] while trials or not algo.is_done: if safe_guard >= MAX_TRIALS: break if not trials: trials = algo.suggest(MAX_TRIALS - len(objectives)) trial = trials.pop(0) results = task(trial.params["x"]) objectives.append(results[0]["value"]) backward.algo_observe(algo, [trial], [dict(objective=objectives[-1])]) safe_guard += 1 assert algo.is_done assert min(objectives) <= 10