Source code for droplets.image_analysis

"""
Functions for analyzing phase field images of emulsions.

.. autosummary::
   :nosignatures:

   locate_droplets
   refine_droplets
   refine_droplet
   get_structure_factor
   get_length_scale
   threshold_otsu

.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""

from __future__ import annotations

import functools
import logging
import math
import warnings
from collections.abc import Iterable, Sequence
from functools import reduce
from itertools import product
from typing import Any, Callable, Literal

import numpy as np
from numpy.lib.recfunctions import (
    structured_to_unstructured,
    unstructured_to_structured,
)
from scipy import ndimage, optimize

try:
    from pyfftw.interfaces.numpy_fft import fftn as np_fftn
except ImportError:
    from numpy.fft import fftn as np_fftn

from pde.fields import ScalarField
from pde.grids import CartesianGrid, CylindricalSymGrid
from pde.grids.base import GridBase
from pde.grids.spherical import SphericalSymGridBase
from pde.tools.math import SmoothData1D
from pde.tools.typing import NumberOrArray

from .droplets import (
    DiffuseDroplet,
    PerturbedDroplet2D,
    PerturbedDroplet3D,
    PerturbedDroplet3DAxisSym,
    SphericalDroplet,
)
from .emulsions import Emulsion


[docs] def threshold_otsu(data: np.ndarray, nbins: int = 256) -> float: """Find the threshold value for a bimodal histogram using the Otsu method. If you have a distribution that is bimodal, i.e., with two peaks and a valley between them, then you can use this to find the location of that valley, which splits the distribution into two. Args: data (:class:`~numpy.ndarray`): The data to be analyzed nbins (int): The number of bins in the histogram, which defines the accuracy of the determined threshold. Modified from https://stackoverflow.com/a/71345917/932593, which is based on the the SciKit Image threshold_otsu implementation: https://github.com/scikit-image/scikit-image/blob/70fa904eee9ef370c824427798302551df57afa1/skimage/filters/thresholding.py#L312 """ counts, bin_edges = np.histogram(data.flat, bins=nbins) bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2 # class probabilities for all possible thresholds weight1 = np.cumsum(counts) weight2 = np.cumsum(counts[::-1])[::-1] # class means for all possible thresholds mean1 = np.cumsum(counts * bin_centers) / weight1 mean2 = (np.cumsum((counts * bin_centers)[::-1]) / weight2[::-1])[::-1] # Clip ends to align class 1 and class 2 variables: # The last value of ``weight1``/``mean1`` should pair with zero values in # ``weight2``/``mean2``, which do not exist. variance12 = weight1[:-1] * weight2[1:] * (mean1[:-1] - mean2[1:]) ** 2 idx = np.argmax(variance12) return float(bin_centers[idx])
def _locate_droplets_in_mask_cartesian(mask: ScalarField) -> Emulsion: """locate droplets in a (potentially periodic) data set on a Cartesian grid This function locates droplets respecting periodic boundary conditions. Args: mask (:class:`~pde.fields.scalar.ScalarField`): The binary image (or mask) in which the droplets are searched Returns: :class:`~droplets.emulsions.Emulsion`: The discovered spherical droplets """ grid = mask.grid cell_volume = np.prod(grid.discretization) # locate individual clusters in the padded image labels, num_labels = ndimage.label(mask.data) grid._logger.info(f"Found {num_labels} cluster(s) in image") if num_labels == 0: example_drop = SphericalDroplet(np.zeros(grid.dim), radius=0) return Emulsion.empty(example_drop) indices: Iterable = range(1, num_labels + 1) # determine position from binary image and scale it to real space positions = ndimage.center_of_mass(mask.data, labels, index=indices) # correct for the additional padding of the array positions = np.asarray(positions) + 0.5 # determine volume from binary image and scale it to real space volumes = ndimage.sum(mask.data, labels, index=indices) volumes = np.asanyarray(volumes) * cell_volume # connect clusters linked viaperiodic boundary conditions for ax in np.flatnonzero(grid.periodic): # look at all periodic axes # compile list of all boundary points connected along the current axis low: list[list[int] | np.ndarray] = [] high: list[list[int] | np.ndarray] = [] for a in range(grid.num_axes): if a == ax: low.append([0]) high.append([-1]) else: low.append(np.arange(grid.shape[a])) high.append(np.arange(grid.shape[a])) # iterate over all boundary points for l, h in zip(product(*low), product(*high)): i_l, i_h = labels[l], labels[h] if i_l > 0 and i_h > 0 and i_l != i_h: # boundary condition on the low side connects to that of the high side # -> we combine the cluster into one, setting is new position as the # weighted averages of the center of mass v_l, v_h = volumes[i_l - 1], volumes[i_h - 1] pos_l, pos_h = positions[i_l - 1], positions[i_h - 1] pos_h[ax] -= grid.shape[ax] # wrap around the upper point pos = (pos_l * v_l + pos_h * v_h) / (v_l + v_h) # update both clusters with the new data positions[i_h - 1] = positions[i_l - 1] = pos volumes[i_h - 1] = volumes[i_l - 1] = v_l + v_h labels[labels == i_h] = i_l # determine which clusters are actually present indices = np.array(sorted(set(np.unique(labels)) - {0})) # create the list of droplets positions = grid.normalize_point(grid.transform(positions, "cell", "grid")) droplets = ( SphericalDroplet.from_volume(position, volume) for position, volume in zip(positions[indices - 1], volumes[indices - 1]) ) # filter overlapping droplets (e.g. due to duplicates) emulsion = Emulsion(droplets) num_candidates = len(emulsion) if num_candidates < num_labels: grid._logger.info(f"Only {num_candidates} candidate(s) inside bounds") emulsion.remove_overlapping(grid=grid) if len(emulsion) < num_candidates: grid._logger.info(f"Only {num_candidates} candidate(s) not overlapping") return emulsion def _locate_droplets_in_mask_spherical(mask: ScalarField) -> Emulsion: """locates droplets in a binary data set on a spherical grid Args: mask (:class:`~pde.fields.scalar.ScalarField`): The binary image (or mask) in which the droplets are searched Returns: :class:`~droplets.emulsions.Emulsion`: The discovered spherical droplets """ grid = mask.grid # locate clusters in the binary image labels, num_labels = ndimage.label(mask.data) if num_labels == 0: example_drop = SphericalDroplet(np.zeros(grid.dim), radius=0) return Emulsion.empty(example_drop) # locate clusters around origin object_slices = ndimage.find_objects(labels) droplet = None for slices in object_slices: if slices[0].start == 0: # contains point around origin radius = float(grid.transform(slices[0].stop, "cell", "grid").flat[-1]) droplet = SphericalDroplet(np.zeros(grid.dim), radius=radius) else: logger = logging.getLogger(grid.__class__.__module__) logger.warning("Found object not located at origin") # return an emulsion of droplets if droplet: return Emulsion([droplet]) else: example_drop = SphericalDroplet(np.zeros(grid.dim), radius=0) return Emulsion.empty(example_drop) class _SpanningDropletSignal(RuntimeError): """exception signaling that an untypical droplet spanning the system was found""" ... def _locate_droplets_in_mask_cylindrical_single( grid: CylindricalSymGrid, mask: np.ndarray ) -> Emulsion: """locate droplets in a data set on a single cylindrical grid Args: grid: The cylindrical grid mask (:class:`~numpy.ndarray`): The binary image (or mask) in which the droplets are searched Returns: :class:`~droplets.emulsions.Emulsion`: The discovered spherical droplets """ # locate the individual clusters labels, num_features = ndimage.label(mask) if num_features == 0: example_drop = SphericalDroplet(np.zeros(grid.dim), radius=0) return Emulsion.empty(example_drop) # locate clusters on the symmetry axis object_slices = ndimage.find_objects(labels) indices = [] for index, slices in enumerate(object_slices, 1): if slices[0].start == 0: # contains point on symmetry axis indices.append(index) if slices[1].start == 0 and slices[1].stop > grid.shape[1]: # the "droplet" extends the entire z-axis raise _SpanningDropletSignal else: logger = logging.getLogger(grid.__class__.__module__) logger.warning("Found object not located on symmetry axis") # determine position from binary image and scale it to real space pos = ndimage.center_of_mass(mask, labels, index=indices) pos = grid.transform(pos, "cell", "cartesian") # determine volume from binary image and scale it to real space vol_r, dz = grid.cell_volume_data cell_volumes = np.outer(vol_r, dz) try: vol = ndimage.sum_labels(cell_volumes, labels, index=indices) except AttributeError: # fall-back for older versions of scipy vol = ndimage.sum(cell_volumes, labels, index=indices) # return an emulsion of droplets droplets = ( SphericalDroplet.from_volume(np.array([0, 0, p[2]]), v) for p, v in zip(pos, vol) ) return Emulsion(droplets) def _locate_droplets_in_mask_cylindrical(mask: ScalarField) -> Emulsion: """locate droplets in a data set on a (periodic) cylindrical grid This function locates droplets respecting periodic boundary conditions. Args: mask (:class:`~pde.fields.scalar.ScalarField`): The binary image (or mask) in which the droplets are searched Returns: :class:`~droplets.emulsions.Emulsion`: The discovered spherical droplets """ assert isinstance(mask.grid, CylindricalSymGrid) grid = mask.grid if grid.periodic[1]: # locate droplets respecting periodic boundary conditions in z-direction # pad the array to simulate periodic boundary conditions dim_r, dim_z = grid.shape z_min, z_max = grid.axes_bounds[1] mask_padded = np.pad(mask.data, [[0, 0], [dim_z, dim_z]], mode="wrap") assert mask_padded.shape == (dim_r, 3 * dim_z) # locate droplets in the extended image try: candidates = _locate_droplets_in_mask_cylindrical_single(grid, mask_padded) except _SpanningDropletSignal: pass else: grid._logger.info(f"Found {len(candidates)} droplet candidates.") # keep droplets that are inside the central area droplets = Emulsion() for droplet in candidates: # correct for the additional padding of the array droplet.position[2] -= grid.length # check whether the droplet lies in the original box if z_min <= droplet.position[2] <= z_max: droplets.append(droplet) grid._logger.info(f"Kept {len(droplets)} central droplets.") # filter overlapping droplets (e.g. due to duplicates) droplets.remove_overlapping() return droplets # simply locate droplets in the mask droplets = _locate_droplets_in_mask_cylindrical_single(mask.grid, mask.data) return droplets def locate_droplets_in_mask(mask: ScalarField) -> Emulsion: """locates droplets in a binary image This function locates droplets respecting periodic boundary conditions. Args: mask (:class:`~pde.fields.scalar.ScalarField`): The binary image (or mask) in which the droplets are searched Returns: :class:`~droplets.emulsions.Emulsion`: The discovered spherical droplets """ if isinstance(mask.grid, CartesianGrid): return _locate_droplets_in_mask_cartesian(mask) elif isinstance(mask.grid, SphericalSymGridBase): return _locate_droplets_in_mask_spherical(mask) elif isinstance(mask.grid, CylindricalSymGrid): return _locate_droplets_in_mask_cylindrical(mask) elif isinstance(mask.grid, GridBase): raise NotImplementedError(f"Locating droplets is not possible for {mask.grid}") else: raise ValueError(f"Invalid grid {mask.grid}")
[docs] def locate_droplets( phase_field: ScalarField, threshold: float | Literal["auto", "extrema", "mean", "otsu"] = 0.5, *, minimal_radius: float = 0, modes: int = 0, interface_width: float | None = None, refine: bool = False, refine_args: dict[str, Any] | None = None, num_processes: int | Literal["auto"] = 1, ) -> Emulsion: """Locates droplets in the phase field This uses a binarized image to locate clusters of large concentration in the phase field, which are interpreted as droplets. Basic quantities, like position and size, are determined for these clusters. Example: To determine the position, radius, and interfacial width of an arbitrary droplet, the following call can be used .. code-block:: python emulsion = droplets.locate_droplets( field, threshold="auto", refine=True, refine_args={'vmin': None, 'vmax': None}, ) :code:`field` is the scalar field, in which the droplets are located. The `refine_args` set flexibel boundaries for the intensities inside and outside the droplet. Args: phase_field (:class:`~pde.fields.ScalarField`): Scalar field that describes the concentration field of droplets threshold (float or str): The threshold for binarizing the image. If a value is given it is used directly. Otherwise, the following algorithms are supported: * `extrema`: take mean between the minimum and the maximum of the data * `mean`: take the mean over the entire data * `otsu`: use Otsu's method implemented in :func:`threshold_otsu` The special value `auto` currently defaults to the `extrema` method. minimal_radius (float): The smallest radius of droplets to include in the list. This can be used to filter out fluctuations in noisy simulations. modes (int): The number of perturbation modes that should be included. If `modes=0`, droplets are assumed to be spherical. Note that the mode amplitudes are only determined when `refine=True`. interface_width (float, optional): Interface width of the located droplets, which is also used as a starting value for the fitting if droplets are refined. refine (bool): Flag determining whether the droplet properties should be refined using fitting. This is a potentially slow procedure. refine_args (dict): Additional keyword arguments passed on to :func:`refine_droplet`. Only has an effect if `refine=True`. num_processes (int): Number of processes used for the refinement. If set to "auto", the number of processes is choosen automatically. Returns: :class:`~droplets.emulsions.Emulsion`: All detected droplets """ if not isinstance(phase_field, ScalarField): raise TypeError("`phase_field` must be ScalarField") dim = phase_field.grid.dim # dimensionality of the space if modes > 0 and dim not in [2, 3]: raise ValueError("Perturbed droplets only supported for 2d and 3d") if refine_args is None: refine_args = {} # determine actual threshold if threshold == "extrema" or threshold == "auto": threshold = float(phase_field.data.min() + phase_field.data.max()) / 2 elif threshold == "mean": threshold = float(phase_field.data.mean()) elif threshold == "otsu": threshold = threshold_otsu(phase_field.data) else: threshold = float(threshold) # locate droplets in thresholded image img_binary = ScalarField(phase_field.grid, phase_field.data > threshold, dtype=bool) candidates = locate_droplets_in_mask(img_binary) if minimal_radius > -np.inf: candidates.remove_small(minimal_radius) droplets = [] for droplet in candidates: # check whether we need to add the interface width droplet_class = droplet.__class__ args: dict[str, NumberOrArray] = {} # change droplet class when interface width is given if interface_width is not None: droplet_class = DiffuseDroplet args["interface_width"] = interface_width # change droplet class when perturbed droplets are requested if modes > 0: if dim == 2: droplet_class = PerturbedDroplet2D elif dim == 3: if isinstance(phase_field.grid, CylindricalSymGrid): droplet_class = PerturbedDroplet3DAxisSym else: droplet_class = PerturbedDroplet3D else: raise NotImplementedError(f"Dimension {dim} is not supported") args["amplitudes"] = np.zeros(modes) # recreate a droplet of the correct class if droplet_class != droplet.__class__: droplet = droplet_class.from_droplet(droplet, **args) droplets.append(droplet) # refine droplets if necessary if refine: droplets = refine_droplets( phase_field, droplets, num_processes=num_processes, **refine_args ) # return droplets as an emulsion emulsion = Emulsion(droplets) if minimal_radius > -np.inf: emulsion.remove_small(minimal_radius) return emulsion
def refine_droplets( phase_field: ScalarField, candidates: Iterable[DiffuseDroplet], *, num_processes: int | Literal["auto"] = 1, **kwargs, ) -> list[DiffuseDroplet]: r"""Refines many droplets by fitting to phase field Args: phase_field (:class:`~pde.fields.ScalarField`): Phase_field that is being used to refine the droplet droplets (sequence of :class:`~droplets.droplets.SphericalDroplet`): Droplets that should be refined. num_processes (int or "auto"): Number of processes used for the refinement. If set to "auto", the number of processes is choosen automatically. \**kwargs (dict): Additional keyword arguments are passed on to :func:`refine_droplet`. Returns: list of :class:`~droplets.droplets.DiffuseDroplet`: The refined droplets """ if num_processes == 1: # refine droplets serially in this process droplets = [ drop for candidate in candidates if (drop := refine_droplet(phase_field, candidate, **kwargs)) is not None ] else: # use multiprocessing to refine droplets from concurrent.futures import ProcessPoolExecutor _refine_one: Callable[[DiffuseDroplet], DiffuseDroplet] = functools.partial( refine_droplet, phase_field, **kwargs ) max_workers = None if num_processes == "auto" else num_processes with ProcessPoolExecutor(max_workers=max_workers) as executor: droplets = [ candidate_refined for candidate_refined in executor.map(_refine_one, candidates) if candidate_refined is not None ] return droplets
[docs] def refine_droplet( phase_field: ScalarField, droplet: DiffuseDroplet, *, vmin: float = 0.0, vmax: float = 1.0, adjust_values: bool = False, tolerance: float | None = None, least_squares_params: dict[str, Any] | None = None, ) -> DiffuseDroplet: """Refines droplet parameters by fitting to phase field This function varies droplet parameters, like position, size, interface width, and potential perturbation amplitudes until the overlap with the respective phase field region is maximized. Here, we use a constraint fitting routine. Args: phase_field (:class:`~pde.fields.ScalarField`): Phase_field that is being used to refine the droplet droplet (:class:`~droplets.droplets.SphericalDroplet`): Droplet that should be refined. This could also be a subclass of :class:`SphericalDroplet` vmin (float): The intensity value of the dilute phase surrounding the droplet. If `None`, the value will be determined automatically. vmax (float): The intensity value inside the droplet. If `None`, the value will be determined automatically. adjust_value (bool): Flag determining whether the intensity values will be included in the fitting procedure. The default value `False` implies that the intensity values are regarded fixed. tolerance (float, optional): Sets the three tolerance values `ftol`, `xtol`, and `gtol` of the :func:`scipy.optimize.least_squares`, unless they are specified in detail by the `least_squares_params` argument. least_squares_params (dict): Dictionary of parameters that influence the fitting; see the documentation of :func:`scipy.optimize.least_squares`. Returns: :class:`~droplets.droplets.DiffuseDroplet`: The refined droplet as an instance of the argument `droplet` """ if not isinstance(phase_field, ScalarField): raise TypeError("`phase_field` must be ScalarField") if least_squares_params is None: least_squares_params = {} if tolerance is not None: for key in ["ftol", "xtol", "gtol"]: least_squares_params.setdefault(key, tolerance) if not isinstance(droplet, DiffuseDroplet): droplet = DiffuseDroplet.from_droplet(droplet) if droplet.interface_width is None: droplet.interface_width = phase_field.grid.typical_discretization # enlarge the mask to also contain the shape change mask = droplet._get_phase_field(phase_field.grid, dtype=bool) dilation_iterations = 1 + int(2 * droplet.interface_width) mask = ndimage.binary_dilation(mask, iterations=dilation_iterations) # apply the mask data_mask = phase_field.data[mask] # determine the coordinate constraints and only vary the free data points data_flat = structured_to_unstructured(droplet.data) # unstructured data dtype = droplet.data.dtype free: np.ndarray = np.ones(len(data_flat), dtype=bool) free[phase_field.grid.coordinate_constraints] = False # determine data bounds l, h = droplet.data_bounds bounds = l[free], h[free] # determine the intensities outside and inside the droplet if vmin is None: vmin = np.min(data_mask) if vmax is None: vmax = np.max(data_mask) vrng = vmax - vmin if adjust_values: # fit intensities in addition to all droplet parameters # add vmin and vrng as separate fitting parameters parameters = np.r_[data_flat[free], vmin, vmax] bounds = np.r_[bounds[0], vmin - vrng, 0], np.r_[bounds[1], vmax, 3 * vrng] def _image_deviation(params): """helper function evaluating the residuals""" # generate the droplet data_flat[free] = params[:-2] vmin, vrng = params[-2:] droplet.data = unstructured_to_structured(data_flat, dtype=dtype) droplet.check_data() img = vmin + vrng * droplet._get_phase_field(phase_field.grid)[mask] return img - data_mask # do the least square optimization result = optimize.least_squares( _image_deviation, parameters, bounds=bounds, **least_squares_params ) data_flat[free] = result.x[:-2] else: # fit only droplet parameters and assume all intensities fixed def _image_deviation(params): """helper function evaluating the residuals""" # generate the droplet data_flat[free] = params droplet.data = unstructured_to_structured(data_flat, dtype=dtype) droplet.check_data() img = vmin + vrng * droplet._get_phase_field(phase_field.grid)[mask] return img - data_mask # do the least square optimization result = optimize.least_squares( _image_deviation, data_flat[free], bounds=bounds, **least_squares_params ) data_flat[free] = result.x droplet.data = unstructured_to_structured(data_flat, dtype=dtype) # normalize the droplet position grid = phase_field.grid coords = grid.transform(droplet.position, "cartesian", "grid") droplet.position = grid.transform(grid.normalize_point(coords), "grid", "cartesian") return droplet
[docs] def get_structure_factor( scalar_field: ScalarField, smoothing: None | float | Literal["auto", "none"] = "auto", wave_numbers: Sequence[float] | Literal["auto"] = "auto", add_zero: bool = False, ) -> tuple[np.ndarray, np.ndarray]: r"""Calculates the structure factor associated with a field Here, the structure factor is basically the power spectral density of the field `scalar_field` normalized so that re-gridding or rescaling the field does not change the result. Args: scalar_field (:class:`~pde.fields.ScalarField`): The scalar_field being analyzed smoothing (float, optional): Length scale that determines the smoothing of the radially averaged structure factor. If omitted, the full data about the discretized structure factor is returned. The special value `auto` calculates a value automatically. wave_numbers (list of floats, optional): The magnitude of the wave vectors at which the structure factor is evaluated. This only applies when smoothing is used. If `auto`, the wave numbers are determined automatically. add_zero (bool): Determines whether the value at k=0 (defined to be 1) should also be returned. Returns: (numpy.ndarray, numpy.ndarray): Two arrays giving the wave numbers and the associated structure factor. Wave numbers :math:`k` are related to distances by :math:`2\pi/k`. """ logger = logging.getLogger(__name__) if not isinstance(scalar_field, ScalarField): raise TypeError( "Length scales can only be calculated for scalar " f"fields, not {scalar_field.__class__.__name__}" ) grid = scalar_field.grid if not isinstance(grid, CartesianGrid): raise NotImplementedError( "Structure factor can currently only be calculated for Cartesian grids" ) if not all(grid.periodic): logger.warning( "Structure factor calculation assumes periodic boundary " "conditions, but not all grid dimensions are periodic" ) # do the n-dimensional Fourier transform and calculate the structure factor f1 = np_fftn(scalar_field.data, norm="ortho").flat[1:] flat_data = scalar_field.data.flat sf = np.abs(f1) ** 2 / np.dot(flat_data, flat_data) # an alternative calculation of the structure factor is # f2 = np_ifftn(scalar_field.data, norm='ortho').flat[1:] # sf = (f1 * f2).real # sf /= (scalar_field.data**2).sum() # but since this involves two FFT, it is probably slower # determine the (squared) components of the wave vectors. # Note that `fftfreq` defines the wave number in cycles per unit of the sample # spacing, so we need to scale lengths by one over 2π. k2s = [ np.fft.fftfreq(grid.shape[i], d=grid.discretization[i] / (2 * np.pi)) ** 2 for i in range(grid.dim) ] # calculate the magnitude k_mag = np.sqrt(reduce(np.add.outer, k2s)).flat[1:] no_wavenumbers = wave_numbers is None or ( isinstance(wave_numbers, str) and wave_numbers == "auto" ) if smoothing is not None and smoothing != "none": # construct the smoothed function of the structure factor if smoothing == "auto": smoothing = k_mag.max() / 128 smoothing = float(smoothing) # type: ignore sf_smooth = SmoothData1D(k_mag, sf, sigma=smoothing) if no_wavenumbers: # determine the wave numbers at which to evaluate it k_min = 2 / grid.cuboid.size.max() k_max = k_mag.max() k_mag = np.linspace(k_min, k_max, 128) else: k_mag = np.array(wave_numbers) # obtain the smoothed values at these points sf = sf_smooth(k_mag) elif not no_wavenumbers: logger.warning( "Argument `wave_numbers` is only used when `smoothing` is enabled." ) if add_zero: sf = np.r_[1, sf] k_mag = np.r_[0, k_mag] return k_mag, sf
[docs] def get_length_scale( scalar_field: ScalarField, method: Literal[ "structure_factor_mean", "structure_factor_maximum", "droplet_detection" ] = "structure_factor_maximum", **kwargs, ) -> float | tuple[float, Any]: """Calculates a length scale associated with a phase field Args: scalar_field (:class:`~pde.fields.ScalarField`): The scalar field to analyze method (str): A string determining which method is used to calculate the length scale. Valid options are `structure_factor_maximum` (numerically determine the maximum in the structure factor), `structure_factor_mean` (calculate the mean of the structure factor), and `droplet_detection` (determine the number of droplets and estimate average separation). Additional supported keyword arguments depend on the chosen method. For instance, the methods involving the structure factor allow for a boolean flag `full_output`, which also returns the actual structure factor. The method `structure_factor_maximum` also allows for some smoothing of the radially averaged structure factor. If the parameter `smoothing` is set to `None` the amount of smoothing is determined automatically from the typical discretization of the underlying grid. For the method `droplet_detection`, additional arguments are forwarded to :func:`locate_droplets`. Returns: float: The determine length scale See Also: :class:`~droplets.trackers.LengthScaleTracker`: Tracker measuring length scales """ logger = logging.getLogger(__name__) if method == "structure_factor_mean" or method == "structure_factor_average": # calculate the structure factor k_mag, sf = get_structure_factor(scalar_field) length_scale = 2 * np.pi * np.sum(sf) / np.sum(k_mag * sf) if kwargs.pop("full_output", False): return length_scale, sf elif method == "structure_factor_maximum" or method == "structure_factor_peak": # calculate the structure factor k_mag, sf = get_structure_factor(scalar_field, smoothing=None, add_zero=True) # smooth the structure factor if kwargs.pop("smoothing", None) is None: smoothing = 0.01 * scalar_field.grid.typical_discretization sf_smooth = SmoothData1D(k_mag, sf, sigma=smoothing) # find the maximum with warnings.catch_warnings(): warnings.simplefilter("ignore") # determine maximum (excluding k=0) max_est = k_mag[1 + np.argmax(sf[1:])] bracket = np.array([0.2, 1, 5]) * max_est logger.debug(f"Search maximum of structure factor in interval {bracket}") try: result = optimize.minimize_scalar( lambda x: -sf_smooth(x), bracket=bracket ) except Exception: logger.exception("Could not determine maximal structure factor") length_scale = math.nan else: if not result.success: logger.warning( "Maximization of structure factor resulted in the following " f"message: {result.message}" ) length_scale = 2 * np.pi / result.x if kwargs.pop("full_output", False): return length_scale, sf_smooth elif method == "droplet_detection": # calculate the length scale from detected droplets droplets = locate_droplets(scalar_field, **kwargs) kwargs = {} # clear kwargs, so no warning is raised # get the axes along which droplets can be placed grid = scalar_field.grid axes = set(range(grid.dim)) - set(grid.coordinate_constraints) volume = 1.0 for ax in axes: volume *= grid.axes_bounds[ax][1] - grid.axes_bounds[ax][0] volume_per_droplet = volume / len(droplets) length_scale = volume_per_droplet ** (1 / len(axes)) else: raise ValueError( f"Method {method} is not defined. Valid values are `structure_factor_mean` " "and `structure_factor_maximum`" ) if kwargs: # raise warning if keyword arguments remain logger.warning("Unused keyword arguments: %s", ", ".join(kwargs)) # return only the length scale with out any additional information return length_scale # type: ignore
__all__ = [ "threshold_otsu", "locate_droplets", "refine_droplet", "get_structure_factor", "get_length_scale", ]