from copy import copy
from functools import partial
import os.path as osp
from sfepy.base.base import output, get_default, Struct
from sfepy.applications import PDESolverApp, Application
from .coefs_base import MiniAppBase, CoefEval
from sfepy.discrete.evaluate import eval_equations
import sfepy.homogenization.multiproc as multiproc
[docs]
def insert_sub_reqs(reqs, levels, req_info):
"""Recursively build all requirements in correct order."""
all_reqs = []
for _, req in enumerate(reqs):
# Coefficients are referenced as 'c.<name>'...
areq = req[2:] if req.startswith('c.') else req
try:
rargs = req_info[areq]
except KeyError:
raise ValueError('requirement "%s" is not defined!' % req)
sub_reqs = rargs.get('requires', [])
if req in levels:
raise ValueError('circular requirement "%s"!' % (req))
if sub_reqs:
levels.append(req)
sreqs = insert_sub_reqs(sub_reqs, levels, req_info)
all_reqs += [ii for ii in sreqs if ii not in all_reqs]
levels.pop()
if req not in all_reqs:
all_reqs.append(req)
return all_reqs
[docs]
def get_dict_idxval(dict_array, idx):
return {k: v[idx] for k, v in dict_array.items()}
[docs]
def deps_to_corrs(problem, dependencies, keys=None):
if keys is None:
keys = [k for k in dependencies.keys() if not isinstance(k, tuple)]
vs = problem.get_variables(auto_create=True)
out = {}
for k in keys:
if not k.startswith('c.'):
v = dependencies[k]
if hasattr(v, 'get_output'):
out[k] = v.get_output(variables=vs)
else:
out[k] = v
return out
[docs]
def rm_chtag(key):
idx = key.find('|ch:')
return key[:idx] if idx > 0 else key
[docs]
def chunk_req_info(name, req_info):
idx = name.find('|ch:')
if idx > 0:
name0, ctag = name[:idx], name[idx:]
out = req_info[name0].copy()
reqs = out.get('requires', [])
if len(reqs) > 0:
out['requires'] = [k + ctag for k in reqs]
return out
else:
return req_info[name]
[docs]
class CoefVolume(MiniAppBase):
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
return eval_equations(equations, variables, term_mode=term_mode)
[docs]
class HomogenizationWorker:
def __call__(self, problem, options, post_process_hook,
req_info, coef_info, micro_states, store_micro_idxs,
chunk_size=None, time_tag=''):
"""Calculate homogenized correctors and coefficients.
Parameters
----------
problem : problem
The problem definition - microscopic problem.
opts : struct
The options of the homogenization application.
post_process_hook : function
The postprocessing hook.
req_info : dict
The definition of correctors.
coef_info : dict
The definition of homogenized coefficients.
micro_states : array
The configurations of multiple microstructures.
store_micro_idxs : list of int
The indices of microstructures whose results are to be stored.
chunk_size: int or None
The number of microstructures in one chunks
time_tag : str
The label corresponding to the actual time step and iteration,
used in the corrector file names.
Returns
-------
dependencies : dict
The computed correctors and coefficients.
save_names : list
The names of computed dependencies.
"""
dependencies = {}
save_names = {}
sorted_names = self.get_sorted_dependencies(req_info, coef_info,
options.compute_only)
for name in sorted_names:
if not name.startswith('c.'):
if micro_states is not None:
req_info[name]['store_idxs'] = (store_micro_idxs, 0)
val, sn = self.calculate_req(problem, options, post_process_hook,
req_info, coef_info, dependencies,
micro_states, time_tag,
None, name, '0')
dependencies[name] = val
save_names.update(sn)
return dependencies, save_names
[docs]
@staticmethod
def get_sorted_dependencies(req_info, coef_info, compute_only):
"Make corrs and coefs list sorted according to the dependencies."
reqcoef_info = copy(coef_info)
reqcoef_info.update(req_info)
compute_names = set(get_default(compute_only, list(coef_info.keys())))
compute_names = ['c.' + key for key in compute_names]
dep_names = []
for coef_name in compute_names:
requires = coef_info[coef_name[2:]].get('requires', [])
deps = insert_sub_reqs(copy(requires), [], reqcoef_info)\
+ [coef_name]
for dep in deps:
if dep not in dep_names:
dep_names.append(dep)
return dep_names
[docs]
@staticmethod
def calculate(mini_app, problem, dependencies, dep_requires,
save_names, micro_states, chunk_tab, mode, proc_id):
data = {rm_chtag(key): dependencies[key] for key in dep_requires
if 'Volume_' not in key}
volume = {rm_chtag(key[9:]): dependencies[key] for key in dep_requires
if 'Volume_' in key}
mini_app.requires = [rm_chtag(ii) for ii in mini_app.requires
if 'c.Volume_' not in ii]
if micro_states is None:
if mode == 'coefs':
val = mini_app(volume, data=data)
else:
if mini_app.save_name is not None:
save_names[mini_app.name] = mini_app.get_save_name_base()
val = mini_app(data=data)
else:
if '|ch:' in mini_app.name and chunk_tab is not None:
chunk_id = int(mini_app.name.split('|ch:')[1])
chunk_tag = f'-{chunk_id}'
local_state = \
{k: v[chunk_tab[chunk_id]] if v is not None else None
for k, v in micro_states.items()}
else:
chunk_tag = ''
local_state = micro_states
val = []
if hasattr(mini_app, 'store_idxs') and mode == 'reqs':
save_name = mini_app.save_name
local_coors = local_state['coors']
for im in range(len(local_coors)):
output(f'== micro {proc_id}{chunk_tag}-{im} ==')
problem.micro_state = (local_state, im)
problem.set_mesh_coors(local_coors[im], update_fields=True,
clear_all=False, actual=True)
if mode == 'coefs':
val.append(mini_app(get_dict_idxval(volume, im),
data=get_dict_idxval(data, im)))
else:
if hasattr(mini_app, 'store_idxs')\
and im in mini_app.store_idxs[0]:
store_id = '_%04d' % (mini_app.store_idxs[1] + im)
if save_name is not None:
mini_app.save_name = save_name + store_id
key = mini_app.name
if key in save_names:
save_names[key].append(
mini_app.get_save_name_base())
else:
save_names[key] =\
[mini_app.get_save_name_base()]
else:
mini_app.save_name = None
val.append(mini_app(data=get_dict_idxval(data, im)))
if len(val) == 1 and val[0].name == 'update_coors':
local_coors[im] += val[0].state
return val
[docs]
@staticmethod
def calculate_req(problem, opts, post_process_hook,
req_info, coef_info, dependencies,
micro_states, time_tag, chunk_tab, name, proc_id):
"""Calculate a requirement, i.e. correctors or coefficients.
Parameters
----------
problem : problem
The problem definition related to the microstructure.
opts : struct
The options of the homogenization application.
post_process_hook : function
The postprocessing hook.
name : str
The name of the requirement.
req_info : dict
The definition of correctors.
coef_info : dict
The definition of homogenized coefficients.
dependencies : dict
The dependencies required by the correctors/coefficients.
micro_states : array
The configurations of multiple microstructures.
time_tag : str
The label corresponding to the actual time step and iteration,
used in the corrector file names.
chunk_tab : list
In the case of multiprocessing the requirements are divided into
several chunks that are solved in parallel.
proc_id : int
The id number of the processor (core) which is solving the actual
chunk.
Returns
-------
val : coefficient/corrector or list of coefficients/correctors
The resulting homogenized coefficients or correctors.
save_names : dict
The dictionary containing names of saved correctors.
"""
save_names = {}
# compute coefficient
if name.startswith('c.'):
output('computing %s...' % name[2:])
cargs = chunk_req_info(name[2:], coef_info)
mini_app = MiniAppBase.any_from_conf(name[2:], problem, cargs)
problem.clear_equations()
# Pass only the direct dependencies, not the indirect ones.
dep_requires = cargs.get('requires', [])
val = HomogenizationWorker.calculate(mini_app, problem,
dependencies, dep_requires,
save_names, micro_states,
chunk_tab, 'coefs', proc_id)
output('...done')
# compute corrector(s)
else:
output('computing dependency %s...' % name)
rargs = chunk_req_info(name, req_info)
mini_app = MiniAppBase.any_from_conf(name, problem, rargs)
mini_app.setup_output(save_formats=opts.save_formats,
post_process_hook=post_process_hook,
split_results_by=opts.split_results_by)
if mini_app.save_name is not None:
mini_app.save_name += time_tag
problem.clear_equations()
# Pass only the direct dependencies, not the indirect ones.
dep_requires = rargs.get('requires', [])
val = HomogenizationWorker.calculate(mini_app, problem,
dependencies, dep_requires,
save_names, micro_states,
chunk_tab, 'reqs', proc_id)
output('...done')
return val, save_names
[docs]
@staticmethod
def recover_micro(problem, corrs, rhook, macro_data):
out = []
corrs_dict = deps_to_corrs(problem, corrs)
for local_macro, label in macro_data:
output(label, verbose=True)
out.append(rhook(problem, corrs_dict, local_macro))
return out
[docs]
@staticmethod
def process_reqs_coefs(orig, nchunk, store_idxs=[]):
new = {}
for k, v in orig.items():
if k == 'filenames':
continue
for ii in range(nchunk):
lab = f'|ch:{ii}'
key = k + lab
val = new[key] = {}
if 'requires' in v:
val['requires'] = [jj + lab for jj in v['requires']]
if len(store_idxs) > 0:
if len(store_idxs[ii][0]) > 0:
val['store_idxs'] = store_idxs[ii]
else:
val['save_name'] = None
return new
[docs]
class HomogenizationWorkerMulti(HomogenizationWorker):
[docs]
@staticmethod
def calculate_req(*args):
import sfepy.discrete.fem.periodic as per
proc_id = ''.join(k for k in multiproc.get_proc_id() if k.isdigit())
output.set_output_prefix(f'he-w{proc_id}:')
new_args = args + (proc_id, )
problem, dependencies = args[0], args[5]
for dk in dependencies.keys():
if isinstance(dk, tuple):
if dk[0] == 'periodic_cache':
if dk[1] not in per.periodic_cache:
per.periodic_cache[dk[1]] = dependencies[dk]
elif dk[0] == 'mappings0':
if dk[2] not in problem.fields[dk[1]].mappings0:
problem.fields[dk[1]].mappings0[dk[2]] = dk[3]
out = HomogenizationWorker.calculate_req(*new_args)
for k, v in per.periodic_cache.items():
dkey = ('periodic_cache', k)
if dkey not in dependencies:
dependencies[dkey] = v
for fk, fv in problem.fields.items():
for mk, mv in fv.mappings0.items():
dkey = ('mappings0', fk, mk, mv)
if dkey not in dependencies:
dependencies[dkey] = mv
return out
[docs]
@staticmethod
def recover_req(*args):
proc_id = ''.join(k for k in multiproc.get_proc_id() if k.isdigit())
output.set_output_prefix(f'he-w{proc_id}:')
new_args = args + (proc_id, )
return HomogenizationWorker.calculate_req(*new_args)
[docs]
@staticmethod
def rhook_call(problem, dependencies, rhook, macro_data):
proc_id = ''.join(k for k in multiproc.get_proc_id() if k.isdigit())
output.set_output_prefix(f'he-w{proc_id}:')
corrs = deps_to_corrs(problem, dependencies)
local_macro, label = macro_data
output(label, verbose=True)
return rhook(problem, corrs, local_macro)
[docs]
@staticmethod
def recover_micro(problem, corrs, rhook, macro_data):
dependencies = multiproc.get_dict('dependencies')
dependencies.update({k: v for k, v in corrs.items()
if k not in dependencies})
workers = multiproc.get_workers()
if workers is None:
conf_file_dir = osp.split(problem.conf.__file__)[0]
max_workers = problem.conf.options.get('max_workers', None)
workers = multiproc.init_workers(conf_file_dir, max_workers)
rhook = problem.conf.options.get('recovery_hook', None)
rhook = problem.conf.get_function(rhook)
wfun = partial(HomogenizationWorkerMulti.rhook_call,
problem, dependencies, rhook)
out = [k for k in workers.map(wfun, macro_data)]
return out
def __call__(self, problem, options, post_process_hook,
req_info, coef_info, micro_states, store_micro_idxs,
chunk_size, time_tag=''):
"""Calculate homogenized correctors and coefficients in separated
processes.
Parameters
----------
The same parameters as :class:`HomogenizationWorker`, extended by:
chunk_size : int
The number of chunks per one worker.
Returns
-------
The same returns as :class:`HomogenizationWorker`.
"""
dependencies = multiproc.get_dict('dependencies', clear=True)
if micro_states is not None and chunk_size is not None:
micro_chunk_tab, req_info_ch, coef_info_ch = \
self.chunk_micro_tasks(len(micro_states['coors']),
req_info, coef_info,
chunk_size, store_micro_idxs)
else:
micro_chunk_tab = None
req_info_ch, coef_info_ch = req_info, coef_info
workers = multiproc.get_workers()
if workers is None:
conf_file_dir = osp.split(problem.conf._filename)[0]
max_workers = problem.conf.options.get('max_workers', None)
workers = multiproc.init_workers(conf_file_dir, max_workers)
wfun = partial(HomogenizationWorkerMulti.calculate_req, problem,
options, post_process_hook, req_info, coef_info,
dependencies, micro_states, time_tag, micro_chunk_tab)
sorted_names = self.get_sorted_dependencies(req_info_ch, coef_info_ch,
options.compute_only)
# calculate number of dependencies and inverse map
numdeps = {}
inverse_deps = {}
for name in sorted_names:
if name.startswith('c.'):
reqs = coef_info_ch[name[2:]].get('requires', [])
else:
reqs = req_info_ch[name].get('requires', [])
numdeps[name] = len(reqs)
if len(reqs) > 0:
for req in reqs:
if req in inverse_deps:
inverse_deps[req].append(name)
else:
inverse_deps[req] = [name]
save_names = {}
remaining = len(sorted_names)
while remaining > 0:
if len(numdeps) > 0:
tasks = [k for k, v in numdeps.items() if v == 0]
numdeps = {k: v for k, v in numdeps.items() if v > 0}
for task, (dep, snames) in zip(tasks, workers.map(wfun, tasks)):
dependencies[task] = dep
save_names.update(snames)
if task in inverse_deps:
for itask in inverse_deps[task]:
numdeps[itask] -= 1 # itask depends on task
remaining -= 1
if micro_chunk_tab is not None:
dependencies = self.dechunk_reqs_coefs(dependencies,
len(micro_chunk_tab))
else:
dependencies = dependencies.copy()
return dependencies, save_names
[docs]
@staticmethod
def chunk_micro_tasks(num_micro, reqs, coefs,
chunk_size, store_micro_idxs=[]):
"""
Split multiple microproblems into several chunks
that can be processed in parallel.
Parameters
----------
num_micro : int
The number of microstructures.
reqs : dict
The requirement definitions.
coefs : dict
The coefficient definitions.
chunk_size : int
The number of microproblems per chunk.
store_micro_idxs : list of int
The indices of microstructures whose results are to be stored.
Returns
-------
micro_tab : list of slices
The indices of microproblems contained in each chunk.
new_reqs : dict
The new requirement definitions - .
new_coefs : dict
The new coefficient definitions.
"""
micro_tab = []
store_idxs = []
for ii in range(0, num_micro, chunk_size):
jj = chunk_size + ii
micro_tab.append(slice(ii, min([num_micro, jj])))
if len(store_micro_idxs) > 0:
store_idxs.append(([k - ii for k in store_micro_idxs
if k >= ii and k < jj], ii))
nchunk = len(micro_tab)
self = HomogenizationWorkerMulti
new_reqs = self.process_reqs_coefs(reqs, nchunk, store_idxs)
new_coefs = self.process_reqs_coefs(coefs, nchunk)
return micro_tab, new_reqs, new_coefs
[docs]
@staticmethod
def dechunk_reqs_coefs(deps, num_chunks):
"""
Merge the results related to the multiple microproblems.
Parameters
----------
deps : dict
The calculated dependencies.
num_chunks : int
The number of chunks.
Returns
-------
new_deps : dict
The merged dependencies.
"""
new_deps = {}
for dep in set([rm_chtag(k) for k in deps.keys()
if not isinstance(k, tuple)]):
new_deps[dep] = sum([deps[f'{dep}|ch:{ii}']
for ii in range(num_chunks)], [])
return new_deps
[docs]
class HomogenizationEngine(PDESolverApp):
[docs]
@staticmethod
def process_options(options):
get = options.get
return Struct(coefs=get('coefs', None,
'missing "coefs" in options!'),
requirements=get('requirements', None,
'missing "requirements" in options!'),
compute_only=get('compute_only', None),
multiprocessing=get('multiprocessing', True),
store_micro_idxs=get('store_micro_idxs', []),
chunk_size=get('chunk_size', 1),
chunks_per_worker=get('chunks_per_worker', None),
save_formats=get('save_formats', ['vtk', 'h5']),
coefs_info=get('coefs_info', None))
def __init__(self, problem, options, app_options=None,
volumes=None, output_prefix='he:', **kwargs):
"""Bypasses PDESolverApp.__init__()!"""
Application.__init__(self, problem.conf, options, output_prefix,
**kwargs)
self.problem = problem
self.setup_options(app_options=app_options)
self.setup_output_info(self.problem, self.options)
self.volumes = volumes
self.micro_states = None
[docs]
def setup_options(self, app_options=None):
PDESolverApp.setup_options(self)
app_options = get_default(app_options, self.conf.options)
po = HomogenizationEngine.process_options
self.app_options += po(app_options)
[docs]
def set_micro_states(self, states):
self.micro_states = states
[docs]
@staticmethod
def define_volume_coef(coef_info, volumes):
"""
Define volume coefficients and make all other dependent on them.
Parameters
----------
coef_info : dict
The coefficient definitions.
volumes : dict
The definitions of volumes.
Returns
-------
coef_info : dict
The coefficient definitions extended by the volume coefficients.
"""
vcfkeys = []
cf_vols = {}
for vk, vv in volumes.items():
cfkey = 'Volume_%s' % vk
vcfkeys.append('c.' + cfkey)
if 'value' in vv:
cf_vols[cfkey] = {'expression': '%e' % float(vv['value']),
'class': CoefEval}
else:
cf_vols[cfkey] = {'expression': vv['expression'],
'class': CoefVolume}
for cf in coef_info.values():
if 'requires' in cf:
cf['requires'] += vcfkeys
else:
cf['requires'] = vcfkeys
coef_info.update(cf_vols)
return coef_info
[docs]
def recover(self, corrs, rhook, macro_data):
if self.app_options.multiprocessing and multiproc.use_multiprocessing:
rcall = HomogenizationWorkerMulti.recover_micro
else:
rcall = HomogenizationWorker.recover_micro
return rcall(self.problem, corrs, rhook, macro_data)
[docs]
def call(self, ret_all=False, time_tag=''):
problem = self.problem
opts = self.app_options
# Some coefficients can require other coefficients - resolve their
# order here.
req_info = getattr(self.conf, opts.requirements, {})
coef_info = getattr(self.conf, opts.coefs, {})
coef_info = self.define_volume_coef(coef_info, self.volumes)
is_store_filenames = coef_info.pop('filenames', None) is not None
if opts.multiprocessing and multiproc.use_multiprocessing:
worker = HomogenizationWorkerMulti()
if (self.micro_states is not None
and opts.chunks_per_worker is not None):
nch = (multiproc.cpu_count() - 1) * opts.chunks_per_worker
# ceil division
chunk_size = -(len(self.micro_states['coors']) // -nch)
else:
chunk_size = opts.chunk_size
else: # no multiprocessing
worker = HomogenizationWorker()
chunk_size = None
dependencies, save_names = worker(problem, opts,
self.post_process_hook,
req_info, coef_info,
self.micro_states,
opts.store_micro_idxs,
chunk_size, time_tag)
deps = {}
if dependencies is not None:
coefs = Struct()
for name in dependencies.keys():
if isinstance(name, tuple):
continue
data = dependencies[name]
if name.startswith('c.'):
coef_name = name[2:]
cstat = coef_info[coef_name].get('status', 'main')
# remove "auxiliary" coefs
if not cstat == 'auxiliary':
setattr(coefs, coef_name, data)
else:
deps[name] = data
# Store filenames of all requirements as a "coefficient".
if is_store_filenames and save_names is not None:
coefs.save_names = save_names
if opts.coefs_info is not None:
coefs.info = opts.coefs_info
if ret_all:
return coefs, deps
else:
return coefs