Source code for orion.algo.pbt.pb2

"""
:mod:`orion.algo.pbt.pb2
========================

"""
from __future__ import annotations

import copy
import logging
import time
from typing import Any, ClassVar, Sequence

import numpy as np
import pandas

from orion.algo.pbt.pb2_utils import import_optional, select_config
from orion.algo.pbt.pbt import PBT
from orion.core.utils.flatten import flatten
from orion.core.utils.random_state import RandomState, control_randomness
from orion.core.worker.transformer import ReshapedSpace, TransformedSpace
from orion.core.worker.trial import Trial

logger = logging.getLogger(__name__)


[docs]class PB2(PBT): """Population Based Bandits Warning: PB2 is broken in current version v0.2.4. We are working on a fix to be released in v0.2.5, ETA July 2022. Population Based Bandits is a variant of Population Based Training using probabilistic model to guide the search instead of relying on purely random perturbations. PB2 implementation uses a time-varying Gaussian process to model the optimization curves during training. This implementation is based on ray-tune implementation. Oríon's version supports discrete and categorical dimensions, and offers better resiliency to broken trials by using back-tracking. See PBT documentation for more information on how to use PBT algorithms. For more information on the algorithm, see original paper at https://arxiv.org/abs/2002.02518. Parker-Holder, Jack, Vu Nguyen, and Stephen J. Roberts. "Provably efficient online hyperparameter optimization with population-based bandits." Advances in Neural Information Processing Systems 33 (2020): 17200-17211. Parameters ---------- space: `orion.algo.space.Space` Optimisation space with priors for each dimension. seed: None, int or sequence of int Seed for the random number generator used to sample new trials. Default: ``None`` population_size: int, optional Size of the population. No trial will be continued until there are `population_size` trials executed until lowest fidelity. If a trial is broken during execution at lowest fidelity, the algorithm will sample a new trial, keeping the population of *non-broken* trials at `population_size`. For efficiency it is better to have less workers running than population_size. Default: 50. generations: int, optional Number of generations, from lowest fidelity to highest one. This will determine how many branchings occur during the execution of PBT. Default: 10 exploit: dict or None, optional Configuration for a ``pbt.exploit.BaseExploit`` object that determines when if a trial should be exploited or not. If None, default configuration is a ``PipelineExploit`` with ``BacktrackExploit`` and ``TruncateExploit``. fork_timeout: int, optional Maximum amount of time in seconds that an attempt to mutate a trial should take, otherwise algorithm.suggest() will raise ``SuggestionTimeout``. Default: 60 """ requires_type: ClassVar[str | None] = "real" requires_dist: ClassVar[str | None] = "linear" requires_shape: ClassVar[str | None] = "flattened" def __init__( self, space, seed=None, population_size=50, generations=10, exploit=None, fork_timeout=60, ): import_optional.ensure() self.random_state: RandomState | None = None super().__init__( space, seed=seed, population_size=population_size, generations=generations, exploit=exploit, fork_timeout=fork_timeout, ) @property def configuration(self): """Return tunable elements of this algorithm in a dictionary form appropriate for saving. """ config = copy.deepcopy(super().configuration) config["pb2"].pop("explore", None) return config
[docs] def seed_rng(self, seed: int | Sequence[int] | None) -> None: """Seed the state of the random number generator. Parameters ---------- seed: int Integer seed for the random number generator. """ super().seed_rng(seed) self.random_state = RandomState.seed(self.rng.randint(0, 2**32 - 1))
@property def state_dict(self) -> dict: """Return a state dict that can be used to reset the state of the algorithm.""" state_dict: dict[str, Any] = super().state_dict state_dict["random_state"] = self.random_state or RandomState.current() return state_dict
[docs] def set_state(self, state_dict: dict) -> None: """Reset the state of the algorithm based on the given state_dict""" super().set_state(state_dict) self.random_state = state_dict["random_state"]
def _generate_offspring(self, trial): """Try to promote or fork a given trial.""" new_trial = trial if not self.has_suggested(new_trial): raise RuntimeError( "Trying to fork a trial that was not registered yet. This should never happen" ) attempts = 0 start = time.perf_counter() while ( self.has_suggested(new_trial) and time.perf_counter() - start <= self.fork_timeout ): trial_to_explore = self.exploit_func( self.rng, trial, self.lineages, ) if trial_to_explore is None: return None, None elif trial_to_explore is trial: new_params = {} trial_to_branch = trial logger.debug("Promoting trial %s, parameters stay the same.", trial) else: new_params = flatten(self._explore(self.space, trial_to_explore)) trial_to_branch = trial_to_explore logger.debug( "Forking trial %s with new parameters %s", trial_to_branch, new_params, ) # Set next level of fidelity new_params[self.fidelity_index] = self.fidelity_upgrades[ flatten(trial_to_branch.params)[self.fidelity_index] ] new_trial = trial_to_branch.branch(params=new_params) assert isinstance(self.space, (TransformedSpace, ReshapedSpace)) new_trial = self.space.transform(self.space.reverse(new_trial)) logger.debug("Attempt %s - Creating new trial %s", attempts, new_trial) attempts += 1 if ( self.has_suggested(new_trial) and time.perf_counter() - start > self.fork_timeout ): trial_to_branch = None new_trial = None logger.info( f"Could not generate unique new parameters for trial {trial.id} in " f"less than {self.fork_timeout} seconds. Attempted {attempts} times." ) return trial_to_branch, new_trial def _explore(self, space, base: Trial): """Generate new hyperparameters for given trial. Derived from PB2 explore implementation in Ray (2022/02/18): https://github.com/ray-project/ray/blob/master/python/ray/tune/schedulers/pb2.py#L131 """ base_params = flatten(base.params) data, current = self._get_data_and_current() bounds = {dim.name: dim.interval() for dim in space.values()} df = data.copy() # Group by trial ID and hyperparams. # Compute change in timesteps and reward. diff_reward = ( df.groupby(["Trial"] + list(bounds.keys()))["Reward"] .mean() .diff() .reset_index(drop=True) ) df["y"] = diff_reward df["R_before"] = df.Reward - df.y df = df[~df.y.isna()].reset_index(drop=True) # Only use the last 1k datapoints, so the GP is not too slow. df = df.iloc[-1000:, :].reset_index(drop=True) # We need this to know the T and Reward for the weights. if not df[df["Trial"] == self.get_id(base)].empty: # N ow specify the dataset for the GP. y_raw = np.array(df.y.values) # Meta data we keep -> episodes and reward. t_r = df[["Budget", "R_before"]] hparams = df[bounds.keys()] x_raw = pandas.concat([t_r, hparams], axis=1).values newpoint = ( df[df["Trial"] == self.get_id(base)] .iloc[-1, :][["Budget", "R_before"]] .values ) with control_randomness(self): new = select_config( x_raw, y_raw, current, newpoint, bounds, num_f=len(t_r.columns), ) new_config = base_params.copy() for i, col in enumerate(hparams.columns): if isinstance(base_params[col], int): new_config[col] = int(new[i]) else: new_config[col] = new[i] else: new_config = base_params return new_config def _get_data_and_current(self): """Generate data and current objects used in _explore function. data is a pandas DataFrame combining data from all completed trials. current is a numpy array with hyperparameters from uncompleted trials. """ data_trials = [] current_trials = [] for trial in self.registry: if trial.status == "completed": data_trials.append(trial) else: current_trials.append(trial) data = self._trials_to_data(data_trials) if current_trials: current_array = [] for trial in current_trials: trial_params = flatten(trial.params) current_array.append([trial_params[key] for key in self.space.keys()]) current = np.asarray(current_array) else: current = None return data, current def _trials_to_data(self, trials): """Generate data frame to use in _explore method.""" rows = [] cols = ["Trial", "Budget"] + list(self.space.keys()) + ["Reward"] for trial in trials: trial_params = flatten(trial.params) values = [trial_params[key] for key in self.space.keys()] lst = ( [self.get_id(trial), trial_params[self.fidelity_index]] + values + [trial.objective.value] ) rows.append(lst) data = pandas.DataFrame(rows, columns=cols) data.Trial = data.Trial.astype("str") return data