"""
The Evolved Transformer and large-scale evolution of image classifiers
======================================================================
Implement evolution to exploit configurations with fixed resource efficiently
"""
from __future__ import annotations
import copy
import importlib
import logging
from typing import Callable, ClassVar, Sequence, TypeVar
import numpy as np
from orion.algo.hyperband import BudgetTuple, Hyperband, HyperbandBracket
from orion.algo.space import Space
from orion.core.utils import format_trials
from orion.core.utils.flatten import flatten
from orion.core.worker.trial import Trial
logger = logging.getLogger(__name__)
REGISTRATION_ERROR = """
Bad fidelity level {fidelity}. Should be in {budgets}.
Params: {params}
"""
SPACE_ERROR = """
EvolutionES cannot be used if space does not contain a fidelity dimension.
"""
BUDGET_ERROR = """
Cannot build budgets below max_resources;
(max: {}) - (min: {}) > (num_rungs: {})
"""
def compute_budgets(
min_resources: int,
max_resources: int,
reduction_factor: int,
nums_population: int,
pairs: int,
) -> list[list[BudgetTuple]]:
"""Compute the budgets used for each execution of hyperband"""
budgets_eves = []
if reduction_factor == 1:
for i in range(min_resources, max_resources + 1):
if i == min_resources:
budgets_eves.append([BudgetTuple(nums_population, i)])
else:
budgets_eves[0].append(BudgetTuple(pairs * 2, i))
else:
num_brackets = int(np.log(max_resources) / np.log(reduction_factor))
budgets: list[list[BudgetTuple]] = []
budgets_tab: dict[int, list[BudgetTuple]] = {} # just for display consideration
for bracket_id in range(0, num_brackets + 1):
bracket_budgets: list[BudgetTuple] = []
num_trials = int(
np.ceil(
int((num_brackets + 1) / (num_brackets - bracket_id + 1))
* (reduction_factor ** (num_brackets - bracket_id))
)
)
min_resources = max_resources / reduction_factor ** (
num_brackets - bracket_id
)
for i in range(0, num_brackets - bracket_id + 1):
n_i = int(num_trials / reduction_factor**i)
min_i = int(min_resources * reduction_factor**i)
bracket_budgets.append(BudgetTuple(n_i, min_i))
budget = BudgetTuple(n_i, min_i)
if budgets_tab.get(i):
budgets_tab[i].append(budget)
else:
budgets_tab[i] = [budget]
budgets.append(bracket_budgets)
for i in range(len(budgets[0])):
if i == 0:
budgets_eves.append([BudgetTuple(nums_population, budgets[0][i][1])])
else:
budgets_eves[0].append(BudgetTuple(pairs * 2, budgets[0][i][1]))
return budgets_eves
BracketT = TypeVar("BracketT", bound="BracketEVES")
[docs]class EvolutionES(Hyperband[BracketT]):
"""EvolutionES formulates hyperparameter optimization as an evolution.
For more information on the algorithm,
see original paper at
https://arxiv.org/pdf/1703.01041.pdf and
https://arxiv.org/pdf/1901.11117.pdf
Real et al. "Large-Scale Evolution of Image Classifiers"
So et all. "The Evolved Transformer"
Parameters
----------
space: `orion.algo.space.Space`
Optimisation space with priors for each dimension.
seed: None, int or sequence of int
Seed for the random number generator used to sample new trials.
Default: ``None``
repetitions: int
Number of execution of Hyperband. Default is numpy.inf which means to
run Hyperband until no new trials can be suggested.
nums_population: int
Number of population for EvolutionES. Larger number of population often gets better
performance but causes more computation. So there is a trade-off according to the search
space and required budget of your problems.
Default: 20
mutate: str or dict or None, optional
In the mutate part, one can define the customized mutate function with its mutate factors,
such as multiply factor (times/divides by a multiply factor) and add factor
(add/subtract by a multiply factor). The function must be defined by
an importable string. If None, default
mutate function is used: ``orion.algo.evolution_es.mutate_functions.default_mutate``.
"""
requires_type: ClassVar[str | None] = None
requires_dist: ClassVar[str | None] = None
requires_shape: ClassVar[str | None] = "flattened"
def __init__(
self,
space: Space,
seed: int | Sequence[int] | None = None,
repetitions: int | float = np.inf,
nums_population: int = 20,
mutate: str | dict | None = None,
max_retries: int = 1000,
):
self.mutate = mutate
super().__init__(space, seed=seed, repetitions=repetitions)
pair = nums_population // 2
mutate_ratio = 0.3
self.nums_population = nums_population
self.nums_comp_pairs = pair
self.max_retries = max_retries
self.mutate_ratio = mutate_ratio
self.nums_mutate_gene = (
int((len(self.space.values()) - 1) * mutate_ratio)
if int((len(self.space.values()) - 1) * mutate_ratio) > 0
else 1
)
self.hurdles: list[float | np.ndarray] = []
self.population: dict[int, list[int]] = {}
for i, dim in enumerate(self.space.values()):
if dim.type != "fidelity":
self.population[i] = [-1] * nums_population
self.performance: np.ndarray = np.inf * np.ones(nums_population)
self.budgets = compute_budgets(
self.min_resources,
self.max_resources,
self.reduction_factor,
nums_population,
pair,
)
self.brackets: list[BracketT] = self.create_brackets()
self.seed_rng(seed)
def create_bracket(self, budgets: list[BudgetTuple], iteration: int) -> BracketT:
return BracketEVES(self, budgets, iteration)
@property
def state_dict(self) -> dict:
"""Return a state dict that can be used to reset the state of the algorithm."""
state_dict = super().state_dict
state_dict["population"] = copy.deepcopy(self.population)
state_dict["performance"] = copy.deepcopy(self.performance)
state_dict["hurdles"] = copy.deepcopy(self.hurdles)
return state_dict
def set_state(self, state_dict: dict) -> None:
"""Reset the state of the algorithm based on the given state_dict"""
self.population = state_dict["population"]
self.performance = state_dict["performance"]
self.hurdles = state_dict["hurdles"]
super().set_state(state_dict)
def _get_bracket(self, trial: Trial) -> BracketT:
"""Get the bracket of a trial during observe"""
return self.brackets[-1]
class BracketEVES(HyperbandBracket[EvolutionES]):
"""Bracket of rungs for the algorithm Hyperband.
Parameters
----------
evolutiones: `evolutiones` algorithm
The evolutiones algorithm object which this bracket will be part of.
budgets: list of tuple
Each tuple gives the (n_trials, resource_budget) for the respective rung.
repetition_id: int
The id of hyperband execution this bracket belongs to
"""
def __init__(
self, owner: EvolutionES, budgets: list[BudgetTuple], repetition_id: int
):
super().__init__(owner, budgets, repetition_id)
self.search_space_without_fidelity = []
self._candidates: dict[int, list[Trial]] = {}
self.mutate_attr: dict = {}
if owner.mutate:
if isinstance(owner.mutate, str):
self.mutate_attr = {"function": owner.mutate}
elif isinstance(owner.mutate, dict):
self.mutate_attr = copy.deepcopy(owner.mutate)
else:
raise ValueError(f"Unsupported type for mutate: {owner.mutate}")
function_string = self.mutate_attr.pop(
"function", "orion.algo.evolution_es.mutate_functions.default_mutate"
)
mod_name, func_name = function_string.rsplit(".", 1)
mod = importlib.import_module(mod_name)
self.mutate_func: Callable = getattr(mod, func_name)
for i, dim in enumerate(self.space.values()):
if dim.type != "fidelity":
self.search_space_without_fidelity.append(i)
@property
def space(self) -> Space:
"""Return origin space"""
return self.owner.space
@property
def state_dict(self) -> dict:
state_dict = super().state_dict
state_dict["candidates"] = copy.deepcopy(self._candidates)
return state_dict
def set_state(self, state_dict: dict) -> None:
super().set_state(state_dict)
self._candidates = state_dict["candidates"]
def _get_teams(self, rung_id: int) -> list | tuple[dict, int, list[int], list[int]]:
"""Get the red team and blue team"""
if self.has_rung_filled(rung_id + 1):
return []
rung = self.rungs[rung_id]["results"]
population_range = (
self.owner.nums_population
if len(list(rung.values())) > self.owner.nums_population
else len(list(rung.values()))
)
rung_trials = list(rung.values())
for trial_index in range(population_range):
objective, trial = rung_trials[trial_index]
self.owner.performance[trial_index] = objective
trial_params = flatten(trial.params)
for ith_dim in self.search_space_without_fidelity:
self.owner.population[ith_dim][trial_index] = trial_params[
self.space[ith_dim].name
]
population_index = list(range(self.owner.nums_population))
red_team = self.owner.rng.choice(
population_index, self.owner.nums_comp_pairs, replace=False
)
diff_list = list(set(population_index).difference(set(red_team)))
blue_team = self.owner.rng.choice(
diff_list, self.owner.nums_comp_pairs, replace=False
)
return rung, population_range, red_team.tolist(), blue_team.tolist()
# pylint: disable=unused-argument
def _mutate_population(
self,
red_team: Sequence[int],
blue_team: Sequence[int],
rung: dict,
population_range: int,
fidelity: int | float,
) -> tuple[list[Trial], np.ndarray]:
"""Get the mutated population and hurdles"""
winner_list = []
loser_list = []
if set(red_team) != set(blue_team):
hurdles = np.zeros(1)
for i, red_value in enumerate(red_team):
winner, loser = (
(red_team, blue_team)
if self.owner.performance[red_value]
< self.owner.performance[blue_team[i]]
else (blue_team, red_team)
)
winner_list.append(winner[i])
loser_list.append(loser[i])
hurdles += self.owner.performance[winner[i]]
self._mutate(winner[i], loser[i])
hurdles /= len(red_team)
self.owner.hurdles.append(hurdles)
logger.debug("Evolution hurdles are: %s", str(self.owner.hurdles))
trials = []
trial_ids = set()
nums_all_equal = [0] * population_range
for i in range(population_range):
point: Sequence[int | float] = [0] * len(self.space)
while True:
point = list(point)
point[
list(self.space.keys()).index(self.owner.fidelity_index)
] = fidelity
for j in self.search_space_without_fidelity:
point[j] = self.owner.population[j][i]
trial = format_trials.tuple_to_trial(point, self.space)
trial_id = self.owner.get_id(trial)
if trial_id in trial_ids:
nums_all_equal[i] += 1
logger.debug("find equal one, continue to mutate.")
self._mutate(i, i)
elif self.owner.has_suggested(trial):
nums_all_equal[i] += 1
logger.debug("find one already suggested, continue to mutate.")
self._mutate(i, i)
else:
break
if nums_all_equal[i] > self.owner.max_retries:
logger.warning(
"Can not Evolve any more. You can make an early stop."
)
break
if nums_all_equal[i] < self.owner.max_retries:
trials.append(trial)
trial_ids.add(trial_id)
else:
logger.debug("Dropping trial %s", trial)
return trials, np.array(nums_all_equal)
def get_candidates(self, rung_id: int) -> list[Trial]:
"""Get a candidate for promotion"""
if rung_id not in self._candidates:
rung, population_range, red_team, blue_team = self._get_teams(rung_id)
fidelity = self.rungs[rung_id + 1]["resources"]
self._candidates[rung_id] = self._mutate_population(
red_team, blue_team, rung, population_range, fidelity
)[0]
candidates: list[Trial] = []
for candidate in self._candidates[rung_id]:
if not self.owner.has_suggested(candidate):
candidates.append(candidate)
return candidates
def _mutate(self, winner_id: int, loser_id: int) -> None:
select_genes_key_list = self.owner.rng.choice(
self.search_space_without_fidelity,
self.owner.nums_mutate_gene,
replace=False,
)
self.copy_winner(winner_id, loser_id)
kwargs = copy.deepcopy(self.mutate_attr)
for value in select_genes_key_list:
space = self.space.values()[value]
old = self.owner.population[value][loser_id]
new = self.mutate_func(space, self.owner.rng, old, **kwargs)
self.owner.population[value][loser_id] = new
self.owner.performance[loser_id] = -1
def copy_winner(self, winner_id: int, loser_id: int) -> None:
"""Copy winner to loser"""
for key in self.search_space_without_fidelity:
self.owner.population[key][loser_id] = self.owner.population[key][winner_id]