"""
Generic Storage Protocol
========================
Implement a generic protocol to allow Orion to communicate using different storage backend.
Storage protocol is a generic way of allowing Orion to interface with different storage.
MongoDB, track, cometML, MLFLow, etc...
Examples
--------
>>> storage_factory.create('track', uri='file://orion_test.json')
>>> storage_factory.create('legacy', experiment=...)
Notes
-----
When retrieving an already initialized Storage object you should use `get_storage`.
`storage_factory.create()` should only be used for initialization purposes as `get_storage`
raises more granular error messages.
"""
import contextlib
import copy
import logging
import orion.core
from orion.core.io import resolve_config
from orion.core.utils import GenericFactory
log = logging.getLogger(__name__)
[docs]def get_uid(item=None, uid=None, force_uid=True):
"""Return uid either from `item` or directly uid.
Parameters
----------
item: Experiment or Trial, optional
Object with .id attribute
uid: str, optional
str id representation
force_uid: bool, optional
If True, at least one of item or uid must be passed.
Raises
------
UndefinedCall
if both item and uid are not set and force_uid is True
AssertionError
if both item and uid are provided and they do not match
"""
if item is not None and uid is not None:
assert item.id == uid
if uid is None:
if item is None and force_uid:
raise MissingArguments("Either `item` or `uid` should be set")
elif item is not None:
uid = item.id
return uid
[docs]def get_trial_uid_and_exp(trial=None, uid=None, experiment_uid=None):
"""Return trial and experiment uid either from `trial` or directly uids.
Parameters
----------
trial: Trial, optional
Object with .id attribute
uid: str, optional
str id representation of the trial
experiment_uid: str, optional
str id representation of the experiment
Raises
------
UndefinedCall
if both trial and (uid or experiment_uid) are not set
AssertionError
if both trial and (uid or experiment_uid) are provided and they do not match
Returns
-------
(trial uid, experiment uid)
"""
if trial is None and experiment_uid is None:
raise MissingArguments(
"Either `trial` or (`uid` and `experiment_uid`) should be set"
)
if trial is not None and experiment_uid:
assert trial.experiment == experiment_uid
elif trial is not None:
experiment_uid = trial.experiment
trial_uid = get_uid(trial, uid)
return trial_uid, experiment_uid
[docs]class FailedUpdate(Exception):
"""Exception raised when we are unable to update a trial' status"""
[docs]class MissingArguments(Exception):
"""Raised when calling a function without the minimal set of parameters"""
[docs]class LockAcquisitionTimeout(Exception):
"""Raised when the lock acquisition timeout (not lock is granted)."""
[docs]class LockedAlgorithmState:
"""Locked state of the algorithm from the storage.
This class helps handle setting the state of the algorithm or resetting it in case
the execution crashes during the lock.
Parameters
----------
state: dict
Dictionary representing the state of the algorithm.
configuration: dict
Configuration of the locked algorithm.
locked: bool
Whether the algorithm is locked or not. Default: True
"""
def __init__(self, state, configuration, locked=True):
self._original_state = state
self.configuration = configuration
self._state = state
self.locked = locked
@property
def state(self):
"""State of the algorithm"""
return self._state
[docs] def set_state(self, state):
"""Update the state of the algorithm that should be saved back in storage."""
self._state = state
[docs] def reset(self):
"""Set back algorithm state to original state found in storage."""
self._state = self._original_state
[docs]class BaseStorageProtocol:
"""Implement a generic protocol to allow Orion to communicate using
different storage backend
"""
[docs] def create_benchmark(self, config):
"""Insert a new benchmark inside the database"""
raise NotImplementedError()
[docs] def fetch_benchmark(self, query, selection=None):
"""Fetch all benchmarks that match the query"""
raise NotImplementedError()
[docs] def create_experiment(self, config):
"""Insert a new experiment inside the database"""
raise NotImplementedError()
[docs] def delete_experiment(self, experiment=None, uid=None):
"""Delete matching experiments from the database
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
Returns
-------
Number of experiments deleted.
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def update_experiment(self, experiment=None, uid=None, where=None, **kwargs):
"""Update the fields of a given experiment
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
where: Optional[dict]
constraint experiment must respect
**kwargs: dict
a dictionary of fields to update
Returns
-------
returns true if the underlying storage was updated
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def fetch_experiments(self, query, selection=None):
"""Fetch all experiments that match the query"""
raise NotImplementedError()
[docs] def register_trial(self, trial):
"""Create a new trial to be executed"""
raise NotImplementedError()
[docs] def delete_trials(self, experiment=None, uid=None, where=None):
"""Delete matching trials from the database
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
where: Optional[dict]
constraint trials must respect
Returns
-------
Number of trials deleted.
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def reserve_trial(self, experiment):
"""Select a pending trial and reserve it for the worker
Returns
-------
Returns the reserved trial or None if no trials were found
"""
raise NotImplementedError()
[docs] def fetch_trials(self, experiment=None, uid=None, where=None):
"""Fetch all the trials of an experiment in the database
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
where: Optional[dict]
constraint trials must respect
Returns
-------
return none if the experiment is not found,
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def update_trials(self, experiment=None, uid=None, where=None, **kwargs):
"""Update trials of a given experiment matching a query
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
where: Optional[dict]
constraint trials must respect
**kwargs: dict
a dictionary of fields to update
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def update_trial(
self, trial=None, uid=None, experiment_uid=None, where=None, **kwargs
):
"""Update fields of a given trial
Parameters
----------
trial: Trial, optional
trial object to update in the database
uid: str, optional
id of the trial to update in the database
experiment_uid: str, optional
experiment id of the trial to update in the database
where: Optional[dict]
constraint trials must respect. Note: useful to handle race conditions.
**kwargs: dict
a dictionary of fields to update
Raises
------
UndefinedCall
if both trial and uid are not set
AssertionError
if both trial and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def get_trial(self, trial=None, uid=None, experiment_uid=None):
"""Fetch a single trial
Parameters
----------
trial: Trial, optional
trial object to retrieve from the database
uid: str, optional
trial id used to retrieve the trial object
experiment_uid: str, optional
experiment id used to retrieve the trial object
Returns
-------
return None if the trial is not found,
Raises
------
UndefinedCall
if both trial and uid are not set
AssertionError
if both trial and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] def fetch_lost_trials(self, experiment):
"""Fetch all trials that have a heartbeat older than
some given time delta (2 minutes by default)
"""
raise NotImplementedError()
[docs] def retrieve_result(self, trial, *args, **kwargs):
"""Fetch the result from a given medium (file, db, socket, etc..) for a given trial and
insert it into the trial object
"""
raise NotImplementedError()
[docs] def push_trial_results(self, trial):
"""Push the trial's results to the database"""
raise NotImplementedError()
[docs] def set_trial_status(self, trial, status, heartbeat=None, was=None):
"""Update the trial status and the heartbeat
Parameters
----------
trial: `Trial` object
Trial object to update in the database.
status: str
Status to be set to the trial
heartbeat: datetime, optional
New heartbeat to update simultaneously with status
was: str, optional
The status the trial should be set to in the database.
If None, current ``trial.status`` will be used.
This is used to ensure coherence in the database, protecting
against race conditions for instance.
Raises
------
FailedUpdate
The exception is raised if the status of the trial object
does not match the status in the database
"""
raise NotImplementedError()
[docs] def fetch_pending_trials(self, experiment):
"""Fetch all trials that are available to be executed by a worker,
this includes new, suspended and interrupted trials
"""
raise NotImplementedError()
[docs] def fetch_noncompleted_trials(self, experiment):
"""Fetch all non completed trials"""
raise NotImplementedError()
[docs] def fetch_trials_by_status(self, experiment, status):
"""Fetch all trials with the given status"""
raise NotImplementedError()
[docs] def count_completed_trials(self, experiment):
"""Count the number of completed trials"""
raise NotImplementedError()
[docs] def count_broken_trials(self, experiment):
"""Count the number of broken trials"""
raise NotImplementedError()
[docs] def update_heartbeat(self, trial):
"""Update trial's heartbeat"""
raise NotImplementedError()
[docs] def initialize_algorithm_lock(self, experiment_id, algorithm_config):
"""Initialize algorithm lock for given experiment
Parameters
----------
experiment_id: int or str
ID of the experiment in storage.
algorithm_config: dict
Configuration of the algorithm.
"""
raise NotImplementedError()
[docs] def release_algorithm_lock(self, experiment=None, uid=None, new_state=None):
"""Release the algorithm lock
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object.
new_state: dict, optional
The new state of the algorithm that should be saved in the lock object.
If None, the previous state is preserved in the lock object in storage.
"""
raise NotImplementedError()
[docs] def get_algorithm_lock_info(self, experiment=None, uid=None):
"""Load algorithm lock info
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object.
Returns
-------
``orion.storage.base.LockedAlgorithmState``
The locked state of the algorithm. Note that the lock is not acquired by the process
calling ``get_algorithm_lock_info`` and the value of LockedAlgorithmState.locked
may not be valid if another process is running and could acquire the lock concurrently.
"""
raise NotImplementedError()
[docs] def delete_algorithm_lock(self, experiment=None, uid=None):
"""Delete experiment algorithm lock from the storage
Parameters
----------
experiment: Experiment, optional
experiment object to retrieve from the database
uid: str, optional
experiment id used to retrieve the trial object
Returns
-------
Number of algorithm lock deleted. Should 1 if successful, 0 is failed.
Raises
------
UndefinedCall
if both experiment and uid are not set
AssertionError
if both experiment and uid are provided and they do not match
"""
raise NotImplementedError()
[docs] @contextlib.contextmanager
def acquire_algorithm_lock(self, experiment, timeout=600, retry_interval=1):
"""Acquire lock on algorithm in storage
This method is a contextmanager and should be called using the ``with``-clause.
Parameters
----------
experiment: Experiment
experiment object to retrieve from the storage
timeout: int, optional
Timeout for the acquisition of the lock. If the lock is not
obtained before ``timeout``, then ``LockAcquisitionTimeout`` is raised.
The timeout is only for the acquisition of the lock.
Once the lock is obtained, it is valid until the context manager is closed.
Default: 600.
retry_interval: int, optional
Sleep time between each attempts at acquiring the lock. Default: 1
Raises
------
``orion.storage.base.LockAcquisitionTimeout``
The lock could not be obtained in less than ``timeout`` seconds.
"""
raise NotImplementedError()
storage_factory = GenericFactory(BaseStorageProtocol)
[docs]def setup_storage(storage=None, debug=False):
"""Create the storage instance from a configuration.
Parameters
----------
config: dict, optional
Configuration for the storage backend. If not defined, global configuration
is used.
debug: bool, optional
If using in debug mode, the storage config is overridden with legacy:EphemeralDB.
Defaults to False.
"""
if storage is None:
storage = orion.core.config.storage.to_dict()
storage = copy.deepcopy(storage)
if storage.get("type") == "legacy" and "database" not in storage:
storage["database"] = orion.core.config.storage.database.to_dict()
elif storage.get("type") is None and "database" in storage:
storage["type"] = "legacy"
# If using same storage type
if storage["type"] == orion.core.config.storage.type:
storage = resolve_config.merge_configs(
orion.core.config.storage.to_dict(), storage
)
if debug:
storage = {"type": "legacy", "database": {"type": "EphemeralDB"}}
storage_type = storage.pop("type")
log.debug("Creating %s storage client with args: %s", storage_type, storage)
try:
return storage_factory.create(of_type=storage_type, **storage)
except ValueError:
if storage_factory.create().__class__.__name__.lower() != storage_type.lower():
raise
# pylint: disable=too-few-public-methods
[docs]class ReadOnlyStorageProtocol:
"""Read-only interface from a storage protocol.
.. seealso::
:py:class:`BaseStorageProtocol`
"""
__slots__ = ("_storage",)
valid_attributes = {
"get_trial",
"fetch_trials",
"fetch_experiments",
"count_broken_trials",
"count_completed_trials",
"fetch_noncompleted_trials",
"fetch_pending_trials",
"fetch_lost_trials",
"fetch_trials_by_status",
}
def __init__(self, protocol):
"""Init method, see attributes of :class:`BaseStorageProtocol`."""
self._storage = protocol
def __getattr__(self, attr):
"""Get attribute only if valid"""
if attr not in self.valid_attributes:
raise AttributeError(
"Cannot access attribute %s on ReadOnlyStorageProtocol." % attr
)
return getattr(self._storage, attr)