Source code for orion.executor.single_backend

"""
Executor without parallelism for debugging
==========================================

"""
import functools
import time
import traceback

from orion.executor.base import (
    AsyncException,
    AsyncResult,
    BaseExecutor,
    ExecutorClosed,
    Future,
)

# A function can return None so we have to create a difference between
# the None result and the absence of result
NOT_SET = object()


class _Future(Future):
    """Wraps a partial function to act as a Future"""

    def __init__(self, future):
        self.future = future
        self.result = NOT_SET
        self.exception = NOT_SET

    def get(self, timeout=None):
        start = time.time()
        self.wait(timeout)

        if timeout and time.time() - start > timeout:
            raise TimeoutError()

        if self.result is not NOT_SET:
            return self.result

        else:
            raise self.exception

    def wait(self, timeout=None):
        if self.ready():
            return

        try:
            self.result = self.future()
        except Exception as e:
            self.exception = e

    def ready(self):
        return (self.result is not NOT_SET) or (self.exception is not NOT_SET)

    def successful(self):
        if not self.ready():
            raise ValueError()

        return self.exception is NOT_SET


[docs]class SingleExecutor(BaseExecutor): """Single thread executor Simple executor for debugging. No parameters. The submitted functions are wrapped with ``functools.partial`` which are then executed in ``wait()``. Notes ----- The tasks are started when wait is called """ def __init__(self, n_workers=1, **config): super().__init__(n_workers=1) self.closed = False self.nested = 0 def __del__(self): if hasattr(self, "closed"): self.close() def __enter__(self): self.nested += 1 return self def __exit__(self, *args, **kwargs): self.close()
[docs] def close(self): """Prevent user from submitting work after closing.""" if self.nested <= 1: self.closed = True
[docs] def wait(self, futures): return [future.get() for future in futures]
[docs] def async_get(self, futures, timeout=0.01): if len(futures) == 0: return [] results = [] try: fut = futures.pop() results.append(AsyncResult(fut, fut.get())) except Exception as err: results.append(AsyncException(fut, err, traceback.format_exc())) return results
[docs] def submit(self, function, *args, **kwargs): if self.closed: raise ExecutorClosed() return _Future(functools.partial(function, *args, **kwargs))