Source code for sfepy.postprocess.sources

from __future__ import absolute_import
import os

import numpy as nm
import six

try:
    from enthought.tvtk.api import tvtk
    from enthought.mayavi.sources.vtk_data_source import VTKDataSource
    from enthought.mayavi.core.dataset_manager import DatasetManager
    from enthought.pyface.timer.api import Timer

except:
    from tvtk.api import tvtk
    from mayavi.sources.vtk_data_source import VTKDataSource
    from mayavi.core.dataset_manager import DatasetManager
    from pyface.timer.api import Timer

from sfepy.base.base import Struct, basestr
from sfepy.postprocess.utils import mlab
from sfepy.discrete.fem import Mesh
from sfepy.discrete.fem.meshio import MeshIO, supported_formats

vtk_cell_types = {'1_1' : 1, '1_2' : 3, '2_2' : 3, '3_2' : 3,
                  '2_3' : 5, '2_4' : 9, '3_4' : 10, '3_8' : 12}

[docs]def create_file_source(filename, watch=False, offscreen=True): """Factory function to create a file source corresponding to the given file format.""" if isinstance(filename, FileSource): return filename from distutils.version import LooseVersion try: from enthought.mayavi import version except: from mayavi import version # Work around a Mayavi 4.4.x issue. can_vtk_source = LooseVersion(version.version) < LooseVersion('4.4') kwargs = {'watch' : watch, 'offscreen' : offscreen} if isinstance(filename, basestr): fmt = os.path.splitext(filename)[1] is_sequence = False else: # A sequence. fmt = os.path.splitext(filename[0])[1] is_sequence = True fmt = fmt.lower() if can_vtk_source and (fmt == '.vtk'): # VTK is supported directly by Mayavi, no need to use MeshIO. if is_sequence: return VTKSequenceFileSource(filename, **kwargs) else: return VTKFileSource(filename, **kwargs) else: if is_sequence: if fmt == '.h5': raise ValueError('format .h5 does not support file sequences!') else: return GenericSequenceFileSource(filename, **kwargs) else: return GenericFileSource(filename, **kwargs)
[docs]class FileSource(Struct): """General file source.""" def __init__(self, filename, watch=False, offscreen=True): """Create a file source using the given file name.""" mlab.options.offscreen = offscreen self.watch = watch self.filename = filename self.reset() def __call__(self, step=0): """Get the file source.""" if self.source is None: self.source = self.create_source() if self.watch: self.timer = Timer(1000, self.poll_file) return self.source
[docs] def reset(self): """Reset.""" self.mat_id_name = None self.source = None self.notify_obj = None self.steps = [] self.times = [] self.step = 0 self.time = 0.0 if self.watch: self.last_stat = os.stat(self.filename)
[docs] def setup_mat_id(self, mat_id_name='mat_id', single_color=False): self.mat_id_name = mat_id_name self.single_color = single_color
[docs] def get_step_time(self, step=None, time=None): """ Set current step and time to the values closest greater or equal to either step or time. Return the found values. """ if (step is not None) and len(self.steps): step = step if step >= 0 else self.steps[-1] + step + 1 ii = nm.searchsorted(self.steps, step) ii = nm.clip(ii, 0, len(self.steps) - 1) self.step = self.steps[ii] if len(self.times): self.time = self.times[ii] elif (time is not None) and len(self.times): ii = nm.searchsorted(self.times, time) ii = nm.clip(ii, 0, len(self.steps) - 1) self.step = self.steps[ii] self.time = self.times[ii] return self.step, self.time
[docs] def get_ts_info(self): return self.steps, self.times
[docs] def get_mat_id(self, mat_id_name='mat_id'): """ Get material ID numbers of the underlying mesh elements. """ if self.source is not None: dm = DatasetManager(dataset=self.source.outputs[0]) mat_id = dm.cell_scalars[mat_id_name] return mat_id
[docs] def file_changed(self): pass
[docs] def setup_notification(self, obj, attr): """The attribute 'attr' of the object 'obj' will be set to True when the source file is watched and changes.""" self.notify_obj = obj self.notify_attr = attr
[docs] def poll_file(self): """Check the source file's time stamp and notify the self.notify_obj in case it changed. Subclasses should implement the file_changed() method.""" if not self.notify_obj: return s = os.stat(self.filename) if s[-2] == self.last_stat[-2]: setattr(self.notify_obj, self.notify_attr, False) else: self.file_changed() setattr(self.notify_obj, self.notify_attr, True) self.last_stat = s
[docs]class VTKFileSource(FileSource): """A thin wrapper around mlab.pipeline.open()."""
[docs] def create_source(self): """Create a VTK file source """ return mlab.pipeline.open(self.filename)
[docs] def get_bounding_box(self): bbox = nm.array(self.source.reader.unstructured_grid_output.bounds) return bbox.reshape((3,2)).T
[docs] def set_filename(self, filename, vis_source): self.filename = filename vis_source.base_file_name = filename # Force re-read. vis_source.reader.modified() vis_source.update() # Propagate changes in the pipeline. vis_source.data_changed = True
[docs]class VTKSequenceFileSource(VTKFileSource): """A thin wrapper around mlab.pipeline.open() for VTK file sequences.""" def __init__(self, *args, **kwargs): FileSource.__init__(self, *args, **kwargs) self.steps = nm.arange(len(self.filename), dtype=nm.int32)
[docs] def create_source(self): """Create a VTK file source """ return mlab.pipeline.open(self.filename[0])
[docs] def set_filename(self, filename, vis_source): self.filename = filename vis_source.base_file_name = filename[self.step]
[docs]class GenericFileSource(FileSource): """File source usable with any format supported by MeshIO classes.""" def __init__(self, *args, **kwargs): FileSource.__init__(self, *args, **kwargs) self.io = None self.read_common(self.filename)
[docs] def read_common(self, filename): self.io = MeshIO.any_from_filename(filename) self.steps, self.times, _ = self.io.read_times() self.mesh = Mesh.from_file(filename) self.n_nod, self.dim = self.mesh.coors.shape
[docs] def create_source(self): """ Create a VTK source from data in a SfePy-supported file. Notes ----- All data need to be set here, otherwise time stepping will not work properly - data added by user later will be thrown away on time step change. """ if self.io is None: self.read_common(self.filename) dataset = self.create_dataset() try: out = self.io.read_data(self.step) except ValueError: out = None if out is not None: self.add_data_to_dataset(dataset, out) if self.mat_id_name is not None: mat_id = self.mesh.cmesh.cell_groups if self.single_color: rm = mat_id.min(), mat_id.max() mat_id[mat_id > rm[0]] = rm[1] dm = DatasetManager(dataset=dataset) dm.add_array(mat_id, self.mat_id_name, 'cell') src = VTKDataSource(data=dataset) # src.print_traits() # debug() return src
[docs] def get_bounding_box(self): bbox = self.mesh.get_bounding_box() if self.dim < 3: bbox = nm.c_[bbox, nm.zeros((2, 3 - self.dim))] return bbox
[docs] def set_filename(self, filename, vis_source): self.filename = filename self.source = self.create_source() vis_source.data = self.source.data
[docs] def get_mat_id(self, mat_id_name='mat_id'): """ Get material ID numbers of the underlying mesh elements. """ if self.source is not None: mat_id = self.mesh.cmesh.cell_groups return mat_id
[docs] def file_changed(self): self.steps, self.times, _ = self.io.read_times()
[docs] def create_dataset(self): """Create a tvtk.UnstructuredGrid dataset from the Mesh instance of the file source.""" mesh = self.mesh n_nod, dim = self.n_nod, self.dim if dim < 3: nod_zz = nm.zeros((n_nod, 3 - dim), dtype=mesh.coors.dtype) points = nm.c_[mesh.coors, nod_zz] else: points = mesh.coors dataset = tvtk.UnstructuredGrid(points=points) cell_types = [] cells = [] offset = [0] n_cells = [1] for ig, desc in enumerate(mesh.descs): conn = mesh.get_conn(desc) n_cell, n_cv = conn.shape n_cells.append(n_cell) aux = nm.empty(n_cell, dtype=nm.int32) aux.fill(vtk_cell_types[desc]) cell_types.append(aux) aux = nm.empty((n_cell, n_cv + 1), dtype=nm.int32) aux[:, 0] = n_cv aux[:, 1:] = conn cells.append(aux.ravel()) offset.append(aux.shape[1]) cells = nm.concatenate(cells) cell_types = nm.concatenate(cell_types) offset = nm.repeat(offset, n_cells) offset = nm.cumsum(offset)[:-1] cell_array = tvtk.CellArray() cell_array.set_cells(mesh.n_el, cells) dataset.set_cells(cell_types, offset, cell_array) return dataset
[docs] def add_data_to_dataset(self, dataset, data): """Add point and cell data to the dataset.""" dm = DatasetManager(dataset=dataset) for key, val in six.iteritems(data): if val.mode == 'vertex': aux = self._reshape(val.data, self.dim) dm.add_array(aux, key, 'point') elif val.mode == 'cell': aux = self._reshape(val.data, self.dim) dm.add_array(aux, key, 'cell')
@staticmethod def _reshape(data, dim): sym = (dim + 1) * dim // 2 if len(data.shape) == 1: num, nr, nc = data.shape[0], 1, 1 elif len(data.shape) == 2: num, nr, nc = data.shape[0], data.shape[1], 1 else: num, _, nr, nc = data.shape if (nr == 1) and (nc == 1): aux = data.reshape((num,)) elif (nr == dim) and (nc == 1): if dim == 3: aux = data.reshape((num, dim)) else: zz = nm.zeros((num, 1), dtype=data.dtype); aux = nm.c_[data.squeeze(), zz] elif ((((nr == sym) or (nr == (dim * dim))) and (nc == 1)) or ((nr == dim) and (nc == dim))): data = data.squeeze() if dim == 3: if nr == sym: aux = data[:,[0,3,4,3,1,5,4,5,2]] elif nr == (dim * dim): aux = data[:,[0,3,4,6,1,5,7,8,2]] else: aux = data.reshape((num, dim*dim)) else: zz = nm.zeros((num, 1), dtype=data.dtype); if nr == sym: aux = nm.c_[data[:,[0,2]], zz, data[:,[2,1]], zz, zz, zz, zz] elif nr == (dim * dim): aux = nm.c_[data[:,[0,2]], zz, data[:,[3,1]], zz, zz, zz, zz] else: aux = nm.c_[data[:,0,[0,1]], zz, data[:,1,[0,1]], zz, zz, zz, zz] else: raise ValueError('unknown data shape! (%s)' % list(data.shape)) return aux
[docs]class GenericSequenceFileSource(GenericFileSource): """File source usable with any format supported by MeshIO classes, with exception of HDF5 (.h5), for file sequences."""
[docs] def read_common(self, filename): GenericFileSource.read_common(self, filename[self.step]) self.steps = nm.arange(len(self.filename), dtype=nm.int32) self.times = nm.zeros(len(self.filename), dtype=nm.float64)
[docs] def set_filename(self, filename, vis_source): self.filename = filename self.io = None self.source = self.create_source() vis_source.data = self.source.data
[docs] def file_changed(self): pass