""" Surrogate model based on Kriging. """
import numpy as np
import scipy.linalg as linalg
from scipy.optimize import minimize
from six.moves import zip, range
from openmdao.surrogate_models.surrogate_model import SurrogateModel
MACHINE_EPSILON = np.finfo(np.double).eps
[docs]class KrigingSurrogate(SurrogateModel):
"""Surrogate Modeling method based on the simple Kriging interpolation.
Predictions are returned as a tuple of mean and RMSE. Based on Gaussian Processes
for Machine Learning (GPML) by Rasmussen and Williams. (see also: scikit-learn).
Args
----
nugget : double or ndarray, optional
Nugget smoothing parameter for smoothing noisy data. Represents the variance of the input values.
If nugget is an ndarray, it must be of the same length as the number of training points.
Default: 10. * Machine Epsilon
eval_rmse : bool
Flag indicating whether the Root Mean Squared Error (RMSE) should be computed. Set to False
by default.
"""
def __init__(self, nugget=10. * MACHINE_EPSILON, eval_rmse=False):
super(KrigingSurrogate, self).__init__()
self.n_dims = 0 # number of independent
self.n_samples = 0 # number of training points
self.thetas = np.zeros(0)
self.nugget = nugget # nugget smoothing parameter from [Sasena, 2002]
self.alpha = np.zeros(0)
self.L = np.zeros(0)
self.sigma2 = np.zeros(0)
# Normalized Training Values
self.X = np.zeros(0)
self.Y = np.zeros(0)
self.X_mean = np.zeros(0)
self.X_std = np.zeros(0)
self.Y_mean = np.zeros(0)
self.Y_std = np.zeros(0)
self.eval_rmse = eval_rmse
[docs] def train(self, x, y):
"""
Train the surrogate model with the given set of inputs and outputs.
Args
----
x : array-like
Training input locations
y : array-like
Model responses at given inputs.
"""
super(KrigingSurrogate, self).train(x, y)
x, y = np.atleast_2d(x, y)
self.n_samples, self.n_dims = x.shape
if self.n_samples <= 1:
raise ValueError(
'KrigingSurrogate require at least 2 training points.'
)
# Normalize the data
X_mean = np.mean(x, axis=0)
X_std = np.std(x, axis=0)
Y_mean = np.mean(y, axis=0)
Y_std = np.std(y, axis=0)
X_std[X_std == 0.] = 1.
Y_std[Y_std == 0.] = 1.
X = (x - X_mean) / X_std
Y = (y - Y_mean) / Y_std
self.X = X
self.Y = Y
self.X_mean, self.X_std = X_mean, X_std
self.Y_mean, self.Y_std = Y_mean, Y_std
def _calcll(thetas):
""" Callback function"""
loglike = self._calculate_reduced_likelihood_params(np.exp(thetas))[0]
return -loglike
bounds = [(np.log(1e-5), np.log(1e5)) for _ in range(self.n_dims)]
optResult = minimize(_calcll, 1e-1*np.ones(self.n_dims), method='slsqp',
options={'eps': 1e-3},
bounds=bounds)
if not optResult.success:
raise ValueError('Kriging Hyper-parameter optimization failed: {0}'.format(optResult.message))
self.thetas = np.exp(optResult.x)
_, params = self._calculate_reduced_likelihood_params()
self.alpha = params['alpha']
self.U = params['U']
self.S_inv = params['S_inv']
self.Vh = params['Vh']
self.sigma2 = params['sigma2']
def _calculate_reduced_likelihood_params(self, thetas=None):
"""
Calculates a quantity with the same maximum location as the log-likelihood for a given theta.
Args
----
thetas : ndarray, optional
Given input correlation coefficients. If none given, uses self.thetas from training.
"""
if thetas is None:
thetas = self.thetas
X, Y = self.X, self.Y
params = {}
# Correlation Matrix
distances = np.zeros((self.n_samples, self.n_dims, self.n_samples))
for i in range(self.n_samples):
distances[i, :, i+1:] = np.abs(X[i, ...] - X[i+1:, ...]).T
distances[i+1:, :, i] = distances[i, :, i+1:].T
R = np.exp(-thetas.dot(np.square(distances)))
R[np.diag_indices_from(R)] = 1. + self.nugget
[U,S,Vh] = linalg.svd(R)
# Penrose-Moore Pseudo-Inverse:
# Given A = USV^* and Ax=b, the least-squares solution is
# x = V S^-1 U^* b.
# Tikhonov regularization is used to make the solution significantly more robust.
h = 1e-8 * S[0]
inv_factors = S / (S ** 2. + h ** 2.)
alpha = Vh.T.dot(np.einsum('j,kj,kl->jl', inv_factors, U, Y))
logdet = -np.sum(np.log(inv_factors))
sigma2 = np.dot(Y.T, alpha).sum(axis=0) / self.n_samples
reduced_likelihood = -(np.log(np.sum(sigma2)) + logdet / self.n_samples)
params['alpha'] = alpha
params['sigma2'] = sigma2 * np.square(self.Y_std)
params['S_inv'] = inv_factors
params['U'] = U
params['Vh'] = Vh
return reduced_likelihood, params
[docs] def predict(self, x):
"""
Calculates a predicted value of the response based on the current
trained model for the supplied list of inputs.
Args
----
x : array-like
Point at which the surrogate is evaluated.
"""
super(KrigingSurrogate, self).predict(x)
X, Y = self.X, self.Y
thetas = self.thetas
if isinstance(x, list):
x = np.array(x)
x = np.atleast_2d(x)
n_eval = x.shape[0]
# Normalize input
x_n = (x - self.X_mean) / self.X_std
r = np.zeros((n_eval, self.n_samples), dtype=x.dtype)
for r_i, x_i in zip(r, x_n):
r_i[:] = np.exp(-thetas.dot(np.square((x_i - X).T)))
# Scaled Predictor
y_t = np.dot(r, self.alpha)
# Predictor
y = self.Y_mean + self.Y_std * y_t
if self.eval_rmse:
mse = (1. - np.dot(np.dot(r, self.Vh.T), np.einsum('j,kj,lk->jl', self.S_inv, self.U, r))) * self.sigma2
# Forcing negative RMSE to zero if negative due to machine precision
mse[mse < 0.] = 0.
return y, np.sqrt(mse)
return y
[docs] def linearize(self, x):
"""
Calculates the jacobian of the Kriging surface at the requested point.
Args
----
x : array-like
Point at which the surrogate Jacobian is evaluated.
"""
thetas = self.thetas
# Normalize Input
x_n = (x - self.X_mean) / self.X_std
r = np.exp(-thetas.dot(np.square((x_n - self.X).T)))
# Z = einsum('i,ij->ij', X, Y) is equivalent to, but much faster and
# memory efficient than, diag(X).dot(Y) for vector X and 2D array Y.
# I.e. Z[i,j] = X[i]*Y[i,j]
gradr = r * -2 * np.einsum('i,ij->ij', thetas, (x_n - self.X).T)
jac = np.einsum('i,j,ij->ij', self.Y_std, 1./self.X_std, gradr.dot(self.alpha).T)
return jac
[docs]class FloatKrigingSurrogate(KrigingSurrogate):
"""Surrogate model based on the simple Kriging interpolation. Predictions are returned as floats,
which are the mean of the model's prediction."""
[docs] def predict(self, x):
dist = super(FloatKrigingSurrogate, self).predict(x)
return dist[0] # mean value