Source code for orion.benchmark.task.profet.model_utils
""" Options and utilities for training the profet meta-model from Emukit. """
import json
import pickle
import warnings
from abc import ABC
from copy import deepcopy
from dataclasses import dataclass
from logging import getLogger as get_logger
from pathlib import Path
from typing import Any, Callable, ClassVar, Optional, Tuple, Union
import numpy as np
_ERROR_MSG = (
"The `profet` extras needs to be installed in order to use the Profet tasks.\n"
"Error: {0}\n"
"Use `pip install orion[profet]` to install the profet extras."
)
try:
import GPy
import torch
from emukit.examples.profet.meta_benchmarks.architecture import (
get_default_architecture,
)
from emukit.examples.profet.meta_benchmarks.meta_forrester import (
get_architecture_forrester, # type: ignore
)
from emukit.examples.profet.train_meta_model import download_data
from GPy.models import BayesianGPLVM
from pybnn.bohamiann import Bohamiann
except ImportError as err:
warnings.warn(RuntimeWarning(_ERROR_MSG.format(err)))
# NOTE: Need to set some garbage dummy values, so that the documentation can be generated without
# actually having these values.
def get_default_architecture(
input_dimensionality: int, classification: bool = False, n_hidden: int = 500
) -> Any:
raise RuntimeError(_ERROR_MSG)
def get_architecture_forrester(input_dimensionality: int) -> Any:
raise RuntimeError(_ERROR_MSG)
logger = get_logger(__name__)
[docs]@dataclass
class MetaModelConfig(ABC):
"""Configuration options for the training of the Profet meta-model."""
benchmark: str
""" Name of the benchmark. """
# ---------- "Abstract"/required class attributes:
json_file_name: ClassVar[str]
""" Name of the json file that contains the data of this benchmark. """
get_architecture: ClassVar[
Callable[[int], "torch.nn.Module"]
] = get_default_architecture
""" Callable that takes the input dimensionality and returns the network to be trained. """
hidden_space: ClassVar[int]
""" Size of the hidden space for this benchmark. """
log_cost: ClassVar[bool]
""" Whether to apply `numpy.log` onto the raw data for the cost of each point. """
log_target: ClassVar[bool]
""" Whether to apply `numpy.log` onto the raw data for the `y` of each point. """
normalize_targets: ClassVar[bool]
""" Whether to normalize the targets (y), by default False. """
shapes: ClassVar[Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]]
""" The shapes of the X, Y and C arrays of the dataset. """
y_min: ClassVar[float]
""" The minimum of the Y array. """
y_max: ClassVar[float]
""" The maximum of the Y array. """
c_min: ClassVar[float]
""" The minimum of the C array. """
c_max: ClassVar[float]
""" The maximum of the C array. """
# -----------
task_id: int = 0
""" Task index. """
seed: int = 123
""" Random seed. """
num_burnin_steps: int = 50000
""" (copied from `Bohamiann.train`): Number of burn-in steps to perform. This value is passed
to the given `optimizer` if it supports special burn-in specific behavior. Networks sampled
during burn-in are discarded.
"""
num_steps: int = 13_000
"""Value passed to the argument of the same name in `Bohamiann.train`.
(copied from `Bohamiann.train`):
Number of sampling steps to perform after burn-in is finished. In total,
`num_steps // keep_every` network weights will be sampled.
"""
mcmc_thining: int = 100
""" `keep_every` argument of `Bohamiann.train`.
(copied from `Bohamiann.train`):
Number of sampling steps (after burn-in) to perform before keeping a sample. In total,
`num_steps // keep_every` network weights will be sampled.
"""
lr: float = 1e-2
""" `lr` argument of `Bohamiann.train`. """
batch_size: int = 5
""" `batch_size` argument of `Bohamiann.train`. """
max_samples: Optional[int] = None
""" Maximum number of data samples to use when training the meta-model. This can be useful
if the dataset is large (e.g. FCNet task) and you don't have crazy amounts of memory.
"""
n_inducing_lvm: int = 50
""" Passed as the value for the "num_inducing" argument of `BayesianGPLVM` constructor.
(copied form ``GPy.core.sparse_gp_mpi.SparseGP_MPI``):
Number of inducing points (optional, default 10. Ignored if Z is not None)
"""
max_iters: int = 10_000
"""Argument passed to the `optimize` method of the `BayesianGPLVM` instance that is used in the
call to `get_features`. Appears to be the number of training iterations to perform.
"""
n_samples_task: int = 500
""" Number of tasks to create in `get_training_data`."""
[docs] def get_task_network(self, input_path: Union[Path, str]) -> Tuple[Any, np.ndarray]:
"""Create, train and return a surrogate model for the given `benchmark`, `seed` and `task_id`.
Parameters
----------
input_path : Union[Path, str]
Data directory containing the json files.
Returns
-------
Tuple[Any, np.ndarray]
The surrogate model for the objective, as well as an array of sampled task features.
"""
rng = np.random.RandomState(seed=self.seed)
X, Y, C = self.load_data(input_path)
task_features_mean, task_features_std = self._get_features(
X=X,
Y=Y,
C=C,
display_messages=False,
)
X_train, Y_train, C_train = self._get_training_data(
X,
Y,
C,
task_features_mean=task_features_mean,
task_features_std=task_features_std,
)
objective_model, cost_model = self._get_meta_model(
X_train,
Y_train,
C_train,
with_cost=False,
)
net = self._create_task_network(objective_model, X_train.shape[1])
multiplier = rng.randn(self.hidden_space)
h = (
task_features_mean[self.task_id]
+ task_features_std[self.task_id] * multiplier
)
return net, h
[docs] def load_data(
self, input_path: Union[str, Path]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Load the profet data for the given benchmark from the input directory.
When the input directory doesn't exist, attempts to download the data to create the input
directory.
Parameters
----------
input_path : Union[str, Path]
Input directory. Expects to find a json file for the given benchmark inside that directory.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
X, Y, and C arrays.
"""
# file = Path(input_path) / NAMES[benchmark]
file = Path(input_path) / self.json_file_name
if not file.exists():
logger.info(f"File {file} doesn't exist, attempting to download data.")
download_data(input_path)
logger.info("Download finished.")
if not file.exists():
raise RuntimeError(
f"Download finished, but file {file} still doesn't exist!"
)
with open(file) as f:
res = json.load(f)
X, Y, C = np.array(res["X"]), np.array(res["Y"]), np.array(res["C"])
if len(X.shape) == 1:
X = X[:, None]
return X, Y, C
[docs] def normalize_Y(
self, Y: np.ndarray, indexD: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Normalize the Y array and return its mean and standard deviations.
Parameters
----------
Y : np.ndarray
Labels from the datasets.
indexD : np.ndarray
Task indices of corresponding labels Y.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
Tuple containing the Y array, the mean array, and the std array.
"""
max_idx = np.max(indexD)
Y_mean = np.zeros(max_idx + 1)
Y_std = np.zeros(max_idx + 1)
for i in range(max_idx + 1):
Y_mean[i] = Y[indexD == i].mean()
Y_std[i] = Y[indexD == i].std() + 1e-8
Y[indexD == i] = (Y[indexD == i] - Y_mean[i]) / Y_std[i]
return Y, Y_mean[:, None], Y_std[:, None]
def _get_features(
self,
X: np.ndarray,
Y: np.ndarray,
C: np.ndarray,
display_messages: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate features for the given task.
Parameters
----------
X : np.ndarray
Training examples
Y : np.ndarray
Training labels
C : np.ndarray
Training costs
display_messages : bool, optional
Whether to log messages to the console or not, by default True.
Returns
-------
Tuple[np.ndarray, np.ndarray]
The features mean and std arrays.
"""
n_tasks = Y.shape[0]
n_configs = X.shape[0]
index_task = np.repeat(np.arange(n_tasks), n_configs)
Y_norm, _, _ = self.normalize_Y(deepcopy(Y.flatten()), index_task)
# train the probabilistic encoder
kern = GPy.kern.Matern52(input_dim=self.hidden_space, ARD=True)
m_lvm = BayesianGPLVM(
Y_norm.reshape(n_tasks, n_configs),
input_dim=self.hidden_space,
kernel=kern,
num_inducing=self.n_inducing_lvm,
)
m_lvm.optimize(max_iters=self.max_iters, messages=display_messages)
ls = np.array(
[m_lvm.kern.lengthscale[i] for i in range(m_lvm.kern.lengthscale.shape[0])]
)
# generate data to train the multi-task model
task_features_mean = np.array(m_lvm.X.mean / ls)
task_features_std = np.array(np.sqrt(m_lvm.X.variance) / ls)
return task_features_mean, task_features_std
def _get_training_data(
self,
X: np.ndarray,
Y: np.ndarray,
C: np.ndarray,
task_features_mean: np.ndarray,
task_features_std: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Create training data by sampling a given number of tasks.
Parameters
----------
X : np.ndarray
Training examples
Y : np.ndarray
Training labels
C : np.ndarray
Training costs (NOTE: This isn't really used at the moment).
task_features_mean : np.ndarray
Mean of the model training weights.
task_features_std : np.ndarray
Std of the model training weights.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
numpy arrays containing the X, Y, and C's for each task.
"""
n_tasks = Y.shape[0]
hidden_space = task_features_std.shape[1]
n_configs = X.shape[0]
X_train_list = []
Y_train_list = []
C_train_list = []
for i, xi in enumerate(X):
for idx in range(n_tasks):
for _ in range(self.n_samples_task):
multiplier = np.random.randn(hidden_space)
ht = task_features_mean[idx] + task_features_std[idx] * multiplier
x = np.concatenate((xi, ht), axis=0)
X_train_list.append(x)
Y_train_list.append(Y[idx, i])
C_train_list.append(C[idx, i])
X_train = np.array(X_train_list)
Y_train = np.array(Y_train_list)
C_train = np.array(C_train_list)
if self.log_cost:
C_train = np.log(C_train)
if self.log_target:
Y_train = np.log(Y_train)
return X_train, Y_train, C_train
def _get_meta_model(
self,
X_train: np.ndarray,
Y_train: np.ndarray,
C_train: np.ndarray,
with_cost: bool = False,
):
"""Create, train and return the objective model, and (optionally) a cost model for the data.
Parameters
----------
X_train : np.ndarray
Training samples.
Y_train : np.ndarray
Training objectives.
C_train : np.ndarray
Training costs.
with_cost : bool, optional
Whether to also create a surrogate model for the cost. Defaults to `False`.
Returns
-------
Tuple[Bohamiann, Optional[Bohamiann]]
Surrogate model for the objective, as well as another for the cost, if `with_cost` is
True, otherwise `None`.
"""
objective_model = Bohamiann(
get_network=type(self).get_architecture,
print_every_n_steps=1000,
normalize_output=self.normalize_targets,
)
logger.info("Training Bohamiann objective model.")
if self.max_samples is not None:
logger.info(
f"Limiting the dataset to a maximum of {self.max_samples} samples."
)
X_train = X_train[: self.max_samples, ...]
Y_train = Y_train[: self.max_samples, ...]
C_train = C_train[: self.max_samples, ...]
logger.debug(f"Shapes: {X_train.shape}, {Y_train.shape}")
logger.debug(f"config: {self}")
objective_model.train(
X_train,
Y_train,
num_steps=self.num_steps + self.num_burnin_steps,
num_burn_in_steps=self.num_burnin_steps,
keep_every=self.mcmc_thining,
lr=self.lr,
verbose=True,
batch_size=self.batch_size,
)
if with_cost:
cost_model = Bohamiann(
get_network=type(self).get_architecture, print_every_n_steps=1000
)
logger.info("Training Bohamiann cost model.")
cost_model.train(
X_train,
C_train,
num_steps=self.num_steps + self.num_burnin_steps,
num_burn_in_steps=self.num_burnin_steps,
keep_every=self.mcmc_thining,
lr=self.lr,
verbose=True,
batch_size=self.batch_size,
)
else:
cost_model = None
return objective_model, cost_model
def _create_task_network(self, model, size: int, idx: int = 0) -> "torch.nn.Module":
"""Retrieve a network with sampled weights for the given task id.
Parameters
----------
model : Bohamiann
"Base" Bohamiann model used to get a network and its weights.
size : int
Input dimensions for the generated network.
idx : int, optional
Task idx, by default 0
Returns
-------
nn.Module
A module with sampled weights.
"""
net = model.get_network(size)
# assert False, (type(self).get_architecture, net, self.shapes)
with torch.no_grad():
sampled_weights = model.sampled_weights[idx]
for parameter, sample in zip(net.parameters(), sampled_weights):
parameter.copy_(torch.from_numpy(sample))
return net
[docs] def load_task_network(
self,
checkpoint_file: Union[str, Path],
) -> Tuple[Any, np.ndarray]:
"""Load the result of the `get_task_network` function stored in the pickle file.
Parameters
----------
checkpoint_file : Union[str, Path]
Path to a pickle file. The file is expected to contain a serialized dictionary, with keys
"benchmark", "size", "network", and "h".
Returns
-------
Tuple[Any, np.ndarray]
The surrogate model for the objective, as well as an array of sampled task features.
"""
with open(checkpoint_file, "rb") as f:
state = pickle.load(f)
if state["benchmark"] != self.benchmark:
raise RuntimeError(
f"Trying to load model for benchmark {self.benchmark} from checkpoint that "
f"contains data from benchmark {state['benchmark']}."
)
network = type(self).get_architecture(input_dimensionality=state["size"])
network.load_state_dict(state["network"])
h = state["h"]
return network, h
[docs] def save_task_network(
self, checkpoint_file: Union[str, Path], network: Any, h: np.ndarray
) -> None:
"""Save the meta-model for the task at the given path.
Parameters
----------
checkpoint_file : Union[str, Path]
Path where the model should be saved
network : Any
The network
h : np.ndarray
The embedding vector
"""
checkpoint_file = Path(checkpoint_file)
state = dict(
benchmark=self.benchmark,
network=network.state_dict(),
size=list(network.parameters())[0].size()[1],
h=h.tolist(),
)
tmp_file = checkpoint_file.with_suffix(".tmp")
with open(tmp_file, "wb") as file:
pickle.dump(state, file, protocol=pickle.DEFAULT_PROTOCOL)
tmp_file.rename(checkpoint_file)