"""
OpenMDAO design-of-experiments Driver implementing the Latin Hypercube and Optimized Latin Hypercube methods.
"""
import os
from openmdao.drivers.predeterminedruns_driver import PredeterminedRunsDriver
from six import iteritems, itervalues
from six.moves import range, zip
from random import shuffle, randint, seed
import numpy as np
from openmdao.util.array_util import evenly_distrib_idxs
trace = os.environ.get('OPENMDAO_TRACE')
if trace: # pragma: no cover
from openmdao.core.mpi_wrap import debug
else:
[docs]class LatinHypercubeDriver(PredeterminedRunsDriver):
"""Design-of-experiments Driver implementing the Latin Hypercube method.
Args
----
num_samples : int, optional
The number of samples to run. Defaults to 1.
seed : int or None, optional
Random seed. Defaults to None.
num_par_doe : int, optional
The number of DOE cases to run concurrently. Defaults to 1.
"""
def __init__(self, num_samples=1, seed=None, num_par_doe=1):
super(LatinHypercubeDriver, self).__init__(num_par_doe=num_par_doe)
self.num_samples = num_samples
self.seed = seed
def _build_runlist(self):
"""Build a runlist based on the Latin Hypercube method."""
design_vars = self.get_desvar_metadata()
self.num_design_vars = len(design_vars)
if self.seed is not None:
seed(self.seed)
np.random.seed(self.seed)
# Generate an LHC of the proper size
rand_lhc = self._get_lhc()
# Map LHC to buckets
buckets = {}
for j, (name, bounds) in enumerate(iteritems(design_vars)):
design_var_buckets = self._get_buckets(bounds['lower'], bounds['upper'])
buckets[name] = [design_var_buckets[rand_lhc[i, j]]
for i in range(self.num_samples)]
# Return random values in given buckets
for i in range(self.num_samples):
yield ((key, np.random.uniform(bounds[i][0], bounds[i][1]))
for key, bounds in iteritems(buckets))
def _distrib_build_runlist(self):
"""
Returns an iterator over only those cases meant to execute
in the current rank as part of a parallel DOE. A latin hypercube,
unlike some other DOE generators, is created in one rank and then
the appropriate cases are scattered to the appropriate ranks.
"""
comm = self._full_comm
job_list = None
if comm.rank == 0:
debug('Parallel DOE using %d procs' % self._num_par_doe)
run_list = [list(case) for case in self._build_runlist()] # need to run iterator
run_sizes, run_offsets = evenly_distrib_idxs(self._num_par_doe,
len(run_list))
job_list = [run_list[o:o+s] for o, s in zip(run_offsets,
run_sizes)]
if trace: debug("scattering job_list: %s" % job_list)
run_list = comm.scatter(job_list, root=0)
if trace: debug('Number of DOE jobs: %s (scatter DONE)' % len(run_list))
for case in run_list:
yield case
def _get_lhc(self):
"""Generates a Latin Hypercube based on the number of samples and the
number of design variables.
"""
rand_lhc = _rand_latin_hypercube(self.num_samples, self.num_design_vars)
return rand_lhc.astype(int)
def _get_buckets(self, low, high):
"""Determines the distribution of samples."""
bucket_walls = np.linspace(low, high, self.num_samples + 1)
return list(zip(bucket_walls[0:-1], bucket_walls[1:]))
[docs]class OptimizedLatinHypercubeDriver(LatinHypercubeDriver):
"""Design-of-experiments Driver implementing the Morris-Mitchell method for
an Optimized Latin Hypercube.
"""
def __init__(self, num_samples=1, seed=None, population=20, generations=2,
norm_method=1, num_par_doe=1):
super(OptimizedLatinHypercubeDriver, self).__init__(num_par_doe=num_par_doe)
self.qs = [1, 2, 5, 10, 20, 50, 100] # List of qs to try for Phi_q optimization
self.num_samples = num_samples
self.seed = seed
self.population = population
self.generations = generations
self.norm_method = norm_method
def _get_lhc(self):
"""Generate an Optimized Latin Hypercube
"""
rand_lhc = _rand_latin_hypercube(self.num_samples, self.num_design_vars)
# Optimize our LHC before returning it
best_lhc = _LHC_Individual(rand_lhc, q=1, p=self.norm_method)
for q in self.qs:
lhc_start = _LHC_Individual(rand_lhc, q, self.norm_method)
lhc_opt = _mmlhs(lhc_start, self.population, self.generations)
if lhc_opt.mmphi() < best_lhc.mmphi():
best_lhc = lhc_opt
return best_lhc._get_doe().astype(int)
class _LHC_Individual(object):
def __init__(self, doe, q=2, p=1):
self.q = q
self.p = p
self.doe = doe
self.phi = None # Morris-Mitchell sampling criterion
@property
def shape(self):
"""Size of the LatinHypercube DOE (rows,cols)."""
return self.doe.shape
def mmphi(self):
"""Returns the Morris-Mitchell sampling criterion for this Latin
hypercube.
"""
if self.phi is None:
distdict = {}
# Calculate the norm between each pair of points in the DOE
arr = self.doe
n, m = arr.shape
for i in range(1, n):
nrm = np.linalg.norm(arr[i] - arr[:i], ord=self.p, axis=1)
for j in range(0, i):
nrmj = nrm[j]
if nrmj in distdict:
distdict[nrmj] += 1
else:
distdict[nrmj] = 1
size = len(distdict)
distinct_d = np.fromiter(distdict, dtype=float, count=size)
# Mutltiplicity array with a count of how many pairs of points
# have a given distance
J = np.fromiter(itervalues(distdict), dtype=int, count=size)
self.phi = sum(J * (distinct_d ** (-self.q))) ** (1.0 / self.q)
return self.phi
def perturb(self, mutation_count):
""" Interchanges pairs of randomly chosen elements within randomly chosen
columns of a DOE a number of times. The result of this operation will also
be a Latin hypercube.
"""
new_doe = self.doe.copy()
n, k = self.doe.shape
for count in range(mutation_count):
col = randint(0, k - 1)
# Choosing two distinct random points
el1 = randint(0, n - 1)
el2 = randint(0, n - 1)
while el1 == el2:
el2 = randint(0, n - 1)
new_doe[el1, col] = self.doe[el2, col]
new_doe[el2, col] = self.doe[el1, col]
return _LHC_Individual(new_doe, self.q, self.p)
def __iter__(self):
return self._get_rows()
def _get_rows(self):
for row in self.doe:
yield row
def __repr__(self):
return repr(self.doe)
def __str__(self):
return str(self.doe)
def __getitem__(self, *args):
return self.doe.__getitem__(*args)
def _get_doe(self):
return self.doe
def _rand_latin_hypercube(n, k):
# Calculates a random Latin hypercube set of n points in k dimensions
# within [0,n-1]^k hypercube.
arr = np.zeros((n, k))
row = list(range(0, n))
for i in range(k):
shuffle(row)
arr[:, i] = row
return arr
def _is_latin_hypercube(lh):
"""Returns True if the given array is a Latin hypercube.
The given array is assumed to be a numpy array.
"""
n, k = lh.shape
for j in range(k):
col = lh[:, j]
colset = set(col)
if len(colset) < len(col):
return False # something was duplicated
return True
def _mmlhs(x_start, population, generations):
"""Evolutionary search for most space filling Latin-Hypercube.
Returns a new LatinHypercube instance with an optimized set of points.
"""
x_best = x_start
phi_best = x_start.mmphi()
n = x_start.shape[1]
level_off = np.floor(0.85 * generations)
for it in range(generations):
if it < level_off and level_off > 1.:
mutations = int(round(1 + (0.5 * n - 1) * (level_off - it) / (level_off - 1)))
else:
mutations = 1
x_improved = x_best
phi_improved = phi_best
for offspring in range(population):
x_try = x_best.perturb(mutations)
phi_try = x_try.mmphi()
if phi_try < phi_improved:
x_improved = x_try
phi_improved = phi_try
if phi_improved < phi_best:
phi_best = phi_improved
x_best = x_improved
return x_best