Source code for openmdao.core.parallel_group

""" Defines the base class for a ParallelGroup in OpenMDAO. ParallelGroup is
used for systems of `Components` or `Groups` that can be run in parallel."""

import warnings
from collections import OrderedDict
from six import itervalues

from openmdao.core.component import Component
from openmdao.core.group import Group
from openmdao.core.mpi_wrap import MPI


[docs]class ParallelGroup(Group): """ParallelGroup is used for systems of `Components` or `Groups` that can be run in parallel. Options ------- deriv_options['type'] : str('user') Derivative calculation type ('user', 'fd', 'cs') Default is 'user', where derivative is calculated from user-supplied derivatives. Set to 'fd' to finite difference this system. Set to 'cs' to perform the complex step if your components support it. deriv_options['form'] : str('forward') Finite difference mode. (forward, backward, central) deriv_options['step_size'] : float(1e-06) Default finite difference stepsize deriv_options['step_calc'] : str('absolute') Set to absolute, relative deriv_options['check_type'] : str('fd') Type of derivative check for check_partial_derivatives. Set to 'fd' to finite difference this system. Set to 'cs' to perform the complex step method if your components support it. deriv_options['check_form'] : str('forward') Finite difference mode: ("forward", "backward", "central") During check_partial_derivatives, the difference form that is used for the check. deriv_options['check_step_calc'] : str('absolute',) Set to 'absolute' or 'relative'. Default finite difference step calculation for the finite difference check in check_partial_derivatives. deriv_options['check_step_size'] : float(1e-06) Default finite difference stepsize for the finite difference check in check_partial_derivatives" deriv_options['linearize'] : bool(False) Set to True if you want linearize to be called even though you are using FD. """
[docs] def apply_nonlinear(self, params, unknowns, resids, metadata=None): """ Evaluates the residuals of our children systems. Args ---- params : `VecWrapper` `VecWrapper` containing parameters. (p) unknowns : `VecWrapper` `VecWrapper` containing outputs and states. (u) resids : `VecWrapper` `VecWrapper` containing residuals. (r) metadata : dict, optional Dictionary containing execution metadata (e.g. iteration coordinate). """ # full scatter self._transfer_data() for sub in self._local_subsystems: if isinstance(sub, Component): sub.apply_nonlinear(sub.params, sub.unknowns, sub.resids) else: sub.apply_nonlinear(sub.params, sub.unknowns, sub.resids, metadata)
[docs] def children_solve_nonlinear(self, metadata): """Loops over our children systems and asks them to solve.""" # full scatter self._transfer_data() for sub in self._local_subsystems: with sub._dircontext: if isinstance(sub, Component): sub.solve_nonlinear(sub.params, sub.unknowns, sub.resids) else: sub.solve_nonlinear(sub.params, sub.unknowns, sub.resids, metadata)
[docs] def get_req_procs(self): """ Returns ------- tuple A tuple of the form (min_procs, max_procs), indicating the min and max processors usable by this `ParallelGroup`. """ min_procs = 0 max_procs = 0 for sub in itervalues(self._subsystems): sub_min, sub_max = sub.get_req_procs() if sub_min > min_procs: min_procs = sub_min if max_procs is not None: if sub_max is None: max_procs = None else: max_procs += sub_max if min_procs == 0: min_procs = 1 if max_procs == 0: max_procs = 1 return (min_procs, max_procs)
def _setup_communicators(self, comm, parent_dir): """ Assign communicator to this `ParallelGroup` and all of its subsystems. Args ---- comm : an MPI communicator (real or fake) The communicator being offered by the parent system. parent_dir : str Absolute dir of parent `System`. """ self.comm = comm self._local_subsystems = [] # If we're not runnin in MPI, make this just a serial Group if not MPI or not self.is_active(): super(ParallelGroup, self)._setup_communicators(comm, parent_dir) return self._setup_dir(parent_dir) size = comm.size rank = comm.rank subsystems = [] requested_procs = [] max_req_procs = [] for system in itervalues(self._subsystems): subsystems.append(system) minproc, maxproc = system.get_req_procs() assert(minproc > 0) requested_procs.append(minproc) max_req_procs.append(maxproc) assigned_procs = [0]*len(subsystems) assigned = 0 requested = sum(requested_procs) _, mx = self.get_req_procs() if mx is None: limit = size max_requested = size else: max_requested = sum(max_req_procs) limit = min(size, max_requested) # first, just use simple round robin assignment of requested procs # until everybody has what they asked for or we run out if requested: if size >= requested: # we have enough for all subsystems while assigned < limit: for i, system in enumerate(subsystems): if max_req_procs[i] is None or \ assigned_procs[i] < max_req_procs[i]: assigned_procs[i] += 1 assigned += 1 if assigned == limit: break # create buckets (one sub per bucket) to be consistent in how # we split procs below buckets = [(n,[i]) for i,n in enumerate(assigned_procs)] else: # we don't have enough, so have to group subsystems remaining = size # sort req procs in descending order tups = sorted([(n,i) for i,n in enumerate(requested_procs)], reverse=True) buckets = [] for i, (req, sub_idx) in enumerate(tups): if remaining >= req: buckets.append((req, [sub_idx])) remaining -= req elif i == 0: # since we sorted in descending order by number of # requested procs, only in the first iteration is there # a chance that we've requested more procs than we have raise RuntimeError("subsystem %s requested %d processes " "but got %d" % (subsystems[sub_idx].pathname, req, remaining)) else: # we already have one in the bucket list that's big enough. # go through buckets, find all that are big enough, and # add the current sub to the one with the fewest number # of subs already in it. In the event of a tie, take # the bucket with the lowest number of requested procs. lenlist = sorted([b for b in buckets if b[0]>=req], key=lambda t: len(t[1])) shortest = len(lenlist[0][1]) final = sorted(b for b in lenlist if len(b[1]) == shortest) final[0][1].append(sub_idx) warnings.warn("Group '%s' requested %d processes to run fully " "in parallel, but it only got %d" % (self.pathname, requested, size)) # if we have any procs left over, apply them to any sub that # can use them while remaining > 0: for i,b in enumerate(buckets): procs, subs = b for sub_idx in subs: if (max_req_procs[sub_idx] is None or max_req_procs[sub_idx] > procs): buckets[i] = (procs+1, subs) remaining -= 1 break if remaining == 0: break assigned = size - remaining # a 'color' is assigned to each subsystem, with # an entry for each processor it will be given # e.g. [0, 1, 1, 1, 1, 2, 2, 3, 3, 3, UND, UND] color = [] for i, b in enumerate(buckets): procs, _ = b color.extend([i]*procs) if size > assigned: color.extend([MPI.UNDEFINED]*(size-assigned)) # create a sub-communicator for each color and # get the one assigned to our color/process rank_color = color[rank] sub_comm = comm.Split(rank_color) if sub_comm == MPI.COMM_NULL: return for i, b in enumerate(buckets): procs, subs = b for sub_idx in subs: sub = subsystems[sub_idx] if i == rank_color: self._local_subsystems.append(sub) sub._setup_communicators(sub_comm, self._sysdata.absdir) else: sub._setup_communicators(MPI.COMM_NULL, self._sysdata.absdir)
[docs] def list_auto_order(self): """ Returns ------- list of str Names of subsystems listed in their current order, since order is irrelevant in a ParallelGroup. list of str This will always be an empty list. """ return [s.name for s in self.subsystems()], []