Source code for orion.core.worker.transformer
# -*- coding: utf-8 -*-
# pylint: disable=too-many-lines
"""
Perform transformations on Dimensions
=====================================
Provide functions and classes to build a Space which an algorithm can operate on.
"""
import functools
import itertools
from abc import ABCMeta, abstractmethod
import numpy
from orion.algo.space import Categorical, Dimension, Fidelity, Integer, Real, Space
NON_LINEAR = ["loguniform", "reciprocal"]
# pylint: disable=unused-argument
[docs]@functools.singledispatch
def build_transform(dim, type_requirement, dist_requirement):
"""Base transformation factory
Parameters
----------
dim: `orion.algo.space.Dimension`
A dimension object which may need transformations to match provided requirements.
type_requirement: str, None
String defining the requirement of the algorithm. It can be one of the following
- 'real', the dim should be transformed so type is `orion.algo.space.Real`
- 'integer', the dim should be transformed so type is `orion.algo.space.Integer`
- 'numerical', the dim should be transformed so type is either `orion.algo.space.Integer` or
`orion.algo.space.Real`
- None, no requirement
dist_requirement: str, None
String defining the distribution requirement of the algorithm.
- 'linear', any dimension with logarithmic prior while be linearized
- None, no requirement
"""
return []
@build_transform.register(Categorical)
def _(dim, type_requirement, dist_requirement):
transformers = []
if type_requirement == "real":
transformers.extend(
[Enumerate(dim.categories), OneHotEncode(len(dim.categories))]
)
elif type_requirement in ["integer", "numerical"]:
transformers.append(Enumerate(dim.categories))
return transformers
@build_transform.register(Fidelity)
def _(dim, type_requirement, dist_requirement):
return []
@build_transform.register(Integer)
def _(dim, type_requirement, dist_requirement):
transformers = []
if dist_requirement == "linear" and dim.prior_name[4:] in NON_LINEAR:
transformers.extend([Reverse(Quantize()), Linearize()])
# NOTE: we do not turn back to integer even though linearize outputs real
# otherwise the mapping from exp(int) to int squashes out lots of possible values.
elif type_requirement == "real":
transformers.append(Reverse(Quantize()))
return transformers
@build_transform.register(Real)
def _(dim, type_requirement, dist_requirement):
transformers = []
if dim.precision is not None:
transformers.append(Precision(dim.precision))
if dist_requirement == "linear" and dim.prior_name in NON_LINEAR:
transformers.append(Linearize())
elif type_requirement == "integer":
# NOTE: This may cause out-of-bound errors for rounded reals. Not fixed for now
# because there are no foreseeable algorithms that may require integer type.
transformers.append(Quantize())
return transformers
[docs]def transform(original_space, type_requirement, dist_requirement):
"""Build a transformed space"""
space = TransformedSpace(original_space)
for dim in original_space.values():
transformers = build_transform(dim, type_requirement, dist_requirement)
space.register(
TransformedDimension(
transformer=Compose(transformers, dim.type), original_dimension=dim
)
)
return space
[docs]def reshape(space, shape_requirement):
"""Build a reshaped space"""
if shape_requirement is None:
return space
# We assume shape_requirement == 'flattened'
reshaped_space = ReshapedSpace(space)
for dim_index, dim in enumerate(space.values()):
if not dim.shape or numpy.prod(dim.shape) == 1:
reshaped_space.register(
ReshapedDimension(
transformer=Identity(dim.type),
original_dimension=dim,
index=dim_index,
)
)
else:
for index in itertools.product(*map(range, dim.shape)):
key = f'{dim.name}[{",".join(map(str, index))}]'
reshaped_space.register(
ReshapedDimension(
transformer=View(dim.shape, index, dim.type),
original_dimension=dim,
name=key,
index=dim_index,
)
)
return reshaped_space
[docs]def build_required_space(
original_space, type_requirement=None, shape_requirement=None, dist_requirement=None
):
"""Build a :class:`orion.algo.space.Space` object which agrees to the `requirements` imposed
by the desired optimization algorithm.
It uses appropriate cascade of `Transformer` objects per `orion.algo.space.Dimension`
contained in `original_space`. `ReshapedTransformer` objects are used above
the `Transformer` if the optimizatios algorithm requires flattened dimensions.
Parameters
----------
original_space : `orion.algo.space.Space`
Original problem's definition of parameter space given by the user to Oríon.
type_requirement: str, None
String defining the requirement of the algorithm. It can be one of the following
- 'real', the dim should be transformed so type is `orion.algo.space.Real`
- 'integer', the dim should be transformed so type is `orion.algo.space.Integer`
- 'numerical', the dim should be transformed so type is either `orion.algo.space.Integer` or
`orion.algo.space.Real`
- None, no requirement
shape_requirement: str, None
String defining the shape requirement of the algorithm.
- 'flattened', any dimension with shape > 1 will be flattened
- None, no requirement
dist_requirement: str, None
String defining the distribution requirement of the algorithm.
- 'linear', any dimension with logarithmic prior while be linearized
- None, no requirement
"""
space = transform(original_space, type_requirement, dist_requirement)
space = reshape(space, shape_requirement)
return space
[docs]class Transformer(object, metaclass=ABCMeta):
"""Define an (injective) function and its inverse. Base transformation class.
Attributes
----------
target_type: str
Defines the type of the target space of the forward function.
It can provide one of the values: ``['real', 'integer', 'categorical']``.
domain_type: str
Is similar to ``target_type`` but it refers to the domain.
If it is ``None``, then it can receive inputs of any type.
"""
domain_type = None
target_type = None
[docs] @abstractmethod
def transform(self, point):
"""Transform a point from domain dimension to the target dimension."""
pass
[docs] @abstractmethod
def reverse(self, transformed_point, index=None):
"""Reverse transform a point from target dimension to the domain dimension."""
pass
# pylint:disable=no-self-use
[docs] def infer_target_shape(self, shape):
"""Return the shape of the dimension after transformation."""
return shape
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return "{}({})".format(self.__class__.__name__, what)
def _get_hashable_members(self):
return (self.__class__.__name__, self.domain_type, self.target_type)
# pylint:disable=protected-access
def __eq__(self, other):
"""Return True if other is the same transformed dimension as self"""
if not isinstance(other, Transformer):
return False
return self._get_hashable_members() == other._get_hashable_members()
[docs]class Identity(Transformer):
"""Implement an identity transformation. Everything as it is."""
def __init__(self, domain_type=None):
self._domain_type = domain_type
@property
def first(self):
"""Signals to ReshapedSpace whether this dimension should be used for `reverse`"""
return True
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Return `transformed_point` as it is."""
if index is not None:
return transformed_point[index]
return transformed_point
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return what
@property
def domain_type(self):
"""Return declared domain type on initialization."""
return self._domain_type
@property
def target_type(self):
"""Return domain type as this will be the target in a identity transformation."""
return self.domain_type
[docs]class Compose(Transformer):
"""Initialize composite transformer with a list of `Transformer` objects
and domain type on which it will be applied.
"""
def __init__(self, transformers, base_domain_type=None):
try:
self.apply = transformers[-1]
except IndexError:
self.apply = Identity()
if len(transformers) > 1:
self.composition = Compose(transformers[:-1], base_domain_type)
else:
self.composition = Identity(base_domain_type)
assert (
self.apply.domain_type is None
or self.composition.target_type == self.apply.domain_type
)
[docs] def transform(self, point):
"""Apply transformers in the increasing order of the `transformers` list."""
point = self.composition.transform(point)
return self.apply.transform(point)
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Reverse transformation by reversing in the opposite order of the `transformers` list."""
transformed_point = self.apply.reverse(transformed_point)
return self.composition.reverse(transformed_point)
[docs] def interval(self, alpha=1.0):
"""Return interval of composed transformation."""
if hasattr(self.apply, "interval"):
return self.apply.interval(alpha)
return None
[docs] def infer_target_shape(self, shape):
"""Return the shape of the dimension after transformation."""
shape = self.composition.infer_target_shape(shape)
return self.apply.infer_target_shape(shape)
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return self.apply.repr_format(self.composition.repr_format(what))
@property
def domain_type(self):
"""Return base domain type."""
return self.composition.domain_type
@property
def target_type(self):
"""Infer type of the tranformation target."""
type_before = self.composition.target_type
type_after = self.apply.target_type
return type_after if type_after else type_before
# pylint:disable=protected-access
def _get_hashable_members(self):
return (
(self.__class__.__name__,)
+ self.apply._get_hashable_members()
+ self.composition._get_hashable_members()
)
[docs]class Reverse(Transformer):
"""Apply the reverse transformation that another one would do."""
def __init__(self, transformer: Transformer):
assert not isinstance(
transformer, OneHotEncode
), "real to categorical is pointless"
self.transformer = transformer
[docs] def transform(self, point):
"""Use `reserve` of composed `transformer`."""
return self.transformer.reverse(point)
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Use `transform` of composed `transformer`."""
return self.transformer.transform(transformed_point)
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return "{}{}".format(
self.__class__.__name__, self.transformer.repr_format(what)
)
@property
def target_type(self):
"""Return `domain_type` of composed `transformer`."""
return self.transformer.domain_type
@property
def domain_type(self):
"""Return `target_type` of composed `transformer`."""
return self.transformer.target_type
[docs]class Precision(Transformer):
"""Round real numbers to requested precision."""
domain_type = "real"
target_type = "real"
def __init__(self, precision=4):
self.precision = precision
[docs] def transform(self, point):
"""Round `point` to the requested precision, as numpy arrays."""
# numpy.format_float_scientific precision starts at 0
if isinstance(point, (list, tuple)) or (
isinstance(point, numpy.ndarray) and point.shape
):
format_float = numpy.vectorize(
lambda x: numpy.format_float_scientific(x, precision=self.precision - 1)
)
point = format_float(point)
to_float = numpy.vectorize(float)
point = to_float(point)
else:
point = float(
numpy.format_float_scientific(point, precision=self.precision - 1)
)
return numpy.asarray(point)
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Cast `transformed_point` to floats, as numpy arrays."""
return self.transform(transformed_point)
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return "{}({}, {})".format(self.__class__.__name__, self.precision, what)
[docs]class Quantize(Transformer):
"""Transform real numbers to integers, violating injection."""
domain_type = "real"
target_type = "integer"
[docs] def transform(self, point):
"""Round `point` and then cast to integers, as numpy arrays."""
quantized = numpy.round(numpy.asarray(point)).astype(int)
if numpy.any(numpy.isinf(point)):
isinf = int(numpy.isinf(point))
quantized = (
isinf * (quantized - 1) * int(numpy.sign(point))
+ (1 - isinf) * (quantized - 1)
).astype(int)
return quantized
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Cast `transformed_point` to floats, as numpy arrays."""
return numpy.asarray(transformed_point).astype(float)
[docs]class Enumerate(Transformer):
"""Enumerate categories.
Effectively transform from a list of objects to a range of integers.
"""
domain_type = "categorical"
target_type = "integer"
def __init__(self, categories):
self.categories = categories
map_dict = {cat: i for i, cat in enumerate(categories)}
self._map = numpy.vectorize(lambda x: map_dict[x], otypes="i")
self._imap = numpy.vectorize(lambda x: categories[x], otypes=[numpy.object])
def __deepcopy__(self, memo):
"""Make a deepcopy"""
return type(self)(self.categories)
[docs] def transform(self, point):
"""Return integers corresponding uniquely to the categories in `point`."""
return self._map(point)
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Return categories corresponding to their positions inside `transformed_point`."""
return self._imap(transformed_point)
# pylint:disable=unused-argument
[docs] def interval(self, alpha=1.0):
"""Return the interval for the enumerated choices."""
return (0, len(self.categories) - 1)
[docs]class OneHotEncode(Transformer):
"""Encode categories to a 1-hot integer space representation."""
domain_type = "integer"
target_type = "real"
def __init__(self, bound: int):
self.num_cats = bound
[docs] def transform(self, point):
"""Match a `point` containing integers to real vector representations of them.
If the upper bound of integers supported by an instance of `OneHotEncode`
is less or equal to 2, then cast them to floats.
.. note:: This transformation possibly appends one more tensor dimension to `point`.
"""
point_ = numpy.asarray(point)
assert (
numpy.all(point_ < self.num_cats)
and numpy.all(point_ >= 0)
and numpy.all(point_ % 1 == 0)
)
if self.num_cats <= 2:
return numpy.asarray(point_, dtype=float)
hot = numpy.zeros(self.infer_target_shape(point_.shape))
grid = numpy.meshgrid(
*[numpy.arange(dim) for dim in point_.shape], indexing="ij"
)
hot[grid + [point_]] = 1
return hot
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Match real vector representations to integers using an argmax function.
If the number of dimensions is exactly 2, then use 0.5 as a decision boundary,
and convert representation to integers 0 or 1.
If the number of dimensions is exactly 1, then return zeros.
.. note:: This reverse transformation possibly removes the last tensor dimension
from `transformed_point`.
"""
point_ = numpy.asarray(transformed_point)
if self.num_cats == 2:
return (point_ > 0.5).astype(int)
elif self.num_cats == 1:
return numpy.zeros_like(point_, dtype=int)
assert point_.shape[-1] == self.num_cats
return point_.argmax(axis=-1)
# pylint:disable=unused-argument
[docs] def interval(self, alpha=1.0):
"""Return the interval for the one-hot encoding in proper shape."""
if self.num_cats == 2:
return 0, 1
else:
low = numpy.zeros(self.num_cats)
high = numpy.ones(self.num_cats)
return low, high
[docs] def infer_target_shape(self, shape):
"""Infer that transformed points will have one more tensor dimension,
if the number of supported integers to transform is larger than 2.
"""
return tuple(list(shape) + [self.num_cats]) if self.num_cats > 2 else shape
def _get_hashable_members(self):
return super(OneHotEncode, self)._get_hashable_members() + (self.num_cats,)
[docs]class Linearize(Transformer):
"""Transform real numbers from loguniform to linear."""
domain_type = "real"
target_type = "real"
[docs] def transform(self, point):
"""Linearize logarithmic distribution."""
return numpy.log(numpy.asarray(point))
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Turn linear distribution to logarithmic distribution."""
return numpy.exp(numpy.asarray(transformed_point))
[docs]class View(Transformer):
"""Look-up single index in a dimensions with shape > 1"""
def __init__(self, shape, index, domain_type=None):
self.shape = shape
self.index = index
self._domain_type = domain_type
@property
def first(self):
"""Signals to ReshapedSpace whether this dimension should be used for `reverse`"""
return sum(self.index) == 0
[docs] def transform(self, point):
"""Only return one element of the group"""
return point[self.index]
[docs] def reverse(self, transformed_point, index=None):
"""Only return packend point if view of first element, otherwise drop."""
subset = transformed_point[index : index + numpy.prod(self.shape)]
return numpy.array(subset).reshape(self.shape)
[docs] def interval(self, interval):
"""Return corresponding view from interval"""
return (interval[0][self.index], interval[1][self.index])
@property
def domain_type(self):
"""Return declared domain type on initialization."""
return self._domain_type
@property
def target_type(self):
"""Return domain type as this will be the target in flatten transformation."""
return self.domain_type
[docs] def repr_format(self, what):
"""Format a string for calling ``__repr__`` in `TransformedDimension`."""
return "{}(shape={}, index={}, {})".format(
self.__class__.__name__, self.shape, self.index, what
)
[docs]class TransformedDimension(object):
"""Duck-type :class:`orion.algo.space.Dimension` to mimic its functionality,
while transform automatically and appropriately an underlying
:class:`orion.algo.space.Dimension` object according to a `Transformer` object.
"""
NO_DEFAULT_VALUE = Dimension.NO_DEFAULT_VALUE
def __init__(self, transformer, original_dimension):
self.original_dimension = original_dimension
self.transformer = transformer
[docs] def transform(self, point):
"""Expose `Transformer.transform` interface from underlying instance."""
return self.transformer.transform(point)
# pylint:disable=unused-argument
[docs] def reverse(self, transformed_point, index=None):
"""Expose `Transformer.reverse` interface from underlying instance."""
return self.transformer.reverse(transformed_point)
[docs] def interval(self, alpha=1.0):
"""Map the interval bounds to the transformed ones."""
if hasattr(self.transformer, "interval"):
interval = self.transformer.interval()
if interval:
return interval
if self.original_dimension.type == "categorical":
return self.original_dimension.categories
low, high = self.original_dimension.interval(alpha)
return self.transform(low), self.transform(high)
def __contains__(self, point):
"""Reverse transform and ask the original dimension if it is a possible
sample.
"""
try:
orig_point = self.reverse(point)
except AssertionError:
return False
return orig_point in self.original_dimension
def __repr__(self):
"""Represent the object as a string."""
return self.transformer.repr_format(repr(self.original_dimension))
# pylint:disable=protected-access
def __eq__(self, other):
"""Return True if other is the same transformed dimension as self"""
if not (hasattr(other, "transformer") and hasattr(other, "original_dimension")):
return False
return (
self.transformer == other.transformer
and self.original_dimension == other.original_dimension
)
def __hash__(self):
"""Hash of the transformed dimension"""
return hash(self._get_hashable_members())
# pylint:disable=protected-access
def _get_hashable_members(self):
"""Hashable members of transformation and original dimension"""
return (
self.transformer._get_hashable_members()
+ self.original_dimension._get_hashable_members()
)
@property
def name(self):
"""Do not change the name of the original dimension."""
return self.original_dimension.name
@property
def type(self):
"""Ask transformer which is its target class."""
type_ = self.transformer.target_type
return type_ if type_ != "invariant" else self.original_dimension.type
@property
def prior_name(self):
"""Do not change the prior name of the original dimension."""
return self.original_dimension.prior_name
@property
def shape(self):
"""Wrap original shape with transformer, because it may have changed."""
return self.transformer.infer_target_shape(self.original_dimension.shape)
@property
def cardinality(self):
"""Wrap original :class:`orion.algo.space.Dimension` capacity"""
# May be a discretized real, must reduce cardinality
if self.type == "integer":
return Integer.get_cardinality(self.shape, self.interval())
# Else we don't care what transformation is.
return self.original_dimension.cardinality
[docs]class ReshapedDimension(TransformedDimension):
"""Duck-type :class:`orion.algo.space.Dimension` to mimic its functionality."""
def __init__(self, transformer, original_dimension, index, name=None):
super(ReshapedDimension, self).__init__(transformer, original_dimension)
if name is None:
name = original_dimension.name
self._name = name
self.index = index
@property
def first(self):
"""Signals to ReshapedSpace whether this dimension should be used for `reverse`"""
return self.transformer.first
[docs] def transform(self, point):
"""Expose `Transformer.transform` interface from underlying instance."""
return self.transformer.transform(point[self.index])
[docs] def reverse(self, transformed_point, index=None):
"""Expose `Transformer.reverse` interface from underlying instance."""
return self.transformer.reverse(transformed_point, index)
[docs] def interval(self, alpha=1.0):
"""Map the interval bounds to the transformed ones."""
interval = self.original_dimension.interval(alpha)
if hasattr(interval[0], "shape") and numpy.prod(interval[0].shape) > 1:
return self.transformer.interval(interval)
return interval
@property
def cardinality(self):
"""Compute cardinality"""
cardinality = super(ReshapedDimension, self).cardinality
if isinstance(self.transformer, View):
cardinality /= numpy.prod(self.transformer.shape)
return cardinality
[docs] def cast(self, point):
"""Cast a point according to original_dimension and then transform it"""
return self.original_dimension.cast(point)
@property
def shape(self):
"""Shape is fixed to ()."""
return ()
@property
def name(self):
"""Name of the view"""
return self._name
[docs]class TransformedSpace(Space):
"""Wrap the :class:`orion.algo.space.Space` to support transformation methods.
Parameter
---------
space: `orion.algo.space.Space`
Original problem's definition of parameter space.
"""
contains = TransformedDimension
def __init__(self, space, *args, **kwargs):
super(TransformedSpace, self).__init__(*args, **kwargs)
self._original_space = space
[docs] def transform(self, point):
"""Transform a point that was in the original space to be in this one."""
return tuple([dim.transform(point[i]) for i, dim in enumerate(self.values())])
[docs] def reverse(self, transformed_point):
"""Reverses transformation so that a point from this `TransformedSpace`
to be in the original one.
"""
return tuple(
[dim.reverse(transformed_point[i]) for i, dim in enumerate(self.values())]
)
[docs] def sample(self, n_samples=1, seed=None):
"""Sample from the original dimension and forward transform them."""
points = self._original_space.sample(n_samples=n_samples, seed=seed)
return [self.transform(point) for point in points]
[docs]class ReshapedSpace(Space):
"""Wrap the `TransformedSpace` to support reshape methods.
Parameter
---------
space: `orion.core.worker.TransformedSpace`
Transformed version of the orinigal problem's definition of parameter space.
"""
contains = ReshapedDimension
def __init__(self, original_space, *args, **kwargs):
super(ReshapedSpace, self).__init__(*args, **kwargs)
self._original_space = original_space
@property
def original(self):
"""Original space without reshape or transformations"""
return self._original_space
[docs] def transform(self, point):
"""Transform a point that was in the original space to be in this one."""
return self.reshape(self.original.transform(point))
[docs] def reverse(self, transformed_point):
"""Reverses transformation so that a point from this `ReshapedSpace` to be in the original
one.
"""
return self.original.reverse(self.restore_shape(transformed_point))
[docs] def reshape(self, point):
"""Reshape the point"""
return tuple([dim.transform(point) for dim in self.values()])
[docs] def restore_shape(self, transformed_point):
"""Restore shape"""
point = []
for index, dim in enumerate(self.values()):
if dim.first:
point.append(dim.reverse(transformed_point, index))
return point
[docs] def sample(self, n_samples=1, seed=None):
"""Sample from the original dimension and forward transform them."""
points = self.original.sample(n_samples=n_samples, seed=seed)
return [self.reshape(point) for point in points]
def __contains__(self, value):
"""Check whether `value` is within the bounds of the space.
Or check if a name for a dimension is registered in this space.
Parameters
----------
value: list
List of values associated with the dimensions contained or a string indicating a
dimension's name.
"""
if isinstance(value, str):
return super(ReshapedSpace, self).__contains__(value)
try:
len(value)
except TypeError as exc:
raise TypeError(
"Can check only for dimension names or "
"for tuples with parameter values."
) from exc
if not self:
return False
return self.restore_shape(value) in self.original
@property
def cardinality(self):
"""Reshape does not affect cardinality"""
return self.original.cardinality