"""
:mod:`orion.algo.HEBO.hebo -- Orion adapter for the HEBO algorithm.
============================================
The HEBO algorithm implementation can be found at https://github.com/huawei-noah/HEBO
"""
from __future__ import annotations
import contextlib
import copy
import typing
import warnings
from dataclasses import dataclass
from logging import getLogger as get_logger
from typing import Any, ClassVar
import numpy as np
import pandas as pd
from typing_extensions import Literal, TypedDict # type: ignore
from orion.algo.base import BaseAlgorithm
from orion.algo.hebo.random_state import RandomState
from orion.algo.space import Dimension, Fidelity, Space
from orion.core.utils.format_trials import dict_to_trial
from orion.core.worker.trial import Trial
_HEBO_REQUIRED_ERROR = None
try:
import hebo
from hebo.acquisitions.acq import MACE, Acquisition
from hebo.design_space import DesignSpace
from hebo.design_space.param import Parameter
from torch.quasirandom import SobolEngine
except ImportError as err:
MACE = object
_HEBO_REQUIRED_ERROR = ImportError(
"The HEBO package is not installed. Install it with `pip install orion[hebo]`"
)
if typing.TYPE_CHECKING and _HEBO_REQUIRED_ERROR:
Acquisition = object # noqa
DesignSpace = object # noqa
Parameter = object # noqa
SobolEngine = object # noqa
logger = get_logger(__name__)
ModelName = Literal[
"gp",
"gpy",
"gpy_mlp",
"rf",
"deep_ensemble",
"masked_deep_ensemble",
"fe_deep_ensemble",
"gumbel",
"catboost",
]
properly_seeded_models: set[ModelName] = {"gp", "gpy", "gpy_mlp", "rf", "catboost"}
EvolutionStrategyName = Literal[
"ga",
"brkga",
"de",
"nelder-mead",
"pattern-search",
"cmaes",
"pso",
"nsga2",
"rnsga2",
"nsga3",
"unsga3",
"rnsga3",
"moead",
"ctaea",
]
class HeboModelState(TypedDict):
"""Typed dict for the state of the HEBO class from `hebo.optimizers.hebo`."""
space: DesignSpace
es: str
X: pd.DataFrame
y: np.ndarray
model_name: str
rand_sample: int
sobol: SobolEngine
acq_cls: type[Acquisition]
_model_config: dict | None
[docs]class HEBO(BaseAlgorithm):
"""Adapter for the HEBO algorithm from https://github.com/huawei-noah/HEBO
Parameters
----------
:param space: Optimisation space with priors for each dimension.
:param seed: Base seed for the random number generators. Defaults to `None`, in which case the
randomness is not seeded.
:param parameters: Parameters for the HEBO algorithm.
"""
requires_type: ClassVar[str | None] = None
requires_shape: ClassVar[str | None] = "flattened"
requires_dist: ClassVar[str | None] = None
[docs] @dataclass(frozen=True)
class Parameters:
"""Parameters of the HEBO algorithm."""
model_name: ModelName = "gpy"
""" Name of the model to use. See `ModelName` for the available values. """
random_samples: int | None = None
""" Number of random samples to suggest before optimization begins.
If `None`, the number of dimensions in the space is used as the default. Otherwise, the max
of the value and `2` is used.
"""
acquisition_class: type[Acquisition] = MACE
""" Acquisition class to use. """
evolutionary_strategy: EvolutionStrategyName = "nsga2"
""" Name of the evolutionary strategy to use. See `EvolutionStrategyName` for the list of
possible values.
"""
model_config: dict | None = None
""" Keyword argument to be passed to the constructor of the model class that is selected
with `model_name`.
"""
def __init__(
self,
space: Space,
seed: int | None = None,
parameters: Parameters | dict | None = None,
):
if _HEBO_REQUIRED_ERROR:
raise _HEBO_REQUIRED_ERROR
super().__init__(space)
if isinstance(parameters, dict):
parameters = self.Parameters(**parameters)
self.parameters: HEBO.Parameters = parameters or self.Parameters()
self.seed = seed
self.random_state: RandomState | None = None
if (
self.parameters.model_name not in properly_seeded_models
and seed is not None
):
warnings.warn(
UserWarning(
f"The randomness used by the chosen model '{self.parameters.model_name}' "
f"cannot be properly seeded. The model will still work, but the results may "
f"not be reproducible, and the random state will not be properly "
f"saved/restored during checkpointing."
)
)
# NOTE: Need to seed the randomness here, since creating the model affects the global torch
# RNG state. This way, we always get the same model for the same seed.
if self.seed is not None:
self.seed_rng(self.seed)
self.hebo_space: DesignSpace = orion_space_to_hebo_space(self.space)
with self._control_randomness():
self.model = hebo.optimizers.hebo.HEBO(
space=self.hebo_space,
model_name=self.parameters.model_name,
rand_sample=self.parameters.random_samples,
acq_cls=self.parameters.acquisition_class,
es=self.parameters.evolutionary_strategy,
model_config=self.parameters.model_config,
)
[docs] def seed_rng(self, seed: int | None) -> None:
"""Seed the random number generators."""
logger.debug("Using a base seed of %s.", seed)
self.random_state = RandomState.seed(seed)
@property
def state_dict(self) -> dict:
"""Return a state dict that can be used to reset the state of the algorithm."""
base_state_dict = super().state_dict
model_state = HeboModelState(
space=self.model.space,
X=self.model.X,
y=self.model.y,
es=self.model.es,
model_name=self.model.model_name,
acq_cls=self.model.acq_cls,
rand_sample=self.model.rand_sample,
_model_config=self.model._model_config, # pylint:disable=protected-access
sobol=self.model.sobol,
)
return copy.deepcopy(
dict(
**base_state_dict,
model=model_state,
random_state=self.random_state or RandomState.current(),
parameters=self.parameters,
)
)
[docs] def set_state(self, state_dict: dict) -> None:
"""Reset the state of the algorithm based on the given state_dict
:param state_dict: Dictionary representing state of an algorithm
"""
super().set_state(state_dict)
self.random_state = state_dict["random_state"]
self.parameters = state_dict["parameters"]
model_state = state_dict["model"]
# NOTE: For now assuming that we can just store anything into the state dict
for key, value in model_state.items():
if not hasattr(self.model, key):
raise RuntimeError(
f"The state dict has attribute {key} that is not in the model!"
)
setattr(self.model, key, value)
[docs] def suggest(self, num: int) -> list[Trial]:
"""Suggest `num` new sets of hyper-parameters to try.
Parameters
----------
num: int
Number of trials to suggest. The algorithm may return less than the number of
trials requested.
Returns
-------
A list of trials representing values suggested by the algorithm.
"""
trials: list[Trial] = []
with self._control_randomness():
v: pd.DataFrame = self.model.suggest(n_suggestions=num)
point_dicts: dict[int, dict] = v.to_dict(orient="index") # type: ignore
for point_index, params_dict in point_dicts.items():
if self.is_done:
break
params_dict = self._hebo_params_to_orion_params(params_dict)
new_trial = self._params_to_trial(params_dict)
if not self.has_suggested(new_trial):
self.register(new_trial)
trials.append(new_trial)
logger.debug("Suggestion %s: %s", point_index, new_trial)
return trials
[docs] def observe(self, trials: list[Trial]) -> None:
"""Observe the `trials` new state of result.
Parameters
----------
:param trials: New trials with their objectives.
"""
new_xs: list[dict] = []
new_ys: list[float] = []
assert len(self.model.X) == self.n_observed
for trial in trials:
if not self.has_observed(trial):
self.register(trial)
new_x = trial.params
if trial.objective is None:
# Trial is broken: ignore it.
continue
new_y = trial.objective.value
new_x = self._orion_params_to_hebo_params(new_x)
new_xs.append(new_x)
new_ys.append(new_y)
x_df = pd.DataFrame(new_xs)
y_array = np.array(new_ys).reshape([-1, 1])
with self._control_randomness():
self.model.observe(X=x_df, y=y_array)
def _hebo_params_to_orion_params(self, hebo_params: dict) -> dict:
"""Fix any issues with the outputs of the HEBO algo so they fit `self.space`."""
orion_params = {}
for name, value in hebo_params.items():
dim: Dimension = self.space[name]
from orion.core.worker.transformer import ReshapedDimension
if (
dim.type == "categorical"
and dim.prior_name == "choices"
and value not in dim
):
potential_vals = [v for v in dim.interval() if str(v) == value]
if len(potential_vals) == 1:
value = potential_vals[0]
else:
raise RuntimeError(
f"Value {value} is not contained in the dimension {dim}, and "
f"{len(potential_vals)} could match it."
)
elif isinstance(dim, ReshapedDimension):
# BUG: https://github.com/Epistimio/orion/issues/800
if isinstance(value, (int, float)) and not isinstance(value, bool):
# note: need to make sure `value` isn't a bool, since issubclass(bool, int).
value = np.array(value)
# assert value in dim # NOTE: Doesn't work! Raises an issue (index is None + int).
orion_params[name] = value
if self._params_to_trial(orion_params) not in self.space:
raise RuntimeError(
f"Unable to fix all the issues: params {orion_params} still isn't in space "
f"{self.space}!"
)
return orion_params
def _orion_params_to_hebo_params(self, orion_params: dict) -> dict:
"""Fix any issues with the trials from Orion so they fit the `self.hebo_space`."""
assert self.hebo_space is not None
# NOTE: Remove the extra stuff (e.g. Fidelity dimension), before passing it to the
# model. It's just a precaution, since the Hebo model probably fetches data using
# the keys of its space, which doesn't have the Fidelity dimension anyway.
params = {}
for name, value in orion_params.items():
orion_dim: Dimension = self.space[name]
hebo_dim: Parameter = self.hebo_space.paras[name]
if orion_dim.type == "fidelity":
continue
from hebo.design_space.categorical_param import CategoricalPara
if isinstance(hebo_dim, CategoricalPara):
if (
value not in hebo_dim.categories
and str(value) in hebo_dim.categories
):
value = str(value)
assert value in hebo_dim.categories, (value, hebo_dim.categories)
params[name] = value
return params
def _params_to_trial(self, orion_params: dict) -> Trial:
"""Create a Trial from a dict of hyper-parameters."""
# Need to convert the {name: value} of point_dict into this format for Orion's Trial.
# Add the max value for the Fidelity dimensions, if any.
if self.fidelity_index is not None:
fidelity_dim: Fidelity = self.space[self.fidelity_index]
orion_params[self.fidelity_index] = fidelity_dim.high
trial: Trial = dict_to_trial(orion_params, space=self.space)
return trial
@contextlib.contextmanager
def _control_randomness(self):
"""Seeds the randomness inside the indented block of code using `self.random_state`.
NOTE: This only has an effect if `seed_rng` was called previously, i.e. if
`self.random_state` is not None.
"""
if self.random_state is None:
yield
return
# Save the initial random state.
initial_rng_state = RandomState.current()
# Set the random state.
self.random_state.set()
yield
# Update the random state stored on `self`, so that the changes inside the block are
# reflected in the RandomState object.
self.random_state = RandomState.current()
# Reset the initial state.
initial_rng_state.set()
def orion_space_to_hebo_space(space: Space) -> DesignSpace:
"""Get the HEBO-equivalent space for the `Space` `space`.
Parameters
----------
:param space: `Space` instance.
Returns
-------
a `DesignSpace` from the `hebo` package.
Raises
------
NotImplementedError
If there is an unsupported dimension or prior type in `space`.
"""
specs = []
ds = DesignSpace()
name: str
dimension: Dimension
for name, dimension in space.items():
spec: dict[str, Any] = {"name": name}
prior_name = dimension.prior_name
bounds = dimension.interval()
if dimension.shape:
raise NotImplementedError(
f"HEBO algorithm doesn't support dimension {dimension} since it has a shape."
)
if dimension.type == "fidelity":
# Ignore that dimension: Don't include it in the space for Hebo to optimize.
continue
# BUG: https://github.com/Epistimio/orion/issues/800
bounds = tuple(b.item() if isinstance(b, np.ndarray) else b for b in bounds)
if prior_name == "choices":
categories = [str(b) for b in bounds]
spec.update(type="cat", categories=categories)
elif prior_name == "uniform":
spec.update(type="num", lb=bounds[0], ub=bounds[1])
elif prior_name == "reciprocal":
spec.update(type="pow", lb=bounds[0], ub=bounds[1])
elif prior_name == "int_uniform":
spec.update(type="int", lb=bounds[0], ub=bounds[1])
elif prior_name == "int_reciprocal":
spec.update(type="pow_int", lb=bounds[0], ub=bounds[1])
else:
raise NotImplementedError(prior_name, dimension)
specs.append(spec)
ds.parse(specs)
return ds