Source code for orion.algo.parallel_strategy

# -*- coding: utf-8 -*-
"""
Parallel Strategies
===================

Register objectives for incomplete trials.

Parallel strategy objects can be created using `strategy_factory.create('strategy_name')`.

"""
import copy
import logging

from orion.core.utils import GenericFactory
from orion.core.worker.trial import Trial

log = logging.getLogger(__name__)


CORRUPTED_DB_WARNING = """\
Trial `%s` has an objective but status is not completed.
This is likely due to a corrupted database, possibly because of
database timeouts. Try setting manually status to `completed`.
You can find documention to do this at
https://orion.readthedocs.io/en/stable/user/storage.html#storage-backend.

If you encounter this issue often, please consider reporting it to
https://github.com/Epistimio/orion/issues."""


[docs]def get_objective(trial): """Get the value for the objective, if it exists, for this trial :return: Float or None The value of the objective, or None if it doesn't exist """ objectives = [ result.value for result in trial.results if result.type == "objective" ] if not objectives: objective = None elif len(objectives) == 1: objective = objectives[0] elif len(objectives) > 1: raise RuntimeError( "Trial {} has {} objectives".format(trial.id, len(objectives)) ) return objective
# TODO: Should add a strategy for broken trials. # TODO: has_observed from algorithms should return True for broken trials. # TODO: Default # We want stub parallel strategy for Hyperband/ASHA/TPE for broken # We want MaxParallelStrategy for TPE. # It is so algorithm dependant, it should be within the algorithms. # strategy: # broken: # StubParallelStrategy: # stub_value: 10000 # else: # MeanParallelStrategy: # default_result: 0.5
[docs]class ParallelStrategy(object): """Strategy to give intermediate results for incomplete trials""" def __init__(self, *args, **kwargs): self._trials_info = {} @property def state_dict(self): """Return a state dict that can be used to reset the state of the strategy.""" return {"_trials_info": self._trials_info} def set_state(self, state_dict): self._trials_info = state_dict["_trials_info"]
[docs] def observe(self, trials): """Observe completed trials .. seealso:: `orion.algo.base.BaseAlgorithm.observe` method Parameters ---------- trials: list of ``orion.core.worker.trial.Trial`` Trials from a `orion.algo.space.Space`. """ for trial in trials: self._trials_info[trial.id] = trial
def infer(self, trial): fake_result = self.lie(trial) if fake_result is None: return None fake_trial = copy.deepcopy(trial) fake_trial._results.append(fake_result) return fake_trial # pylint: disable=no-self-use
[docs] def lie(self, trial): """Construct a fake result for an incomplete trial Parameters ---------- trial: `orion.core.worker.trial.Trial` A trial object which is not supposed to be completed. Returns ------- ``orion.core.worker.trial.Trial.Result`` The fake objective result corresponding to the trial given. Notes ----- If the trial has an objective even if not completed, a warning is printed to user with a pointer to documentation to resolve the database corruption. The result returned is the corresponding objective instead of the lie. """ objective = get_objective(trial) if objective: log.warning(CORRUPTED_DB_WARNING, trial.id) return Trial.Result(name="lie", type="objective", value=objective) return None
@property def configuration(self): """Provide the configuration of the strategy as a dictionary.""" return {"of_type": self.__class__.__name__.lower()}
[docs]class NoParallelStrategy(ParallelStrategy): """No parallel strategy"""
[docs] def lie(self, trial): """See ParallelStrategy.lie""" result = super(NoParallelStrategy, self).lie(trial) if result: return result return None
[docs]class StatusBasedParallelStrategy(ParallelStrategy): """Different parallel strategies for different trial status Parameters ---------- strategy_configs: dict Dictionary of strategy configurations. Each key should be a valid trial status. default_strategy: dict or None, optional Default strategy for trial status that are not defined by ``strategy_configs``. Default is NoParallelStrategy(), which always returns None. """ def __init__(self, strategy_configs=None, default_strategy=None): super(StatusBasedParallelStrategy, self).__init__() if strategy_configs is None: strategy_configs = {"broken": {"of_type": "maxparallelstrategy"}} self.strategies = dict() for status, strategy_config in strategy_configs.items(): self.strategies[status] = strategy_factory.create(**strategy_config) if default_strategy is None: default_strategy = {"of_type": "noparallelstrategy"} self.default_strategy = strategy_factory.create(**default_strategy) @property def configuration(self): configuration = super(StatusBasedParallelStrategy, self).configuration configuration["strategy_configs"] = { status: strategy.configuration for status, strategy in self.strategies.items() } configuration["default_strategy"] = self.default_strategy.configuration return configuration @property def state_dict(self): state_dict = super(StatusBasedParallelStrategy, self).state_dict state_dict["strategies"] = { status: strategy.state_dict for status, strategy in self.strategies.items() } state_dict["default_strategy"] = self.default_strategy.state_dict return state_dict def set_state(self, state_dict): super(StatusBasedParallelStrategy, self).set_state(state_dict) for status in self.strategies.keys(): self.strategies[status].set_state(state_dict["strategies"][status]) self.default_strategy.set_state(state_dict["default_strategy"]) def get_strategy(self, trial): strategy = self.strategies.get(trial.status) if strategy is None: return self.default_strategy return strategy
[docs] def observe(self, trials): for trial in trials: for strategy in self.strategies.values(): strategy.observe([trial]) self.default_strategy.observe([trial])
[docs] def lie(self, trial): # print( # trial.status, self.get_strategy(trial), self.get_strategy(trial).max_result # ) return self.get_strategy(trial).lie(trial)
[docs]class MaxParallelStrategy(ParallelStrategy): """Parallel strategy that uses the max of completed objectives""" def __init__(self, default_result=float("inf")): """Initialize the maximum result used to lie""" super(MaxParallelStrategy, self).__init__() self.default_result = default_result @property def configuration(self): """Provide the configuration of the strategy as a dictionary.""" configuration = super(MaxParallelStrategy, self).configuration configuration["default_result"] = self.default_result return configuration @property def max_result(self): objectives = [ trial.objective.value for trial in self._trials_info.values() if trial.status == "completed" ] if not objectives: return self.default_result return max(objectives)
[docs] def lie(self, trial): """See ParallelStrategy.lie""" result = super(MaxParallelStrategy, self).lie(trial) if result: return result return Trial.Result(name="lie", type="objective", value=self.max_result)
[docs]class MeanParallelStrategy(ParallelStrategy): """Parallel strategy that uses the mean of completed objectives""" def __init__(self, default_result=float("inf")): """Initialize the mean result used to lie""" super(MeanParallelStrategy, self).__init__() self.default_result = default_result @property def configuration(self): """Provide the configuration of the strategy as a dictionary.""" configuration = super(MeanParallelStrategy, self).configuration configuration["default_result"] = self.default_result return configuration @property def mean_result(self): objectives = [ trial.objective.value for trial in self._trials_info.values() if trial.status == "completed" ] if not objectives: return self.default_result return sum(objectives) / len(objectives)
[docs] def lie(self, trial): """See ParallelStrategy.lie""" result = super(MeanParallelStrategy, self).lie(trial) if result: return result return Trial.Result(name="lie", type="objective", value=self.mean_result)
[docs]class StubParallelStrategy(ParallelStrategy): """Parallel strategy that returns static objective value for incompleted trials.""" def __init__(self, stub_value=None): """Initialize the stub value""" super(StubParallelStrategy, self).__init__() self.stub_value = stub_value @property def configuration(self): """Provide the configuration of the strategy as a dictionary.""" configuration = super(StubParallelStrategy, self).configuration configuration["stub_value"] = self.stub_value return configuration
[docs] def lie(self, trial): """See ParallelStrategy.lie""" result = super(StubParallelStrategy, self).lie(trial) if result: return result return Trial.Result(name="lie", type="objective", value=self.stub_value)
strategy_factory = GenericFactory(ParallelStrategy)