Source code for orion.core.worker.trial_pacemaker

# -*- coding: utf-8 -*-
Trial Pacemaker

Monitor trials and update their heartbeat

import threading

from import get_storage

STOPPED_STATUS = {"completed", "interrupted", "suspended"}

[docs]class TrialPacemaker(threading.Thread): """Monitor a given trial inside a thread, updating its heartbeat at a given interval of time. Parameters ---------- exp: Experiment The current Experiment. """ def __init__(self, trial, wait_time=60): threading.Thread.__init__(self) self.stopped = threading.Event() self.trial = trial self.wait_time = wait_time = get_storage()
[docs] def stop(self): """Stop monitoring.""" self.stopped.set() self.join()
[docs] def run(self): """Run the trial monitoring every given interval.""" while not self.stopped.wait(self.wait_time): self._monitor_trial()
def _monitor_trial(self): trial = if trial.status in STOPPED_STATUS: self.stopped.set() else: if not self.stopped.set()