Source code for sfepy.solvers.nls

"""
Nonlinear solvers.
"""
from __future__ import absolute_import

import numpy as nm
import numpy.linalg as nla

from sfepy.base.base import output, get_default, debug, Struct
from sfepy.base.log import Log, get_logging_conf
from sfepy.base.timing import Timer, Timers
from sfepy.solvers.solvers import NonlinearSolver
import six
from six.moves import range

[docs]def standard_nls_call(call): """ Decorator handling argument preparation and timing for nonlinear solvers. """ def _standard_nls_call(self, vec_x0, conf=None, fun=None, fun_grad=None, lin_solver=None, iter_hook=None, status=None, **kwargs): timer = Timer(start=True) status = get_default(status, self.status) fun = get_default(fun, self.fun, 'function has to be specified!') result = call(self, vec_x0, conf=conf, fun=fun, fun_grad=fun_grad, lin_solver=lin_solver, iter_hook=iter_hook, status=status, **kwargs) elapsed = timer.stop() if status is not None: status['time'] = elapsed return result return _standard_nls_call
[docs]def check_tangent_matrix(conf, vec_x0, fun, fun_grad): """ Verify the correctness of the tangent matrix as computed by `fun_grad()` by comparing it with its finite difference approximation evaluated by repeatedly calling `fun()` with `vec_x0` items perturbed by a small delta. """ vec_x = vec_x0.copy() delta = conf.delta vec_r = fun(vec_x) # Update state. mtx_a0 = fun_grad(vec_x) mtx_a = mtx_a0.tocsc() mtx_d = mtx_a.copy() mtx_d.data[:] = 0.0 vec_dx = nm.zeros_like(vec_r) for ic in range(vec_dx.shape[0]): vec_dx[ic] = delta xx = vec_x.copy() - vec_dx vec_r1 = fun(xx) vec_dx[ic] = -delta xx = vec_x.copy() - vec_dx vec_r2 = fun(xx) vec_dx[ic] = 0.0; vec = 0.5 * (vec_r2 - vec_r1) / delta ir = mtx_a.indices[mtx_a.indptr[ic]:mtx_a.indptr[ic+1]] mtx_d.data[mtx_a.indptr[ic]:mtx_a.indptr[ic+1]] = vec[ir] vec_r = fun(vec_x) # Restore. timer = Timer(start=True) output(mtx_a, '.. analytical') output(mtx_d, '.. difference') import sfepy.base.plotutils as plu plu.plot_matrix_diff(mtx_d, mtx_a, delta, ['difference', 'analytical'], conf.check) return timer.stop()
[docs]def conv_test(conf, it, err, err0): """ Nonlinear solver convergence test. Parameters ---------- conf : Struct instance The nonlinear solver configuration. it : int The current iteration. err : float The current iteration error. err0 : float The initial error. Returns ------- status : int The convergence status: -1 = no convergence (yet), 0 = solver converged - tolerances were met, 1 = max. number of iterations reached. """ status = -1 if (abs(err0) < conf.macheps): err_r = 0.0 else: err_r = err / err0 output('nls: iter: %d, residual: %e (rel: %e)' % (it, err, err_r)) conv_a = err <= conf.eps_a if it > 0: conv_r = err_r <= conf.eps_r if conv_a and conv_r: status = 0 elif (conf.get('eps_mode', '') == 'or') and (conv_a or conv_r): status = 0 else: if conv_a: status = 0 if (status == -1) and (it >= conf.i_max): status = 1 return status
[docs]class Newton(NonlinearSolver): r""" Solves a nonlinear system :math:`f(x) = 0` using the Newton method. The solver uses a backtracking line-search on divergence. """ name = 'nls.newton' _parameters = [ ('i_max', 'int', 1, False, 'The maximum number of iterations.'), ('eps_a', 'float', 1e-10, False, 'The absolute tolerance for the residual, i.e. :math:`||f(x^i)||`.'), ('eps_r', 'float', 1.0, False, """The relative tolerance for the residual, i.e. :math:`||f(x^i)|| / ||f(x^0)||`."""), ('eps_mode', "'and' or 'or'", 'and', False, """The logical operator to use for combining the absolute and relative tolerances."""), ('macheps', 'float', nm.finfo(nm.float64).eps, False, 'The float considered to be machine "zero".'), ('lin_red', 'float or None', 1.0, False, """The linear system solution error should be smaller than (`eps_a` * `lin_red`), otherwise a warning is printed. If None, the check is skipped."""), ('lin_precision', 'float or None', None, False, """If not None, the linear system solution tolerances are set in each nonlinear iteration relative to the current residual norm by the `lin_precision` factor. Ignored for direct linear solvers."""), ('step_red', '0.0 < float <= 1.0', 1.0, False, """Step reduction factor. Equivalent to the mixing parameter :math:`a`: :math:`(1 - a) x + a (x + dx) = x + a dx`"""), ('ls_on', 'float', 0.99999, False, """Start the backtracking line-search by reducing the step, if :math:`||f(x^i)|| / ||f(x^{i-1})||` is larger than `ls_on`."""), ('ls_red', '0.0 < float < 1.0', 0.1, False, 'The step reduction factor in case of correct residual assembling.'), ('ls_red_warp', '0.0 < float < 1.0', 0.001, False, """The step reduction factor in case of failed residual assembling (e.g. the "warp violation" error caused by a negative volume element resulting from too large deformations)."""), ('ls_min', '0.0 < float < 1.0', 1e-5, False, 'The minimum step reduction factor.'), ('give_up_warp', 'bool', False, False, 'If True, abort on the "warp violation" error.'), ('check', '0, 1 or 2', 0, False, """If >= 1, check the tangent matrix using finite differences. If 2, plot the resulting sparsity patterns."""), ('delta', 'float', 1e-6, False, r"""If `check >= 1`, the finite difference matrix is taken as :math:`A_{ij} = \frac{f_i(x_j + \delta) - f_i(x_j - \delta)}{2 \delta}`."""), ('log', 'dict or None', None, False, """If not None, log the convergence according to the configuration in the following form: ``{'text' : 'log.txt', 'plot' : 'log.pdf'}``. Each of the dict items can be None."""), ('is_linear', 'bool', False, False, 'If True, the problem is considered to be linear.'), ]
[docs] def __init__(self, conf, **kwargs): NonlinearSolver.__init__(self, conf, **kwargs) conf = self.conf log = get_logging_conf(conf) conf.log = log = Struct(name='log_conf', **log) conf.is_any_log = (log.text is not None) or (log.plot is not None) if conf.is_any_log: self.log = Log([[r'$||r||$'], ['iteration']], xlabels=['', 'all iterations'], ylabels=[r'$||r||$', 'iteration'], yscales=['log', 'linear'], is_plot=conf.log.plot is not None, log_filename=conf.log.text, formats=[['%.8e'], ['%d']]) else: self.log = None
[docs] @standard_nls_call def __call__(self, vec_x0, conf=None, fun=None, fun_grad=None, lin_solver=None, iter_hook=None, status=None): """ Nonlinear system solver call. Solves a nonlinear system :math:`f(x) = 0` using the Newton method with backtracking line-search, starting with an initial guess :math:`x^0`. Parameters ---------- vec_x0 : array The initial guess vector :math:`x_0`. conf : Struct instance, optional The solver configuration parameters, fun : function, optional The function :math:`f(x)` whose zero is sought - the residual. fun_grad : function, optional The gradient of :math:`f(x)` - the tangent matrix. lin_solver : LinearSolver instance, optional The linear solver for each nonlinear iteration. iter_hook : function, optional User-supplied function to call before each iteration. status : dict-like, optional The user-supplied object to hold convergence statistics. Notes ----- * The optional parameters except `iter_hook` and `status` need to be given either here or upon `Newton` construction. * Setting `conf.is_linear == True` means a pre-assembled and possibly pre-solved matrix. This is mostly useful for linear time-dependent problems. """ conf = get_default(conf, self.conf) fun = get_default(fun, self.fun) fun_grad = get_default(fun_grad, self.fun_grad) lin_solver = get_default(lin_solver, self.lin_solver) iter_hook = get_default(iter_hook, self.iter_hook) status = get_default(status, self.status) ls_eps_a, ls_eps_r = lin_solver.get_tolerance() eps_a = get_default(ls_eps_a, 1.0) eps_r = get_default(ls_eps_r, 1.0) if conf.lin_red is not None: lin_red = conf.eps_a * conf.lin_red else: lin_red = None timers = Timers(['residual', 'matrix', 'solve']) if conf.check: timers.create('check') vec_x = vec_x0.copy() vec_x_last = vec_x0.copy() vec_dx = None if self.log is not None: self.log.plot_vlines(color='r', linewidth=1.0) err = err0 = -1.0 err_last = -1.0 it = 0 ls_status = {} ls_n_iter = 0 while 1: if iter_hook is not None: iter_hook(self.context, self, vec_x, it, err, err0) ls = 1.0 vec_dx0 = vec_dx while 1: timers.residual.start() try: vec_r = fun(vec_x) except ValueError: if (it == 0) or (ls < conf.ls_min): output('giving up!') raise else: ok = False else: ok = True timers.residual.stop() if ok: try: err = nla.norm(vec_r) except: output('infs or nans in the residual:', vec_r) output(nm.isfinite(vec_r).all()) debug() if self.log is not None: self.log(err, it) if it == 0: err0 = err; break if err < (err_last * conf.ls_on): break red = conf.ls_red; output('linesearch: iter %d, (%.5e < %.5e) (new ls: %e)' % (it, err, err_last * conf.ls_on, red * ls)) else: # Failure. if conf.give_up_warp: output('giving up!') break red = conf.ls_red_warp; output('residual computation failed for iter %d' ' (new ls: %e)!' % (it, red * ls)) if ls < conf.ls_min: output('linesearch failed, continuing anyway') break ls *= red; vec_dx = ls * vec_dx0; vec_x = vec_x_last.copy() - vec_dx # End residual loop. if self.log is not None: self.log.plot_vlines([1], color='g', linewidth=0.5) err_last = err; vec_x_last = vec_x.copy() condition = conv_test(conf, it, err, err0) if condition >= 0: break if (not ok) and conf.give_up_warp: condition = 2 break timers.matrix.start() if not conf.is_linear: mtx_a = fun_grad(vec_x) else: mtx_a = fun_grad('linear') timers.matrix.stop() if conf.check: timers.check.start() wt = check_tangent_matrix(conf, vec_x, fun, fun_grad) timers.check.stop() timers.check.add(-wt) if conf.lin_precision is not None: if ls_eps_a is not None: eps_a = max(err * conf.lin_precision, ls_eps_a) elif ls_eps_r is not None: eps_r = max(conf.lin_precision, ls_eps_r) if lin_red is not None: lin_red = max(eps_a, err * eps_r) if conf.verbose: output('solving linear system...') timers.solve.start() vec_dx = lin_solver(vec_r, x0=vec_x, eps_a=eps_a, eps_r=eps_r, mtx=mtx_a, status=ls_status) ls_n_iter += ls_status['n_iter'] timers.solve.stop() if conf.verbose: output('...done') for key, val in timers.get_dts().items(): output('%10s: %7.2f [s]' % (key, val)) if lin_red is not None: vec_e = mtx_a @ vec_dx - vec_r lerr = nla.norm(vec_e) if lerr > lin_red: output('warning: linear system solution precision is lower' ' then the value set in solver options!' ' (err = %e < %e)' % (lerr, lin_red)) vec_x -= conf.step_red * vec_dx it += 1 time_stats = timers.get_totals() ls_n_iter = ls_n_iter if ls_n_iter >= 0 else -1 if status is not None: status['time_stats'] = time_stats status['err0'] = err0 status['err'] = err status['n_iter'] = it status['ls_n_iter'] = ls_n_iter status['condition'] = condition if conf.report_status: output(f'cond: {condition}, iter: {it}, ls_iter: {ls_n_iter},' f' err0: {err0:.8e}, err: {err:.8e}') for key, val in time_stats.items(): output('%8s: %.8f [s]' % (key, val)) output(' sum: %.8f [s]' % sum(time_stats.values())) if conf.log.plot is not None: if self.log is not None: self.log(save_figure=conf.log.plot) return vec_x
[docs]class ScipyBroyden(NonlinearSolver): """ Interface to Broyden and Anderson solvers from ``scipy.optimize``. """ name = 'nls.scipy_broyden_like' _parameters = [ ('method', 'str', 'anderson', False, 'The name of the solver in ``scipy.optimize``.'), ('i_max', 'int', 10, False, 'The maximum number of iterations.'), ('alpha', 'float', 0.9, False, 'See ``scipy.optimize``.'), ('M', 'float', 5, False, 'See ``scipy.optimize``.'), ('f_tol', 'float', 1e-6, False, 'See ``scipy.optimize``.'), ('w0', 'float', 0.1, False, 'See ``scipy.optimize``.'), ]
[docs] def __init__(self, conf, **kwargs): NonlinearSolver.__init__(self, conf, **kwargs) self.set_method(self.conf)
[docs] def set_method(self, conf): import scipy.optimize as so try: solver = getattr(so, conf.method) except AttributeError: output('scipy solver %s does not exist!' % conf.method) output('using broyden3 instead') solver = so.broyden3 self.solver = solver
[docs] @standard_nls_call def __call__(self, vec_x0, conf=None, fun=None, fun_grad=None, lin_solver=None, iter_hook=None, status=None): if conf is not None: self.set_method(conf) else: conf = self.conf fun = get_default(fun, self.fun) status = get_default(status, self.status) timer = Timer(start=True) kwargs = {'iter' : conf.i_max, 'alpha' : conf.alpha, 'verbose' : conf.verbose} if conf.method == 'broyden_generalized': kwargs.update({'M' : conf.M}) elif conf.method in ['anderson', 'anderson2']: kwargs.update({'M' : conf.M, 'w0' : conf.w0}) if conf.method in ['anderson', 'anderson2', 'broyden', 'broyden2' , 'newton_krylov']: kwargs.update({'f_tol' : conf.f_tol }) vec_x = self.solver(fun, vec_x0, **kwargs) vec_x = nm.asarray(vec_x) if status is not None: status['time_stats'] = {'solver' : timer.stop()} if conf.report_status: output('solver: %.8f [s]' % status['time_stats']['solver']) return vec_x
[docs]class PETScNonlinearSolver(NonlinearSolver): """ Interface to PETSc SNES (Scalable Nonlinear Equations Solvers). The solver supports parallel use with a given MPI communicator (see `comm` argument of :func:`PETScNonlinearSolver.__init__()`). Returns a (global) PETSc solution vector instead of a (local) numpy array, when given a PETSc initial guess vector. For parallel use, the `fun` and `fun_grad` callbacks should be provided by :class:`PETScParallelEvaluator <sfepy.parallel.evaluate.PETScParallelEvaluator>`. """ name = 'nls.petsc' _parameters = [ ('method', 'str', 'newtonls', False, 'The SNES type.'), ('i_max', 'int', 10, False, 'The maximum number of iterations.'), ('if_max', 'int', 100, False, 'The maximum number of function evaluations.'), ('eps_a', 'float', 1e-10, False, 'The absolute tolerance for the residual, i.e. :math:`||f(x^i)||`.'), ('eps_r', 'float', 1.0, False, """The relative tolerance for the residual, i.e. :math:`||f(x^i)|| / ||f(x^0)||`."""), ('eps_s', 'float', 0.0, False, r"""The convergence tolerance in terms of the norm of the change in the solution between steps, i.e. $||delta x|| < \epsilon_s ||x||$"""), ]
[docs] def __init__(self, conf, pmtx=None, prhs=None, comm=None, **kwargs): if comm is None: try: import petsc4py petsc4py.init([]) except ImportError: msg = 'cannot import petsc4py!' raise ImportError(msg) from petsc4py import PETSc as petsc converged_reasons = {} for key, val in six.iteritems(petsc.SNES.ConvergedReason.__dict__): if isinstance(val, int): converged_reasons[val] = key ksp_converged_reasons = {} for key, val in six.iteritems(petsc.KSP.ConvergedReason.__dict__): if isinstance(val, int): ksp_converged_reasons[val] = key NonlinearSolver.__init__(self, conf, petsc=petsc, pmtx=pmtx, prhs=prhs, comm=comm, converged_reasons=converged_reasons, ksp_converged_reasons=ksp_converged_reasons, **kwargs)
[docs] @standard_nls_call def __call__(self, vec_x0, conf=None, fun=None, fun_grad=None, lin_solver=None, iter_hook=None, status=None, pmtx=None, prhs=None, comm=None): conf = self.conf fun = get_default(fun, self.fun) fun_grad = get_default(fun_grad, self.fun_grad) lin_solver = get_default(lin_solver, self.lin_solver) iter_hook = get_default(iter_hook, self.iter_hook) status = get_default(status, self.status) pmtx = get_default(pmtx, self.pmtx) prhs = get_default(prhs, self.prhs) comm = get_default(comm, self.comm) timer = Timer(start=True) if isinstance(vec_x0, self.petsc.Vec): psol = vec_x0 else: psol = pmtx.getVecLeft() psol[...] = vec_x0 snes = self.petsc.SNES() snes.create(comm) snes.setType(conf.method) ksp = lin_solver.create_ksp() snes.setKSP(ksp) ls_conf = lin_solver.conf ksp.setTolerances(atol=ls_conf.eps_a, rtol=ls_conf.eps_r, divtol=ls_conf.eps_d, max_it=ls_conf.i_max) snes.setFunction(fun, prhs) snes.setJacobian(fun_grad, pmtx) snes.setTolerances(atol=conf.eps_a, rtol=conf.eps_r, stol=conf.eps_s, max_it=conf.i_max) snes.setMaxFunctionEvaluations(conf.if_max) snes.setFromOptions() fun(snes, psol, prhs) err0 = prhs.norm() snes.solve(prhs.duplicate(), psol) if status is not None: status['time_stats'] = {'solver' : timer.stop()} if snes.reason in self.converged_reasons: reason = 'snes: %s' % self.converged_reasons[snes.reason] else: reason = 'ksp: %s' % self.ksp_converged_reasons[snes.reason] output('%s(%s): %d iterations in the last step' % (ksp.getType(), ksp.getPC().getType(), ksp.getIterationNumber()), verbose=conf.verbose) output('%s convergence: %s (%s, %d iterations, %d function evaluations)' % (snes.getType(), snes.reason, reason, snes.getIterationNumber(), snes.getFunctionEvaluations()), verbose=conf.verbose) converged = snes.reason >= 0 condition = 0 if converged else -1 n_iter = snes.getLinearSolveIterations() ls_n_iter = snes.getLinearSolveIterations() if not converged: # PETSc does not update the solution if KSP have not converged. dpsol = snes.getSolutionUpdate() psol -= dpsol fun(snes, psol, prhs) err = prhs.norm() else: try: err = snes.getFunctionNorm() except AttributeError: fun(snes, psol, prhs) err = prhs.norm() if status is not None: status['err0'] = err0 status['err'] = err status['n_iter'] = n_iter status['ls_n_iter'] = ls_n_iter status['condition'] = condition if conf.report_status: output(f'cond: {condition}, iter: {n_iter}, ls_iter: {ls_n_iter},' f' err0: {err0:.8e}, err: {err:.8e}') output('solver: %.8f [s]' % status['time_stats']['solver']) if isinstance(vec_x0, self.petsc.Vec): sol = psol else: sol = psol[...].copy() return sol