Source code for sfepy.discrete.variables

"""
Classes of variables for equations/terms.
"""
from __future__ import print_function
from __future__ import absolute_import
from collections import deque

import numpy as nm

from sfepy.base.base import (real_types, complex_types, assert_, get_default,
                             output, OneTypeList, Container, Struct, basestr,
                             iter_dict_of_lists)
from sfepy.base.timing import Timer
import sfepy.linalg as la
from sfepy.discrete.functions import Function
from sfepy.discrete.conditions import get_condition_value
from sfepy.discrete.integrals import Integral
from sfepy.discrete.common.dof_info import (DofInfo, EquationMap,
                                            expand_nodes_to_equations,
                                            is_active_bc)
from sfepy.discrete.fem.lcbc_operators import LCBCOperators
from sfepy.discrete.common.mappings import get_physical_qps
from sfepy.discrete.evaluate_variable import eval_real, eval_complex
import six
from six.moves import range

is_state = 0
is_virtual = 1
is_parameter = 2
is_field = 10

[docs]def create_adof_conns(conn_info, var_indx=None, active_only=True, verbose=True): """ Create active DOF connectivities for all variables referenced in `conn_info`. If a variable has not the equation mapping, a trivial mapping is assumed and connectivity with all DOFs active is created. DOF connectivity key is a tuple ``(primary variable name, region name, type, is_trace flag)``. Notes ----- If `active_only` is False, the DOF connectivities contain all DOFs, with the E(P)BC-constrained ones stored as `-1 - <DOF number>`, so that the full connectivities can be reconstructed for the matrix graph creation. """ var_indx = get_default(var_indx, {}) def _create(var, econn): offset = var_indx.get(var.name, slice(0, 0)).start if var.eq_map is None: eq = nm.arange(var.n_dof, dtype=nm.int32) else: if isinstance(var, DGFieldVariable): eq = nm.arange(var.n_dof, dtype=nm.int32) else: if active_only: eq = var.eq_map.eq else: eq = nm.arange(var.n_dof, dtype=nm.int32) eq[var.eq_map.eq_ebc] = -1 - (var.eq_map.eq_ebc + offset) eq[var.eq_map.master] = eq[var.eq_map.slave] adc = create_adof_conn(eq, econn, var.n_components, offset) return adc def _assign(adof_conns, info, region, var, field, is_trace): key = (var.name, region.name, info.dc_type.type, is_trace) if not key in adof_conns: econn = field.get_econn(info.dc_type, region, is_trace=is_trace) if econn is None: return adof_conns[key] = _create(var, econn) if info.is_trace: key = (var.name, region.name, info.dc_type.type, False) if not key in adof_conns: econn = field.get_econn(info.dc_type, region, is_trace=False) adof_conns[key] = _create(var, econn) if verbose: output('setting up dof connectivities...') timer = Timer(start=True) adof_conns = {} for key, ii, info in iter_dict_of_lists(conn_info, return_keys=True): if info.primary is not None: var = info.primary field = var.get_field() field.setup_extra_data(info.ps_tg, info, info.is_trace) region = info.get_region() _assign(adof_conns, info, region, var, field, info.is_trace) if info.has_virtual and not info.is_trace: var = info.virtual field = var.get_field() field.setup_extra_data(info.v_tg, info, False) aux = var.get_primary() var = aux if aux is not None else var region = info.get_region(can_trace=False) _assign(adof_conns, info, region, var, field, False) if verbose: output('...done in %.2f s' % timer.stop()) return adof_conns
[docs]def create_adof_conn(eq, conn, dpn, offset): """ Given a node connectivity, number of DOFs per node and equation mapping, create the active dof connectivity. Locally (in a connectivity row), the DOFs are stored DOF-by-DOF (u_0 in all local nodes, u_1 in all local nodes, ...). Globally (in a state vector), the DOFs are stored node-by-node (u_0, u_1, ..., u_X in node 0, u_0, u_1, ..., u_X in node 1, ...). """ if dpn == 1: aux = nm.take(eq, conn) adc = aux + nm.asarray(offset * (aux >= 0), dtype=nm.int32) else: n_el, n_ep = conn.shape adc = nm.empty((n_el, n_ep * dpn), dtype=conn.dtype) ii = 0 for idof in range(dpn): aux = nm.take(eq, dpn * conn + idof) adc[:, ii : ii + n_ep] = aux + nm.asarray(offset * (aux >= 0), dtype=nm.int32) ii += n_ep return adc
[docs]def expand_basis(basis, dpn): """ Expand basis for variables with several components (DOFs per node), in a way compatible with :func:`create_adof_conn()`, according to `dpn` (DOF-per-node count). """ n_c, n_bf = basis.shape[-2:] ebasis = nm.zeros(basis.shape[:2] + (dpn, n_bf * dpn), dtype=nm.float64) for ic in range(n_c): for ir in range(dpn): ebasis[..., n_c*ir+ic, ir*n_bf:(ir+1)*n_bf] = basis[..., ic, :] return ebasis
[docs]class Variables(Container): """ Container holding instances of Variable. """
[docs] @staticmethod def from_conf(conf, fields): """ This method resets the variable counters for automatic order! """ Variable.reset() obj = Variables() for key, val in six.iteritems(conf): var = Variable.from_conf(key, val, fields) obj[var.name] = var obj.setup_dtype() obj.setup_ordering() return obj
def __init__(self, variables=None): Container.__init__(self, OneTypeList(Variable), state=set(), virtual=set(), parameter=set(), has_virtual_dcs=False, has_lcbc=False, has_lcbc_rhs=False, has_eq_map=False, ordered_state=[], ordered_virtual=[]) if variables is not None: for var in variables: self[var.name] = var self.setup_ordering() self.setup_dtype() self.adof_conns = {} def __setitem__(self, ii, var): Container.__setitem__(self, ii, var) if var.is_state(): self.state.add(var.name) elif var.is_virtual(): self.virtual.add(var.name) elif var.is_parameter(): self.parameter.add(var.name) var._variables = self self.setup_ordering() self.setup_dof_info()
[docs] def setup_dtype(self): """ Setup data types of state variables - all have to be of the same data type, one of nm.float64 or nm.complex128. """ dtypes = {nm.complex128 : 0, nm.float64 : 0} for var in self.iter_state(ordered=False): dtypes[var.dtype] += 1 if dtypes[nm.float64] and dtypes[nm.complex128]: raise ValueError("All variables must have the same dtype!") elif dtypes[nm.float64]: self.dtype = nm.float64 elif dtypes[nm.complex128]: self.dtype = nm.complex128 else: self.dtype = None
[docs] def get_dual_names(self): """ Get names of pairs of dual variables. Returns ------- duals : dict The dual names as virtual name : state name pairs. """ duals = {} for name in self.virtual: duals[name] = self[name].primary_var_name return duals
[docs] def setup_ordering(self): """ Setup ordering of variables. """ self.link_duals() orders = [] for var in self: try: orders.append(var._order) except: pass orders.sort() self.ordered_state = [None] * len(self.state) for var in self.iter_state(ordered=False): ii = orders.index(var._order) self.ordered_state[ii] = var.name self.ordered_virtual = [None] * len(self.virtual) ii = 0 for var in self.iter_state(ordered=False): if var.dual_var_name is not None: self.ordered_virtual[ii] = var.dual_var_name ii += 1
[docs] def has_virtuals(self): return len(self.virtual) > 0
[docs] def setup_dof_info(self, make_virtual=False): """ Setup global DOF information. """ self.di = DofInfo('state_dof_info') for var_name in self.ordered_state: self.di.append_variable(self[var_name]) if make_virtual: self.vdi = DofInfo('virtual_dof_info') for var_name in self.ordered_virtual: self.vdi.append_variable(self[var_name]) else: self.vdi = self.di
[docs] def setup_lcbc_operators(self, lcbcs, ts=None, functions=None): """ Prepare linear combination BC operator matrix and right-hand side vector. """ from sfepy.discrete.common.region import are_disjoint if lcbcs is None: self.lcdi = self.adi return self.lcbcs = lcbcs if (ts is None) or ((ts is not None) and (ts.step == 0)): regs = [] var_names = [] for bcs in self.lcbcs: for bc in bcs.iter_single(): vns = bc.get_var_names() regs.append(bc.regions[0]) var_names.append(vns[0]) if bc.regions[1] is not None: regs.append(bc.regions[1]) var_names.append(vns[1]) for i0 in range(len(regs) - 1): for i1 in range(i0 + 1, len(regs)): if ((var_names[i0] == var_names[i1]) and not are_disjoint(regs[i0], regs[i1])): raise ValueError('regions %s and %s are not disjoint!' % (regs[i0].name, regs[i1].name)) ops = LCBCOperators('lcbcs', self, functions=functions) for bcs in self.lcbcs: for bc in bcs.iter_single(): vns = bc.get_var_names() dofs = [self[vn].dofs for vn in vns if vn is not None] bc.canonize_dof_names(*dofs) if not is_active_bc(bc, ts=ts, functions=functions): continue output('lcbc:', bc.name) ops.add_from_bc(bc, ts) aux = ops.make_global_operator(self.adi) self.mtx_lcbc, self.vec_lcbc, self.lcdi = aux self.has_lcbc = self.mtx_lcbc is not None self.has_lcbc_rhs = self.vec_lcbc is not None
[docs] def get_lcbc_operator(self): if self.has_lcbc: return self.mtx_lcbc else: raise ValueError('no LCBC defined!')
[docs] def equation_mapping(self, ebcs, epbcs, ts, functions, problem=None, active_only=True): """ Create the mapping of active DOFs from/to all DOFs for all state variables. Parameters ---------- ebcs : Conditions instance The essential (Dirichlet) boundary conditions. epbcs : Conditions instance The periodic boundary conditions. ts : TimeStepper instance The time stepper. functions : Functions instance The user functions for boundary conditions. problem : Problem instance, optional The problem that can be passed to user functions as a context. active_only : bool If True, the active DOF info ``self.adi`` uses the reduced (active DOFs only) numbering. Otherwise it is the same as ``self.di``. Returns ------- active_bcs : set The set of boundary conditions active in the current time. """ self.ebcs = ebcs self.epbcs = epbcs ## # Assing EBC, PBC to variables and regions. if ebcs is not None: self.bc_of_vars = self.ebcs.group_by_variables() else: self.bc_of_vars = {} if epbcs is not None: self.bc_of_vars = self.epbcs.group_by_variables(self.bc_of_vars) ## # List EBC nodes/dofs for each variable. active_bcs = set() for var_name in self.di.var_names: var = self[var_name] bcs = self.bc_of_vars.get(var.name, None) var_di = self.di.get_info(var_name) active = var.equation_mapping(bcs, var_di, ts, functions, problem=problem) active_bcs.update(active) if self.has_virtual_dcs: vvar = self[var.dual_var_name] vvar_di = self.vdi.get_info(var_name) active = vvar.equation_mapping(bcs, vvar_di, ts, functions, problem=problem) active_bcs.update(active) self.adi = DofInfo('active_state_dof_info') for var_name in self.ordered_state: self.adi.append_variable(self[var_name], active=active_only) if self.has_virtual_dcs: self.avdi = DofInfo('active_virtual_dof_info') for var_name in self.ordered_virtual: self.avdi.append_variable(self[var_name], active=active_only) else: self.avdi = self.adi self.has_eq_map = True return active_bcs
[docs] def get_matrix_shape(self): if not self.has_eq_map: raise ValueError('call equation_mapping() first!') return (self.avdi.ptr[-1], self.adi.ptr[-1])
[docs] def setup_initial_conditions(self, ics, functions): self.ics = ics self.ic_of_vars = self.ics.group_by_variables() for var_name in self.di.var_names: var = self[var_name] ics = self.ic_of_vars.get(var.name, None) if ics is None: continue var.setup_initial_conditions(ics, self.di, functions) for var_name in self.parameter: var = self[var_name] if hasattr(var, 'special') and ('ic' in var.special): setter, sargs, skwargs = var._get_setter('ic', functions) var.set_data(setter(*sargs, **skwargs)) output('IC data of %s set by %s()' % (var.name, setter.name))
[docs] def set_adof_conns(self, adof_conns): """ Set all active DOF connectivities to `self` as well as relevant sub-dicts to the individual variables. """ self.adof_conns = adof_conns for var in self: var.adof_conns = {} for key, val in six.iteritems(adof_conns): if key[0] in self.names: var = self[key[0]] var.adof_conns[key] = val var = var.get_dual() if var is not None: var.adof_conns[key] = val
[docs] def create_state_vector(self): vec = nm.zeros((self.di.ptr[-1],), dtype=self.dtype) return vec
[docs] def create_stripped_state_vector(self): vec = nm.zeros((self.adi.ptr[-1],), dtype=self.dtype) return vec
[docs] def apply_ebc(self, vec, force_values=None): """ Apply essential (Dirichlet) and periodic boundary conditions defined for the state variables to vector `vec`. """ for var in self.iter_state(): var.apply_ebc(vec, self.di.indx[var.name].start, force_values)
[docs] def apply_ic(self, vec, force_values=None): """ Apply initial conditions defined for the state variables to vector `vec`. """ for var in self.iter_state(): var.apply_ic(vec, self.di.indx[var.name].start, force_values)
[docs] def strip_state_vector(self, vec, follow_epbc=False, svec=None): """ Get the reduced DOF vector, with EBC and PBC DOFs removed. Notes ----- If 'follow_epbc' is True, values of EPBC master dofs are not simply thrown away, but added to the corresponding slave dofs, just like when assembling. For vectors with state (unknown) variables it should be set to False, for assembled vectors it should be set to True. """ if svec is None: svec = nm.empty((self.adi.ptr[-1],), dtype=self.dtype) for var in self.iter_state(): aindx = self.adi.indx[var.name] svec[aindx] = var.get_reduced(vec, self.di.indx[var.name].start, follow_epbc) return svec
[docs] def make_full_vec(self, svec, force_value=None, vec=None): """ Make a full DOF vector satisfying E(P)BCs from a reduced DOF vector. Parameters ---------- svec : array The reduced DOF vector. force_value : float, optional Passing a `force_value` overrides the EBC values. vec : array, optional If given, the buffer for storing the result (zeroed). Returns ------- vec : array The full DOF vector. """ self.check_vector_size(svec, stripped=True) if self.has_lcbc: if self.has_lcbc_rhs: svec = self.mtx_lcbc * svec + self.vec_lcbc else: svec = self.mtx_lcbc * svec if vec is None: vec = self.create_state_vector() for var in self.iter_state(): indx = self.di.indx[var.name] aindx = self.adi.indx[var.name] var.get_full(svec, aindx.start, force_value, vec, indx.start) return vec
[docs] def has_ebc(self, vec, force_values=None): for var_name in self.di.var_names: eq_map = self[var_name].eq_map i0 = self.di.indx[var_name].start ii = i0 + eq_map.eq_ebc if force_values is None: if not nm.allclose(vec[ii], eq_map.val_ebc): return False else: if isinstance(force_values, dict): if not nm.allclose(vec[ii], force_values[var_name]): return False else: if not nm.allclose(vec[ii], force_values): return False # EPBC. if not nm.allclose(vec[i0+eq_map.master], vec[i0+eq_map.slave]): return False return True
[docs] def get_indx(self, var_name, stripped=False, allow_dual=False): var = self[var_name] if not var.is_state(): if allow_dual and var.is_virtual(): var_name = var.primary_var_name else: msg = '%s is not a state part' % var_name raise IndexError(msg) if stripped: return self.adi.indx[var_name] else: return self.di.indx[var_name]
[docs] def check_vector_size(self, vec, stripped=False): """ Check whether the shape of the DOF vector corresponds to the total number of DOFs of the state variables. Parameters ---------- vec : array The vector of DOF values. stripped : bool If True, the size of the DOF vector should be reduced, i.e. without DOFs fixed by boundary conditions. """ if not stripped: n_dof = self.di.get_n_dof_total() if vec.size != n_dof: msg = 'incompatible data size!' \ ' (%d (variables) == %d (DOF vector))' \ % (n_dof, vec.size) raise ValueError(msg) else: if self.has_lcbc: n_dof = self.lcdi.get_n_dof_total() else: n_dof = self.adi.get_n_dof_total() if vec.size != n_dof: msg = 'incompatible data size!' \ ' (%d (active variables) == %d (reduced DOF vector))' \ % (n_dof, vec.size) raise ValueError(msg)
[docs] def get_state_part_view(self, state, var_name, stripped=False): self.check_vector_size(state, stripped=stripped) return state[self.get_indx(var_name, stripped)]
[docs] def set_state_part(self, state, part, var_name, stripped=False): self.check_vector_size(state, stripped=stripped) state[self.get_indx(var_name, stripped)] = part
[docs] def get_state_parts(self, vec=None): """ Return parts of a state vector corresponding to individual state variables. Parameters ---------- vec : array, optional The state vector. If not given, then the data stored in the variables are returned instead. Returns ------- out : dict The dictionary of the state parts. """ if vec is not None: self.check_vector_size(vec) out = {} for var in self.iter_state(): if vec is None: out[var.name] = var() else: out[var.name] = vec[self.di.indx[var.name]] return out
[docs] def set_data(self, data, step=0, ignore_unknown=False, preserve_caches=False): """ Set data (vectors of DOF values) of variables. Parameters ---------- data : array The state vector or dictionary of {variable_name : data vector}. step : int, optional The time history step, 0 (default) = current. ignore_unknown : bool, optional Ignore unknown variable names if `data` is a dict. preserve_caches : bool If True, do not invalidate evaluate caches of variables. """ if data is None: return if isinstance(data, dict): for key, val in six.iteritems(data): try: var = self[key] except (ValueError, IndexError): if ignore_unknown: pass else: raise KeyError('unknown variable! (%s)' % key) else: var.set_data(val, step=step, preserve_caches=preserve_caches) elif isinstance(data, nm.ndarray): self.check_vector_size(data) for ii in self.state: var = self[ii] var.set_data(data, self.di.indx[var.name], step=step, preserve_caches=preserve_caches) else: raise ValueError('unknown data class! (%s)' % data.__class__)
[docs] def set_from_state(self, var_names, state, var_names_state): """ Set variables with names in `var_names` from state variables with names in `var_names_state` using DOF values in the state vector `state`. """ self.check_vector_size(state) if isinstance(var_names, basestr): var_names = [var_names] var_names_state = [var_names_state] for ii, var_name in enumerate(var_names): var_name_state = var_names_state[ii] if self[var_name_state].is_state(): self[var_name].set_data(state, self.di.indx[var_name_state]) else: msg = '%s is not a state part' % var_name_state raise IndexError(msg)
[docs] def state_to_output(self, vec, fill_value=None, var_info=None, extend=True, linearization=None): """ Convert a state vector to a dictionary of output data usable by Mesh.write(). """ di = self.di if var_info is None: self.check_vector_size(vec) var_info = {} for name in di.var_names: var_info[name] = (False, name) out = {} for key, indx in six.iteritems(di.indx): var = self[key] if key not in list(var_info.keys()): continue is_part, name = var_info[key] if is_part: aux = vec else: aux = vec[indx] out.update(var.create_output(aux, key=name, extend=extend, fill_value=fill_value, linearization=linearization)) return out
[docs] def iter_state(self, ordered=True): if ordered: for ii in self.ordered_state: yield self[ii] else: for ii in self.state: yield self[ii]
[docs] def init_history(self): for var in self.iter_state(): var.init_history()
[docs] def time_update(self, ts, functions, verbose=True): if verbose: output('updating variables...') for var in self: var.time_update(ts, functions) if verbose: output('...done')
[docs] def advance(self, ts): for var in self.iter_state(): var.advance(ts)
[docs]class Variable(Struct): _count = 0 _orders = [] _all_var_names = set()
[docs] @staticmethod def reset(): Variable._count = 0 Variable._orders = [] Variable._all_var_names = set()
[docs] @staticmethod def from_conf(key, conf, fields): aux = conf.kind.split() if len(aux) == 2: kind, family = aux elif len(aux) == 3: kind, family = aux[0], '_'.join(aux[1:]) else: raise ValueError('variable kind is 2 or 3 words! (%s)' % conf.kind) history = conf.get('history', None) if history is not None: try: history = int(history) assert_(history >= 0) except (ValueError, TypeError): raise ValueError('history must be integer >= 0! (got "%s")' % history) order = conf.get('order', None) if order is not None: order = int(order) primary_var_name = conf.get('dual', None) if primary_var_name is None: if hasattr(conf, 'like'): primary_var_name = get_default(conf.like, '(set-to-None)') else: primary_var_name = None special = conf.get('special', None) if family == 'field': try: fld = fields[conf.field] except IndexError: msg = 'field "%s" does not exist!' % conf.field raise KeyError(msg) if "DG" in fld.family_name: obj = DGFieldVariable(conf.name, kind, fld, order, primary_var_name, special=special, key=key, history=history) else: obj = FieldVariable(conf.name, kind, fld, order, primary_var_name, special=special, key=key, history=history) else: raise ValueError('unknown variable family! (%s)' % family) return obj
def __init__(self, name, kind, order=None, primary_var_name=None, special=None, flags=None, **kwargs): Struct.__init__(self, name=name, **kwargs) self.flags = set() if flags is not None: for flag in flags: self.flags.add(flag) self.indx = slice(None) self.n_dof = None self.step = 0 self.dt = 1.0 self.initial_condition = None self.dual_var_name = None self.eq_map = None if self.is_virtual(): self.data = None else: self.data = deque() self.data.append(None) self._set_kind(kind, order, primary_var_name, special=special) Variable._all_var_names.add(name) def _set_kind(self, kind, order, primary_var_name, special=None): if kind == 'unknown': self.flags.add(is_state) if order is not None: if order in Variable._orders: raise ValueError('order %d already used!' % order) else: self._order = order Variable._orders.append(order) else: self._order = Variable._count Variable._orders.append(self._order) Variable._count += 1 self.dof_name = self.name elif kind == 'test': if primary_var_name == self.name: raise ValueError('primary variable for %s cannot be %s!' % (self.name, primary_var_name)) self.flags.add(is_virtual) msg = 'test variable %s: related unknown missing' % self.name self.primary_var_name = get_default(primary_var_name, None, msg) self.dof_name = self.primary_var_name elif kind == 'parameter': self.flags.add(is_parameter) msg = 'parameter variable %s: related unknown missing' % self.name self.primary_var_name = get_default(primary_var_name, None, msg) if self.primary_var_name == '(set-to-None)': self.primary_var_name = None self.dof_name = self.name else: self.dof_name = self.primary_var_name if special is not None: self.special = special else: raise NotImplementedError('unknown variable kind: %s' % kind) self.kind = kind def _setup_dofs(self, n_nod, n_components, val_shape): """ Setup number of DOFs and DOF names. """ self.n_nod = n_nod self.n_components = n_components self.val_shape = val_shape self.n_dof = self.n_nod * self.n_components self.dofs = [self.dof_name + ('.%d' % ii) for ii in range(self.n_components)]
[docs] def get_primary(self): """ Get the corresponding primary variable. Returns ------- var : Variable instance The primary variable, or `self` for state variables or if `primary_var_name` is None, or None if no other variables are defined. """ if self.is_state(): var = self elif self.primary_var_name is not None: if ((self._variables is not None) and (self.primary_var_name in self._variables.names)): var = self._variables[self.primary_var_name] else: var = None else: var = self return var
[docs] def get_dual(self): """ Get the dual variable. Returns ------- var : Variable instance The primary variable for non-state variables, or the dual variable for state variables. """ if self.is_state(): if ((self._variables is not None) and (self.dual_var_name in self._variables.names)): var = self._variables[self.dual_var_name] else: var = None else: if ((self._variables is not None) and (self.primary_var_name in self._variables.names)): var = self._variables[self.primary_var_name] else: var = None return var
[docs] def is_state(self): return is_state in self.flags
[docs] def is_virtual(self): return is_virtual in self.flags
[docs] def is_parameter(self): return is_parameter in self.flags
[docs] def is_state_or_parameter(self): return (is_state in self.flags) or (is_parameter in self.flags)
[docs] def is_kind(self, kind): return eval('self.is_%s()' % kind)
[docs] def is_real(self): return self.dtype in real_types
[docs] def is_complex(self): return self.dtype in complex_types
[docs] def is_finite(self, step=0, derivative=None, dt=None): return nm.isfinite(self(step=step, derivative=derivative, dt=dt)).all()
[docs] def get_primary_name(self): if self.is_state(): name = self.name else: name = self.primary_var_name return name
[docs] def init_history(self): """Initialize data of variables with history.""" if self.history is None: return self.data = deque((self.history + 1) * [None]) self.step = 0
[docs] def time_update(self, ts, functions): """Implemented in subclasses.""" pass
[docs] def advance(self, ts): """ Advance in time the DOF state history. A copy of the DOF vector is made to prevent history modification. """ if self.history is None: return self.step = ts.step + 1 if self.history > 0: # Copy the current step data to the history data, shift history, # initialize if needed. The current step data are left intact. # Note: cannot use self.data.rotate() due to data sharing with # State. for ii in range(self.history, 0, -1): if self.data[ii] is None: self.data[ii] = nm.empty_like(self.data[0]) self.data[ii][:] = self.data[ii - 1] # Advance evaluate cache. for step_cache in six.itervalues(self.evaluate_cache): steps = sorted(step_cache.keys()) for step in steps: if step is None: # Special caches with possible custom advance() # function. for key, val in six.iteritems(step_cache[step]): if hasattr(val, '__advance__'): val.__advance__(ts, val) elif -step < self.history: step_cache[step-1] = step_cache[step] if len(steps) and (steps[0] is not None): step_cache.pop(steps[-1])
[docs] def init_data(self, step=0): """ Initialize the dof vector data of time step `step` to zeros. """ if self.is_state_or_parameter(): data = nm.zeros((self.n_dof,), dtype=self.dtype) self.set_data(data, step=step)
[docs] def set_constant(self, val): """ Set the variable to a constant value. """ data = nm.empty((self.n_dof,), dtype=self.dtype) data.fill(val) self.set_data(data)
[docs] def set_data(self, data=None, indx=None, step=0, preserve_caches=False): """ Set data (vector of DOF values) of the variable. Parameters ---------- data : array The vector of DOF values. indx : int, optional If given, `data[indx]` is used. step : int, optional The time history step, 0 (default) = current. preserve_caches : bool If True, do not invalidate evaluate caches of the variable. """ data = data.ravel() if indx is None: indx = slice(0, len(data)) else: indx = slice(int(indx.start), int(indx.stop)) n_data_dof = indx.stop - indx.start if self.n_dof != n_data_dof: msg = 'incompatible data shape! (%d (variable) == %d (data))' \ % (self.n_dof, n_data_dof) raise ValueError(msg) elif (step > 0) or (-step >= len(self.data)): raise ValueError('step %d out of range! ([%d, 0])' % (step, -(len(self.data) - 1))) else: self.data[step] = data self.indx = indx if not preserve_caches: self.invalidate_evaluate_cache(step=step)
def __call__(self, step=0, derivative=None, dt=None): """ Return vector of degrees of freedom of the variable. Parameters ---------- step : int, default 0 The time step (0 means current, -1 previous, ...). derivative : None or 'dt' If not None, return time derivative of the DOF vector, approximated by the backward finite difference. Returns ------- vec : array The DOF vector. If `derivative` is None: a view of the data vector, otherwise: required derivative of the DOF vector at time step given by `step`. Notes ----- If the previous time step is requested in step 0, the step 0 DOF vector is returned instead. """ if derivative is None: if (self.step == 0) and (step == -1): data = self.data[0] else: data = self.data[-step] if data is None: raise ValueError('data of variable are not set! (%s, step %d)' \ % (self.name, step)) return data[self.indx] else: if self.history is None: msg = 'set history type of variable %s to use derivatives!'\ % self.name raise ValueError(msg) dt = get_default(dt, self.dt) return (self(step=step) - self(step=step-1)) / dt
[docs] def get_initial_condition(self): if self.initial_condition is None: return 0.0 else: return self.initial_condition
[docs]class FieldVariable(Variable): """ A finite element field variable. field .. field description of variable (borrowed) """ def __init__(self, name, kind, field, order=None, primary_var_name=None, special=None, flags=None, history=None, **kwargs): Variable.__init__(self, name, kind, order, primary_var_name, special, flags, history=history, **kwargs) self._set_field(field) self.has_field = True self.has_bc = True self._variables = None self.clear_evaluate_cache() def _set_field(self, field): """ Set field of the variable. Takes reference to a Field instance. Sets dtype according to field.dtype. Sets `dim` attribute to spatial dimension. """ self.is_surface = field.is_surface self.field = field self._setup_dofs(field.n_nod, field.n_components, field.val_shape) self.flags.add(is_field) self.dtype = field.dtype self.dim = field.domain.shape.dim def _get_setter(self, kind, functions, **kwargs): """ Get the setter function of the variable and its arguments depending in the setter kind. """ if not (hasattr(self, 'special') and (kind in self.special)): return setter_name = self.special[kind] setter = functions[setter_name] region = self.field.region nod_list = self.field.get_dofs_in_region(region) nods = nm.unique(nod_list) coors = self.field.get_coor(nods) if kind == 'setter': sargs = (kwargs.get('ts'), coors) elif kind == 'ic': sargs = (coors, ) skwargs = {'region' : region} return setter, sargs, skwargs
[docs] def get_field(self): return self.field
[docs] def get_mapping(self, region, integral, integration, get_saved=False, return_key=False): """ Get the reference element mapping of the underlying field. See Also -------- sfepy.discrete.common.fields.Field.get_mapping """ if region is None: region = self.field.region out = self.field.get_mapping(region, integral, integration, get_saved=get_saved, return_key=return_key) return out
[docs] def get_dof_conn(self, dc_type, is_trace=False, trace_region=None): """ Get active dof connectivity of a variable. Notes ----- The primary and dual variables must have the same Region. """ if self.is_virtual(): var = self.get_primary() # No primary variable can occur in single term evaluations. var_name = var.name if var is not None else self.name else: var_name = self.name if not is_trace: region_name = dc_type.region_name else: aux = self.field.domain.regions[dc_type.region_name] region = aux.get_mirror_region(trace_region) region_name = region.name key = (var_name, region_name, dc_type.type, is_trace) dc = self.adof_conns[key] return dc
[docs] def get_dof_info(self, active=False): details = Struct(name='field_var_dof_details', n_nod=self.n_nod, dpn=self.n_components) if active: n_dof = self.n_adof else: n_dof = self.n_dof return n_dof, details
[docs] def time_update(self, ts, functions): """ Store time step, set variable data for variables with the setter function. """ if ts is not None: self.dt = ts.dt if hasattr(self, 'special') and ('setter' in self.special): setter, sargs, skwargs = self._get_setter('setter', functions, ts=ts) self.set_data(setter(*sargs, **skwargs)) output('data of %s set by %s()' % (self.name, setter.name))
[docs] def set_from_qp(self, data_qp, integral, step=0): """ Set DOFs of variable using values in quadrature points corresponding to the given integral. """ data_vertex = self.field.average_qp_to_vertices(data_qp, integral) # Field nodes values. data = self.field.interp_v_vals_to_n_vals(data_vertex) data = data.ravel() self.indx = slice(0, len(data)) self.data[step] = data
[docs] def set_from_mesh_vertices(self, data): """ Set the variable using values at the mesh vertices. """ ndata = self.field.interp_v_vals_to_n_vals(data) self.set_data(ndata)
[docs] def set_from_function(self, fun, step=0): """ Set the variable data (the vector of DOF values) using a function of space coordinates. Parameters ---------- fun : callable The function of coordinates returning DOF values of shape `(n_coor, n_components)`. step : int, optional The time history step, 0 (default) = current. """ _, vv = self.field.set_dofs(fun, self.field.region, self.n_components) self.set_data(vv.ravel(), step=step)
[docs] def equation_mapping(self, bcs, var_di, ts, functions, problem=None, warn=False): """ Create the mapping of active DOFs from/to all DOFs. Sets n_adof. Returns ------- active_bcs : set The set of boundary conditions active in the current time. """ self.eq_map = EquationMap('eq_map', self.dofs, var_di) if bcs is not None: bcs.canonize_dof_names(self.dofs) bcs.sort() active_bcs = self.eq_map.map_equations(bcs, self.field, ts, functions, problem=problem, warn=warn) self.n_adof = self.eq_map.n_eq return active_bcs
[docs] def setup_initial_conditions(self, ics, di, functions, warn=False): """ Setup of initial conditions. """ ics.canonize_dof_names(self.dofs) ics.sort() self.initial_condition = nm.zeros((di.n_dof[self.name],), dtype=self.dtype) for ic in ics: region = ic.region dofs, val = ic.dofs if warn: clean_msg = ('warning: ignoring nonexistent' \ ' IC node (%s) in ' % self.name) else: clean_msg = None nod_list = self.field.get_dofs_in_region(region) if len(nod_list) == 0: continue fun = get_condition_value(val, functions, 'IC', ic.name) if isinstance(fun, Function): aux = fun fun = lambda coors: aux(coors, ic=ic) nods, vv = self.field.set_dofs(fun, region, len(dofs), clean_msg) eq = expand_nodes_to_equations(nods, dofs, self.dofs) self.initial_condition[eq] = nm.ravel(vv)
[docs] def get_data_shape(self, integral, integration='volume', region_name=None): """ Get element data dimensions for given approximation. Parameters ---------- integral : Integral instance The integral describing used numerical quadrature. integration : 'volume', 'surface', 'surface_extra', 'point' or 'custom' The term integration type. region_name : str The name of the region of the integral. Returns ------- data_shape : 5 ints The `(n_el, n_qp, dim, n_en, n_comp)` for volume shape kind, `(n_fa, n_qp, dim, n_fn, n_comp)` for surface shape kind and `(n_nod, 0, 0, 1, n_comp)` for point shape kind. Notes ----- - `n_el`, `n_fa` = number of elements/facets - `n_qp` = number of quadrature points per element/facet - `dim` = spatial dimension - `n_en`, `n_fn` = number of element/facet nodes - `n_comp` = number of variable components in a point/node - `n_nod` = number of element nodes """ aux = self.field.get_data_shape(integral, integration=integration, region_name=region_name) data_shape = aux + (self.n_components,) return data_shape
[docs] def clear_evaluate_cache(self): """ Clear current evaluate cache. """ self.evaluate_cache = {}
[docs] def invalidate_evaluate_cache(self, step=0): """ Invalidate variable data in evaluate cache for time step given by `step` (0 is current, -1 previous, ...). This should be done, for example, prior to every nonlinear solver iteration. """ for step_cache in six.itervalues(self.evaluate_cache): for key in list(step_cache.keys()): if key == step: # Given time step to clear. step_cache.pop(key)
[docs] def evaluate(self, mode='val', region=None, integral=None, integration=None, step=0, time_derivative=None, is_trace=False, trace_region=None, dt=None, bf=None): """ Evaluate various quantities related to the variable according to `mode` in quadrature points defined by `integral`. The evaluated data are cached in the variable instance in `evaluate_cache` attribute. Parameters ---------- mode : one of 'val', 'grad', 'div', 'cauchy_strain' The evaluation mode. region : Region instance, optional The region where the evaluation occurs. If None, the underlying field region is used. integral : Integral instance, optional The integral defining quadrature points in which the evaluation occurs. If None, the first order volume integral is created. Must not be None for surface integrations. integration : 'volume', 'surface', 'surface_extra', or 'point' The term integration type. If None, it is derived from `integral`. step : int, default 0 The time step (0 means current, -1 previous, ...). time_derivative : None or 'dt' If not None, return time derivative of the data, approximated by the backward finite difference. is_trace : bool, default False Indicate evaluation of trace of the variable on a boundary region. dt : float, optional The time step to be used if `derivative` is `'dt'`. If None, the `dt` attribute of the variable is used. bf : Base function, optional The base function to be used in 'val' mode. Returns ------- out : array The 4-dimensional array of shape `(n_el, n_qp, n_row, n_col)` with the requested data, where `n_row`, `n_col` depend on `mode`. """ if integration == 'custom': msg = 'cannot use FieldVariable.evaluate() with custom integration!' raise ValueError(msg) step_cache = self.evaluate_cache.setdefault(mode, {}) cache = step_cache.setdefault(step, {}) field = self.field if region is None: region = field.region if is_trace: region = region.get_mirror_region(trace_region) if (region is not field.region) and not region.is_empty: assert_(field.region.contains(region)) if integral is None: integral = Integral('aux_1', 1) if integration is None: integration = 'volume' if region.can_cells else 'surface' geo, _, key = field.get_mapping(region, integral, integration, return_key=True) key += (time_derivative, is_trace) if key in cache: out = cache[key] else: vec = self(step=step, derivative=time_derivative, dt=dt) ct = integration if integration == 'surface_extra': ct = 'volume' conn = field.get_econn(ct, region, is_trace, integration) shape = self.get_data_shape(integral, integration, region.name) if self.dtype == nm.float64: out = eval_real(vec, conn, geo, mode, shape, bf) else: out = eval_complex(vec, conn, geo, mode, shape, bf) cache[key] = out return out
[docs] def get_state_in_region(self, region, reshape=True, step=0): """ Get DOFs of the variable in the given region. Parameters ---------- region : Region The selected region. reshape : bool If True, reshape the DOF vector to a 2D array with the individual components as columns. Otherwise a 1D DOF array of the form [all DOFs in region node 0, all DOFs in region node 1, ...] is returned. step : int, default 0 The time step (0 means current, -1 previous, ...). Returns ------- out : array The selected DOFs. """ nods = self.field.get_dofs_in_region(region, merge=True) eq = nm.empty((len(nods) * self.n_components,), dtype=nm.int32) for idof in range(self.n_components): eq[idof::self.n_components] = self.n_components * nods \ + idof + self.indx.start out = self.data[step][eq] if reshape: out.shape = (len(nods), self.n_components) return out
[docs] def apply_ebc(self, vec, offset=0, force_values=None): """ Apply essential (Dirichlet) and periodic boundary conditions to vector `vec`, starting at `offset`. """ eq_map = self.eq_map ii = offset + eq_map.eq_ebc # EBC, if force_values is None: vec[ii] = eq_map.val_ebc else: if isinstance(force_values, dict): vec[ii] = force_values[self.name] else: vec[ii] = force_values # EPBC. vec[offset+eq_map.master] = vec[offset+eq_map.slave]
[docs] def apply_ic(self, vec, offset=0, force_values=None): """ Apply initial conditions conditions to vector `vec`, starting at `offset`. """ ii = slice(offset, offset + self.n_dof) if force_values is None: vec[ii] = self.get_initial_condition() else: if isinstance(force_values, dict): vec[ii] = force_values[self.name] else: vec[ii] = force_values
[docs] def get_reduced(self, vec, offset=0, follow_epbc=False): """ Get the reduced DOF vector, with EBC and PBC DOFs removed. Notes ----- The full vector starts in `vec` at `offset`. If 'follow_epbc' is True, values of EPBC master DOFs are not simply thrown away, but added to the corresponding slave DOFs, just like when assembling. For vectors with state (unknown) variables it should be set to False, for assembled vectors it should be set to True. """ eq_map = self.eq_map ii = offset + eq_map.eqi r_vec = vec[ii] if follow_epbc: master = offset + eq_map.master slave = eq_map.eq[eq_map.slave] ii = slave >= 0 la.assemble1d(r_vec, slave[ii], vec[master[ii]]) return r_vec
[docs] def get_full(self, r_vec, r_offset=0, force_value=None, vec=None, offset=0): """ Get the full DOF vector satisfying E(P)BCs from a reduced DOF vector. Notes ----- The reduced vector starts in `r_vec` at `r_offset`. Passing a `force_value` overrides the EBC values. Optionally, `vec` argument can be provided to store the full vector (in place) starting at `offset`. """ if vec is None: vec = nm.empty(self.n_dof, dtype=r_vec.dtype) else: vec = vec[offset:offset+self.n_dof] eq_map = self.eq_map r_vec = r_vec[r_offset:r_offset+eq_map.n_eq] # EBC. vec[eq_map.eq_ebc] = get_default(force_value, eq_map.val_ebc) # Reduced vector values. vec[eq_map.eqi] = r_vec # EPBC. vec[eq_map.master] = vec[eq_map.slave] unused_dofs = self.field.get('unused_dofs') if unused_dofs is not None: vec[:] = self.field.restore_substituted(vec) return vec
[docs] def create_output(self, vec=None, key=None, extend=True, fill_value=None, linearization=None): """ Convert the DOF vector to a dictionary of output data usable by Mesh.write(). Parameters ---------- vec : array, optional An alternative DOF vector to be used instead of the variable DOF vector. key : str, optional The key to be used in the output dictionary instead of the variable name. extend : bool Extend the DOF values to cover the whole domain. fill_value : float or complex The value used to fill the missing DOF values if `extend` is True. linearization : Struct or None The linearization configuration for higher order approximations. """ linearization = get_default(linearization, Struct(kind='strip')) if vec is None: vec = self() key = get_default(key, self.name) aux = nm.reshape(vec, (self.n_dof // self.n_components, self.n_components)) out = self.field.create_output(aux, self.name, dof_names=self.dofs, key=key, extend=extend, fill_value=fill_value, linearization=linearization) return out
[docs] def get_element_diameters(self, cells, mode, square=False): """Get diameters of selected elements.""" field = self.field domain = field.domain cells = nm.array(cells) diameters = nm.empty((cells.shape[0],), dtype=nm.float64) integral = Integral('i_tmp', 1) vg, _ = field.get_mapping(field.region, integral, 'volume') diameters = domain.get_element_diameters(cells, vg, mode, square=square) return diameters
[docs] def save_as_mesh(self, filename): """ Save the field mesh and the variable values into a file for visualization. Only the vertex values are stored. """ mesh = self.field.create_mesh(extra_nodes=False) vec = self() n_nod, n_dof, dpn = mesh.n_nod, self.n_dof, self.n_components aux = nm.reshape(vec, (n_dof // dpn, dpn)) ext = self.field.extend_dofs(aux, 0.0) out = {} if self.field.approx_order != 0: out[self.name] = Struct(name='output_data', mode='vertex', data=ext, var_name=self.name, dofs=self.dofs) else: ext.shape = (ext.shape[0], 1, ext.shape[1], 1) out[self.name] = Struct(name='output_data', mode='cell', data=ext, var_name=self.name, dofs=self.dofs) mesh.write(filename, io='auto', out=out)
[docs] def has_same_mesh(self, other): """ Returns ------- flag : int The flag can be either 'different' (different meshes), 'deformed' (slightly deformed same mesh), or 'same' (same). """ f1 = self.field f2 = other.field c1 = f1.get_coor() c2 = f2.get_coor() if c1.shape != c2.shape: flag = 'different' else: eps = 10.0 * nm.finfo(nm.float64).eps if nm.allclose(c1, c2, rtol=eps, atol=0.0): flag = 'same' elif nm.allclose(c1, c2, rtol=0.1, atol=0.0): flag = 'deformed' else: flag = 'different' return flag
[docs] def get_interp_coors(self, strategy='interpolation', interp_term=None): """ Get the physical coordinates to interpolate into, based on the strategy used. """ if strategy == 'interpolation': coors = self.field.get_coor() elif strategy == 'projection': region = self.field.region integral = Integral(term=interp_term) coors = get_physical_qps(region, integral) else: raise ValueError('unknown interpolation strategy! (%s)' % strategy) return coors
[docs] def evaluate_at(self, coors, mode='val', strategy='general', close_limit=0.1, get_cells_fun=None, cache=None, ret_cells=False, ret_status=False, ret_ref_coors=False, verbose=False): """ Evaluate the variable in the given physical coordinates. Convenience wrapper around :func:`Field.evaluate_at() <sfepy.discrete.common.fields.Field.evaluate_at()>`, see its docstring for more details. """ source_vals = self().reshape((self.n_nod, self.n_components)) out = self.field.evaluate_at(coors, source_vals, mode=mode, strategy=strategy, close_limit=close_limit, get_cells_fun=get_cells_fun, cache=cache, ret_cells=ret_cells, ret_status=ret_status, ret_ref_coors=ret_ref_coors, verbose=verbose) return out
[docs] def set_from_other(self, other, strategy='projection', close_limit=0.1): """ Set the variable using another variable. Undefined values (e.g. outside the other mesh) are set to numpy.nan, or extrapolated. Parameters ---------- strategy : 'projection' or 'interpolation' The strategy to set the values: the L^2 orthogonal projection (not implemented!), or a direct interpolation to the nodes (nodal elements only!) Notes ----- If the other variable uses the same field mesh, the coefficients are set directly. """ flag_same_mesh = self.has_same_mesh(other) if flag_same_mesh == 'same': self.set_data(other()) return if strategy == 'interpolation': coors = self.get_interp_coors(strategy) elif strategy == 'projection': ## interp_term = Term() # TODO ## coors = self.get_interp_coors(strategy, interp_term) pass else: raise ValueError('unknown interpolation strategy! (%s)' % strategy) vals = other.evaluate_at(coors, strategy='general', close_limit=close_limit) if strategy == 'interpolation': self.set_data(vals) elif strategy == 'projection': raise NotImplementedError('unsupported strategy! (%s)' % strategy) else: raise ValueError('unknown interpolation strategy! (%s)' % strategy)
[docs]class DGFieldVariable(FieldVariable): """ Fieald variable specificaly intended for use with DGFields, bypasses application of EBC and EPBC as this is done in DGField. Is instance checked in create_adof_conns. """ def __init__(self, name, kind, field, order=None, primary_var_name=None, special=None, flags=None, history=None, **kwargs): FieldVariable.__init__(self, name, kind, field, order=order, primary_var_name=primary_var_name, special=special, flags=flags, history=history, **kwargs) from sfepy.discrete.dg.fields import DGField if isinstance(field, DGField): pass else: raise ValueError("Attempted to use DGFieldVariable with non DGField!")
[docs] def apply_ebc(self, vec, offset=0, force_values=None): pass
[docs] def get_full(self, r_vec, r_offset=0, force_value=None, vec=None, offset=0): """ Get the full DOF vector satisfying E(P)BCs from a reduced DOF vector. Notes ----- The reduced vector starts in `r_vec` at `r_offset`. Passing a `force_value` overrides the EBC values. Optionally, `vec` argument can be provided to store the full vector (in place) starting at `offset`. """ if vec is None: vec = nm.empty(self.n_dof, dtype=r_vec.dtype) else: vec = vec[offset:offset+self.n_dof] eq_map = self.eq_map r_vec = r_vec[r_offset:r_offset+eq_map.n_eq] # overide to hotfix second application of EBCs # # EBC. # vec[eq_map.eq_ebc] = get_default(force_value, eq_map.val_ebc) # Reduced vector values, for DG this is full vector as eq_map.eq # contains all dofs, cf. create_adof_conns vec[eq_map.eqi] = r_vec # EPBC. # vec[eq_map.master] = vec[eq_map.slave] unused_dofs = self.field.get('unused_dofs') if unused_dofs is not None: vec[:] = self.field.restore_substituted(vec) return vec