Source code for openmdao.surrogate_models.kriging

""" Surrogate model based on Kriging. """
from math import log

from openmdao.surrogate_models.surrogate_model import SurrogateModel

# pylint: disable-msg=E0611,F0401
from numpy import zeros, dot, ones, eye, abs, exp, log10, diagonal,\
    prod, square, column_stack, ndarray, sqrt, inf, einsum, sum, power
from numpy.linalg import slogdet, linalg
from numpy.dual import lstsq
from scipy.linalg import cho_factor, cho_solve
from scipy.optimize import minimize
from six.moves import range


[docs]class KrigingSurrogate(SurrogateModel): """Surrogate Modeling method based on the simple Kriging interpolation. Predictions are returned as a tuple of mean and RMSE""" def __init__(self): super(KrigingSurrogate, self).__init__() self.m = 0 # number of independent self.n = 0 # number of training points self.thetas = zeros(0) self.nugget = 0 # nugget smoothing parameter from [Sasena, 2002] self.R = zeros(0) self.R_fact = None self.R_solve_ymu = zeros(0) self.R_solve_one = zeros(0) self.mu = zeros(0) self.log_likelihood = inf # Training Values self.X = zeros(0) self.Y = zeros(0)
[docs] def train(self, x, y): """ Train the surrogate model with the given set of inputs and outputs. Args ---- x : array-like Training input locations y : array-like Model responses at given inputs. """ super(KrigingSurrogate, self).train(x, y) self.m = len(x[0]) self.n = len(x) if self.n <= 1: raise ValueError( 'KrigingSurrogate require at least 2 training points.' ) self.X = x self.Y = y def _calcll(thetas): """ Callback function""" self.thetas = thetas self._calculate_log_likelihood() return -self.log_likelihood cons = [] for i in range(self.m): cons.append({'type': 'ineq', 'fun': lambda logt: logt[i] - log10(1e-2)}) # min cons.append({'type': 'ineq', 'fun': lambda logt: log10(3) - logt[i]}) # max self.thetas = minimize(_calcll, zeros(self.m), method='COBYLA', constraints=cons, tol=1e-8).x self._calculate_log_likelihood()
def _calculate_log_likelihood(self): """ Calculates the log-likelihood (up to a constant) for a given self.theta. """ R = zeros((self.n, self.n)) X, Y = self.X, self.Y thetas = power(10., self.thetas) # exponentially weighted distance formula for i in range(self.n): R[i, i+1:self.n] = exp(-thetas.dot(square(X[i, ...] - X[i+1:self.n, ...]).T)) R *= (1.0 - self.nugget) R += R.T + eye(self.n) self.R = R one = ones(self.n) rhs = column_stack([Y, one]) try: # Cholesky Decomposition self.R_fact = cho_factor(R) sol = cho_solve(self.R_fact, rhs) solve = lambda x: cho_solve(self.R_fact, x) det_factor = log(abs(prod(diagonal(self.R_fact[0])) ** 2) + 1.e-16) except (linalg.LinAlgError, ValueError): # Since Cholesky failed, try linear least squares self.R_fact = None # reset this to none, so we know not to use Cholesky sol = lstsq(self.R, rhs)[0] solve = lambda x: lstsq(self.R, x)[0] det_factor = slogdet(self.R)[1] self.mu = dot(one, sol[:, :-1]) / dot(one, sol[:, -1]) y_minus_mu = Y - self.mu self.R_solve_ymu = solve(y_minus_mu) self.R_solve_one = sol[:, -1] self.sig2 = dot(y_minus_mu.T, self.R_solve_ymu) / self.n if isinstance(self.sig2, ndarray): self.log_likelihood = -self.n/2. * slogdet(self.sig2)[1] \ - 1./2.*det_factor else: self.log_likelihood = -self.n/2. * log(self.sig2) \ - 1./2.*det_factor
[docs] def predict(self, x): """ Calculates a predicted value of the response based on the current trained model for the supplied list of inputs. Args ---- x : array-like Point at which the surrogate is evaluated. """ super(KrigingSurrogate, self).predict(x) X, Y = self.X, self.Y thetas = power(10., self.thetas) r = exp(-thetas.dot(square((x - X).T))) if self.R_fact is not None: # Cholesky Decomposition sol = cho_solve(self.R_fact, r).T else: # Linear Least Squares sol = lstsq(self.R, r)[0].T f = self.mu + dot(r, self.R_solve_ymu) term1 = dot(r, sol) # Note: sum(sol) should be 1, since Kriging is an unbiased # estimator. This measures the effect of numerical instabilities. bias = (1.0 - sum(sol)) ** 2. / sum(self.R_solve_one) mse = self.sig2 * (1.0 - term1 + bias) rmse = sqrt(abs(mse)) return f, rmse
[docs] def linearize(self, x): """ Calculates the jacobian of the Kriging surface at the requested point. Args ---- x : array-like Point at which the surrogate Jacobian is evaluated. """ thetas = power(10., self.thetas) r = exp(-thetas.dot(square((x - self.X).T))) # Z = einsum('i,ij->ij', X, Y) is equivalent to, but much faster and # memory efficient than, diag(X).dot(Y) for vector X and 2D array Y. # I.e. Z[i,j] = X[i]*Y[i,j] gradr = r * -2 * einsum('i,ij->ij', thetas, (x - self.X).T) jac = gradr.dot(self.R_solve_ymu).T return jac
[docs]class FloatKrigingSurrogate(KrigingSurrogate): """Surrogate model based on the simple Kriging interpolation. Predictions are returned as floats, which are the mean of the model's prediction."""
[docs] def predict(self, x): dist = super(FloatKrigingSurrogate, self).predict(x) return dist[0] # mean value