Source code for sfepy.homogenization.homogen_app

from __future__ import print_function
from __future__ import absolute_import
import os.path as op
import shutil

import numpy as nm

from sfepy.base.base import get_default, Struct
from sfepy.homogenization.coefficients import Coefficients
from sfepy.homogenization.engine import HomogenizationEngine
from sfepy.applications import PDESolverApp
import sfepy.discrete.fem.periodic as per
import sfepy.linalg as la
import sfepy.base.multiproc as multi
import six
from six.moves import range


[docs]class HomogenizationApp(HomogenizationEngine):
[docs] @staticmethod def process_options(options): """ Application options setup. Sets default values for missing non-compulsory options. """ get = options.get volume = get('volume', None) volumes = get('volumes', None) if volume is None and volumes is None: raise ValueError('missing "volume" in options!') return Struct(print_digits=get('print_digits', 3), float_format=get('float_format', '%8.3e'), coefs_filename=get('coefs_filename', 'coefs'), tex_names=get('tex_names', None), coefs=get('coefs', None, 'missing "coefs" in options!'), requirements=get('requirements', None, 'missing "requirements" in options!'), return_all=get('return_all', False), mesh_update_variable=get('mesh_update_variable', None), macro_data=get('macro_data', None), micro_update=get('micro_update', {}), n_micro=get('n_micro', None), multiprocessing=get('multiprocessing', True), use_mpi=get('use_mpi', False), store_micro_idxs=get('store_micro_idxs', []), volume=volume, volumes=volumes)
def __init__(self, conf, options, output_prefix, **kwargs): PDESolverApp.__init__(self, conf, options, output_prefix, init_equations=False) self.setup_options() self.n_micro = kwargs.get('n_micro', self.app_options.get('n_micro', None)) self.updating_corrs = None self.micro_state_cache = {} self.multiproc_mode = None self.micro_states = None if self.n_micro is None else {} # macroscopic data given in problem options dict. macro_data = self.app_options.macro_data if macro_data is not None: self.n_micro = macro_data[list(macro_data.keys())[0]].shape[0] self.setup_macro_data(macro_data) if self.n_micro is not None: for k in self.app_options.micro_update: if not k == 'coors': self.micro_states[k] = None coors = self.problem.domain.get_mesh_coors() c_sh = (self.n_micro,) + coors.shape self.micro_states['coors'] = nm.empty(c_sh, dtype=nm.float64) mac_ids = kwargs.get('mac_ids', self.app_options.get('mac_ids', None)) self.micro_states['id'] = [] for im in range(self.n_micro): self.micro_states['coors'][im] = coors self.micro_states['id'].append( mac_ids[im] if mac_ids is not None else im) output_dir = self.problem.output_dir if conf._filename is not None: shutil.copyfile(conf._filename, op.join(output_dir, op.basename(conf._filename)))
[docs] def setup_options(self): PDESolverApp.setup_options(self) po = HomogenizationApp.process_options self.app_options += po(self.conf.options) if hasattr(self, 'he'): self.he.setup_options()
[docs] def setup_macro_data(self, data): """ Setup macroscopic deformation gradient. """ self.macro_data = data self.problem.homogenization_macro_data = self.macro_data
[docs] def get_micro_cache_key(self, key, icoor, itime): tt = '' if itime is None else '_t%03d' % itime return '%s_%d%s' % (key, icoor, tt)
[docs] def update_micro_states(self): """ Update microstructures state according to the macroscopic data and corrector functions. """ def calculate_local_update(state, corrs, var, macro_vals, mul=1): for ic, corr in enumerate(corrs): if state is None: sh = corr.states[corr.components[0]][var].shape \ if hasattr(corr, 'states') else corr.state[var].shape state = nm.zeros((len(corrs),) + sh, dtype=nm.float64) else: sh = state[ic].shape if hasattr(corr, 'states'): corr_arr = nm.array( [corr.states[jj][var] for jj in corr.components]).T mval = macro_vals[ic].reshape((corr_arr.shape[1], 1)) state[ic] += mul * nm.dot(corr_arr, mval).reshape(sh) else: if macro_vals is None: state[ic] += mul * corr.state[var].reshape(sh) else: state[ic] += mul\ * (corr.state[var] * macro_vals[ic]).reshape(sh) return state micro_update = self.app_options.micro_update for key, upd_obj in six.iteritems(micro_update): if '_prev' in key: continue state = self.micro_states[key] if key + '_prev' in micro_update and state is not None: self.micro_states[key + '_prev'] = state.copy() if key == 'coors': if hasattr(upd_obj, '__call__'): upd_obj(state, self.macro_data, self.problem) else: # macro strain - in the first sequence of the list mtx_e = self.macro_data[upd_obj[0][2]] state += la.dot_sequences(state, mtx_e, 'ABT') if hasattr(upd_obj, '__call__'): upd_obj(state, self.macro_data, self.problem) else: if self.updating_corrs is not None: for v in upd_obj: if len(v) == 4: cname, vname, mname, mul = v else: cname, vname, mname = v mul = 1 macro_data = None if mname is None \ else self.macro_data[mname] if cname is not None: state0 = calculate_local_update( state, self.updating_corrs[cname], vname, macro_data, mul) else: state += macro_data[..., 0] if state0 is not state: self.micro_states[key] = state0 state = state0
[docs] def call(self, verbose=False, ret_all=None, itime=None, iiter=None): """ Call the homogenization engine and compute the homogenized coefficients. Parameters ---------- verbose : bool If True, print the computed coefficients. ret_all : bool or None If not None, it can be used to override the 'return_all' option. If True, also the dependencies are returned. time_tag: str The time tag used in file names. Returns ------- coefs : Coefficients instance The homogenized coefficients. dependencies : dict The dependencies, if `ret_all` is True. """ opts = self.app_options ret_all = get_default(ret_all, opts.return_all) if not hasattr(self, 'he'): volumes = {} if hasattr(opts, 'volumes') and (opts.volumes is not None): volumes.update(opts.volumes) elif hasattr(opts, 'volume') and (opts.volume is not None): volumes['total'] = opts.volume else: volumes['total'] = 1.0 self.he = HomogenizationEngine(self.problem, self.options, volumes=volumes) if self.micro_states is not None: self.update_micro_states() self.he.set_micro_states(self.micro_states) multiproc_mode = None if opts.multiprocessing and multi.use_multiprocessing: multiproc, multiproc_mode = multi.get_multiproc(mpi=opts.use_mpi) if multiproc_mode is not None: upd_var = self.app_options.mesh_update_variable if upd_var is not None: uvar = self.problem.create_variables([upd_var])[upd_var] uvar.field.mappings0 = multiproc.get_dict('mappings0', soft_set=True) per.periodic_cache = multiproc.get_dict('periodic_cache', soft_set=True) time_tag = ('' if itime is None else '_t%03d' % itime)\ + ('' if iiter is None else '_i%03d' % iiter) aux = self.he(ret_all=ret_all, time_tag=time_tag) if ret_all: coefs, dependencies = aux # store correctors for coors update self.updating_corrs = {} for upd_obj in six.itervalues(opts.micro_update): if upd_obj is not None and not hasattr(upd_obj, '__call__'): for v in upd_obj: cr = v[0] if cr is not None: self.updating_corrs[cr] = dependencies[cr] else: coefs = aux if coefs is not None: coefs = Coefficients(**coefs.to_dict()) if verbose: prec = nm.get_printoptions()['precision'] if hasattr(opts, 'print_digits'): nm.set_printoptions(precision=opts.print_digits) print(coefs) nm.set_printoptions(precision=prec) ms_cache = self.micro_state_cache for ii in self.app_options.store_micro_idxs: for k in self.micro_states.keys(): key = self.get_micro_cache_key(k, ii, itime) ms_cache[key] = self.micro_states[k][ii] coef_save_name = op.join(opts.output_dir, opts.coefs_filename) coefs.to_file_hdf5(coef_save_name + '%s.h5' % time_tag) coefs.to_file_txt(coef_save_name + '%s.txt' % time_tag, opts.tex_names, opts.float_format) if ret_all: return coefs, dependencies else: return coefs