from __future__ import print_function
import sys
import os
from collections import OrderedDict
from itertools import chain
import numpy
from six import iteritems, itervalues, reraise
from openmdao.core.component import Component
from openmdao.util.dict_util import _jac_to_flat_dict
from openmdao.core.mpi_wrap import MPI
def _reraise(pathname, exc):
"""
Rather than adding the sub-Problem pathname to every system and variable
in the sub-Problem (and causing more complication w.r.t. promoted names),
just put a try block around all of the calls to the sub-Problem and
preface any exception messages with "In subproblem 'x' ..."
"""
new_err = exc[0]("In subproblem '%s': %s" % (pathname, str(exc[1])))
reraise(exc[0], new_err, exc[2])
class SubProblem(Component):
[docs] """A Component that wraps a sub-Problem.
Args
----
problem : Problem
The Problem to be wrapped by this component.
params : iter of str
Names of variables that are to be visible as parameters to
this component. Note that these are allowed to be unknowns in
the sub-problem.
unknowns : iter of str
Names of variables that are to be visible as unknowns in this
component.
"""
def __init__(self, problem, params=(), unknowns=()):
super(SubProblem, self).__init__()
self._problem = problem
self._prob_params = list(params)
self._prob_unknowns = list(unknowns)
def check_setup(self, out_stream=sys.stdout):
[docs] """Write a report to the given stream indicating any potential problems
found with the current configuration of this ``System``.
Args
----
out_stream : a file-like object, optional
Stream where report will be written.
"""
try:
self._problem.check_setup(out_stream)
except:
_reraise(self.pathname, sys.exc_info())
def cleanup(self):
[docs] """ Clean up resources prior to exit. """
try:
self._problem.cleanup()
except:
_reraise(self.pathname, sys.exc_info())
def get_req_procs(self):
[docs] """
Returns
-------
tuple
A tuple of the form (min_procs, max_procs), indicating the min and max
processors usable by this `System`.
"""
# because this is called before self._problem.setup, we need to go
# ahead and set the problem's driver's root explicitly here.
self._problem.driver.root = self._problem.root
try:
return self._problem.get_req_procs()
except:
_reraise(self.pathname, sys.exc_info())
def _get_relname_map(self, parent_proms):
"""
Args
----
parent_proms : `dict`
A dict mapping absolute names to promoted names in the parent
system.
Returns
-------
dict
Maps promoted name in parent (owner of unknowns) to
the corresponding promoted name in the child.
"""
# use an ordered dict here so we can use this smaller dict when looping
# during get_view.
# (the order of this one matches the order in the parent)
umap = OrderedDict()
for key in self._prob_unknowns:
pkey = '.'.join((self.name, key))
if pkey in parent_proms:
umap[parent_proms[pkey]] = key
return umap
def _rec_get_param(self, name):
return self.params[name]
def _rec_get_param_meta(self, name):
try:
return self._problem.root._rec_get_param_meta(name)
except:
_reraise(self.pathname, sys.exc_info())
def _rec_set_param(self, name, value):
self.params[name] = value
def _setup_communicators(self, comm, parent_dir):
"""
Assign communicator to this `System` and run full setup on its
subproblem.
Args
----
comm : an MPI communicator (real or fake)
The communicator being offered by the parent system.
parent_dir : str
The absolute directory of the parent, or '' if unspecified. Used to
determine the absolute directory of all subsystems.
"""
self._problem.comm = comm
# do full setup on our subproblem now that we have what we need
# check_setup will be called later if specified from the top level
# Problem so always set check=False here.
try:
self._problem.setup(check=False)
except:
_reraise(self.pathname, sys.exc_info())
super(SubProblem, self)._setup_communicators(comm, parent_dir)
self._problem.pathname = self.pathname
self._problem._parent_dir = self._sysdata.absdir
for p in self._prob_params:
if not (p in self._problem._dangling or p in self._problem.root.unknowns):
raise RuntimeError("Param '%s' cannot be set. Either it will "
"be overwritten by a connected output or it "
"doesn't exist." % p)
def _setup_variables(self):
"""
Returns copies of our params and unknowns dictionaries,
re-keyed to use absolute variable names.
"""
to_prom_name = self._sysdata.to_prom_name = {}
to_abs_uname = self._sysdata.to_abs_uname = {}
to_abs_pnames = self._sysdata.to_abs_pnames = OrderedDict()
to_prom_uname = self._sysdata.to_prom_uname = OrderedDict()
to_prom_pname = self._sysdata.to_prom_pname = OrderedDict()
# Our subproblem has been completely set up. We now just pull
# variable metadata from our subproblem
subparams = self._problem.root.params
subunknowns = self._problem.root.unknowns
# keep track of params that are actually unknowns in the subproblem
self._unknowns_as_params = []
self._params_dict = self._init_params_dict = OrderedDict()
for name in self._prob_params:
pathname = self._get_var_pathname(name)
if name in subparams:
meta = subparams._dat[name].meta
elif name in self._problem._dangling:
meta = self._rec_get_param_meta(name)
else:
meta = subunknowns._dat[name].meta
if not meta.get('_canset_'):
raise TypeError("SubProblem param '%s' is mapped to the output of an internal component."
" This is illegal because a value set into the param will be overwritten"
" by the internal component." % name)
self._unknowns_as_params.append(name)
meta = meta.copy() # don't mess with subproblem's metadata!
self._params_dict[pathname] = meta
meta['pathname'] = pathname
del meta['top_promoted_name']
to_prom_pname[pathname] = name
to_abs_pnames[name] = (pathname,)
self._unknowns_dict = self._init_unknowns_dict = OrderedDict()
# if we have params that are really unknowns in the subproblem, we
# also add them as unknowns so we can take derivatives
for name in self._prob_unknowns:
pathname = self._get_var_pathname(name)
meta = subunknowns._dat[name].meta.copy()
self._unknowns_dict[pathname] = meta
meta['pathname'] = pathname
del meta['top_promoted_name']
to_prom_uname[pathname] = name
to_abs_uname[name] = pathname
to_prom_name.update(to_prom_uname)
to_prom_name.update(to_prom_pname)
self._post_setup_vars = True
self._sysdata._params_dict = self._params_dict
self._sysdata._unknowns_dict = self._unknowns_dict
return self._params_dict, self._unknowns_dict
def solve_nonlinear(self, params, unknowns, resids):
[docs] """Sets params into the sub-problem, runs the
sub-problem, and updates our unknowns with values
from the sub-problem.
Args
----
params : `VecWrapper`
`VecWrapper` containing parameters. (p)
unknowns : `VecWrapper`
`VecWrapper` containing outputs and states. (u)
resids : `VecWrapper`
`VecWrapper` containing residuals. (r)
"""
if not self.is_active():
return
try:
# set params into the subproblem
prob = self._problem
for name in self._prob_params:
prob[name] = params[name]
self._problem.run()
# update our unknowns from subproblem
for name in self._sysdata.to_abs_uname:
unknowns[name] = prob.root.unknowns[name]
resids[name] = prob.root.resids[name]
# if params are really unknowns, they may have changed, so update
for name in self._unknowns_as_params:
params[name] = prob.root.unknowns[name]
except:
_reraise(self.pathname, sys.exc_info())
def linearize(self, params, unknowns, resids):
[docs] """
Returns Jacobian. J is a dictionary whose keys are tuples
of the form ('unknown', 'param') and whose values are ndarrays.
Args
----
params : `VecWrapper`
`VecWrapper` containing parameters. (p)
unknowns : `VecWrapper`
`VecWrapper` containing outputs and states. (u)
resids : `VecWrapper`
`VecWrapper` containing residuals. (r)
Returns
-------
dict
Dictionary whose keys are tuples of the form ('unknown', 'param')
and whose values are ndarrays.
"""
try:
prob = self._problem
# set params into the subproblem
for name in self._prob_params:
prob[name] = params[name]
# have to convert jacobian returned from calc_gradient from a
# nested dict to a flat dict with tuple keys.
return _jac_to_flat_dict(prob.calc_gradient(self.params.keys(),
self.unknowns.keys(),
return_format='dict'))
except:
_reraise(self.pathname, sys.exc_info())
def add_param(self, name, **kwargs):
[docs] raise NotImplementedError("Can't add '%s' to SubProblem. "
"add_param is not supported." % name)
def add_output(self, name, **kwargs):
[docs] raise NotImplementedError("Can't add '%s' to SubProblem. "
"add_output is not supported." % name)
def add_state(self, name, **kwargs):
[docs] raise NotImplementedError("Can't add '%s' to SubProblem. "
"add_state is not supported." % name)