"""
Classes representing the time evolution of droplets
.. autosummary::
:nosignatures:
DropletTrack
DropletTrackList
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
from __future__ import annotations
import functools
import json
import logging
from typing import Callable, Literal
import numpy as np
from numpy.lib import recfunctions as rfn
from scipy import ndimage
from scipy.spatial import distance
from pde.grids.base import GridBase
from pde.storage.base import StorageBase
from pde.tools.output import display_progress
from pde.tools.plotting import PlotReference, plot_on_axes
from pde.trackers.base import InfoDict
from .droplets import SphericalDroplet, droplet_from_data
from .emulsions import Emulsion, EmulsionTimeCourse
def contiguous_true_regions(condition: np.ndarray) -> np.ndarray:
"""Finds contiguous True regions in the boolean array "condition"
Inspired by http://stackoverflow.com/a/4495197/932593
Args:
condition (:class:`~numpy.ndarray`):
A one-dimensional boolean array
Returns:
:class:`~numpy.ndarray`: A two-dimensional array where the first column
is the start index of the region and the second column is the end index
"""
if len(condition) == 0:
return np.empty((0, 2), dtype=np.intc)
# convert condition array to integer
condition = np.asarray(condition, np.intc)
# Find the indices of changes in "condition"
d = np.diff(condition)
idx = np.flatnonzero(d)
# We need to start things after the change in "condition". Therefore,
# we'll shift the index by 1 to the right.
idx += 1
if condition[0]:
# If the start of condition is True prepend a 0
idx = np.r_[0, idx]
if condition[-1]:
# If the end of condition is True, append the length of the array
idx = np.r_[idx, condition.size]
# Reshape the result into two columns
return idx.reshape(-1, 2).astype(np.intc)
[docs]
class DropletTrack:
"""information about a single droplet over multiple time steps"""
def __init__(self, droplets=None, times=None):
"""
Args:
emulsions (list):
List of emulsions that describe this time course
times (list):
Times associated with the emulsions
"""
if isinstance(droplets, DropletTrack):
# get data from given object
times = droplets.times
droplets = droplets.droplets
self.droplets = []
self.times = []
# add all emulsions
if droplets:
for d in droplets:
self.append(d)
# add all times
if times is not None:
self.times = list(times)
if len(self.times) != len(self.droplets):
raise ValueError(
"The lists of droplets and times need to have the same length"
)
def __repr__(self):
"""human-readable representation of a droplet track"""
class_name = self.__class__.__name__
if len(self.times) == 0:
return f"{class_name}([])"
elif len(self.times) == 1:
return f"{class_name}(time={self.start})"
else:
return f"{class_name}(timespan={self.start}..{self.end})"
def __len__(self):
"""number of time points"""
return len(self.times)
def __getitem__(self, key: int | slice):
"""return the droplets identified by the given index/slice"""
result = self.droplets.__getitem__(key)
if isinstance(key, slice):
return self.__class__(droplets=result, times=self.times[key])
else:
return result
def __eq__(self, other):
"""determine whether two DropletTracks instance are equal"""
return self.times == other.times and self.droplets == other.droplets
@property
def start(self) -> float:
"""float: first time point"""
return self.times[0] # type: ignore
@property
def end(self) -> float:
"""float: last time point"""
return self.times[-1] # type: ignore
@property
def duration(self) -> float:
"""float: total duration of the track"""
if len(self.times) > 0:
return self.end - self.start
else:
return 0
@property
def first(self) -> SphericalDroplet:
"""SphericalDroplet: first droplet instance"""
return self.droplets[0] # type: ignore
@property
def last(self) -> SphericalDroplet:
"""SphericalDroplet: last droplet instance"""
return self.droplets[-1] # type: ignore
@property
def dim(self) -> int | None:
"""return the space dimension of the droplets"""
try:
return self.last.dim
except IndexError:
return None
@property
def data(self) -> np.ndarray | None:
""":class:`~numpy.ndarray`: an array containing the data of the full track"""
if len(self) == 0:
return None
else:
d0 = self.first
dtype = [("time", "f8")] + d0.data.dtype.descr
result = np.empty(len(self), dtype=dtype)
for i in range(len(self)):
result[i] = (self.times[i],) + self.droplets[i].data.tolist()
return result
def __iter__(self):
"""iterate over all droplets"""
return iter(self.droplets)
[docs]
def items(self):
"""iterate over all times and droplets, returning them in pairs"""
return zip(self.times, self.droplets)
[docs]
def append(self, droplet: SphericalDroplet, time: float | None = None) -> None:
"""append a new droplet with a time code
Args:
droplet (:class:`droplets.droplets.SphericalDroplet`):
The droplet to append
time (float, optional):
The associated time point
"""
if self.dim is not None and droplet.dim != self.dim:
raise ValueError(
"Space dimension of added droplet must match the DropletTrack "
f" ({droplet.dim} != {self.dim})"
)
# add the emulsion
self.droplets.append(droplet.copy())
if time is None:
time = 0 if len(self.times) == 0 else self.times[-1] + 1
self.times.append(time)
[docs]
def get_position(self, time: float) -> np.ndarray:
""":class:`~numpy.ndarray`: returns the droplet position at a specific time"""
try:
idx = self.times.index(time)
except AttributeError:
# assume that self.times is a numpy array
idx = np.nonzero(self.times == time)[0][0]
return self.droplets[idx].position # type: ignore
[docs]
def get_trajectory(
self, smoothing: float = 0, *, attribute: str = "position"
) -> np.ndarray:
"""return a the time-evolution of a droplet attribute (e.g., the position)
Args:
smoothing (float):
Determines the scale for some gaussian smoothing of the trajectory.
The default value of zero disables smoothing.
attribute (str):
The attribute to consider (default: "position").
Returns:
:class:`~numpy.ndarray`: An array giving the position of the droplet at each
time instance
"""
trajectory = np.array([getattr(d, attribute) for d in self.droplets])
if smoothing:
ndimage.gaussian_filter1d(
trajectory, output=trajectory, sigma=smoothing, axis=0, mode="nearest"
)
return trajectory
[docs]
def get_radii(self, smoothing: float = 0) -> np.ndarray:
""":class:`~numpy.ndarray`: returns the droplet radius for each time point
Args:
smoothing (float):
Determines the length scale for some gaussian smoothing of the
trajectory. The default value of zero disables smoothing.
"""
return self.get_trajectory(smoothing, attribute="radius")
[docs]
def get_volumes(self, smoothing: float = 0) -> np.ndarray:
""":class:`~numpy.ndarray`: returns the droplet volume for each time point
Args:
smoothing (float):
Determines the volume scale for some gaussian smoothing of the
trajectory. The default value of zero disables smoothing.
"""
return self.get_trajectory(smoothing, attribute="volume")
[docs]
def time_overlaps(self, other: DropletTrack) -> bool:
"""determine whether two DropletTrack instances overlaps in time
Args:
other (:class:`DropletTrack`):
The other droplet track
Returns:
bool: True when both tracks contain droplets at the same time step
"""
s0, s1 = self.start, self.end
o0, o1 = other.start, other.end
return s0 <= o1 and o0 <= s1
@classmethod
def _from_hdf_dataset(cls, dataset) -> DropletTrack:
"""construct a droplet track by reading data from an hdf5 dataset
Args:
dataset:
A HDF5 dataset from which the data of the droplet track is read
"""
# there are values, so the emulsion is not empty
droplet_class = dataset.attrs["droplet_class"]
obj = cls()
if droplet_class == "None":
return obj
else:
# separate time from the data set
times = dataset["time"]
droplet_data = rfn.rec_drop_fields(dataset, "time")
for time, data in zip(times, droplet_data):
droplet = droplet_from_data(droplet_class, data)
obj.append(droplet, time=time) # type: ignore
return obj
[docs]
@classmethod
def from_file(cls, path: str) -> DropletTrack:
"""create droplet track by reading from file
Args:
path (str):
The path from which the data is read. This function assumes that the
data was written as an HDF5 file using :meth:`to_file`.
"""
import h5py
with h5py.File(path, "r") as fp:
if len(fp) != 1:
raise RuntimeError(
f"Multiple droplet tracks found in file {path}. Did you mean to "
"load a DropletTrackList instead?"
)
dataset = fp[list(fp.keys())[0]] # retrieve the only dataset
obj = cls._from_hdf_dataset(dataset)
return obj
def _write_hdf_dataset(self, hdf_path, key: str = "droplet_track"):
"""write data to a given hdf5 path `hdf_path`"""
if self:
# emulsion contains at least one droplet
dataset = hdf_path.create_dataset(key, data=self.data)
# self.data ensures that there is only one droplet class
dataset.attrs["droplet_class"] = self[0].__class__.__name__
else:
# create empty dataset to indicate empty emulsion
dataset = hdf_path.create_dataset(key, shape=tuple())
dataset.attrs["droplet_class"] = "None"
return dataset
[docs]
def to_file(self, path: str, info: InfoDict | None = None) -> None:
"""store data in hdf5 file
The data can be read using the classmethod :meth:`DropletTrack.from_file`.
Args:
path (str):
The path to which the data is written as an HDF5 file.
info (dict):
Additional data stored alongside the droplet track list
"""
import h5py
with h5py.File(path, "w") as fp:
self._write_hdf_dataset(fp)
# write additional information
if info:
for k, v in info.items():
fp.attrs[k] = json.dumps(v)
[docs]
@plot_on_axes()
def plot(
self,
attribute: str = "radius",
smoothing: float = 0,
t_max: float | None = None,
ax=None,
**kwargs,
) -> PlotReference:
"""plot the time evolution of the droplet
Args:
attribute (str):
The attribute to plot. Typical values include `radius` and `volume`, but
others might be defined on the droplet class.
smoothing (float):
Determines the scale for some gaussian smoothing of the trajectory.
The default value of zero disables smoothing.
{PLOT_ARGS}
**kwargs:
All remaining parameters are forwarded to the `ax.plot` method. For
example, passing `color=None`, will use different colors for different
droplets.
Returns:
:class:`~pde.tools.plotting.PlotReference`: Information about the plot
"""
if len(self.times) == 0:
return PlotReference(ax, None)
if attribute in {"radius", "radii"}:
data = self.get_radii()
ylabel = "Radius"
elif attribute in {"volume", "volumes"}:
data = self.get_volumes()
ylabel = "Volume"
else:
data = self.get_trajectory(smoothing=smoothing, attribute=attribute)
ylabel = attribute.capitalize()
if t_max is not None and len(self.times) >= 2 and self.times[-1] < t_max:
dt = self.times[-1] - self.times[-2]
times = np.r_[self.times, self.times[-1] + dt]
data = np.r_[data, 0]
else:
times = self.times
(line,) = ax.plot(times, data, **kwargs)
ax.set_xlabel("Time")
ax.set_ylabel(ylabel)
return PlotReference(ax, line, {"attribute": attribute})
[docs]
@plot_on_axes()
def plot_positions(
self, grid: GridBase | None = None, arrow: bool = True, ax=None, **kwargs
) -> PlotReference:
"""plot the droplet track
Args:
grid (GridBase, optional):
The grid on which the droplets are defined. If given, periodic boundary
conditions can be respected in the plotting.
arrow (bool, optional):
Flag determining whether an arrow head is shown to indicate the
direction of the droplet drift.
{PLOT_ARGS}
**kwargs:
Additional keyword arguments are passed to the matplotlib plot
function to affect the appearance. For example, passing `color=None`,
will use different colors for different droplets.
Returns:
:class:`~pde.tools.plotting.PlotReference`: Information about the plot
"""
if len(self.times) == 0:
return PlotReference(ax, None, {"arrow": arrow})
if self.dim != 2:
raise NotImplementedError("Plotting is only implemented for 2d grids")
# obtain droplet positions as a function of time
xy = self.get_trajectory()
if grid is None:
# simply plot the trajectory
cx, cy = xy[:, 0], xy[:, 1]
(line,) = ax.plot(cx, cy, **kwargs)
else:
# use the grid to detect wrapping around
segments = []
for p1, p2 in zip(xy[:-1], xy[1:]):
dist_direct = np.hypot(p1[0] - p2[0], p1[1] - p2[1])
dist_real = grid.distance(p1, p2, coords="cartesian")
close = np.isclose(dist_direct, dist_real)
segments.append(close)
# plot the individual segments
line, cx = None, [] # type: ignore
for s, e in contiguous_true_regions(np.array(segments)):
if line is None:
color = kwargs.get("color", "k")
else:
color = line.get_color() # ensure colors stays the same
cx, cy = xy[s : e + 1, 0], xy[s : e + 1, 1]
(line,) = ax.plot(cx, cy, color=color)
if arrow and len(cx) >= 2:
# add arrow head to last segment
size = min(sum(ax.get_xlim()), sum(ax.get_ylim()))
ax.arrow(
cx[-2],
cy[-2],
cx[-1] - cx[-2],
cy[-1] - cy[-2],
head_width=0.02 * size,
color=line.get_color(),
)
return PlotReference(ax, None, {"arrow": arrow})
[docs]
class DropletTrackList(list):
"""a list of instances of :class:`DropletTrack`"""
def __getitem__(self, key: int | slice): # type: ignore
"""return the droplets identified by the given index/slice"""
result = super().__getitem__(key)
if isinstance(key, slice):
return self.__class__(result)
else:
return result
[docs]
@classmethod
def from_emulsion_time_course(
cls,
time_course: EmulsionTimeCourse,
*,
method: Literal["distance", "overlap"] = "overlap",
grid: GridBase | None = None,
progress: bool = False,
**kwargs,
) -> DropletTrackList:
r"""obtain droplet tracks from an emulsion time course
Args:
time_course (:class:`droplets.emulsions.EmulsionTimeCourse`):
A collection of temporally arranged emulsions
method (str):
The method used for tracking droplet identities. Possible methods are
"overlap" (adding droplets that overlap with those in previous frames)
and "distance" (matching droplets to minimize center-to-center
distances).
grid (:class:`~pde.grids.base.GridBase`):
The grid on which the droplets are defined, which is necessary if
periodic boundary conditions should be respected for measuring distances
progress (bool):
Whether to show the progress of the process.
**kwargs:
Additional parameters for the tracking algorithm. Currently, one can
only specify a maximal distance (using `max_dist`) for the "distance"
method.
Returns:
:class:`DropletTrackList`: the resulting droplet tracks
"""
# get tracks, i.e. clearly overlapping droplets
tracks = cls()
logger = logging.getLogger(cls.__name__)
# determine the tracking method
if method == "overlap":
# track droplets by their physical overlap
def match_tracks(
emulsion: Emulsion, tracks_alive: list[DropletTrack], time: float
) -> None:
"""helper function adding emulsions to the tracks"""
found_multiple_overlap = False
for droplet in emulsion:
# determine which old tracks could be extended
overlaps: list[DropletTrack] = []
for track in tracks_alive:
if track.last.overlaps(droplet, grid=grid):
overlaps.append(track)
if len(overlaps) == 1:
overlaps[0].append(droplet, time=time)
else:
if len(overlaps) > 1:
found_multiple_overlap = True
tracks.append(DropletTrack(droplets=[droplet], times=[time]))
if found_multiple_overlap:
logger.debug(f"Found multiple overlapping droplet(s) at t={time}")
elif method == "distance":
# track droplets by their physical distance
max_dist = kwargs.pop("max_dist", np.inf)
def match_tracks(
emulsion: Emulsion, tracks_alive: list[DropletTrack], time: float
) -> None:
"""helper function adding emulsions to the tracks"""
added = set()
# calculate the distance between droplets
if tracks_alive:
if grid is None:
metric: str | Callable = "euclidean"
else:
metric = functools.partial(grid.distance, coords="cartesian")
points_prev = [track.last.position for track in tracks_alive]
points_now = [droplet.position for droplet in emulsion]
dists = distance.cdist(points_prev, points_now, metric=metric)
# impose a cutoff distance
dists[dists > max_dist] = np.inf
# add all matching droplets
while True:
i, j = np.unravel_index(np.argmin(dists), dists.shape)
if np.isinf(dists[i, j]):
break # no more matches
added.add(j)
tracks_alive[i].append(emulsion[j], time=time) # type: ignore
dists[i, :] = np.inf
dists[:, j] = np.inf
# add droplets that have not been matched
for i, droplet in enumerate(emulsion): # type: ignore
if i not in added:
tracks.append(DropletTrack(droplets=[droplet], times=[time]))
else:
raise ValueError(f"Unknown tracking method {method}")
# check kwargs
if kwargs:
logger.warning(f"Unused keyword arguments: {kwargs}")
# add all emulsions successively using the given algorithm
t_last = None
for t, emulsion in display_progress(
time_course.items(), total=len(time_course), enabled=progress
):
# determine tracks from the last frame that have not yet been extended
tracks_alive = [track for track in tracks if track.end == t_last]
# match all tracks with the current emulsion
match_tracks(emulsion, tracks_alive, time=t)
t_last = t
return tracks
[docs]
@classmethod
def from_storage(
cls,
storage: StorageBase,
*,
method: Literal["distance", "overlap"] = "overlap",
refine: bool = False,
num_processes: int | Literal["auto"] = 1,
progress: bool | None = None,
) -> DropletTrackList:
r"""obtain droplet tracks from stored scalar field data
This method first determines an emulsion time course and than collects tracks by
tracking droplets.
Args:
storage (:class:`~pde.storage.base.StorageBase`):
The phase fields for many time instances
method (str):
The method used for tracking droplet identities. Possible methods are
"overlap" (adding droplets that overlap with those in previous frames)
and "distance" (matching droplets to minimize center-to-center
distances).
refine (bool):
Flag determining whether the droplet properties should be refined
using fitting. This is a potentially slow procedure.
num_processes (int or "auto"):
Number of processes used for the refinement. If set to "auto", the
number of processes is choosen automatically.
progress (bool):
Whether to show the progress of the process. If `None`, the progress is
not shown, except for the first step if `refine` is `True`.
Returns:
:class:`DropletTrackList`: the resulting droplet tracks
"""
etc = EmulsionTimeCourse.from_storage(
storage, refine=refine, num_processes=num_processes, progress=progress
)
if progress is None:
progress = False
return cls.from_emulsion_time_course(etc, method=method, progress=progress)
[docs]
@classmethod
def from_file(cls, path: str) -> DropletTrackList:
"""create droplet track list by reading file
Args:
path (str):
The path from which the data is read. This function assumes that the
data was written as an HDF5 file using :meth:`to_file`.
Returns:
:class:`DropletTrackList`: an instance describing the droplet track list
"""
import h5py
obj = cls()
with h5py.File(path, "r") as fp:
for key in sorted(fp.keys()): # iterate in the stored order
dataset = fp[key]
obj.append(DropletTrack._from_hdf_dataset(dataset))
return obj
[docs]
def to_file(self, path: str, info: InfoDict | None = None) -> None:
"""store data in hdf5 file
The data can be read using the classmethod :meth:`DropletTrackList.from_file`.
Args:
path (str):
The path to which the data is written as an HDF5 file.
info (dict):
Additional data stored alongside the droplet track list
"""
import h5py
with h5py.File(path, "w") as fp:
# write the actual emulsion data
for i, droplet_track in enumerate(self):
droplet_track._write_hdf_dataset(fp, f"track_{i:06d}")
# write additional information
if info:
for k, v in info.items():
fp.attrs[k] = json.dumps(v)
[docs]
def remove_short_tracks(self, min_duration: float = 0) -> None:
"""remove tracks that a shorter than a minimal duration
Args:
min_duration (float):
The minimal duration a droplet track must have in order to be retained.
This is measured in actual time and not in the number of time steps
stored in the track.
"""
for i in reversed(range(len(self))):
if self[i].duration <= min_duration:
self.pop(i)
[docs]
@plot_on_axes()
def plot(self, attribute: str = "radius", ax=None, **kwargs) -> PlotReference:
"""plot the time evolution of all droplets
Args:
attribute (str):
The attribute to plot. Typical values include `radius` and
`volume`, but others might be defined on the droplet class.
{PLOT_ARGS}
**kwargs:
Additional keyword arguments are passed to the matplotlib plot
function to affect the appearance. The special value `color="cycle"`
implies that the default color cycle is used for the tracks, using
different colors for different tracks.
Returns:
:class:`~pde.tools.plotting.PlotReference`: Information about the plot
"""
# choose a suitable color for the tracks
if "color" in kwargs:
if kwargs["color"] == "cycle":
kwargs.pop("color") # if color is None, use the default color cycle
else:
kwargs["color"] = "k" # use black by default
# get maximal time
if self:
t_max = max(track.times[-1] for track in self if len(track.times) > 0)
else:
t_max = None
# adjust alpha such that multiple tracks are visible well
kwargs.setdefault("alpha", min(0.8, 20 / len(self)))
elements = []
for track in self:
elements.append(
track.plot(attribute=attribute, t_max=t_max, ax=ax, **kwargs).element
)
kwargs["label"] = "" # set potential plot label only for first track
return PlotReference(ax, elements, {"attribute": attribute})
[docs]
@plot_on_axes()
def plot_positions(self, ax=None, **kwargs) -> PlotReference:
"""plot all droplet tracks
Args:
{PLOT_ARGS}
**kwargs:
Additional keyword arguments are passed to the matplotlib plot
function to affect the appearance.
Returns:
:class:`~pde.tools.plotting.PlotReference`: Information about the plot
"""
elements = [track.plot_positions(ax=ax, **kwargs).element for track in self]
return PlotReference(ax, elements)
__all__ = ["DropletTrack", "DropletTrackList"]