Source code for sfepy.discrete.common.dof_info

"""
Classes holding information on global DOFs and mapping of all DOFs -
equations (active DOFs).

Helper functions for the equation mapping.
"""
import numpy as nm
import scipy.sparse as sp

from sfepy.base.base import assert_, Struct, basestr
from sfepy.discrete.functions import Function
from sfepy.discrete.conditions import get_condition_value, EssentialBC, \
    PeriodicBC, DGPeriodicBC, DGEssentialBC


[docs]def expand_nodes_to_dofs(nods, n_dof_per_node): """ Expand DOF node indices into DOFs given a constant number of DOFs per node. """ dofs = nm.repeat(nods, n_dof_per_node) dofs.shape = (nods.shape[0], n_dof_per_node) idof = nm.arange(n_dof_per_node, dtype=nm.int32) dofs = n_dof_per_node * dofs + idof return dofs
[docs]def expand_nodes_to_equations(nods, dof_names, all_dof_names): """ Expand vector of node indices to equations (DOF indices) based on the DOF-per-node count. DOF names must be already canonized. Returns ------- eq : array The equations/DOF indices in the node-by-node order. """ dpn = len(all_dof_names) nc = len(dof_names) eq = nm.empty(len(nods) * nc, dtype=nm.int32) for ii, dof in enumerate(dof_names): idof = all_dof_names.index(dof) eq[ii::nc] = dpn * nods + idof return eq
[docs]def resolve_chains(master_slave, chains): """ Resolve EPBC chains - e.g. in corner nodes. """ for chain in chains: slave = chain[-1] master_slave[chain[:-1]] = slave + 1 master_slave[slave] = - chain[0] - 1 # Any of masters...
[docs]def group_chains(chain_list): """ Group EPBC chains. """ chains = [] while len(chain_list): chain = set(chain_list.pop(0)) ## print ':', chain ii = 0 while ii < len(chain_list): c1 = sorted(chain_list[ii]) ## print '--', ii, c1, chain is0 = c1[0] in chain is1 = c1[1] in chain if is0 and is1: chain_list.pop(ii) elif is0 or is1: chain.update(c1) chain_list.pop(ii) ii = 0 else: ii += 1 ## print ii, chain, chain_list ## print '->', chain ## print chain_list chains.append(list(chain)) ## print 'EPBC chain groups:', chains aux = {} for chain in chains: aux.setdefault(len(chain), [0])[0] += 1 ## print 'EPBC chain counts:', aux return chains
[docs]class DofInfo(Struct): """ Global DOF information, i.e. ordering of DOFs of the state (unknown) variables in the global state vector. """ def __init__(self, name): Struct.__init__(self, name=name) self.n_var = 0 self.var_names = [] self.n_dof = {} self.n_dof_total = 0 self.indx = {} self.details = {} self.shared_dofs = {} def _update_after_append(self, name, shared=None): if shared is None: indx0 = self.n_dof_total n_dof = self.n_dof[name] self.n_dof_total += n_dof self.indx[name] = slice(indx0, indx0 + n_dof) else: self.indx[name] = self.indx[shared] self.n_dof[name] = self.n_dof[shared] self.shared_dofs[name] = shared self.n_var += 1
[docs] def append_variable(self, var, active=False, shared=None): """ Append DOFs of the given variable. Parameters ---------- var : Variable instance The variable to append. active : bool, optional When True, only active (non-constrained) DOFs are considered. """ name = var.name if name in self.var_names: raise ValueError('variable %s already present!' % name) self.var_names.append(name) self.n_dof[name], self.details[name] = var.get_dof_info(active=active) self._update_after_append(name, shared=shared)
[docs] def append_raw(self, name, n_dof): """ Append raw DOFs. Parameters ---------- name : str The name of variable the DOFs correspond to. n_dof : int The number of DOFs. """ if name in self.var_names: raise ValueError('variable %s already present!' % name) self.var_names.append(name) self.n_dof[name], self.details[name] = n_dof, None self._update_after_append(name)
# # probably unused - not tested! # def update(self, name, n_dof): # """ # Set the number of DOFs of the given variable. # Parameters # ---------- # name : str # The name of variable the DOFs correspond to. # n_dof : int # The number of DOFs. # """ # if not name in self.var_names: # raise ValueError('variable %s is not present!' % name) # ii = self.var_names.index(name) # delta = n_dof - self.n_dof[name] # self.n_dof[name] = n_dof # for iv, nn in enumerate(self.var_names[ii:]): # self.ptr[ii+iv+1] += delta # self.indx[nn] = slice(self.ptr[ii+iv], self.ptr[ii+iv+1])
[docs] def get_info(self, var_name): """ Return information on DOFs of the given variable. Parameters ---------- var_name : str The name of the variable. """ return Struct(name='%s_dof_info' % var_name, var_name=var_name, n_dof=self.n_dof[var_name], indx=self.indx[var_name], details=self.details[var_name], shared_dofs_with=self.shared_dofs.get(var_name, None))
[docs] def get_subset_info(self, var_names): """ Return global DOF information for selected variables only. Silently ignores non-existing variable names. Parameters ---------- var_names : list The names of the selected variables. """ di = DofInfo(self.name + ':subset') for var_name in var_names: if var_name not in self.var_names: continue di.append_raw(var_name, self.n_dof[var_name]) return di
[docs] def get_n_dof_total(self): """ Return the total number of DOFs of all state variables. """ return self.n_dof_total
[docs]def is_active_bc(bc, ts=None, functions=None): """ Check whether the given boundary condition is active in the current time. Returns ------- active : bool True if the condition `bc` is active. """ if (bc.times is None) or (ts is None): active = True elif isinstance(bc.times, list): for tt in bc.times: if tt[0] <= ts.time < tt[1]: active = True break else: active = False else: if isinstance(bc.times, basestr): if functions is not None: fun = functions[bc.times] else: raise ValueError('no functions given for bc %s!' % bc.name) elif isinstance(bc.times, Function): fun = bc.times else: raise ValueError('unknown times type! (%s)' % type(bc.times)) active = fun(ts) return active
[docs]class EquationMap(Struct): """ Map all DOFs to equations for active DOFs. """ def __init__(self, name, dof_names, var_di): Struct.__init__(self, name=name, dof_names=dof_names, var_di=var_di) self.dpn = len(self.dof_names) self.eq = nm.arange(var_di.n_dof, dtype=nm.int32) self.n_dg_ebc = 0 self.dg_ebc_names = {} self.dg_ebc = {} self.dg_ebc_val = {} self.n_dg_epbc = 0 self.dg_epbc_names = [] self.dg_epbc = [] def _init_empty(self, field): self.val_ebc = nm.empty((0,), dtype=field.dtype) if field.get('unused_dofs') is None: self.eqi = nm.arange(self.var_di.n_dof, dtype=nm.int32) else: self._mark_unused(field) self.eqi = nm.compress(self.eq >= 0, self.eq) self.eq[self.eqi] = nm.arange(self.eqi.shape[0], dtype=nm.int32) self.eq_ebc = nm.empty((0,), dtype=nm.int32) self.master = nm.empty((0,), dtype=nm.int32) self.slave = nm.empty((0,), dtype=nm.int32) self.n_eq = self.eqi.shape[0] self.n_ebc = self.eq_ebc.shape[0] self.n_epbc = self.master.shape[0] def _mark_unused(self, field): unused_dofs = field.get('unused_dofs') if unused_dofs is not None: unused = expand_nodes_to_equations(field.unused_dofs, self.dof_names, self.dof_names) self.eq[unused] = -3
[docs] def map_equations(self, bcs, field, ts, functions, problem=None, warn=False): """ Create the mapping of active DOFs from/to all DOFs. Parameters ---------- bcs : Conditions instance The Dirichlet or periodic boundary conditions (single condition instances). The dof names in the conditions must already be canonized. field : Field instance The field of the variable holding the DOFs. ts : TimeStepper instance The time stepper. functions : Functions instance The registered functions. problem : Problem instance, optional The problem that can be passed to user functions as a context. warn : bool, optional If True, warn about BC on non-existent nodes. Returns ------- active_bcs : set The set of boundary conditions active in the current time. Notes ----- - Periodic bc: master and slave DOFs must belong to the same field (variables can differ, though). """ if bcs is None: self._init_empty(field) return set() eq_ebc = nm.zeros((self.var_di.n_dof,), dtype=nm.int32) val_ebc = nm.zeros((self.var_di.n_dof,), dtype=field.dtype) master_slave = nm.zeros((self.var_di.n_dof,), dtype=nm.int32) chains = [] active_bcs = set() for bc in bcs: # Skip conditions that are not active in the current time. if not is_active_bc(bc, ts=ts, functions=functions): continue if isinstance(bc, DGEssentialBC): ntype = "DGEBC" region = bc.region sig = (bc.key, bc.name, tuple(bc.dofs[0]), bc.region.name) elif isinstance(bc, DGPeriodicBC): ntype = "DGEPBC" region = bc.regions[0] sig = (bc.key, bc.name, tuple(bc.dofs[0]), tuple(bc.dofs[1]), bc.regions[0].name, bc.regions[1].name) elif isinstance(bc, EssentialBC): ntype = 'EBC' region = bc.region sig = (bc.key, bc.name, tuple(bc.dofs[0]), bc.region.name) elif isinstance(bc, PeriodicBC): ntype = 'EPBC' region = bc.regions[0] sig = (bc.key, bc.name, tuple(bc.dofs[0]), tuple(bc.dofs[1]), bc.regions[0].name, bc.regions[1].name) active_bcs.add(sig) if warn: clean_msg = ('warning: ignoring nonexistent %s node (%s) in ' % (ntype, self.var_di.var_name)) else: clean_msg = None # Get master region nodes. master_nod_list = field.get_dofs_in_region(region) if len(master_nod_list) == 0: continue if ntype == 'EBC': # EBC. dofs, val = bc.dofs ## # Evaluate EBC values. fun = get_condition_value(val, functions, 'EBC', bc.name) if isinstance(fun, Function): aux = fun fun = lambda coors: aux(ts, coors, bc=bc, problem=problem) nods, vv = field.set_dofs(fun, region, len(dofs), clean_msg) eq = expand_nodes_to_equations(nods, dofs, self.dof_names) # Duplicates removed here... eq_ebc[eq] = 1 if vv is not None: val_ebc[eq] = nm.ravel(vv) elif ntype == "DGEBC": dofs, val = bc.dofs ## # Evaluate EBC values. fun = get_condition_value(val, functions, 'EBC', bc.name) if isinstance(fun, Function): aux = fun fun = lambda coors: aux(ts, coors, bc=bc, problem=problem) values = field.get_bc_facet_values(fun, region, diff=bc.diff) bc2bfi = field.get_bc_facet_idx(region) self.dg_ebc_val.setdefault(bc.diff, []).append(values) self.dg_ebc.setdefault(bc.diff, []).append(bc2bfi) self.n_dg_ebc += 1 elif ntype == "DGEPBC": # ensure matching boundaries? master_bc2bfi = field.get_bc_facet_idx(region) slave_bc2bfi = field.get_bc_facet_idx(bc.regions[1]) self.dg_epbc.append((master_bc2bfi, slave_bc2bfi)) self.n_dg_epbc += 1 else: # EPBC. region = bc.regions[1] slave_nod_list = field.get_dofs_in_region(region) nmaster = nm.unique(master_nod_list) # Treat fields not covering the whole domain. if nmaster[0] == -1: nmaster = nmaster[1:] nslave = nm.unique(slave_nod_list) # Treat fields not covering the whole domain. if nslave[0] == -1: nslave = nslave[1:] ## print nmaster + 1 ## print nslave + 1 if nmaster.shape != nslave.shape: msg = 'EPBC list lengths do not match!\n(%s,\n %s)' %\ (nmaster, nslave) raise ValueError(msg) if (nmaster.shape[0] == 0) and (nslave.shape[0] == 0): continue mcoor = field.get_coor(nmaster) scoor = field.get_coor(nslave) fun = get_condition_value(bc.match, functions, 'EPBC', bc.name) if isinstance(fun, Function): i1, i2 = fun(mcoor, scoor) else: i1, i2 = fun ## print nm.c_[mcoor[i1], scoor[i2]] ## print nm.c_[nmaster[i1], nslave[i2]] + 1 meq = expand_nodes_to_equations(nmaster[i1], bc.dofs[0], self.dof_names) seq = expand_nodes_to_equations(nslave[i2], bc.dofs[1], self.dof_names) m_assigned = nm.where(master_slave[meq] != 0)[0] s_assigned = nm.where(master_slave[seq] != 0)[0] if m_assigned.size or s_assigned.size: # Chain EPBC. aux = master_slave[meq[m_assigned]] sgn = nm.sign(aux) om_chain = zip(meq[m_assigned], (aux - sgn) * sgn) chains.extend(om_chain) aux = master_slave[seq[s_assigned]] sgn = nm.sign(aux) os_chain = zip(seq[s_assigned], (aux - sgn) * sgn) chains.extend(os_chain) m_chain = zip(meq[m_assigned], seq[m_assigned]) chains.extend(m_chain) msd = nm.setdiff1d(s_assigned, m_assigned) s_chain = zip(meq[msd], seq[msd]) chains.extend(s_chain) msa = nm.union1d(m_assigned, s_assigned) ii = nm.setdiff1d(nm.arange(meq.size), msa) master_slave[meq[ii]] = seq[ii] + 1 master_slave[seq[ii]] = - meq[ii] - 1 else: master_slave[meq] = seq + 1 master_slave[seq] = - meq - 1 chains = group_chains(chains) resolve_chains(master_slave, chains) self.master = nm.nonzero(master_slave > 0)[0] self.slave = master_slave[self.master] - 1 # Propagate EBCs via PBCs. mask = eq_ebc[self.master] > 0 im0 = self.master[mask] im1 = self.slave[mask] mask = eq_ebc[self.slave] > 0 is0 = self.slave[mask] is1 = self.master[mask] val_ebc[im1] = val_ebc[im0] eq_ebc[im1] = eq_ebc[im0] val_ebc[is1] = val_ebc[is0] eq_ebc[is1] = eq_ebc[is0] self.eq_ebc = nm.nonzero(eq_ebc > 0)[0] self.val_ebc = val_ebc[self.eq_ebc] assert_((self.eq_ebc.shape == self.val_ebc.shape)) self.eq[self.eq_ebc] = -2 self.eq[self.master] = -1 self._mark_unused(field) self.eqi = self.eq[self.eq >= 0] self.eq[self.eqi] = nm.arange(self.eqi.shape[0], dtype=nm.int32) self.eq[self.master] = self.eq[self.slave] self.n_eq = self.eqi.shape[0] self.n_ebc = self.eq_ebc.shape[0] self.n_epbc = self.master.shape[0] return active_bcs
[docs] def get_operator(self): """ Get the matrix operator :math:`R` corresponding to the equation mapping, such that the restricted matrix :math:`A_r` can be obtained from the full matrix :math:`A` by :math:`A_r = R^T A R`. All the matrices are w.r.t. a single variables that uses this mapping. Returns ------- mtx : coo_matrix The matrix :math:`R`. """ # EBC. rows = self.eqi cols = nm.arange(self.n_eq, dtype=nm.int32) # EPBC. ic = self.eq[self.slave] ii = ic >= 0 rows = nm.r_[rows, self.master[ii]] cols = nm.r_[cols, ic[ii]] ones = nm.ones(rows.shape[0], dtype=nm.float64) mtx = sp.coo_matrix((ones, (rows, cols)), shape=(self.eq.shape[0], self.n_eq)) return mtx