Source code for sfepy.homogenization.coefs_base

from __future__ import absolute_import
import os

import numpy as nm
import scipy as sc
from collections.abc import Iterable

from sfepy.base.base import output, assert_, get_default, debug, Struct
from sfepy.base.timing import Timer
from sfepy.discrete.evaluate import eval_equations
from sfepy.solvers.ts import TimeStepper
from sfepy.solvers import Solver, eig
from sfepy.linalg import MatrixAction
from .utils import iter_sym, iter_nonsym, create_pis, create_scalar_pis,\
    rm_multi
import six
from six.moves import range

[docs]class MiniAppBase(Struct):
[docs] def any_from_conf(name, problem, kwargs): try: cls = kwargs['class'] except KeyError: raise KeyError("set 'class' for MiniApp %s!" % name) obj = cls(name, problem, kwargs) return obj
any_from_conf = staticmethod(any_from_conf) def __init__(self, name, problem, kwargs): Struct.__init__(self, name=name, problem=problem, **kwargs) if self.problem is not None: self.problem.clear_equations() self.set_default('requires', []) self.set_default('is_linear', False) self.set_default('dtype', nm.float64) self.set_default('term_mode', None) self.set_default('set_volume', 'total') # Application-specific options. self.app_options = self.process_options()
[docs] def process_options(self): """ Setup application-specific options. Subclasses should implement this method as needed. Returns ------- app_options : Struct instance The application options. """
[docs] def init_solvers(self, problem): """ Setup solvers. Use local options if these are defined, otherwise use the global ones. For linear problems, assemble the matrix and try to presolve the linear system. """ if hasattr(self, 'solvers'): opts = self.solvers else: opts = problem.conf.options problem.set_conf_solvers(problem.conf.solvers, opts) problem.init_solvers() problem.set_linear(self.is_linear) if self.is_linear: set_presolve = True for v in problem.conf.solvers.values(): if v.kind.startswith('ls.') and hasattr(v, 'use_presolve'): set_presolve = False ls_conf = problem.get_solver().nls.lin_solver.conf if set_presolve and hasattr(ls_conf, 'use_presolve'): ls_conf.use_presolve = True
def _get_volume(self, volume): if isinstance(volume, dict): return volume[self.set_volume] else: return volume
[docs]class CorrSolution(Struct): """ Class for holding solutions of corrector problems. """
[docs] def iter_solutions(self): if hasattr(self, 'components'): for indx in self.components: key = ('%d' * len(indx)) % indx yield key, self.states[indx] else: yield '', self.state
[docs] def iter_time_steps(self): if hasattr(self, 'n_step') and self.n_step > 0: for ii in range(self.n_step): yield self.get_ts_val(ii) else: yield self
[docs] def get_ts_val(self, step): if hasattr(self, 'states'): states = nm.zeros(self.states.shape, dtype=object) for idx in self.components: state = {k: v[step] for k, v in\ six.iteritems(self.states[idx])} states[idx] = state out = CorrSolution(name=self.name, states=states, components=self.components) else: state = {k: v[step] for k, v in six.iteritems(self.state)} out = CorrSolution(name=self.name, state=state) return out
[docs] def get_output(self, is_dump=False, var_map=None): out = {} for key, sol in self.iter_solutions(): for var_name in sol.keys(): if var_map is not None and var_name in var_map: vname = var_map[var_name] else: vname = var_name dof_vector = sol[var_name] if len(dof_vector.shape) == 1: dof_vector = dof_vector[:, None] if is_dump: skey = var_name + '_' + key if key else var_name out[skey] = Struct(name='dump', mode='vertex', data=dof_vector, shape=dof_vector.shape, var_name=vname) else: new_key = var_name + '_' + key if key else var_name out[new_key] = dof_vector return out
[docs]class CorrMiniApp(MiniAppBase): def __init__(self, name, problem, kwargs): MiniAppBase.__init__(self, name, problem, kwargs) self.output_dir = self.problem.output_dir self.set_default('save_name', None) if self.save_name is not None: self.save_name = os.path.normpath(os.path.join(self.output_dir, self.save_name))
[docs] def setup_output(self, save_formats=None, post_process_hook=None, split_results_by=None): """Instance attributes have precedence!""" self.set_default('save_formats', save_formats) self.set_default('post_process_hook', post_process_hook) self.set_default('split_results_by', split_results_by)
[docs] def get_save_name_base(self): return self.save_name
[docs] def get_save_name(self, save_format='.h5', stamp=''): save_name_base = self.get_save_name_base() if save_name_base is not None: return '.'.join((save_name_base + stamp, save_format))
[docs] def get_output(self, corr_sol, is_dump=False, extend=True, variables=None, var_map=None): if variables is None: variables = self.problem.get_variables() to_output = variables.create_output if is_dump: extend = False out = {} for key, sol in corr_sol.iter_solutions(): for var_name in six.iterkeys(sol): if var_name not in variables.ordered_state\ and var_map is not None\ and var_name in var_map: vname = var_map[var_name] else: vname = var_name dof_vector = sol[var_name] if is_dump: skey = var_name + '_' + key if key else var_name var = variables[vname] shape = (var.n_dof // var.n_components, var.n_components) out[skey] = Struct(name='dump', mode='vertex', data=dof_vector, shape=shape, var_name=vname, region_name=var.field.region.name) else: aux = to_output(dof_vector, var_info={vname: (True, var_name)}, extend=extend) if self.post_process_hook is not None: aux = self.post_process_hook(aux, self.problem, None, extend=extend) for _key, val in six.iteritems(aux): if key: new_key = _key + '_' + key else: new_key = _key out[new_key] = val return out
[docs] def save(self, state, problem, variables=None, ts=None, var_map=None): if ts is not None: n_digit = int(nm.log10(ts.n_step)) + 1 time_stamp = ('_%s' % ('%%0%dd' % n_digit)) % ts.step else: time_stamp = '' for save_format in self.save_formats: if self.get_save_name_base() is not None: if save_format in ['h5']: save_name = self.get_save_name(save_format) is_dump, split_results_by, extend = True, 'none', False else: save_name = self.get_save_name(save_format, time_stamp) split_results_by, is_dump = self.split_results_by, False extend = split_results_by is None out = self.get_output(state, extend=extend, is_dump=is_dump, variables=variables, var_map=var_map) problem.save_state(save_name, out=out, split_results_by=split_results_by, ts=ts)
[docs]class ShapeDimDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) clist, pis = create_pis(problem, self.variables[0]) corr_sol = CorrSolution(name=self.name, states=pis, components=clist) self.save(corr_sol, problem, variables=problem.create_variables([self.variables[0]])) return corr_sol
[docs]class ShapeDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) clist, pis = create_scalar_pis(problem, self.variables[0]) corr_sol = CorrSolution(name=self.name, states=pis, components=clist) self.save(corr_sol, problem, variables=problem.create_variables([self.variables[0]])) return corr_sol
[docs]class OnesDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) var_name = self.variables[0] var = problem.get_variables(auto_create=True)[var_name] dim = problem.domain.mesh.dim nnod = var.n_nod e00 = nm.zeros((nnod, dim), dtype=var.dtype) e1 = nm.ones((nnod,), dtype=var.dtype) ones = nm.zeros((dim,), dtype=object) clist = [] for ir in range(dim): aux = e00.copy() aux[:,ir] = e1 ones[ir] = {var_name : nm.ascontiguousarray(aux)} clist.append((ir,)) corr_sol = CorrSolution(name=self.name, states=ones, components=clist) self.save(corr_sol, problem, variables=problem.create_variables([self.variables[0]])) return corr_sol
[docs]class CorrEval(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) expr = self.expression for req in map(rm_multi, self.requires): expr = expr.replace(req, "data['%s']" % req) val = eval(expr) if type(val) is dict: corr_sol = CorrSolution(name=self.name, state=val) elif type(val) is nm.ndarray: if val.dtype == object: corr_sol = CorrSolution(name=self.name, states=val, components=['data']) else: ndof, ndim = val.shape state = {self.variable: val.reshape((ndof * ndim,))} corr_sol = CorrSolution(name=self.name, state=state) else: corr_sol = val cvars = problem.create_variables([self.variable]) self.save(corr_sol, problem, variables=cvars) return corr_sol
[docs]class CorrNN(CorrMiniApp): """ __init__() kwargs: { 'ebcs' : [], 'epbcs' : [], 'equations' : {}, 'set_variables' : None, }, """
[docs] def set_variables_default(variables, ir, ic, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].states[ir,ic][comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() states = nm.zeros((self.dim, self.dim), dtype=object) clist = [] for ir in range(self.dim): for ic in range(self.dim): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, ic, self.set_variables, data) else: self.set_variables(variables, ir, ic, **data) state = problem.solve(update_materials=False, save_results=False) assert_(state.has_ebc()) states[ir,ic] = state.get_state_parts() clist.append((ir, ic)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class CorrN(CorrMiniApp):
[docs] def set_variables_default(variables, ir, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].states[ir][comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() states = nm.zeros((self.dim,), dtype=object) clist = [] for ir in range(self.dim): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, self.set_variables, data) else: self.set_variables(variables, ir, **data) state = problem.solve(update_materials=False, save_results=False) assert_(state.has_ebc()) states[ir] = state.get_state_parts() clist.append((ir,)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class CorrDimDim(CorrNN): pass
[docs]class CorrDim(CorrN): pass
[docs]class CorrOne(CorrMiniApp):
[docs] def set_variables_default(variables, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data) else: self.set_variables(variables, **data) state = problem.solve(update_materials=False, save_results=False) assert_(state.has_ebc()) corr_sol = CorrSolution(name=self.name, state=state.get_state_parts()) self.save(corr_sol, problem) return corr_sol
[docs]class CorrSetBCS(CorrMiniApp): def __call__(self, problem=None, data=None): from sfepy.base.base import select_by_names from sfepy.discrete.variables import Variables from sfepy.discrete.conditions import Conditions problem = get_default(problem, self.problem) conf_ebc = select_by_names(problem.conf.ebcs, self.ebcs) conf_epbc = select_by_names(problem.conf.epbcs, self.epbcs) ebcs = Conditions.from_conf(conf_ebc, problem.domain.regions) epbcs = Conditions.from_conf(conf_epbc, problem.domain.regions) conf_variables = select_by_names(problem.conf.variables, self.variable) variables = Variables.from_conf(conf_variables, problem.fields) variables.equation_mapping(ebcs, epbcs, problem.ts, problem.functions) variables.init_state() variables.fill_state(0.0) variables.apply_ebc() corr_sol = CorrSolution(name=self.name, state=variables.get_state_parts()) self.save(corr_sol, problem, variables=variables) return corr_sol
[docs]class CorrEqPar(CorrOne): """ The corrector which equation can be parametrized via 'eq_pars', the dimension is given by the number of parameters. Example: 'equations': 'dw_diffusion.5.Y(mat.k, q, p) = dw_integrate.5.%s(q)', 'eq_pars': ('bYMp', 'bYMm'), 'class': cb.CorrEqPar, """ def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', len(self.eq_pars)) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) states = nm.zeros((self.dim,), dtype=object) clist = [] eqns ={} for ir in range(self.dim): for key_eq, val_eq in six.iteritems(self.equations): eqns[key_eq] = val_eq % self.eq_pars[ir] problem.set_equations(eqns) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data) else: self.set_variables(variables, **data) state = problem.solve(update_materials=False, save_results=False) assert_(state.has_ebc()) states[ir] = state.get_state_parts() clist.append((ir,)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class PressureEigenvalueProblem(CorrMiniApp): """Pressure eigenvalue problem solver for time-dependent correctors."""
[docs] def presolve(self, mtx): """Prepare A^{-1} B^T for the Schur complement.""" mtx_a = mtx['A'] mtx_bt = mtx['BT'] output('full A size: %.3f MB' % (8.0 * nm.prod(mtx_a.shape) / 1e6)) output('full B size: %.3f MB' % (8.0 * nm.prod(mtx_bt.shape) / 1e6)) ls = Solver.any_from_conf(self.problem.ls_conf + Struct(use_presolve=True), mtx=mtx_a) if self.mode == 'explicit': timer = Timer(start=True) mtx_aibt = nm.zeros(mtx_bt.shape, dtype=mtx_bt.dtype) for ic in range(mtx_bt.shape[1]): mtx_aibt[:,ic] = ls(mtx_bt[:,ic].toarray().squeeze()) output('mtx_aibt: %.2f s' % timer.stop()) action_aibt = MatrixAction.from_array(mtx_aibt) else: ## # c: 30.08.2007, r: 13.02.2008 def fun_aibt(vec): # Fix me for sparse mtx_bt... rhs = sc.dot(mtx_bt, vec) out = ls(rhs) return out action_aibt = MatrixAction.from_function(fun_aibt, (mtx_a.shape[0], mtx_bt.shape[1]), nm.float64) mtx['action_aibt'] = action_aibt
[docs] def solve_pressure_eigenproblem(self, mtx, eig_problem=None, n_eigs=0, check=False): """G = B*AI*BT or B*AI*BT+D""" def get_slice(n_eigs, nn): if n_eigs > 0: ii = slice(0, n_eigs) elif n_eigs < 0: ii = slice(nn + n_eigs, nn) else: ii = slice(0, 0) return ii eig_problem = get_default(eig_problem, self.eig_problem) n_eigs = get_default(n_eigs, self.n_eigs) check = get_default(check, self.check) mtx_c, mtx_b, action_aibt = mtx['C'], mtx['B'], mtx['action_aibt'] mtx_g = mtx_b * action_aibt.to_array() # mtx_b must be sparse! if eig_problem == 'B*AI*BT+D': mtx_g += mtx['D'].toarray() mtx['G'] = mtx_g output(mtx_c.shape, mtx_g.shape) eigs, mtx_q = eig(mtx_c.toarray(), mtx_g, solver_kind='eig.sgscipy') if check: ee = nm.diag(sc.dot(mtx_q.T * mtx_c, mtx_q)).squeeze() oo = nm.diag(sc.dot(sc.dot(mtx_q.T, mtx_g), mtx_q)).squeeze() try: assert_(nm.allclose(ee, eigs)) assert_(nm.allclose(oo, nm.ones_like(eigs))) except ValueError: debug() nn = mtx_c.shape[0] if isinstance(n_eigs, tuple): output('required number of eigenvalues: (%d, %d)' % n_eigs) if sum(n_eigs) < nn: ii0 = get_slice(n_eigs[0], nn) ii1 = get_slice(-n_eigs[1], nn) eigs = nm.concatenate((eigs[ii0], eigs[ii1])) mtx_q = nm.concatenate((mtx_q[:,ii0], mtx_q[:,ii1]), 1) else: output('required number of eigenvalues: %d' % n_eigs) if (n_eigs != 0) and (abs(n_eigs) < nn): ii = get_slice(n_eigs, nn) eigs = eigs[ii] mtx_q = mtx_q[:,ii] out = Struct(eigs=eigs, mtx_q=mtx_q) return out
def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials() mtx = problem.equations.eval_tangent_matrices(problem.create_state()(), problem.mtx_a, by_blocks=True) self.presolve(mtx) evp = self.solve_pressure_eigenproblem(mtx) return Struct(name=self.name, ebcs=self.ebcs, epbcs=self.epbcs, mtx=mtx, evp=evp)
[docs]class TCorrectorsViaPressureEVP(CorrMiniApp): """ Time correctors via the pressure eigenvalue problem. """
[docs] def setup_equations(self, equations, problem=None): """ Set equations, update boundary conditions and materials. """ problem = get_default(problem, self.problem) problem.set_variables() problem.set_equations(equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials() # Assume parameters constant in time.
[docs] def compute_correctors(self, evp, sign, state0, ts, problem=None, vec_g=None): problem = get_default(problem, self.problem) eigs = evp.evp.eigs mtx_q = evp.evp.mtx_q mtx = evp.mtx nr, nc = mtx_q.shape if vec_g is not None: output('nonzero pressure EBC: max = %e, min = %e' \ % (vec_g.max(), vec_g.min())) one = nm.ones((nc,), dtype=nm.float64) vu, vp = self.up_variables variables = problem.get_variables() var_u = variables[vu] var_p = variables[vp] ## # follow_epbc = False -> R1 = - R2 as required. ? for other correctors? vec_p0 = sign * var_p.get_reduced(state0[vp], follow_epbc=False) # xi0 = Q^{-1} p(0) = Q^T G p(0) vec_xi0 = sc.dot(mtx_q.T, sc.dot(mtx['G'], vec_p0[:,nm.newaxis])).squeeze() action_aibt = mtx['action_aibt'] e_e_qg = 0.0 iee_e_qg = 0.0 format = '====== time %%e (step %%%dd of %%%dd) ====='\ % ((ts.n_digit,) * 2) vu, vp = self.up_variables state = {k: [] for k in [vu, vp, 'd' + vp]} for step, time in ts: output(format % (time, step + 1, ts.n_step)) e_e = nm.exp(- eigs * time) e_e_qp = e_e * vec_xi0 # exp(-Et) Q^{-1} p(0) if vec_g is not None: Qg = sc.dot(mtx_q.T, vec_g) e_e_qg = e_e * Qg iee_e_qg = ((one - e_e) / eigs) * Qg vec_p = sc.dot(mtx_q, e_e_qp + iee_e_qg) vec_dp = - sc.dot(mtx_q, (eigs * e_e_qp - e_e_qg)) vec_u = action_aibt(vec_dp) vec_u = var_u.get_full(vec_u) vec_p = var_p.get_full(vec_p) # BC nodes - time derivative of constant is zero! vec_dp = var_p.get_full(vec_dp, force_value=0.0) state[vu].append(vec_u) state[vp].append(vec_p) state['d' + vp].append(vec_dp) return {k: nm.asarray(v) for k, v in state.items()}
[docs] def save(self, corrs, problem, ts): ts0 = TimeStepper(0, 1) ts0.set_from_ts(ts, step=0) _, vp = self.up_variables for step, _ in ts0: icorrs = corrs.get_ts_val(step) super(TCorrectorsViaPressureEVP, self).save(icorrs, problem, ts=ts0, var_map={'d' + vp: vp})
[docs]def create_ts_coef(cls): """ Define a new class with modified call method which accepts time dependent data (correctors). """ class TSCoef(cls): def __call__(self, volume=None, problem=None, data=None): problem = get_default(problem, self.problem) ts_keys = [] ts_data = {} n_step = None for key, val in six.iteritems(data): if isinstance(val, CorrSolution) and hasattr(val, 'n_step'): if n_step is None: n_step = val.n_step else: if not(n_step == val.n_step): raise ValueError('incorrect number of time' +\ 'steps in %s!' % self.name) ts_keys.append(key) else: ts_data[key] = val if n_step is None: raise ValueError('no time steps found in %s!' % self.name) n_digit = int(nm.log10(n_step))+1 format = '====== step %%%dd of %%%dd =====' % ((n_digit,) * 2) out = [] for step in range(n_step): output(format % (step + 1, n_step)) for key in ts_keys: ts_data[key] = data[key].get_ts_val(step) out.append(cls.__call__(self, volume, problem, ts_data)) out = nm.asarray(out) sh = out.shape if len(sh) == 2: out = out.reshape((sh[0], 1, sh[1])) elif len(sh) == 1: out = out.reshape((sh[0], 1, 1)) return out return TSCoef
[docs]class CoefDummy(MiniAppBase): """ Dummy class serving for computing and returning its requirements. """ def __call__(self, volume=None, problem=None, data=None): return data
[docs]class TSTimes(MiniAppBase): """Coefficient-like class, returns times of the time stepper.""" def __call__(self, volume=None, problem=None, data=None): problem = get_default(problem, self.problem) problem.init_solvers() return problem.get_timestepper().times
[docs]class VolumeFractions(MiniAppBase): """Coefficient-like class, returns volume fractions of given regions within the whole domain.""" def __call__(self, volume=None, problem=None, data=None): problem = get_default(problem, self.problem) vf = {} for region_name in self.regions: vkey = 'volume_%s' % region_name key = 'fraction_%s' % region_name equations, variables = problem.create_evaluable( self.expression % region_name) val = eval_equations(equations, variables).real vf[vkey] = nm.asarray(val, dtype=nm.float64) vf[key] = vf[vkey] / self._get_volume(volume) return vf
[docs]class CoefMN(MiniAppBase):
[docs] @staticmethod def set_variables_default(variables, ir, ic, mode, set_var, data, dtype): def get_corr_state(corr, ir, ic): if hasattr(corr, 'states'): if ir is None: return corr.states[ic] elif ic is None: return corr.states[ir] else: return corr.states[ir, ic] else: return corr.state if mode == 'row_only': act_set_var = set_var else: mode2var = {'row': 0, 'col': 1} aux = set_var[mode2var[mode]] act_set_var = aux[:] if isinstance(aux, list) else [aux] act_set_var += set_var[2:] for (var, req, comp) in act_set_var: if type(req) is tuple: val = get_corr_state(data[req[0]], ir, ic)[comp].copy() val = nm.asarray(val, dtype=dtype) for ii in req[1:]: val += get_corr_state(data[ii], ir, ic)[comp] else: val = get_corr_state(data[req], ir, ic)[comp] variables[var].set_data(val)
def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" MiniAppBase.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim())
[docs] def get_coef(self, row, col, volume, problem, data): problem = get_default(problem, self.problem) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) coef = nm.zeros((len(row), len(col)), dtype=self.dtype) for ir, (irr, icr) in enumerate(row): if isinstance(self.set_variables, list): self.set_variables_default(variables, irr, icr, 'row', self.set_variables, data, self.dtype) else: self.set_variables(variables, irr, icr, 'row', **data) for ic, (irc, icc) in enumerate(col): if isinstance(self.set_variables, list): self.set_variables_default(variables, irc, icc, 'col', self.set_variables, data, self.dtype) else: self.set_variables(variables, irc, icc, 'col', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir, ic] = val coef /= self._get_volume(volume) return coef
def __call__(self, volume, problem=None, data=None): if isinstance(self.dim, Iterable) and len(self.dim) >= 2: dim1, dim2 = self.dim[:2] else: dim1 = dim2 = self.dim row = [(ii, None) for ii in range(dim1)] col = [(None, ii) for ii in range(dim2)] return self.get_coef(row, col, volume, problem, data)
[docs]class CoefDimDim(CoefMN): pass
[docs]class CoefSymSym(CoefMN): iter_sym = staticmethod(iter_sym) is_sym = True def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) isym = [ii for ii in self.iter_sym(problem.get_dim())] return self.get_coef(isym, isym, volume, problem, data)
[docs]class CoefNonSymNonSym(CoefSymSym): iter_sym = staticmethod(iter_nonsym) is_sym = False
[docs]class CoefDimSym(CoefMN): def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim = problem.get_dim() row = [(ii, None) for ii in range(dim)] col = [ii for ii in iter_sym(dim)] return self.get_coef(row, col, volume, problem, data)
[docs]class CoefN(CoefMN):
[docs] @staticmethod def set_variables_default(variables, ir, ic, mode, set_var, data, dtype): mode = mode + '_only' CoefMN.set_variables_default(variables, ir, ic, mode, set_var, data, dtype)
[docs] def get_coef(self, row, volume, problem, data): problem = get_default(problem, self.problem) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) coef = nm.zeros((len(row),), dtype=self.dtype) for ii, (ir, ic) in enumerate(row): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, ic, 'row', self.set_variables, data, self.dtype) else: self.set_variables(variables, ir, ic, 'row', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ii] = val coef /= self._get_volume(volume) return coef
def __call__(self, volume, problem=None, data=None): row = [(ii, None) for ii in range(self.dim)] return self.get_coef(row, volume, problem, data)
[docs]class CoefDim(CoefN): pass
[docs]class CoefSym(CoefN): iter_sym = staticmethod(iter_sym) is_sym = True def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) isym = [ii for ii in self.iter_sym(problem.get_dim())] return self.get_coef(isym, volume, problem, data)
[docs]class CoefNonSym(CoefSym): iter_sym = staticmethod(iter_nonsym) is_sym = False
[docs]class CoefOne(MiniAppBase):
[docs] def set_variables_default(variables, set_var, data, dtype): for (var, req, comp) in set_var: if type(req) is tuple: val = data[req[0]].state[comp].copy() val = nm.asarray(val, dtype=dtype) for ii in req[1:]: val += data[ii].state[comp] else: val = data[req].state[comp] variables[var].set_data(val)
set_variables_default = staticmethod(set_variables_default) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data, self.dtype) else: self.set_variables(variables, **data) val = eval_equations(equations, variables, term_mode=term_mode) coef = val / self._get_volume(volume) return coef
[docs]class CoefSum(MiniAppBase): def __call__(self, volume, problem=None, data=None): coef = nm.zeros_like(data[self.requires[0]]) for req in map(rm_multi, self.requires): coef += data[req] return coef
[docs]class CoefEval(MiniAppBase): """ Evaluate expression. """ def __call__(self, volume, problem=None, data=None): expr = self.expression for req in map(rm_multi, self.requires): expr = expr.replace(req, "data['%s']" % req) coef = eval(expr) return coef
[docs]class CoefNone(MiniAppBase): def __call__(self, volume, problem=None, data=None): coef = 0.0 return coef
[docs]class CoefExprPar(MiniAppBase): """ The coefficient which expression can be parametrized via 'expr_pars', the dimension is given by the number of parameters. Example: 'expression': 'dw_surface_ndot.5.Ys(mat_norm.k%d, corr1)', 'expr_pars': [ii for ii in range(dim)], 'class': cb.CoefExprPar, """
[docs] def set_variables_default(variables, ir, set_var, data): for (var, req, comp) in set_var: if hasattr(data[req], 'states'): variables[var].set_data(data[req].states[ir][comp]) else: variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" MiniAppBase.__init__(self, name, problem, kwargs) dim = len(self.expr_pars) self.set_default('dim', dim) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) coef = nm.zeros((self.dim,), dtype=self.dtype) term_mode = self.term_mode for ir in range(self.dim): expression = self.expression % self.expr_pars[ir] equations, variables = \ problem.create_evaluable(expression, term_mode=term_mode) if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, self.set_variables, data) else: self.set_variables(variables, ir, **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir] = val coef /= self._get_volume(volume) return coef