import os.path as op
import shutil
import numpy as nm
from sfepy.base.base import get_default, Struct
from sfepy.homogenization.coefficients import Coefficients
from sfepy.homogenization.engine import HomogenizationEngine
from sfepy.applications import PDESolverApp
import sfepy.discrete.fem.periodic as per
import sfepy.linalg as la
import sfepy.base.multiproc as multi
[docs]
class HomogenizationApp(HomogenizationEngine):
[docs]
@staticmethod
def process_options(options):
"""
Application options setup. Sets default values for missing
non-compulsory options.
"""
get = options.get
volume = get('volume', None)
volumes = get('volumes', None)
if volume is None and volumes is None:
raise ValueError('missing "volume" in options!')
return Struct(print_digits=get('print_digits', 3),
float_format=get('float_format', '%8.3e'),
coefs_filename=get('coefs_filename', 'coefs'),
tex_names=get('tex_names', None),
coefs=get('coefs', None, 'missing "coefs" in options!'),
requirements=get('requirements', None,
'missing "requirements" in options!'),
return_all=get('return_all', False),
mesh_update_variable=get('mesh_update_variable', None),
macro_data=get('macro_data', None),
micro_update=get('micro_update', {}),
n_micro=get('n_micro', None),
multiprocessing=get('multiprocessing', True),
use_mpi=get('use_mpi', False),
store_micro_idxs=get('store_micro_idxs', []),
volume=volume,
volumes=volumes)
def __init__(self, conf, options, output_prefix, **kwargs):
PDESolverApp.__init__(self, conf, options, output_prefix,
init_equations=False)
self.setup_options()
self.n_micro = kwargs.get('n_micro',
self.app_options.get('n_micro', None))
self.updating_corrs = None
self.micro_state_cache = {}
self.multiproc_mode = None
self.micro_states = None if self.n_micro is None else {}
# macroscopic data given in problem options dict.
macro_data = self.app_options.macro_data
if macro_data is not None:
self.n_micro = macro_data[list(macro_data.keys())[0]].shape[0]
self.setup_macro_data(macro_data)
if self.n_micro is not None:
for k in self.app_options.micro_update:
if not k == 'coors':
self.micro_states[k] = None
coors = self.problem.domain.get_mesh_coors()
c_sh = (self.n_micro,) + coors.shape
self.micro_states['coors'] = nm.empty(c_sh, dtype=nm.float64)
mac_ids = kwargs.get('mac_ids',
self.app_options.get('mac_ids', None))
self.micro_states['id'] = []
for im in range(self.n_micro):
self.micro_states['coors'][im] = coors
self.micro_states['id'].append(
mac_ids[im] if mac_ids is not None else im)
output_dir = self.problem.output_dir
if conf._filename is not None:
shutil.copyfile(conf._filename,
op.join(output_dir, op.basename(conf._filename)))
[docs]
def setup_options(self):
PDESolverApp.setup_options(self)
po = HomogenizationApp.process_options
self.app_options += po(self.conf.options)
if hasattr(self, 'he'):
self.he.setup_options()
[docs]
def setup_macro_data(self, data):
"""
Setup macroscopic deformation gradient.
"""
self.macro_data = data
self.problem.homogenization_macro_data = self.macro_data
[docs]
def get_micro_cache_key(self, key, icoor, itime):
tt = '' if itime is None else '_t%03d' % itime
return '%s_%d%s' % (key, icoor, tt)
[docs]
def update_micro_states(self):
"""
Update microstructures state according to the macroscopic data
and corrector functions.
"""
def calculate_local_update(state, corrs, var, macro_vals, mul=1):
for ic, corr in enumerate(corrs):
if state is None:
sh = corr.states[corr.components[0]][var].shape \
if hasattr(corr, 'states') else corr.state[var].shape
state = nm.zeros((len(corrs),) + sh, dtype=nm.float64)
else:
sh = state[ic].shape
if hasattr(corr, 'states'):
corr_arr = nm.array(
[corr.states[jj][var] for jj in corr.components]).T
mval = macro_vals[ic].reshape((corr_arr.shape[1], 1))
state[ic] += mul * nm.dot(corr_arr, mval).reshape(sh)
else:
if macro_vals is None:
state[ic] += mul * corr.state[var].reshape(sh)
else:
state[ic] += mul\
* (corr.state[var] * macro_vals[ic]).reshape(sh)
return state
micro_update = self.app_options.micro_update
for key, upd_obj in micro_update.items():
if '_prev' in key:
continue
state = self.micro_states[key]
if key + '_prev' in micro_update and state is not None:
self.micro_states[key + '_prev'] = state.copy()
if key == 'coors':
if hasattr(upd_obj, '__call__'):
upd_obj(state, self.macro_data, self.problem)
else:
# macro strain - in the first sequence of the list
mtx_e = self.macro_data[upd_obj[0][2]]
state += la.dot_sequences(state, mtx_e, 'ABT')
if hasattr(upd_obj, '__call__'):
upd_obj(state, self.macro_data, self.problem)
else:
if self.updating_corrs is not None:
for v in upd_obj:
if len(v) == 4:
cname, vname, mname, mul = v
else:
cname, vname, mname = v
mul = 1
macro_data = None if mname is None \
else self.macro_data[mname]
if cname is not None:
state0 = calculate_local_update(
state, self.updating_corrs[cname],
vname, macro_data, mul)
else:
state += macro_data[..., 0]
if state0 is not state:
self.micro_states[key] = state0
state = state0
[docs]
def call(self, verbose=False, ret_all=None, itime=None, iiter=None):
"""
Call the homogenization engine and compute the homogenized
coefficients.
Parameters
----------
verbose : bool
If True, print the computed coefficients.
ret_all : bool or None
If not None, it can be used to override the 'return_all' option.
If True, also the dependencies are returned.
time_tag: str
The time tag used in file names.
Returns
-------
coefs : Coefficients instance
The homogenized coefficients.
dependencies : dict
The dependencies, if `ret_all` is True.
"""
opts = self.app_options
ret_all = get_default(ret_all, opts.return_all)
force_init_he = hasattr(self.problem, 'force_init_he')\
and self.problem.force_init_he
if not hasattr(self, 'he') or force_init_he:
volumes = {}
if hasattr(opts, 'volumes') and (opts.volumes is not None):
volumes.update(opts.volumes)
elif hasattr(opts, 'volume') and (opts.volume is not None):
volumes['total'] = opts.volume
else:
volumes['total'] = 1.0
self.he = HomogenizationEngine(self.problem, self.options,
volumes=volumes)
if self.micro_states is not None:
self.update_micro_states()
self.he.set_micro_states(self.micro_states)
multiproc_mode = None
if opts.multiprocessing and multi.use_multiprocessing:
multiproc, multiproc_mode = multi.get_multiproc(mpi=opts.use_mpi)
if multiproc_mode is not None:
upd_var = self.app_options.mesh_update_variable
if upd_var is not None:
uvar = self.problem.create_variables([upd_var])[upd_var]
uvar.field.mappings0 = multiproc.get_dict('mappings0',
soft_set=True)
per.periodic_cache = multiproc.get_dict('periodic_cache',
soft_set=True)
time_tag = ('' if itime is None else '_t%03d' % itime)\
+ ('' if iiter is None else '_i%03d' % iiter)
aux = self.he(ret_all=ret_all, time_tag=time_tag)
if ret_all:
coefs, dependencies = aux
# store correctors for coors update
self.updating_corrs = {}
for upd_obj in opts.micro_update.values():
if upd_obj is not None and not hasattr(upd_obj, '__call__'):
for v in upd_obj:
cr = v[0]
if cr is not None:
self.updating_corrs[cr] = dependencies[cr]
else:
coefs = aux
if coefs is not None:
coefs = Coefficients(**coefs.to_dict())
if verbose:
prec = nm.get_printoptions()['precision']
if hasattr(opts, 'print_digits'):
nm.set_printoptions(precision=opts.print_digits)
print(coefs)
nm.set_printoptions(precision=prec)
ms_cache = self.micro_state_cache
for ii in self.app_options.store_micro_idxs:
for k in self.micro_states.keys():
key = self.get_micro_cache_key(k, ii, itime)
ms_cache[key] = self.micro_states[k][ii]
coef_save_name = op.join(opts.output_dir, opts.coefs_filename)
coefs.to_file_hdf5(coef_save_name + '%s.h5' % time_tag)
coefs.to_file_txt(coef_save_name + '%s.txt' % time_tag,
opts.tex_names,
opts.float_format)
if ret_all:
return coefs, dependencies
else:
return coefs