Source code for droplets.droplet_tracks

"""Classes representing the time evolution of droplets.

.. autosummary::
   :nosignatures:

   DropletTrack
   DropletTrackList

.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""

from __future__ import annotations

import functools
import itertools
import json
import logging
from typing import TYPE_CHECKING, Literal

import numpy as np
from numpy.lib import recfunctions as rfn
from scipy import ndimage
from scipy.spatial import distance

from pde.tools.output import display_progress
from pde.tools.plotting import PlotReference, plot_on_axes

from .droplets import SphericalDroplet, droplet_from_data
from .emulsions import Emulsion, EmulsionTimeCourse

if TYPE_CHECKING:
    from collections.abc import Callable

    from numpy.typing import NDArray

    from pde.grids.base import GridBase
    from pde.storage.base import StorageBase
    from pde.trackers.base import InfoDict

    from .tools.typing import RealArray

_logger = logging.getLogger(__name__)
""":class:`logging.Logger`: Logger instance."""


def contiguous_true_regions(condition: NDArray[np.bool]) -> NDArray[np.intc]:
    """Finds contiguous True regions in the boolean array "condition".

    Inspired by http://stackoverflow.com/a/4495197/932593

    Args:
        condition (:class:`~numpy.ndarray`):
            A one-dimensional boolean array

    Returns:
        :class:`~numpy.ndarray`: A two-dimensional array where the first column
        is the start index of the region and the second column is the end index
    """
    if len(condition) == 0:
        return np.empty((0, 2), dtype=np.intc)
    # convert condition array to integer
    condition = np.asarray(condition, np.intc)

    # Find the indices of changes in "condition"
    d = np.diff(condition)
    idx = np.flatnonzero(d)

    # We need to start things after the change in "condition". Therefore,
    # we'll shift the index by 1 to the right.
    idx += 1

    if condition[0]:
        # If the start of condition is True prepend a 0
        idx = np.r_[0, idx]

    if condition[-1]:
        # If the end of condition is True, append the length of the array
        idx = np.r_[idx, condition.size]

    # Reshape the result into two columns
    return idx.reshape(-1, 2).astype(np.intc)


[docs] class DropletTrack: """Information about a single droplet over multiple time steps.""" def __init__(self, droplets=None, times=None): """ Args: emulsions (list): List of emulsions that describe this time course times (list): Times associated with the emulsions """ if isinstance(droplets, DropletTrack): # get data from given object times = droplets.times droplets = droplets.droplets self.droplets = [] self.times = [] # add all emulsions if droplets: for d in droplets: self.append(d) # add all times if times is not None: self.times = list(times) if len(self.times) != len(self.droplets): msg = "The lists of droplets and times need to have the same length" raise ValueError(msg) def __repr__(self): """Human-readable representation of a droplet track.""" class_name = self.__class__.__name__ if len(self.times) == 0: return f"{class_name}([])" if len(self.times) == 1: return f"{class_name}(time={self.start:g})" return f"{class_name}(timespan={self.start:g}..{self.end:g})" def __len__(self): """Number of time points.""" return len(self.times) def __getitem__(self, key: int | slice): """Return the droplets identified by the given index/slice.""" result = self.droplets.__getitem__(key) if isinstance(key, slice): return self.__class__(droplets=result, times=self.times[key]) return result def __eq__(self, other): """Determine whether two DropletTracks instance are equal.""" return self.times == other.times and self.droplets == other.droplets @property def start(self) -> float: """float: first time point""" return self.times[0] # type: ignore @property def end(self) -> float: """float: last time point""" return self.times[-1] # type: ignore @property def duration(self) -> float: """float: total duration of the track""" if len(self.times) > 0: return self.end - self.start return 0 @property def first(self) -> SphericalDroplet: """SphericalDroplet: first droplet instance""" return self.droplets[0] # type: ignore @property def last(self) -> SphericalDroplet: """SphericalDroplet: last droplet instance""" return self.droplets[-1] # type: ignore @property def dim(self) -> int | None: """Return the space dimension of the droplets.""" try: return self.last.dim except IndexError: return None @property def data(self) -> RealArray | None: """:class:`~numpy.ndarray`: an array containing the data of the full track.""" if len(self) == 0: return None d0 = self.first dtype = [("time", "f8"), *d0.data.dtype.descr] result = np.empty(len(self), dtype=dtype) for i in range(len(self)): result[i] = (self.times[i], *self.droplets[i].data.tolist()) return result def __iter__(self): """Iterate over all droplets.""" return iter(self.droplets)
[docs] def items(self): """Iterate over all times and droplets, returning them in pairs.""" return zip(self.times, self.droplets, strict=False)
[docs] def append(self, droplet: SphericalDroplet, time: float | None = None) -> None: """Append a new droplet with a time code. Args: droplet (:class:`droplets.droplets.SphericalDroplet`): The droplet to append time (float, optional): The associated time point """ if self.dim is not None and droplet.dim != self.dim: msg = ( "Space dimension of added droplet must match the DropletTrack " f" ({droplet.dim} != {self.dim})" ) raise ValueError(msg) # add the emulsion self.droplets.append(droplet.copy()) if time is None: time = 0 if len(self.times) == 0 else self.times[-1] + 1 self.times.append(time)
[docs] def get_position(self, time: float) -> RealArray: """:class:`~numpy.ndarray`: returns the droplet position at a specific time.""" try: idx = self.times.index(time) except AttributeError: # assume that self.times is a numpy array idx = np.nonzero(self.times == time)[0][0] return self.droplets[idx].position # type: ignore
[docs] def get_trajectory( self, smoothing: float = 0, *, attribute: str = "position" ) -> RealArray: """Return a the time-evolution of a droplet attribute (e.g., the position) Args: smoothing (float): Determines the scale for some gaussian smoothing of the trajectory. The default value of zero disables smoothing. attribute (str): The attribute to consider (default: "position"). Returns: :class:`~numpy.ndarray`: An array giving the position of the droplet at each time instance """ trajectory = np.array([getattr(d, attribute) for d in self.droplets]) if smoothing: ndimage.gaussian_filter1d( trajectory, output=trajectory, sigma=smoothing, axis=0, mode="nearest" ) return trajectory
[docs] def get_radii(self, smoothing: float = 0) -> RealArray: """:class:`~numpy.ndarray`: returns the droplet radius for each time point. Args: smoothing (float): Determines the length scale for some gaussian smoothing of the trajectory. The default value of zero disables smoothing. """ return self.get_trajectory(smoothing, attribute="radius")
[docs] def get_volumes(self, smoothing: float = 0) -> RealArray: """:class:`~numpy.ndarray`: returns the droplet volume for each time point. Args: smoothing (float): Determines the volume scale for some gaussian smoothing of the trajectory. The default value of zero disables smoothing. """ return self.get_trajectory(smoothing, attribute="volume")
[docs] def time_overlaps(self, other: DropletTrack) -> bool: """Determine whether two DropletTrack instances overlaps in time. Args: other (:class:`DropletTrack`): The other droplet track Returns: bool: True when both tracks contain droplets at the same time step """ s0, s1 = self.start, self.end o0, o1 = other.start, other.end return s0 <= o1 and o0 <= s1
@classmethod def _from_hdf_dataset(cls, dataset) -> DropletTrack: """Construct a droplet track by reading data from an hdf5 dataset. Args: dataset: A HDF5 dataset from which the data of the droplet track is read """ # there are values, so the emulsion is not empty droplet_class = dataset.attrs["droplet_class"] obj = cls() if droplet_class == "None": return obj # separate time from the data set times = dataset["time"] droplet_data = rfn.rec_drop_fields(dataset, "time") for time, data in zip(times, droplet_data, strict=False): droplet = droplet_from_data(droplet_class, data) obj.append(droplet, time=time) # type: ignore return obj
[docs] @classmethod def from_file(cls, path: str) -> DropletTrack: """Create droplet track by reading from file. Args: path (str): The path from which the data is read. This function assumes that the data was written as an HDF5 file using :meth:`to_file`. """ import h5py with h5py.File(path, "r") as fp: if len(fp) != 1: msg = ( f"Multiple droplet tracks found in file {path}. Did you mean to " "load a DropletTrackList instead?" ) raise RuntimeError(msg) dataset = fp[next(iter(fp.keys()))] # retrieve the only dataset obj = cls._from_hdf_dataset(dataset) return obj
def _write_hdf_dataset(self, hdf_path, key: str = "droplet_track"): """Write data to a given hdf5 path `hdf_path`""" if self: # emulsion contains at least one droplet dataset = hdf_path.create_dataset(key, data=self.data) # self.data ensures that there is only one droplet class dataset.attrs["droplet_class"] = self[0].__class__.__name__ else: # create empty dataset to indicate empty emulsion dataset = hdf_path.create_dataset(key, shape=()) dataset.attrs["droplet_class"] = "None" return dataset
[docs] def to_file(self, path: str, info: InfoDict | None = None) -> None: """Store data in hdf5 file. The data can be read using the classmethod :meth:`DropletTrack.from_file`. Args: path (str): The path to which the data is written as an HDF5 file. info (dict): Additional data stored alongside the droplet track list """ import h5py with h5py.File(path, "w") as fp: self._write_hdf_dataset(fp) # write additional information if info: for k, v in info.items(): fp.attrs[k] = json.dumps(v)
[docs] @plot_on_axes() def plot( self, attribute: str = "radius", smoothing: float = 0, t_max: float | None = None, ax=None, **kwargs, ) -> PlotReference: """Plot the time evolution of the droplet. Args: attribute (str): The attribute to plot. Typical values include `radius` and `volume`, but others might be defined on the droplet class. smoothing (float): Determines the scale for some gaussian smoothing of the trajectory. The default value of zero disables smoothing. {PLOT_ARGS} **kwargs: All remaining parameters are forwarded to the `ax.plot` method. For example, passing `color=None`, will use different colors for different droplets. Returns: :class:`~pde.tools.plotting.PlotReference`: Information about the plot """ if len(self.times) == 0: return PlotReference(ax, None) if attribute in {"radius", "radii"}: data = self.get_radii() ylabel = "Radius" elif attribute in {"volume", "volumes"}: data = self.get_volumes() ylabel = "Volume" else: data = self.get_trajectory(smoothing=smoothing, attribute=attribute) ylabel = attribute.capitalize() if t_max is not None and len(self.times) >= 2 and self.times[-1] < t_max: dt = self.times[-1] - self.times[-2] times = np.r_[self.times, self.times[-1] + dt] data = np.r_[data, 0] else: times = self.times (line,) = ax.plot(times, data, **kwargs) ax.set_xlabel("Time") ax.set_ylabel(ylabel) return PlotReference(ax, line, {"attribute": attribute})
[docs] @plot_on_axes() def plot_positions( self, grid: GridBase | None = None, arrow: bool = True, ax=None, **kwargs ) -> PlotReference: """Plot the droplet track. Args: grid (GridBase, optional): The grid on which the droplets are defined. If given, periodic boundary conditions can be respected in the plotting. arrow (bool, optional): Flag determining whether an arrow head is shown to indicate the direction of the droplet drift. {PLOT_ARGS} **kwargs: Additional keyword arguments are passed to the matplotlib plot function to affect the appearance. For example, passing `color=None`, will use different colors for different droplets. Returns: :class:`~pde.tools.plotting.PlotReference`: Information about the plot """ if len(self.times) == 0: return PlotReference(ax, None, {"arrow": arrow}) if self.dim != 2: msg = "Plotting is only implemented for 2d grids" raise NotImplementedError(msg) # obtain droplet positions as a function of time xy = self.get_trajectory() if grid is None: # simply plot the trajectory cx, cy = xy[:, 0], xy[:, 1] (line,) = ax.plot(cx, cy, **kwargs) else: # use the grid to detect wrapping around segments = [] for p1, p2 in itertools.pairwise(xy): dist_direct = np.hypot(p1[0] - p2[0], p1[1] - p2[1]) dist_real = grid.distance(p1, p2, coords="cartesian") close = np.isclose(dist_direct, dist_real) segments.append(close) # plot the individual segments line, cx = None, [] # type: ignore for s, e in contiguous_true_regions(np.array(segments)): if line is None: color = kwargs.get("color", "k") else: color = line.get_color() # ensure colors stays the same cx, cy = xy[s : e + 1, 0], xy[s : e + 1, 1] (line,) = ax.plot(cx, cy, color=color) if arrow and len(cx) >= 2: # add arrow head to last segment size = min(sum(ax.get_xlim()), sum(ax.get_ylim())) ax.arrow( cx[-2], cy[-2], cx[-1] - cx[-2], cy[-1] - cy[-2], head_width=0.02 * size, color=line.get_color(), ) return PlotReference(ax, None, {"arrow": arrow})
[docs] class DropletTrackList(list): """A list of instances of :class:`DropletTrack`""" def __getitem__(self, key: int | slice): # type: ignore """Return the droplets identified by the given index/slice.""" result = super().__getitem__(key) if isinstance(key, slice): return self.__class__(result) return result
[docs] @classmethod def from_emulsion_time_course( cls, time_course: EmulsionTimeCourse, *, method: Literal["distance", "overlap"] = "overlap", grid: GridBase | None = None, progress: bool = False, **kwargs, ) -> DropletTrackList: r"""Obtain droplet tracks from an emulsion time course. Args: time_course (:class:`droplets.emulsions.EmulsionTimeCourse`): A collection of temporally arranged emulsions method (str): The method used for tracking droplet identities. Possible methods are "overlap" (adding droplets that overlap with those in previous frames) and "distance" (matching droplets to minimize center-to-center distances). grid (:class:`~pde.grids.base.GridBase`): The grid on which the droplets are defined, which is necessary if periodic boundary conditions should be respected for measuring distances progress (bool): Whether to show the progress of the process. **kwargs: Additional parameters for the tracking algorithm. Currently, one can only specify a maximal distance (using `max_dist`) for the "distance" method. Returns: :class:`DropletTrackList`: the resulting droplet tracks """ # get tracks, i.e. clearly overlapping droplets tracks = cls() # determine the tracking method if method == "overlap": # track droplets by their physical overlap def match_tracks( emulsion: Emulsion, tracks_alive: list[DropletTrack], time: float ) -> None: """Helper function adding emulsions to the tracks.""" found_multiple_overlap = False for droplet in emulsion: # determine which old tracks could be extended overlaps: list[DropletTrack] = [ track for track in tracks_alive if track.last.overlaps(droplet, grid=grid) ] if len(overlaps) == 1: overlaps[0].append(droplet, time=time) else: if len(overlaps) > 1: found_multiple_overlap = True tracks.append(DropletTrack(droplets=[droplet], times=[time])) if found_multiple_overlap: _logger.debug("Found multiple overlapping droplet(s) at t=%g", time) elif method == "distance": # track droplets by their physical distance max_dist = kwargs.pop("max_dist", np.inf) def match_tracks( emulsion: Emulsion, tracks_alive: list[DropletTrack], time: float ) -> None: """Helper function adding emulsions to the tracks.""" added = set() # calculate the distance between droplets if tracks_alive: if grid is None: metric: str | Callable = "euclidean" else: metric = functools.partial(grid.distance, coords="cartesian") points_prev = [track.last.position for track in tracks_alive] points_now = [droplet.position for droplet in emulsion] dists = distance.cdist(points_prev, points_now, metric=metric) # impose a cutoff distance dists[dists > max_dist] = np.inf # add all matching droplets while True: i, j = np.unravel_index(np.argmin(dists), dists.shape) if np.isinf(dists[i, j]): break # no more matches added.add(j) tracks_alive[i].append(emulsion[j], time=time) # type: ignore dists[i, :] = np.inf dists[:, j] = np.inf # add droplets that have not been matched for i, droplet in enumerate(emulsion): # type: ignore if i not in added: tracks.append(DropletTrack(droplets=[droplet], times=[time])) else: msg = "Unknown tracking method `%s`" raise ValueError(msg, method) # check kwargs if kwargs: _logger.warning("Unused keyword arguments: %s", kwargs) # add all emulsions successively using the given algorithm t_last = None for t, emulsion in display_progress( time_course.items(), total=len(time_course), enabled=progress ): # determine tracks from the last frame that have not yet been extended tracks_alive = [track for track in tracks if track.end == t_last] # match all tracks with the current emulsion match_tracks(emulsion, tracks_alive, time=t) t_last = t return tracks
[docs] @classmethod def from_storage( cls, storage: StorageBase, *, method: Literal["distance", "overlap"] = "overlap", refine: bool = False, num_processes: int | Literal["auto"] = 1, progress: bool | None = None, ) -> DropletTrackList: r"""Obtain droplet tracks from stored scalar field data. This method first determines an emulsion time course and than collects tracks by tracking droplets. Args: storage (:class:`~pde.storage.base.StorageBase`): The phase fields for many time instances method (str): The method used for tracking droplet identities. Possible methods are "overlap" (adding droplets that overlap with those in previous frames) and "distance" (matching droplets to minimize center-to-center distances). refine (bool): Flag determining whether the droplet properties should be refined using fitting. This is a potentially slow procedure. num_processes (int or "auto"): Number of processes used for the refinement. If set to "auto", the number of processes is chosen automatically. progress (bool): Whether to show the progress of the process. If `None`, the progress is not shown, except for the first step if `refine` is `True`. Returns: :class:`DropletTrackList`: the resulting droplet tracks """ etc = EmulsionTimeCourse.from_storage( storage, refine=refine, num_processes=num_processes, progress=progress ) if progress is None: progress = False return cls.from_emulsion_time_course(etc, method=method, progress=progress)
[docs] @classmethod def from_file(cls, path: str, *, progress: bool = True) -> DropletTrackList: """Create droplet track list by reading file. Args: path (str): The path from which the data is read. This function assumes that the data was written as an HDF5 file using :meth:`to_file`. progress (bool): Whether to show the progress of the process in a progress bar Returns: :class:`DropletTrackList`: an instance describing the droplet track list """ import h5py obj = cls() with h5py.File(path, "r") as fp: # load the actual droplet track data and iterate in the right order for key in display_progress( sorted(fp.keys()), total=len(fp), enabled=progress ): dataset = fp[key] obj.append(DropletTrack._from_hdf_dataset(dataset)) return obj
[docs] def to_file(self, path: str, info: InfoDict | None = None) -> None: """Store data in hdf5 file. The data can be read using the classmethod :meth:`DropletTrackList.from_file`. Args: path (str): The path to which the data is written as an HDF5 file. info (dict): Additional data stored alongside the droplet track list """ import h5py with h5py.File(path, "w") as fp: # write the actual emulsion data for i, droplet_track in enumerate(self): droplet_track._write_hdf_dataset(fp, f"track_{i:06d}") # write additional information if info: for k, v in info.items(): fp.attrs[k] = json.dumps(v)
[docs] def remove_short_tracks(self, min_duration: float = 0) -> None: """Remove tracks that a shorter than a minimal duration. Args: min_duration (float): The minimal duration a droplet track must have in order to be retained. This is measured in actual time and not in the number of time steps stored in the track. """ for i in reversed(range(len(self))): if self[i].duration <= min_duration: self.pop(i)
[docs] @plot_on_axes() def plot(self, attribute: str = "radius", ax=None, **kwargs) -> PlotReference: """Plot the time evolution of all droplets. Args: attribute (str): The attribute to plot. Typical values include `radius` and `volume`, but others might be defined on the droplet class. {PLOT_ARGS} **kwargs: Additional keyword arguments are passed to the matplotlib plot function to affect the appearance. The special value `color="cycle"` implies that the default color cycle is used for the tracks, using different colors for different tracks. Returns: :class:`~pde.tools.plotting.PlotReference`: Information about the plot """ # choose a suitable color for the tracks if "color" in kwargs: if kwargs["color"] == "cycle": kwargs.pop("color") # if color is None, use the default color cycle else: kwargs["color"] = "k" # use black by default # get maximal time if self: t_max = max(track.times[-1] for track in self if len(track.times) > 0) else: t_max = None # adjust alpha such that multiple tracks are visible well kwargs.setdefault("alpha", min(0.8, 20 / len(self))) elements = [] for track in self: elements.append( track.plot(attribute=attribute, t_max=t_max, ax=ax, **kwargs).element ) kwargs["label"] = "" # set potential plot label only for first track return PlotReference(ax, elements, {"attribute": attribute})
[docs] @plot_on_axes() def plot_positions(self, ax=None, **kwargs) -> PlotReference: """Plot all droplet tracks. Args: {PLOT_ARGS} **kwargs: Additional keyword arguments are passed to the matplotlib plot function to affect the appearance. Returns: :class:`~pde.tools.plotting.PlotReference`: Information about the plot """ elements = [track.plot_positions(ax=ax, **kwargs).element for track in self] return PlotReference(ax, elements)
__all__ = ["DropletTrack", "DropletTrackList"]