Source code for orion.algo.dehb.dehb

"""
:mod:`orion.algo.dehb.dehb -- DEHB
==================================

Module for the wrapper around DEHB: https://github.com/automl/DEHB.
"""
from __future__ import annotations

import logging
from collections import defaultdict
from copy import deepcopy
from typing import ClassVar, NamedTuple

import numpy

from orion.algo.base import BaseAlgorithm
from orion.algo.dehb.brackets import SHBracketManager
from orion.algo.space import Fidelity, Space
from orion.core.utils import format_trials
from orion.core.utils.module_import import ImportOptional
from orion.core.worker.trial import Trial

with ImportOptional("DEHB") as import_optional:
    from dehb.optimizers import DEHB as DEHBImpl
    from sspace.convert import convert_space
    from sspace.convert import transform as to_orion

if import_optional.failed:

    class DEHBImpl:  # noqa: F811
        pass


logger = logging.getLogger(__name__)


class UnsupportedConfiguration(Exception):
    """Raised when an unsupported configuration is sent"""


SPACE_ERROR = """
DEHB cannot be used if space does not contain a fidelity dimension.
"""

MUTATION_STRATEGIES = [
    "rand1",
    "rand2dir",
    "randtobest1",
    "currenttobest1",
    "best1",
    "best2",
    "rand2",
]

CROSSOVER_STRATEGY = [
    "bin",
    "exp",
]

FIX_MODES = ["random", "clip"]


class RungResult(NamedTuple):
    cost: int | float
    fitness: float


class _CustomDEHBImpl(DEHBImpl):
    def __init__(self, duplicates, **kwargs):
        self.duplicates = duplicates
        super().__init__(**kwargs)

    def f_objective(self, *args, **kwargs) -> None:
        """Not needed for Orion, the objective is called by the worker"""

    def _start_new_bracket(self) -> SHBracketManager:
        """Starts a new bracket based on Hyperband"""
        # start new bracket
        self.iteration_counter += (
            1  # iteration counter gives the bracket count or bracket ID
        )
        n_configs, budgets = self.get_next_iteration(self.iteration_counter)
        bracket = SHBracketManager(
            n_configs=n_configs,
            budgets=budgets,
            bracket_id=self.iteration_counter,
            duplicates=self.duplicates,
        )
        self.active_brackets.append(bracket)
        return bracket

    def register_job(self, job_info: dict) -> None:
        """Register to DEHB's backend"""

        # pass information of job submission to Bracket Manager
        for bracket in self.active_brackets:
            if bracket.bracket_id == job_info["bracket_id"]:
                # registering is IMPORTANT for Bracket Manager to perform SH
                bracket.register_job(job_info["budget"])
                break

    def init_population(self, pop_size: int) -> list[numpy.ndarray]:
        """Generate our initial population of sample

        Parameters
        ----------
        pop_size: int
            Number of samples to generate

        """
        population = self.cs.sample_configuration(size=pop_size)
        population = [
            self.configspace_to_vector(individual) for individual in population
        ]
        return population

    def observe(self, job_info: dict, cost: int | float, fitness: float) -> None:
        """Observe a completed job"""
        config: numpy.ndarray = job_info["config"]
        budget: int | float = job_info["budget"]
        parent_id: int = job_info["parent_id"]
        bracket_id: int = job_info["bracket_id"]

        for bracket in self.active_brackets:
            if bracket.bracket_id == bracket_id:
                # bracket job complete
                # IMPORTANT to perform synchronous SH
                bracket.complete_job(budget)

        # carry out DE selection
        if fitness <= self.de[budget].fitness[parent_id]:
            self.de[budget].population[parent_id] = config
            self.de[budget].fitness[parent_id] = fitness

        # NOTE: This dictionary seams useless in dehb source code.
        info: dict = dict()
        # updating incumbents
        if self.de[budget].fitness[parent_id] < self.inc_score:
            self._update_incumbents(
                config=self.de[budget].population[parent_id],
                score=self.de[budget].fitness[parent_id],
                info=info,
            )

        # book-keeping
        self._update_trackers(
            traj=self.inc_score,
            runtime=cost,
            history=(config.tolist(), float(fitness), float(cost), float(budget), info),
        )

    def seed_rng(self, seed: int) -> None:
        """Seed the state of rngs used by DEHB"""
        numpy.random.seed(seed)
        self.cs.seed(numpy.random.randint(numpy.iinfo(numpy.int32).max))
        # Reset sub population
        self.reset()

    @property
    def state_dict(self) -> dict:
        """Return a state dict that can be used to reset the state of the algorithm."""
        state = dict(self.__dict__)
        state["client"] = None
        state["logger"] = None
        for key in [
            "active_brackets",
            "iteration_counter",
            "de",
            "_max_pop_size",
            "start",
            "traj",
            "runtime",
            "history",
        ]:
            state[key] = getattr(self, key, None)

        return deepcopy(
            {
                "state": state,
                "numpy_GlobalState": numpy.random.get_state(),
                "numpy_RandomState": self.cs.random.get_state(),
            }
        )

    def set_state(self, state_dict: dict) -> None:
        """Reset the state of DEHB"""
        for k, v in state_dict["state"].items():
            if hasattr(self, k):
                setattr(self, k, v)
            else:
                logger.error("DEHB does not have attribute %s", k)

        numpy.random.set_state(state_dict["numpy_GlobalState"])
        self.cs.random.set_state(state_dict["numpy_RandomState"])


# pylint: disable=too-many-public-methods
[docs]class DEHB(BaseAlgorithm): """Differential Evolution with HyperBand This class is a wrapper around the library DEHB: https://github.com/automl/DEHB. For more information on the algorithm, see original paper at https://arxiv.org/abs/2105.09821. Awad, Noor, Neeratyoy Mallik, and Frank Hutter. "Dehb: Evolutionary hyperband for scalable, robust and efficient hyperparameter optimization." arXiv preprint arXiv:2105.09821 (2021). Parameters ---------- space: `orion.algo.space.Space` Optimisation space with priors for each dimension. seed: None, int or sequence of int Seed for the random number generator used to sample new trials. Default: ``None`` mutation_factor: float Mutation probability Default: ``0.5`` crossover_prob: float Crossover probability Default: ``0.5`` mutation_strategy: str Mutation strategy rand1, rand2dir randtobest1 currenttobest1 best1 best2 rand2 Default: ``'rand1'`` crossover_strategy: str Crossover strategy bin or exp Default: ``'bin'`` boundary_fix_type: str Boundary fix method, clip or random Default: ``'random'`` min_clip: float Min clip when boundary fix method is clip Default: ``None`` max_clip: float Max clip when boundary fix method is clip Default: ``None`` """ requires_type: ClassVar[str | None] = None requires_dist: ClassVar[str | None] = None requires_shape: ClassVar[str | None] = "flattened" # pylint: disable=too-many-arguments def __init__( self, space: Space, seed: int | None = None, mutation_factor: float = 0.5, crossover_prob: float = 0.5, mutation_strategy: str = "rand1", crossover_strategy: str = "bin", boundary_fix_type: str = "random", min_clip: int | None = None, max_clip: int | None = None, ): import_optional.ensure() # Sanity Check if mutation_strategy not in MUTATION_STRATEGIES: raise UnsupportedConfiguration( f"Mutation strategy {mutation_strategy} not supported" ) if crossover_strategy not in CROSSOVER_STRATEGY: raise UnsupportedConfiguration( f"Crossover strategy {crossover_strategy} not supported" ) if boundary_fix_type not in FIX_MODES: raise UnsupportedConfiguration( f"Boundary fix type {boundary_fix_type} not supported" ) super().__init__( space, seed=seed, mutation_factor=mutation_factor, crossover_prob=crossover_prob, mutation_strategy=mutation_strategy, crossover_strategy=crossover_strategy, boundary_fix_type=boundary_fix_type, min_clip=min_clip, max_clip=max_clip, ) # Extract fidelity information if self.fidelity_index is None: raise RuntimeError(SPACE_ERROR) fidelity_dim: Fidelity = space[self.fidelity_index] # NOTE: This isn't a Fidelity, it's a TransformedDimension<Fidelity> from orion.core.worker.transformer import TransformedDimension # NOTE: Currently bypassing (possibly more than one) `TransformedDimension` wrappers to get # the 'low', 'high' and 'base' attributes. while isinstance(fidelity_dim, TransformedDimension): fidelity_dim = fidelity_dim.original_dimension assert isinstance(fidelity_dim, Fidelity) self.rung: int | None = None self.seed = seed self.job_infos: defaultdict[str, list] = defaultdict(list) self.job_results: dict[str, RungResult] = dict() self.duplicates: defaultdict[str, int] = defaultdict(int) configspace = convert_space(self.space) # Initialize self.dehb = _CustomDEHBImpl( duplicates=self.duplicates, cs=configspace, configspace=True, dimensions=len(configspace.get_hyperparameters()), mutation_factor=mutation_factor, crossover_prob=crossover_prob, strategy=f"{mutation_strategy}_{crossover_strategy}", min_clip=min_clip, max_clip=max_clip, boundary_fix_type=boundary_fix_type, max_age=numpy.inf, # Derived min_budget=fidelity_dim.low, max_budget=fidelity_dim.high, eta=fidelity_dim.base, # Disable their Dask Integration n_workers=1, client=None, # No need for the user function f=None, ) self.seed_rng(self.seed) self.rung = len(self.dehb.budgets) @property def state_dict(self) -> dict: """Return a state dict that can be used to reset the state of the algorithm.""" state_dict = super().state_dict state_dict["DEHB_statedict"] = self.dehb.state_dict return deepcopy(state_dict)
[docs] def set_state(self, state_dict: dict) -> None: """Reset the state of the algorithm based on the given state_dict :param state_dict: Dictionary representing state of an algorithm """ super().set_state(state_dict) self.dehb.set_state(state_dict["DEHB_statedict"])
[docs] def seed_rng(self, seed: int | None) -> None: """Seed the state of the random number generator. Parameters ---------- seed: int or None Integer seed for the random number generator. """ if hasattr(self, "dehb"): self.dehb.seed_rng(seed)
@property def is_done(self) -> bool: """Return True, if an algorithm holds that there can be no further improvement.""" return self.dehb._is_run_budget_exhausted(None, self.rung, None)
[docs] def sample_to_trial(self, sample: numpy.ndarray, fidelity: int) -> Trial: """Convert a ConfigSpace sample into a trial""" config = self.dehb.vector_to_configspace(sample) hps = {} for k, v in self.space.items(): if v.type == "fidelity": hps[k] = fidelity else: hps[k] = config[k] return format_trials.dict_to_trial(to_orion(hps), self.space)
[docs] def suggest(self, num: int) -> list[Trial]: """Suggest a number of new sets of parameters. Parameters ---------- num: int, optional Number of trials to suggest. The algorithm may return less than the number of trials requested. Returns ------- list of trials or None A list of trials representing values suggested by the algorithm. The algorithm may opt out if it cannot make a good suggestion at the moment (it may be waiting for other trials to complete), in which case it will return None. Notes ----- New parameters must be compliant with the problem's domain `orion.algo.space.Space`. """ trials: Trial = [] while len(trials) < num: if self.is_done: break job_info: dict = self.dehb._get_next_job() job_info["done"] = 0 # We are generating trials for a bracket that is too high if self.rung is not None and job_info["bracket_id"] >= self.rung: break # Generate Orion trial new_trial = self.sample_to_trial( job_info["config"], fidelity=job_info["budget"] ) # DEHB may sample 2 identical trials if working in a purely discrete spaceI # It this case you will have has_suggested(new_trial) is True, and you will # discard this trial. # It's fine to discard the trial, but we should keep track of the job_info. # DEHB does not know that we discarded the trial and will be waiting for the # result. We need to keep track of both job_info so that when we have # the result of the first trial, we assign it to the second job_info as well. if not self.has_suggested(new_trial): # Store metadata self.job_infos[self.get_id(new_trial)].append(job_info) self.dehb.register_job(job_info) # Standard Orion self.register(new_trial) trials.append(new_trial) logger.debug("Suggest new trials %s", new_trial) else: logger.debug("Already suggested %s", new_trial) # Do we already have a result for this trial ? result = self.job_results.get(self.get_id(new_trial)) # Keep track of duplicated jobs per brackets # Bracket is only done after we reach the budget for unique # jobs self.duplicates[str(job_info["budget"])] += 1 # if so observe it right now and discard if result is not None: self.dehb.observe(job_info, *result) else: # else we need to keep track of it to observe it later self.job_infos[self.get_id(new_trial)].append(job_info) return trials
[docs] def observe(self, trials: list[Trial]) -> None: """Observe the `trials` new state of result. Parameters ---------- trials: list of ``orion.core.worker.trial.Trial`` Trials from a `orion.algo.space.Space`. """ for trial in trials: if not self.has_suggested(trial): logger.debug("Ignore unseen trial %s", trial) continue if self.has_observed(trial): logger.debug("Ignore already observed trial %s", trial) continue self.register(trial) if trial.status == "completed": self.observe_one(trial) logger.debug( "Observe trial %s (Remaining %d)", trial, len(self.job_infos) )
[docs] def observe_one(self, trial: Trial) -> None: """Observe a single trial""" # Get all the job sampled by DEHB, it might be more than one job_infos = self.job_infos.get(self.get_id(trial), []) if not job_infos: # this should be 100% unreachable because we check # if the trial was suggested inside `observe` logger.error("Could not find trial %s", self.get_id(trial)) return # Yes, it is odd; fidelity is cost and fitness is objective cost = trial.params[self.fidelity_index] fitness = trial.objective.value # Store the result for later, if we sample # a trial that is too alike for us to evaluate self.job_results[self.get_id(trial)] = RungResult(cost, fitness) for job_info in job_infos: cost = job_info["budget"] self.dehb.observe(job_info, cost, fitness)