"""
Trial Pacemaker
===============
Monitor trials and update their heartbeat
"""
import threading
STOPPED_STATUS = {"completed", "interrupted", "suspended"}
[docs]class TrialPacemaker(threading.Thread):
"""Monitor a given trial inside a thread, updating its heartbeat
at a given interval of time.
Parameters
----------
exp: Experiment
The current Experiment.
"""
def __init__(self, trial, storage, wait_time=60):
threading.Thread.__init__(self)
self.stopped = threading.Event()
self.trial = trial
self.wait_time = wait_time
self.storage = storage
[docs] def stop(self):
"""Stop monitoring."""
self.stopped.set()
self.join()
[docs] def run(self):
"""Run the trial monitoring every given interval."""
while not self.stopped.wait(self.wait_time):
self._monitor_trial()
def _monitor_trial(self):
trial = self.storage.get_trial(self.trial)
if trial.status in STOPPED_STATUS:
self.stopped.set()
else:
if not self.storage.update_heartbeat(trial):
self.stopped.set()