Source code for openmdao.drivers.latinhypercube_driver

"""
OpenMDAO design-of-experiments Driver implementing the Latin Hypercube and Optimized Latin Hypercube methods.
"""

from collections import OrderedDict
import os
from random import shuffle, randint, seed

from six import iteritems, itervalues
from six.moves import range, zip

import numpy as np

from openmdao.drivers.predeterminedruns_driver import PredeterminedRunsDriver
from openmdao.util.array_util import evenly_distrib_idxs

trace = os.environ.get('OPENMDAO_TRACE')
from openmdao.core.mpi_wrap import debug


[docs]class LatinHypercubeDriver(PredeterminedRunsDriver): """Design-of-experiments Driver implementing the Latin Hypercube method. Args ---- num_samples : int, optional The number of samples to run. Defaults to 1. seed : int or None, optional Random seed. Defaults to None. num_par_doe : int, optional The number of DOE cases to run concurrently. Defaults to 1. load_balance : bool, Optional If True, use rank 0 as master and load balance cases among all of the other ranks. Defaults to False. """ def __init__(self, num_samples=1, seed=None, num_par_doe=1, load_balance=False): super(LatinHypercubeDriver, self).__init__(num_par_doe=num_par_doe, load_balance=load_balance) self.num_samples = num_samples self.seed = seed def _build_runlist(self): """Build a runlist based on the Latin Hypercube method.""" design_vars = self.get_desvar_metadata() # Add up sizes self.num_design_vars = sum(meta['size'] for meta in itervalues(design_vars)) if self.seed is not None: seed(self.seed) np.random.seed(self.seed) # Generate an LHC of the proper size rand_lhc = self._get_lhc() # Map LHC to buckets buckets = OrderedDict() j = 0 for (name, bounds) in iteritems(design_vars): buckets[name] = [] # Support for array desvars val = self.root.unknowns._dat[name].val nval = bounds['size'] for k in range(nval): lowb = bounds['lower'] upb = bounds['upper'] if isinstance(lowb, np.ndarray): lowb = lowb[k] if isinstance(upb, np.ndarray): upb = upb[k] design_var_buckets = self._get_buckets(lowb, upb) buckets[name].append([design_var_buckets[rand_lhc[i, j]] for i in range(self.num_samples)]) j += 1 # Return random values in given buckets for i in range(self.num_samples): sample = [] for key, bounds in iteritems(buckets): sample.append([key, np.array([np.random.uniform(bounds[k][i][0], bounds[k][i][1]) for k in range(design_vars[key]['size'])]) ]) yield sample def _distrib_build_runlist(self): """ Returns an iterator over only those cases meant to execute in the current rank as part of a parallel DOE. A latin hypercube, unlike some other DOE generators, is created in one rank and then the appropriate cases are scattered to the appropriate ranks. """ comm = self._full_comm # get the par_doe_id from every rank in the full comm so we know which # cases to scatter where doe_ids = comm.allgather(self._par_doe_id) job_list = None if comm.rank == 0: if trace: debug('Parallel DOE using %d procs' % self._num_par_doe) run_list = [list(case) for case in self._build_runlist()] # need to run iterator run_sizes, run_offsets = evenly_distrib_idxs(self._num_par_doe, len(run_list)) jobs = [run_list[o:o+s] for o, s in zip(run_offsets, run_sizes)] job_list = [jobs[i] for i in doe_ids] if trace: debug("scattering job_list: %s" % job_list) run_list = comm.scatter(job_list, root=0) if trace: debug('Number of DOE jobs: %s (scatter DONE)' % len(run_list)) for case in run_list: yield case def _get_lhc(self): """Generates a Latin Hypercube based on the number of samples and the number of design variables. """ rand_lhc = _rand_latin_hypercube(self.num_samples, self.num_design_vars) return rand_lhc.astype(int) def _get_buckets(self, low, high): """Determines the distribution of samples.""" bucket_walls = np.linspace(low, high, self.num_samples + 1) return list(zip(bucket_walls[0:-1], bucket_walls[1:]))
[docs]class OptimizedLatinHypercubeDriver(LatinHypercubeDriver): """Design-of-experiments Driver implementing the Morris-Mitchell method for an Optimized Latin Hypercube. """ def __init__(self, num_samples=1, seed=None, population=20, generations=2, norm_method=1, num_par_doe=1, load_balance=False): super(OptimizedLatinHypercubeDriver, self).__init__(num_par_doe=num_par_doe, load_balance=load_balance) self.qs = [1, 2, 5, 10, 20, 50, 100] # List of qs to try for Phi_q optimization self.num_samples = num_samples self.seed = seed self.population = population self.generations = generations self.norm_method = norm_method def _get_lhc(self): """Generate an Optimized Latin Hypercube """ rand_lhc = _rand_latin_hypercube(self.num_samples, self.num_design_vars) # Optimize our LHC before returning it best_lhc = _LHC_Individual(rand_lhc, q=1, p=self.norm_method) for q in self.qs: lhc_start = _LHC_Individual(rand_lhc, q, self.norm_method) lhc_opt = _mmlhs(lhc_start, self.population, self.generations) if lhc_opt.mmphi() < best_lhc.mmphi(): best_lhc = lhc_opt return best_lhc._get_doe().astype(int)
class _LHC_Individual(object): def __init__(self, doe, q=2, p=1): self.q = q self.p = p self.doe = doe self.phi = None # Morris-Mitchell sampling criterion @property def shape(self): """Size of the LatinHypercube DOE (rows,cols).""" return self.doe.shape def mmphi(self): """Returns the Morris-Mitchell sampling criterion for this Latin hypercube. """ if self.phi is None: distdict = {} # Calculate the norm between each pair of points in the DOE arr = self.doe n, m = arr.shape for i in range(1, n): nrm = np.linalg.norm(arr[i] - arr[:i], ord=self.p, axis=1) for j in range(0, i): nrmj = nrm[j] if nrmj in distdict: distdict[nrmj] += 1 else: distdict[nrmj] = 1 size = len(distdict) distinct_d = np.fromiter(distdict, dtype=float, count=size) # Mutltiplicity array with a count of how many pairs of points # have a given distance J = np.fromiter(itervalues(distdict), dtype=int, count=size) self.phi = sum(J * (distinct_d ** (-self.q))) ** (1.0 / self.q) return self.phi def perturb(self, mutation_count): """ Interchanges pairs of randomly chosen elements within randomly chosen columns of a DOE a number of times. The result of this operation will also be a Latin hypercube. """ new_doe = self.doe.copy() n, k = self.doe.shape for count in range(mutation_count): col = randint(0, k - 1) # Choosing two distinct random points el1 = randint(0, n - 1) el2 = randint(0, n - 1) while el1 == el2: el2 = randint(0, n - 1) new_doe[el1, col] = self.doe[el2, col] new_doe[el2, col] = self.doe[el1, col] return _LHC_Individual(new_doe, self.q, self.p) def __iter__(self): return self._get_rows() def _get_rows(self): for row in self.doe: yield row def __repr__(self): return repr(self.doe) def __str__(self): return str(self.doe) def __getitem__(self, *args): return self.doe.__getitem__(*args) def _get_doe(self): return self.doe def _rand_latin_hypercube(n, k): # Calculates a random Latin hypercube set of n points in k dimensions # within [0,n-1]^k hypercube. arr = np.zeros((n, k)) row = list(range(0, n)) for i in range(k): shuffle(row) arr[:, i] = row return arr def _is_latin_hypercube(lh): """Returns True if the given array is a Latin hypercube. The given array is assumed to be a numpy array. """ n, k = lh.shape for j in range(k): col = lh[:, j] colset = set(col) if len(colset) < len(col): return False # something was duplicated return True def _mmlhs(x_start, population, generations): """Evolutionary search for most space filling Latin-Hypercube. Returns a new LatinHypercube instance with an optimized set of points. """ x_best = x_start phi_best = x_start.mmphi() n = x_start.shape[1] level_off = np.floor(0.85 * generations) for it in range(generations): if it < level_off and level_off > 1.: mutations = int(round(1 + (0.5 * n - 1) * (level_off - it) / (level_off - 1))) else: mutations = 1 x_improved = x_best phi_improved = phi_best for offspring in range(population): x_try = x_best.perturb(mutations) phi_try = x_try.mmphi() if phi_try < phi_improved: x_improved = x_try phi_improved = phi_try if phi_improved < phi_best: phi_best = phi_improved x_best = x_improved return x_best