Source code for orion.core.worker.producer

"""
Produce and register samples to try
===================================

Suggest new parameter sets which optimize the objective.

"""
from __future__ import annotations

import logging
import typing

from orion.core.io.database import DuplicateKeyError
from orion.core.worker.experiment import AlgoT
from orion.core.worker.warm_start.warm_starteable import is_warmstarteable

if typing.TYPE_CHECKING:
    from orion.core.worker.experiment import Experiment


log = logging.getLogger(__name__)


[docs]class Producer: """Produce suggested sets of problem's parameter space to try out. It uses an `Experiment`s `BaseAlgorithm` object to observe trial results and suggest new trials of the parameter `Space` to be evaluated. The producer is the bridge between the storage and the algorithm. """ def __init__(self, experiment: Experiment[AlgoT]): """Initialize a producer. :param experiment: Manager of this experiment, provides convenient interface for interacting with the database. """ log.debug("Creating Producer object.") self.experiment = experiment # Indicates whether the algo has been warm-started with the knowledge base. self.warm_started = False
[docs] def observe(self, trial): """Observe a trial to update algorithm's status""" # algorithm = self.experiment.algorithms # if True: with self.experiment.acquire_algorithm_lock() as algorithm: algorithm.observe([trial])
[docs] def produce(self, pool_size, timeout=60, retry_interval=1): """Create and register new trials.""" log.debug("### Algorithm attempts suggesting %s new trials.", pool_size) n_suggested_trials = 0 with self.experiment.acquire_algorithm_lock( timeout=timeout, retry_interval=retry_interval ) as algorithm: if ( self.experiment.knowledge_base and not self.warm_started and is_warmstarteable(algorithm) ): # todo: Not currently passing a limit on the max_trials to fetch from KB. similar_trials = self.experiment.knowledge_base.get_related_trials( self.experiment.configuration ) log.debug( "### Warm Starting with up to %s experiments and a total of %s trials.", len(similar_trials), sum(len(trials) for _, trials in similar_trials), ) algorithm.warm_start(similar_trials) self.warm_started = True new_trials = algorithm.suggest(pool_size) if not new_trials and not algorithm.is_done: log.info( "Algo does not have more trials to sample." "Waiting for current trials to finish" ) for new_trial in new_trials: try: self.experiment.register_trial(new_trial) n_suggested_trials += 1 except DuplicateKeyError: log.debug("Algo suggested duplicate trial %s", new_trial) return n_suggested_trials