# -*- coding: utf-8 -*-
:mod:`orion.core.worker.consumer` -- Evaluate objective on a set of parameters

.. module:: consumer
   :platform: Unix
   :synopsis: Call user's script as a black box process to evaluate a trial.

import logging
import os
import signal
import subprocess
import tempfile

import orion.core
from import OrionCmdlineParser
from orion.core.utils.working_dir import WorkingDir
from orion.core.worker.trial_pacemaker import TrialPacemaker

log = logging.getLogger(__name__)

# pylint: disable = unused-argument
def _handler(signum, frame):
    log.error('Oríon has been interrupted.')
    raise KeyboardInterrupt

[docs]class ExecutionError(Exception): """Error raised when Orion is unable to execute the user's script without errors.""" pass
[docs]class Consumer(object): """Consume a trial by using it to initialize a black-box box to evaluate it. It uses an `Experiment` object to push an evaluated trial, if results are delivered to the worker process successfully. It forks another process which executes user's script with the suggested options. It expects results to be written in a **JSON** file, whose path has been defined in a special orion environmental variable which is set into the child process' environment. """ def __init__(self, experiment): """Initialize a consumer. :param experiment: Manager of this experiment, provides convenient interface for interacting with the database. """ log.debug("Creating Consumer object.") self.experiment = experiment = if is None: raise RuntimeError("Experiment object provided to Consumer has not yet completed" " initialization.") # Fetch space builder self.template_builder = OrionCmdlineParser(orion.core.config.user_script_config) self.template_builder.set_state_dict(experiment.metadata['parser']) # Get path to user's script and infer trial configuration directory if experiment.working_dir: self.working_dir = os.path.abspath(experiment.working_dir) else: self.working_dir = os.path.join(tempfile.gettempdir(), 'orion') self.script_path = experiment.metadata['user_script'] self.pacemaker = None
[docs] def consume(self, trial): """Execute user's script as a block box using the options contained within `trial`. :type trial: `orion.core.worker.trial.Trial` """ log.debug("### Create new directory at '%s':", self.working_dir) temp_dir = self.experiment.working_dir is None prefix = + "_" suffix = try: with WorkingDir(self.working_dir, temp_dir, prefix=prefix, suffix=suffix) as workdirname: log.debug("## New consumer context: %s", workdirname) trial.working_dir = workdirname results_file = self._consume(trial, workdirname) log.debug("## Parse results from file and fill corresponding Trial object.") self.experiment.update_completed_trial(trial, results_file) except KeyboardInterrupt: log.debug("### Save %s as interrupted.", trial) self.experiment.set_trial_status(trial, status='interrupted') raise except ExecutionError: log.debug("### Save %s as broken.", trial) self.experiment.set_trial_status(trial, status='broken')
[docs] def get_execution_environment(self, trial, results_file='results.log'): """Set a few environment variables to allow users and underlying processes to know if they are running under orion. Parameters ---------- results_file: str file used to store results, this is only used by the legacy protocol trial: Trial reference to the trial object that is going to be run Notes ----- This function defines the environment variables described below .. envvar:: ORION_EXPERIMENT_ID Current experiment that is being ran. .. envvar:: ORION_EXPERIMENT_NAME Name of the experiment the worker is currently working on. .. envvar:: ORION_EXPERIMENT_VERSION Version of the experiment the worker is currently working on. .. envvar:: ORION_TRIAL_ID Current trial id that is currently being executed in this process. .. envvar:: ORION_WORKING_DIRECTORY Trial's current working directory. .. envvar:: ORION_RESULTS_PATH Trial's results file that is read by the legacy protocol to get the results of the trial after a successful run. """ env = dict(os.environ) env['ORION_EXPERIMENT_ID'] = str( env['ORION_EXPERIMENT_NAME'] = str( env['ORION_EXPERIMENT_VERSION'] = str(self.experiment.version) env['ORION_TRIAL_ID'] = str( env['ORION_WORKING_DIR'] = str(trial.working_dir) env['ORION_RESULTS_PATH'] = str(results_file) return env
def _consume(self, trial, workdirname): config_file = tempfile.NamedTemporaryFile(mode='w', prefix='trial_', suffix='.conf', dir=workdirname, delete=False) config_file.close() log.debug("## New temp config file: %s", results_file = tempfile.NamedTemporaryFile(mode='w', prefix='results_', suffix='.log', dir=workdirname, delete=False) results_file.close() log.debug("## New temp results file: %s", log.debug("## Building command line argument and configuration for trial.") env = self.get_execution_environment(trial, cmd_args = self.template_builder.format(, trial, self.experiment) log.debug("## Launch user's script as a subprocess and wait for finish.") self.pacemaker = TrialPacemaker(trial) self.pacemaker.start() try: self.execute_process(cmd_args, env) finally: # merciless self.pacemaker.stop() return results_file
[docs] def execute_process(self, cmd_args, environ): """Facilitate launching a black-box trial.""" command = [self.script_path] + cmd_args signal.signal(signal.SIGTERM, _handler) process = subprocess.Popen(command, env=environ) return_code = process.wait() if return_code != 0: raise ExecutionError("Something went wrong. Check logs. Process " "returned with code {} !".format(return_code))