Source code for ugants.analysis.fill
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of UG-ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.
"""Provides functionality for filling missing data on UGrid."""
import abc
import warnings
import numpy as np
from iris.cube import Cube
from pykdtree.kdtree import KDTree
from ugants.analysis.coord_transforms import convert_to_cartesian
from ugants.utils.cube import get_connectivity_indices
[docs]
class FillABC(abc.ABC):
"""An abstract base class which provides an interface for fill operations.
The abstract methods to be provided when subclassing the :class:`FillABC` are:
* :meth:`calculate_fill_lookup` - Perform precalculation prior to filling
missing data.
* :meth:`__call__` - Perform the missing data fill on the supplied cube.
Attributes
----------
source : :class:`iris.cube.Cube`
UGrid cube with missing data to be filled, identified by masked data.
target_mask : :class:`iris.cube.Cube`
Target mask to be applied to constrain the search, by default None.
The filled cube will inherit this target mask.
indices_to_be_filled : :class:`numpy.ndarray`
A boolean array specifying the cells in the source data that need to be filled.
indices_to_fill_from : :class:`numpy.ndarray`
A boolean array specifying the cells in the source data that are
considered valid fill candidates.
"""
[docs]
def __init__(self, source: Cube, target_mask: Cube = None):
"""Identify cells to fill, cells to fill from, and calculate_fill_lookup.
Missing points are identified by either masked or nan values.
The ``target_mask`` is used to constrain the valid fill points,
so that only unmasked points are considered valid fill candidates.
In addition, the ``target_mask`` is applied to the filled cube.
The following truth table shows the behaviour of a given cell for the
various combinations of being masked in ``source`` and ``target_mask``:
==================== ======================== ===================== ====================
Masked in ``source`` Value in ``target_mask`` Cell requires filling Valid fill candidate
==================== ======================== ===================== ====================
False False False True
False True False False
True False True False
True True False False
==================== ======================== ===================== ====================
Parameters
----------
source
UGrid cube with missing data to be filled, identified by either NaN or masked data.
target_mask
Target mask to be applied to constrain the search, by default None.
Cells that are ``True`` in the target mask are considered invalid fill candidates.
""" # noqa: E501
if (target_mask is not None) and (source.mesh != target_mask.mesh):
raise ValueError("Source and target mask have different meshes")
self.source = convert_nan_to_masked(source)
self.target_mask = None
if target_mask is not None:
self._validate_target_mask(target_mask)
self.target_mask = target_mask.copy(target_mask.data.astype(bool))
self.indices_to_be_filled: np.ndarray
self.identify_cells_to_fill()
self.indices_to_fill_from: np.ndarray
self.identify_valid_fill_cells()
self.calculate_fill_lookup()
@staticmethod
def _validate_target_mask(target_mask):
# Only integer or boolean dtypes allowed in target mask
if (dtype := target_mask.data.dtype).kind not in ("i", "b"):
raise TypeError(
f"Unexpected target mask dtype: {dtype}, "
"expected integer or boolean."
)
# No masked data allowed in target mask
if np.ma.is_masked(target_mask.data):
raise ValueError("Unexpected masked data present in target mask.")
# Target mask should only contain values 0 or 1 (False or True).
# Check the actual unique values set is a subset of the expected set {0, 1}.
# This accounts for the case where the target mask is all 0, in which case
# unique_values would be {0}.
# Note this works even if the dtype is bool rather than int.
if not ((unique_values := set(target_mask.data)) <= {0, 1}):
raise ValueError(f"Unexpected values in target mask, got {unique_values}.")
[docs]
@abc.abstractmethod
def calculate_fill_lookup(self):
"""Perform precalculation for filling missing data.
Initialise any data structures used in the :meth:`__call__` method,
and return ``None``.
"""
pass
[docs]
@abc.abstractmethod
def __call__(self, cube: Cube) -> Cube:
"""Perform the missing data fill on the supplied cube.
Make use of any data structures created during the :meth:`calculate_fill_lookup`
method.
Return a new :class:`iris.cube.Cube` with the missing data filled.
"""
pass
[docs]
def identify_cells_to_fill(self):
"""Identify cells within :attr:`source` cube to be filled.
Sets the :attr:`indices_to_be_filled` attribute.
Returns
-------
None
In-place operation.
"""
missing_in_source = np.ma.getmaskarray(self.source.data)
if self.target_mask is not None:
# find points which are masked in source and not 1 in target_mask
not_target_mask = ~self.target_mask.data
search_mask = missing_in_source & not_target_mask
else:
search_mask = missing_in_source
if np.all(~search_mask):
warnings.warn("No cells in the source cube require filling.", stacklevel=1)
self.indices_to_be_filled = search_mask
return None
[docs]
def identify_valid_fill_cells(self):
"""Identify cells within :attr:`source` cube which are valid fill candidates.
Sets the :attr:`indices_to_fill_from` attribute.
Returns
-------
None
In-place operation.
"""
valid_in_source = ~np.ma.getmaskarray(self.source.data)
if self.target_mask is not None:
valid_in_target = ~self.target_mask.data
if np.all(valid_in_target):
warnings.warn(
"All target mask data is unmasked, target mask has no effect.",
stacklevel=1,
)
valid_mask = valid_in_source & valid_in_target
else:
valid_mask = valid_in_source
if np.all(~valid_mask):
raise ValueError("No valid fill candidates in source cube.")
self.indices_to_fill_from = valid_mask
return None
[docs]
def __repr__(self) -> str:
"""Provide a string representation of the class instance."""
classname = self.__class__.__name__
msg = f"{classname}(source={self.source!r}, target_mask={self.target_mask!r})"
return msg
[docs]
class KDTreeFill(FillABC):
"""Fill points with data from nearest neighbour, identified using a KDTree.
Warning
-------
The KDTree fill assumes that the data is located on a spherical geocentric
coordinate system.
Attempting to use this fill algorithm with a different coordinate system may lead
to unexpected results.
"""
[docs]
def calculate_fill_lookup(self):
"""Train and query a KDTree to identify nearest neighbours.
The location of every face of the source cube is converted to a 3D vector.
This 'point cloud' is then split into two sets: 'points to be filled' and
'points to fill from'. The 'points to fill from' are used to train a KDTree,
which is then queried using the 'points to be filled'. This provides a mapping
from every missing point in the source cube to the index of its nearest
neighbour in the valid subset.
"""
point_cloud = convert_to_cartesian(self.source)
self.points_to_be_filled = point_cloud[self.indices_to_be_filled]
self.points_to_fill_from = point_cloud[self.indices_to_fill_from]
self.kdtree = KDTree(self.points_to_fill_from)
_, self.nearest_neighbour_indices = self.kdtree.query(self.points_to_be_filled)
return None
[docs]
def __call__(self, cube: Cube) -> Cube:
"""
Fill missing points in ``cube``.
Missing data are filled with nearest neighbour data using the precalculated
KDTree to look up valid data.
Parameters
----------
cube
Cube with missing points to be filled.
Returns
-------
:class:`iris.cube.Cube`
Cube with missing points filled with data from nearest neighbour.
"""
if cube.mesh != self.source.mesh:
raise ValueError("Provided cube and source cube have different meshes")
target = convert_nan_to_masked(cube)
# NOTE: nearest_neighbour_indices references the points_to_fill_from array,
# NOT the data array of the full cube
valid_fill_data = target.data[self.indices_to_fill_from]
target.data[self.indices_to_be_filled] = valid_fill_data[
self.nearest_neighbour_indices
]
if self.target_mask is not None:
target.data.mask = self.target_mask.data
return target
[docs]
def flood_fill(cube: Cube, seed_point: int, fill_value: float):
"""Fill a contiguous region in cube, starting at seed point.
The flood fill algorithm identifies a contiguous region, starting
at the ``seed_point`` and extending outwards to neighbouring cells
with the same data value as the seed point. Neighbours are identified
using the mesh's :obj:`~iris.experimental.ugrid.mesh.Mesh.face_face_connectivity`.
Parameters
----------
cube :
Cube to flood fill.
seed_point :
Index of the cube's data array to start the flood fill.
fill_value :
Value with which to fill all cells in the contiguous region.
To mask out the region, set ``fill_value=np.ma.masked``.
Returns
-------
:class:`iris.cube.Cube`
Flood filled copy of the original cube.
Raises
------
ValueError
If the provided cube has a non-horizontal dimension.
ValueError
If the specified seed point already has the specified fill value.
Note
----
There is no way to specify an extended neighbourhood in the same way as regular
ANTS. Only immediate neighbours as identified by the cube's
:obj:`~iris.experimental.ugrid.mesh.Mesh.face_face_connectivity`
are considered, not diagonals.
Wraparound is always enabled since there is no
meaningful boundary in the unstructured grid.
Currently the flood fill can only be used on horizontal data. Cubes with
vertical levels, time coordinates or other non-mesh coordinates are not supported.
"""
cube_dimensions_number = len(cube.shape)
if cube_dimensions_number != 1:
raise ValueError(
f"The provided cube has {cube_dimensions_number} data dimensions, "
"expected only 1 horizontal dimension."
)
filled_cube = cube.copy()
# connectivity_indices is a numpy array specifying the nodes connected to each face.
# shape = (n_faces, 4) since each face is quadrilateral.
connectivity_indices = get_connectivity_indices(cube, "face_face_connectivity")
seed_value = cube.data[seed_point]
if seed_value == fill_value:
raise ValueError(
f"The value at location {seed_point} already has the "
f"fill value {fill_value}."
)
## MASKS
# Starting setup: contiguous region = boundary = seed cell
contiguous_region = np.zeros(cube.shape, dtype=bool)
contiguous_region[seed_point] = True
boundary = contiguous_region.copy()
# All cells that contain valid data
equal_to_seed_value = cube.data == seed_value
while boundary.any():
# Mask indentifying all cells that border the current boundary
adjacent_to_boundary = np.zeros(cube.shape, dtype=bool)
indices_adjacent_to_boundary = connectivity_indices[boundary].flatten()
adjacent_to_boundary[indices_adjacent_to_boundary] = True
# Next layer of cells to add to the contiguous region satisfies three criteria
# 1. Adjacent to the current boundary of the contriguous region
# 2. Data value equal to that of the seed cell
# 3. Not already part of the contiguous region
# The reason for this last criterion is so that we can use the next_layer
# from this iteration as the boundary for the next iteration
next_layer = adjacent_to_boundary & equal_to_seed_value & ~contiguous_region
contiguous_region = contiguous_region | next_layer
boundary = next_layer
filled_cube.data[contiguous_region] = fill_value
return filled_cube
[docs]
def convert_nan_to_masked(cube: Cube) -> Cube:
"""Mask NaN values in the provided ``cube``.
Parameters
----------
cube
Source cube with missing data identified by ``np.nan`` values.
Returns
-------
:class:`iris.cube.Cube`
Cube with NaN values masked.
"""
target = cube.copy()
target.data = np.ma.masked_where(np.isnan(target.data), target.data)
return target