Source code for orion.client.experiment

# -*- coding: utf-8 -*-
"""
Experiment wrapper client
=========================

Wraps the core Experiment object to provide further functionalities for the user
"""
import atexit
import functools
import inspect
import logging
import sys

from numpy import inf as infinity

import orion.core.utils.format_trials as format_trials
import orion.core.worker
from orion.core.io.database import DuplicateKeyError
from orion.core.utils.exceptions import (
    BrokenExperiment,
    SampleTimeout,
    UnsupportedOperation,
    WaitingForTrials,
)
from orion.core.utils.flatten import flatten, unflatten
from orion.core.worker.trial import Trial, TrialCM
from orion.core.worker.trial_pacemaker import TrialPacemaker
from orion.plotting.base import PlotAccessor
from orion.storage.base import FailedUpdate

log = logging.getLogger(__name__)


[docs]def set_broken_trials(client): """Release all trials with status broken if the process exits without releasing them.""" if sys.exc_info()[0] is KeyboardInterrupt: status = "interrupted" else: status = "broken" for trial_id in list(client._pacemakers.keys()): # pylint: disable=protected-access trial = client.get_trial(uid=trial_id) if trial is None: log.warning( "Trial {} was not found in storage, could not set status to `broken`." ) continue client.release(trial, status=status)
# pylint: disable=too-many-public-methods
[docs]class ExperimentClient: """ExperimentClient providing all functionalities for the python API Note that the ExperimentClient is not meant to be instantiated by the user. Look at `orion.client.create_experiment` to build an ExperimentClient. Parameters ---------- experiment: `orion.core.worker.experiment.Experiment` Experiment object serving for interaction with storage producer: `orion.core.worker.producer.Producer` Producer object used to produce new trials. """ def __init__(self, experiment, producer, heartbeat=None): self._experiment = experiment self._producer = producer self._pacemakers = {} self.set_broken_trials = functools.partial(set_broken_trials, client=self) if heartbeat is None: heartbeat = orion.core.config.worker.heartbeat self.heartbeat = heartbeat self.plot = PlotAccessor(self) atexit.register(self.set_broken_trials) ### # Attributes ### @property def name(self): """Return the name of the experiment in the database.""" return self._experiment.name # pylint: disable=invalid-name @property def id(self): """Return the id of the experiment in the database.""" return self._experiment.id @property def version(self): """Version of the experiment.""" return self._experiment.version @property def max_trials(self): """Max-trials to execute before stopping the experiment.""" return self._experiment.max_trials @property def max_broken(self): """Minimum number of broken trials before the experiment is considered broken.""" return self._experiment.max_broken @property def metadata(self): """Metadata of the experiment.""" return self._experiment.metadata @property def space(self): """Return problem's parameter `orion.algo.space.Space`.""" return self._experiment.space @property def algorithms(self): """Algorithms of the experiment.""" return self._experiment.algorithms @property def refers(self): """References to the experiment version control""" return self._experiment.refers @property def is_done(self): """Return True, if this experiment is considered to be finished. 1. Count how many trials have been completed and compare with `max_trials`. 2. Ask `algorithms` if they consider there is a chance for further improvement. """ return self._experiment.is_done @property def is_broken(self): """Return True, if this experiment is considered to be broken. Count how many trials are broken and return True if that number has reached as given threshold. """ return self._experiment.is_broken @property def configuration(self): """Return a copy of an `Experiment` configuration as a dictionary.""" return self._experiment.configuration @property def stats(self): """Calculate a stats dictionary for this particular experiment. Returns ------- stats : dict Stats ----- trials_completed : int Number of completed trials best_trials_id : int Unique identifier of the :class:`orion.core.worker.trial.Trial` object in the database which achieved the best known objective result. best_evaluation : float Evaluation score of the best trial start_time : `datetime.datetime` When Experiment was first dispatched and started running. finish_time : `datetime.datetime` When Experiment reached terminating condition and stopped running. duration : `datetime.timedelta` Elapsed time. """ return self._experiment.stats @property def node(self): """Node of the experiment in the version control tree.""" return self._experiment.node @property def working_dir(self): """Working directory of the experiment.""" return self._experiment.working_dir @property def producer(self): """Return the producer configuration of the experiment.""" return self._experiment.producer @property def mode(self): """Return the access right of the experiment {'r': read, 'w': read/write, 'x': read/write/execute} """ return self._experiment.mode ### # Rights ### def _check_if_writable(self): if self.mode == "r": calling_function = inspect.stack()[1].function raise UnsupportedOperation( f"ExperimentClient must have write rights to execute `{calling_function}()`" ) def _check_if_executable(self): if self.mode != "x": calling_function = inspect.stack()[1].function raise UnsupportedOperation( f"ExperimentClient must have execution rights to execute `{calling_function}()`" ) ### # Queries ###
[docs] def to_pandas(self, with_evc_tree=False): """Builds a dataframe with the trials of the experiment Parameters ---------- with_evc_tree: bool, optional Fetch all trials from the EVC tree. Default: False """ return self._experiment.to_pandas(with_evc_tree=with_evc_tree)
[docs] def fetch_trials(self, with_evc_tree=False): """Fetch all trials of the experiment Parameters ---------- with_evc_tree: bool, optional Fetch all trials from the EVC tree. Default: False """ return self._experiment.fetch_trials(with_evc_tree=with_evc_tree)
[docs] def get_trial(self, trial=None, uid=None): """Fetch a single trial Parameters ---------- trial: Trial, optional trial object to retrieve from the database uid: str, optional trial id used to retrieve the trial object Returns ------- return none if the trial is not found, Raises ------ UndefinedCall if both trial and uid are not set AssertionError if both trial and uid are provided and they do not match """ return self._experiment.get_trial(trial, uid)
[docs] def fetch_trials_by_status(self, status, with_evc_tree=False): """Fetch all trials with the given status Trials are sorted based on ``Trial.submit_time`` :return: list of :class:`orion.core.worker.trial.Trial` objects """ return self._experiment.fetch_trials_by_status( status, with_evc_tree=with_evc_tree )
[docs] def fetch_noncompleted_trials(self, with_evc_tree=False): """Fetch non-completed trials of this `Experiment` instance. Trials are sorted based on ``Trial.submit_time`` .. note:: It will return all non-completed trials, including new, reserved, suspended, interrupted and broken ones. :return: list of non-completed :class:`orion.core.worker.trial.Trial` objects """ return self._experiment.fetch_noncompleted_trials(with_evc_tree=with_evc_tree)
### # Actions ### # pylint: disable=unused-argument
[docs] def insert(self, params, results=None, reserve=False): """Insert a new trial. Experiment must be in writable ('w') or executable ('x') mode. Parameters ---------- params: dict Parameters of the new trial to add to the database. These parameters must comply with the space definition otherwise a ValueError will be raised. results: list, optional Results to be set for the new trial. Results must have the format {name: <str>: type: <'objective', 'constraint' or 'gradient'>, value=<float>} otherwise a ValueError will be raised. Note that passing results will mark the trial as completed and therefore cannot be reserved. The returned trial will have status 'completed'. If the results are invalid, the trial will still be inserted but reservation will be released. reserve: bool, optional If reserve=True, the inserted trial will be reserved. `reserve` cannot be True if `results` are given. Defaults to False. Returns ------- `orion.core.worker.trial.Trial` The trial inserted in storage. If `reserve=True` and no results are given, the returned trial will be in a `reserved` status. Raises ------ `ValueError` - If results are given and reserve=True - If params have invalid format - If results have invalid format `orion.core.io.database.DuplicateKeyError` - If a trial with identical params already exist for the current experiment. `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in writable mode. """ self._check_if_writable() if results and reserve: raise ValueError( "Cannot observe a trial and reserve it. A trial with results has status " "`completed` and cannot be reserved." ) trial = format_trials.dict_to_trial(params, self.space) try: self._experiment.register_trial(trial, status="reserved") self._maintain_reservation(trial) except DuplicateKeyError as e: message = ( "A trial with params {} already exist for experiment {}-v{}".format( params, self.name, self.version ) ) raise DuplicateKeyError(message) from e if results: try: self.observe(trial, results) except ValueError: self._release_reservation(trial) raise return trial if not reserve: self.release(trial) return trial
[docs] def reserve(self, trial): """Reserve a trial. Experiment must be in executable ('x') mode. Set a trial status to reserve to ensure that concurrent process cannot work on it. Trials can only be reserved with status 'new', 'interrupted' or 'suspended'. Parameters ---------- trial: `orion.core.worker.trial.Trial` Trial to reserve. Raises ------ `RuntimeError` If trial is reserved by another process `ValueError` If the trial does not exist in storage. `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in executable mode. Notes ----- When reserved, a :class:`TrialPacemaker <orion.core.worker.trial_pacemaker.TrialPacemaker>` is started to update an heartbeat in storage. The frequency of the heartbeat is configurable at creation of experiment or with ``orion.core.config.worker.heartbeat``. If the process terminates unexpectedly, the heartbeat will cease and remote processes may reset the status of the trial to 'interrupted' when the heartbeat has not been updated since twice the value of ``heartbeat``. """ self._check_if_executable() if trial.status == "reserved" and trial.id in self._pacemakers: log.warning("Trial %s is already reserved.", trial.id) return elif trial.status == "reserved" and trial.id not in self._pacemakers: raise RuntimeError( "Trial {} is already reserved by another process.".format(trial.id) ) try: self._experiment.set_trial_status( trial, "reserved", heartbeat=self.heartbeat ) except FailedUpdate as e: if self.get_trial(trial) is None: raise ValueError( "Trial {} does not exist in database.".format(trial.id) ) from e raise RuntimeError("Could not reserve trial {}.".format(trial.id)) from e self._maintain_reservation(trial)
[docs] def release(self, trial, status="interrupted"): """Release a trial. Release the reservation and stop the heartbeat. Experiment must be in writable ('w') or executable ('x') mode. Parameters ---------- trial: `orion.core.worker.trial.Trial` Trial to reserve. status: str, optional Set the trial to given status while releasing the reservation. Defaults to 'interrupted'. Raises ------ `RuntimeError` If reservation of the trial has been lost prior to releasing it. `ValueError` If the trial does not exist in storage. `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in writable mode. """ self._check_if_writable() current_status = trial.status raise_if_unreserved = True try: self._experiment.set_trial_status(trial, status, was="reserved") except FailedUpdate as e: if self.get_trial(trial) is None: raise ValueError( "Trial {} does not exist in database.".format(trial.id) ) from e if current_status != "reserved": raise_if_unreserved = False raise RuntimeError( "Trial {} was already released locally.".format(trial.id) ) from e raise RuntimeError( "Reservation for trial {} has been lost before release.".format( trial.id ) ) from e finally: self._release_reservation(trial, raise_if_unreserved=raise_if_unreserved)
[docs] def suggest(self): """Suggest a trial to execute. Experiment must be in executable ('x') mode. If any trial is available (new or interrupted), it selects one and reserves it. Otherwise, the algorithm is used to generate a new trial that is registered in storage and reserved. Returns ------- `orior.core.worker.trial.Trial` or None Reserved trial for execution. Will return None if experiment is done. of if the algorithm cannot suggest until other trials complete. Raises ------ :class:`orion.core.utils.exceptions.WaitingForTrials` if the experiment is not completed and algorithm needs to wait for some trials to complete before it can suggest new trials. :class:`orion.core.utils.exceptions.BrokenExperiment` if too many trials failed to run and the experiment cannot continue. This is determined by ``max_broken`` in the configuration of the experiment. :class:`orion.core.utils.exceptions.SampleTimeout` if the algorithm of the experiment could not sample new unique points. :class:`orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in executable mode. """ self._check_if_executable() if self.is_broken: raise BrokenExperiment("Trials failed too many times") if self.is_done: return None try: trial = orion.core.worker.reserve_trial(self._experiment, self._producer) except WaitingForTrials as e: if self.is_broken: raise BrokenExperiment("Trials failed too many times") from e raise e except SampleTimeout as e: if self.is_broken: raise BrokenExperiment("Trials failed too many times") from e raise e if trial is None: return trial else: self._maintain_reservation(trial) return TrialCM(self, trial)
[docs] def observe(self, trial, results): """Observe trial results Experiment must be in executable ('x') mode. Parameters ---------- trial: `orion.core.worker.trial.Trial` Reserved trial to observe. results: list Results to be set for the new trial. Results must have the format {name: <str>: type: <'objective', 'constraint' or 'gradient'>, value=<float>} otherwise a ValueError will be raised. If the results are invalid, the trial will not be released. Returns ------- `orion.core.worker.trial.Trial` The trial inserted in storage. If `reserve=True` and no results are given, the returned trial will be in a `reserved` status. Raises ------ `ValueError` - If results have invalid format - If the trial does not exist in storage. `RuntimeError` If reservation of the trial has been lost prior to releasing it. `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in executable mode. """ self._check_if_executable() trial.results += [Trial.Result(**result) for result in results] raise_if_unreserved = True try: self._experiment.update_completed_trial(trial) except FailedUpdate as e: if self.get_trial(trial) is None: raise_if_unreserved = False raise ValueError( "Trial {} does not exist in database.".format(trial.id) ) from e raise RuntimeError( "Reservation for trial {} has been lost.".format(trial.id) ) from e finally: self._release_reservation(trial, raise_if_unreserved=raise_if_unreserved)
[docs] def workon(self, fct, max_trials=infinity, **kwargs): """Optimize a given function Experiment must be in executable ('x') mode. Parameters ---------- fct: callable Function to optimize. Must take arguments provided by trial.params. Additional constant parameter can be passed as ``**kwargs`` to `workon`. Function must return the final objective. max_trials: int, optional Maximum number of trials to execute within `workon`. If the experiment or algorithm reach status is_done before, the execution of `workon` terminates. **kwargs Constant argument to pass to `fct` in addition to trial.params. If values in kwargs are present in trial.params, the latter takes precedence. Raises ------ `ValueError` If results returned by `fct` have invalid format `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in executable mode. """ self._check_if_executable() trials = 0 kwargs = flatten(kwargs) while not self.is_done and trials < max_trials: trial = self.suggest() if trial is None: log.warning("Algorithm could not sample new points") return trials kwargs.update(flatten(trial.params)) results = fct(**unflatten(kwargs)) self.observe(trial, results=results) trials += 1 return trials
[docs] def close(self): """Verify that no reserved trials are remaining and unregister atexit(). Experiment must be in executable ('x') mode. Raises ------ `orion.core.utils.exceptions.UnsupportedOperation` If the experiment was not loaded in executable mode. """ self._check_if_executable() if self._pacemakers: raise RuntimeError( "There is still reserved trials: {}\nRelease all trials before " "closing the client, using " "client.release(trial).".format(self._pacemakers.keys()) ) atexit.unregister(self.set_broken_trials)
### # Private ### def __repr__(self): """Represent the object as a string.""" return "Experiment(name=%s, version=%s)" % (self.name, self.version) def _verify_reservation(self, trial): if trial.id not in self._pacemakers: raise RuntimeError( "Trial {} had no pacemakers. Was it reserved properly?".format(trial.id) ) if self.get_trial(trial).status != "reserved": self._release_reservation(trial) raise RuntimeError( "Reservation for trial {} has been lost.".format(trial.id) ) def _maintain_reservation(self, trial): self._pacemakers[trial.id] = TrialPacemaker(trial) self._pacemakers[trial.id].start() def _release_reservation(self, trial, raise_if_unreserved=True): if trial.id not in self._pacemakers: if raise_if_unreserved: raise RuntimeError( "Trial {} had no pacemakers. Was it reserved properly?".format( trial.id ) ) else: return self._pacemakers.pop(trial.id).stop()