Developer Guide

This section purports to document the fish_heart internals.

Module Index

Complete Problem Definition File

# -*- coding: utf-8 -*-
import os
import numpy as np
from sfepy.solvers.ts import TimeStepper
from sfepy.base.log import Log

#######################################################################
# Options.
#######################################################################

filename_mesh = 'meshes/rs2.mesh'
output_dir = 'output/aux3'

basename = os.path.splitext(os.path.basename('meshes/rs2.mesh'))[0]
convergence_log = basename + '_log.txt'
convergence_fig = basename + '_log.png'

loads_log = basename + '_loads_log.txt'
loads_fig = basename + '_loads_log.png'

globals_log = basename + '_globals_log.txt'
globals_fig = basename + '_globals_log.png'

# Time-stepping parameters.
t0 = 0.0
t1 = 2.0
n_step = 81

ts = TimeStepper(t0, t1, None, n_step)

# Sphere radii.
r0 = 1.0
r1 = 2.0
r2 = 2.5

eps = 1e-8

# Number of periods.
n_period = 2

#######################################################################

cwd = os.path.split(os.path.join(os.getcwd(), __file__))[0]
def incwd(filename):
    """Prepend the directory of this file to filename."""
    return os.path.join(cwd, filename)

def inodir(filename):
    """Prepend the output directory to filename."""
    odir = incwd(output_dir)
    if not os.path.exists(odir):
            os.makedirs(odir)

    return os.path.join(odir, filename)

options = {
    'nls' : 'newton',
    'ls' : 'ls_d',
    'ts' : 'ts',
    'save_steps' : -1,
    'output_format' : 'h5',
    'output_dir' : incwd(output_dir),
    'post_process_hook' : 'post_process',
}

# Log simulation parameters: activation of fibres and loads.
plog = Log([[r'$A_1$', r'$A_2$'], [r'$\pi$', r'$p$']],
           xlabels=['time [s]', 'time [s]'],
           ylabels=['activation', 'pressure'],
           yscales=['linear', 'linear'],
           is_plot=True,
           log_filename=inodir(loads_log),
           formats=[['%.8f', '%.8f'], ['%.8g', '%.8g']])

# Log global indicators: fluid volume.
glog = Log([[r'$V_F$']],
           xlabels=['time [s]'],
           ylabels=['fluid volume'],
           yscales=['linear'],
           is_plot=True,
           log_filename=inodir(globals_log),
           formats=[['%.8f', '%.8f']])


if filename_mesh.find('rs1') >= 0:
    regions = {
        'Omega' : ('all', {}),
        'Omega1' : ('elements of group 1', {'forbid' : 'group 2 3 4'}),
        'Omega2' : ('elements of group 2', {'forbid' : 'group 1 3 4'}),
        'Omega12' : ('r.Omega1 +e r.Omega2', {}),
        'Gamma' : ('nodes of surface', {}),
        'Gamma0' : ('r.Gamma *n r.Omega1', {'can_cells' : True,
                                            'forbid' : 'group 3 4'}),
        'Gamma2' : ('r.Gamma *n r.Omega2', {'forbid' : 'group 3 4'}),
        'Fix' : ('r.Gamma *n elements of group 3', {}),
    }
elif filename_mesh.find('rs2') >= 0:
    regions = {
        'Omega' : ('all', {}),
        'Omega1' : ('elements of group 1', {'forbid' : 'group 2 3 4'}),
        'Omega2' : ('elements of group 2', {'forbid' : 'group 1 3 4'}),
        'Omega12' : ('r.Omega1 +e r.Omega2', {}),
        'Gamma0' : ('nodes by select_r0', {'can_cells' : True,
                                           'forbid' : 'group 3 4'}),
        'Gamma2' : ('nodes by select_r2', {'forbid' : 'group 3 4'}),
        'Fix012' : ('node 120', {}),
        'Fix02' : ('node 166', {}),
        'Fix2' : ('nodes in (z < %.12e)' % eps, {}),
    }
else:
    raise ValueError('unknown mesh! (%s)' % filename_mesh)

# Volume fractions in compact myocardium
vf_matrix = 0.5
vf_fibres1 = 0.2
vf_fibres2 = 0.3

# Volume fractions in spongious myocardium
vf_solid = 0.5
vf_fluid = 0.5

materials = {
    # Perfused solid.
    'ps' : ('Omega1', {
        'mu' : 20e0, # shear modulus of neoHookean term
        'k'  : ts.dt * np.eye(3, dtype=np.float64), # reference permeability
        'N_f' : 1.0, # reference porosity
    }),
    # Solid.
    's' : ('Omega2', {
        'K'  : vf_matrix * 1e3, # bulk modulus
        'mu' : vf_matrix * 20e0, # shear modulus of neoHookean term
    }),
    # Fibres.
    'f1' : ('Omega2', None, 'get_pars_fibres1'),
    'f2' : ('Omega2', None, 'get_pars_fibres2'),
    # Surface pressure traction.
    'traction' : ('Gamma0', None, 'get_traction'),
}

fields = {
    'displacement': ((3,1), 'real', 'Omega12', {'Omega12' : '3_4_P1'}),
    'pressure'    : ((1,1), 'real', 'Omega1', {'Omega1' : '3_4_P1'}),
}

variables = {
    'u' : ('unknown field', 'displacement', 0, 'previous'),
    'v' : ('test field', 'displacement', 'u'),
    'p' : ('unknown field', 'pressure', 1),
    'q' : ('test field', 'pressure', 'p'),
}

##
# Dirichlet BC.
ebcs = {
    'pressure' : ('Gamma0', {'p.0' : 'get_pressure'}),
}

if filename_mesh.find('rs1') >= 0:
    ebcs['fix'] = ('Fix', {'u.all' : 0.0})

elif filename_mesh.find('rs2') >= 0:
    ebcs['fix012'] = ('Fix012', {'u.all' : 0.0})
    ebcs['fix02'] = ('Fix02', {'u.[0,2]' : 0.0})
    ebcs['fix2'] = ('Fix2', {'u.2' : 0.0})

##
# Balance of forces.
integrals = {
    'i1' : ('v', 'gauss_o1_d3'),
    'i2' : ('s3', 'gauss_o2_d2'),
}

equations = {
    'force_balance'
        : """dw_tl_he_neohook.i1.Omega1( ps.mu, v, u )
           + dw_tl_bulk_pressure.i1.Omega1( v, u, p )
           + dw_tl_he_neohook.i1.Omega2( s.mu, v, u )
           + dw_tl_bulk_penalty.i1.Omega2( s.K, v, u )
           + dw_tl_fib_a.i1.Omega2( f1.fmax, f1.eps_opt, f1.s, f1.fdir, f1.act,
                                   v, u )
           + dw_tl_fib_a.i1.Omega2( f2.fmax, f2.eps_opt, f2.s, f2.fdir, f2.act,
                                   v, u )
           + dw_tl_surface_traction.i2.Gamma0( traction.pressure, v, u )
           = 0""",
    'mass_balance'
        : """dw_tl_volume.i1.Omega1( q, u )
           + dw_tl_diffusion.i1.Omega1( ps.k, ps.N_f, q, p, u[-1])
           = dw_tl_volume.i1.Omega1( q, u[-1] )"""
}

def post_process(out, problem, state, extend=False):
    from sfepy.base.base import Struct, debug
    from sfepy.fem import extend_cell_data

    val = problem.evaluate("""dw_tl_he_neohook.i1.Omega1( ps.mu, v, u )
                            + dw_tl_he_neohook.i1.Omega2( s.mu, v, u )""",
                           state, call_mode='de_strain')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega12')
    out['green_strain'] = Struct(name = 'output_data',
                                 mode = 'cell', data = val,
                                 dofs = None)

    val = problem.evaluate("""dw_tl_he_neohook.i1.Omega1( ps.mu, v, u )
                            + dw_tl_he_neohook.i1.Omega2( s.mu, v, u )""",
                           state, call_mode='de_stress')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega12')
    out['neohook_stress'] = Struct(name = 'output_data',
                                   mode = 'cell', data = val,
                                   dofs = None)

    val = problem.evaluate("""dw_tl_bulk_pressure.i1.Omega1( v, u, p )
                            + dw_tl_bulk_penalty.i1.Omega2( s.K, v, u )""",
                           state, call_mode='de_stress')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega12')
    out['bulk_pressure'] = Struct(name = 'output_data',
                                  mode = 'cell', data = val,
                                  dofs = None)

    val = problem.evaluate("""dw_tl_fib_a.i1.Omega2( f1.fmax, f1.eps_opt,
                                                     f1.s, f1.fdir, f1.act,
                                                     v, u )""",
                           state, call_mode='de_stress')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega2')
    out['fibre_stress_1'] = Struct(name = 'output_data',
                                   mode = 'cell', data = val,
                                   dofs = None)

    val = problem.evaluate("""dw_tl_fib_a.i1.Omega2( f2.fmax, f2.eps_opt,
                                                     f2.s, f2.fdir, f2.act,
                                                     v, u )""",
                           state, call_mode='de_stress')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega2')
    out['fibre_stress_2'] = Struct(name = 'output_data',
                                   mode = 'cell', data = val,
                                   dofs = None)

    val = problem.evaluate("""dw_tl_diffusion.i1.Omega1( ps.k, ps.N_f,
                                                         q, p, u[-1] )""",
                           state, call_mode='de_diffusion_velocity')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega1')
    out['diffusion_velocity'] = Struct(name = 'output_data',
                                       mode = 'cell', data = val,
                                       dofs = None)

    val = problem.evaluate("""dw_tl_volume.i1.Omega1( q, u )""",
                           state, call_mode='de_rel_volume')
    if extend: val = extend_cell_data(val, problem.domain, 'Omega1')
    out['J'] = Struct(name = 'output_data',
                      mode = 'cell', data = val,
                      dofs = None)

    if problem.ts.step == 0:
        dirs = {}

        val = problem.evaluate("""de_volume_average_mat.i1.Omega2(f1.fdir, u,
                                                                  shape)""",
                               state, shape=(3, 1))
        if extend: val = extend_cell_data(val, problem.domain, 'Omega2', val=0.0)
        dirs['dir1'] = Struct(name = 'output_data',
                              mode = 'cell', data = val,
                              dofs = None)

        val = problem.evaluate("""de_volume_average_mat.i1.Omega2(f2.fdir, u,
                                                                  shape)""",
                               state, shape=(3, 1))
        if extend: val = extend_cell_data(val, problem.domain, 'Omega2', val=0.0)
        dirs['dir2'] = Struct(name = 'output_data',
                              mode = 'cell', data = val,
                              dofs = None)

        problem.save_state(problem.ofn_trunk + '_dirs.vtk', out=dirs)

    # Log the simulation parameters.
    args = (problem.ts, np.array([[0,0,0]]), 'qp')

    act1 = problem.materials['f1'].function(*args)['act'].squeeze()
    act2 = problem.materials['f2'].function(*args)['act'].squeeze()
    pi = problem.materials['traction'].function(*args)['pressure'].squeeze()[0,0]

    p = problem.variables['p'].eq_map.val_ebc[0]

    ts = problem.ts
    plog(act1, act2, pi, p, x=[ts.time, ts.time])

    aux = ts.times[np.linspace(0, ts.n_step-1, 2*n_period+1).astype(np.int32)]
    aux = (aux / ts.dt).astype(np.int32)

    if hasattr(np, 'in1d'):
        in1d = np.in1d
    else:
        in1d = np.setmember1d

    if in1d([int(ts.time / ts.dt)], aux):
        plog.plot_vlines(color='r', linewidth=1)

    plog(save_figure=inodir(loads_fig))

    # Log the global indicators.
    val = problem.evaluate("""dw_tl_volume.i1.Omega1( q, u )""",
                           state, call_mode='de_volume')
    total_fluid_volume = vf_fluid * val.sum()

    glog(total_fluid_volume, x=[ts.time])
    glog(save_figure=inodir(globals_fig))

    return out

##
# Solvers etc.
solvers = {
    'ls_d' : ('ls.scipy_direct', {}),
    'ls_i' : ('ls.petsc', {
        'method' : 'bcgsl', # ksp_type
        'precond' : 'ilu', # pc_type
        'eps_a' : 1e-12, # abstol
        'eps_r' : 1e-12, # rtol
        'i_max' : 400, # maxits
    }),
    'newton' : ('nls.newton', {
        'i_max'      : 7,
        'eps_a'      : 1e-10,
        'eps_r'      : 1.0,
        'macheps'    : 1e-16,
        'lin_red'    : 1e-2, # Linear system error < (eps_a * lin_red).
        'ls_red'     : 0.1,
        'ls_red_warp': 0.001,
        'ls_on'      : 1.1,
        'ls_min'     : 1e-5,
        'check'      : 0,
        'delta'      : 1e-6,
        'is_plot'    : False,
        'log'        : {'text' : inodir(convergence_log),
                        'plot' : inodir(convergence_fig)},
        'problem'    : 'nonlinear', # 'nonlinear' or 'linear' (ignore i_max)
    }),
    'ts' : ('ts.simple', {
        't0'    : t0,
        't1'    : t1,
        'dt'    : None,
        'n_step' : n_step, # has precedence over dt!
        'quasistatic' : False,
    }),
}

##
# FE assembling parameters.
fe = {
    'chunk_size' : 100000,
    'cache_override' : False,
}

##
# Functions.
def select_radius(coors, domain=None, radius=0.0, sense='in'):
    """Select nodes within (sense == 'in') or outside (sense == 'out') a
    specified radius."""
    x, y, z = coors[:,0], coors[:,1], coors[:,2]
    
    r = np.sqrt(x**2 + y**2 + z**2)

    if sense == 'in':
        out = np.where((r <= radius))[0]

    elif sense == 'out':
        out = np.where((r >= radius))[0]

    else:
        raise ValueError('unknown sense! (%s)' % sense)
    
    return out

def get_scalar_pressure(tt):
    """Get scalar pressure given the normalized time tt."""

    tt = max(tt - 0.0, 0.0)

    val = 1e-0 * np.sin(tt)

    return val

def get_traction(ts, coors, mode=None):
    """
    Pressure traction.
    
    Parameters
    ----------
    ts : TimeStepper
        Time stepping info.
    coors : array_like
        The physical domain coordinates where the parameters shound be defined.
    mode : 'qp' or 'special'
        Call mode.
    """
    if mode != 'qp': return

    tt = ts.nt * 2.0 * n_period * np.pi

    dim = coors.shape[1]
    val = get_scalar_pressure(tt) * np.eye(dim, dtype=np.float64)

    shape = (coors.shape[0], 1, 1)
    out = {
        'pressure' : np.tile(val, shape),
    }

    return out

def get_pressure(ts, coor, bc):
    """Internal pressure Dirichlet boundary condition."""
    tt = ts.nt * 2.0 * n_period * np.pi

    val = np.zeros((coor.shape[0],), dtype=np.float64)

    val[:] = 0.9 * get_scalar_pressure(tt)

    return val

def get_pars_fibres(ts, coors, mode=None, which=0, vf=1.0):
    """
    Parameters
    ----------
    ts : TimeStepper
        Time stepping info.
    coors : array_like
        The physical domain coordinates where the parameters shound be defined.
    mode : 'qp' or 'special'
        Call mode.
    which : int
        Fibre system id.
    vf : float
        Fibre system volume fraction.
    """
    if mode != 'qp': return

    fmax = 50.0
    eps_opt = 0.01
    s = 1.0

    tt = ts.nt * 2.0 * n_period * np.pi

    x, y, z = coors[:,0], coors[:,1], coors[:,2]
    # Spherical coordinates.
    r = np.sqrt(x**2 + y**2 + z**2)
    theta = np.arccos(z / r)[:,None]
    phi = np.arctan2(y, x)[:,None]

    at = max(tt - np.pi / 2.0, 0.0)

    if which == 0: # system 1 - meridians
        fdir = np.c_[np.cos(theta) * np.cos(phi),
                     np.cos(theta) * np.sin(phi),
                     - np.sin(theta)]
        act = 0.5 * (1.0 + np.sin(at - (0.5 * np.pi)))

    elif which == 1: # system 2 - parallels
        fdir = np.c_[-np.sin(phi), np.cos(phi), np.zeros_like(phi)]
        act = 0.5 * (1.0 + np.sin(at - (0.5 * np.pi)))

    else:
        raise ValueError('unknown fibre system! (%d)' % which)

    # Normalize fibre directions.
    # The pole fibres are set to [0, 0, 1].
    x, y, z = fdir[:,0], fdir[:,1], fdir[:,2]
    nf = np.sqrt(x**2 + y**2 + z**2)[:,None]
    iz = np.where(nf < 0.1)[0]
    io = np.setdiff1d(np.arange(nf.shape[0], dtype=np.int32), iz)

    fdir[io] = fdir[io] / nf[io]
    fdir[iz] = [0.0, 0.0, 1.0]
    fdir.shape = fdir.shape + (1,)

    print 'activation:', act

    shape = (coors.shape[0], 1, 1)
    out = {
        'fmax' : vf * np.tile(fmax, shape),
        'eps_opt' : np.tile(eps_opt, shape),
        's' : np.tile(s, shape),
        'fdir' : fdir,
        'act' : np.tile(act, shape),
    }

    return out

functions = {
    'select_r0' : (lambda coors, domain=None:
                   select_radius(coors, domain=domain, radius=r0 + eps,
                                 sense='in'),),
    'select_r2' : (lambda coors, domain=None:
                   select_radius(coors, domain=domain, radius=r2 - eps,
                                 sense='out'),),
    'get_traction' : (lambda ts, coors, mode=None, region=None, ig=None:
                      get_traction(ts, coors, mode=mode),),
    'get_pressure' : (get_pressure,),
    'get_pars_fibres1' : (lambda ts, coors, mode=None, region=None, ig=None:
                          get_pars_fibres(ts, coors, mode=mode, which=0,
                                          vf=vf_fibres1),),
    'get_pars_fibres2' : (lambda ts, coors, mode=None, region=None, ig=None:
                          get_pars_fibres(ts, coors, mode=mode, which=1,
                                          vf=vf_fibres2),),
}

Table Of Contents

Previous topic

User’s Guide

Next topic

fish_heart module

This Page