Source code for ants.analysis._merge

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.
"""
Various tools are available at our disposal for performing analysis on data.
The metadata is also updated to reflect the analysis made.

"""
import abc
import logging
import warnings

import ants
import ants.regrid
import ants.stats
import ants.utils
import iris
import numpy as np
import numpy.lib.stride_tricks as stride
import shapely
from pykdtree.kdtree import KDTree
from shapely.geometry import Polygon
from shapely.vectorized import contains

try:
    from um_spiral_search.um_spiral_search import spiral_search as spiral
except Exception as _SPIRAL_IMPORT_ERROR:
    spiral = None
    _SPIRAL_IMPORT_ERROR_MESSAGE = (
        f'{str(_SPIRAL_IMPORT_ERROR)}\nUnable to import "spiral", proceeding without '
        "the capabilities it provides.  See install.rst"
    )
    warnings.warn(_SPIRAL_IMPORT_ERROR_MESSAGE)


_LOGGER = logging.getLogger(__name__)


def _merge_compatible(cube1, cube2):
    if cube1.attributes.get("STASH", None) != cube2.attributes.get("STASH", None):
        msg = "STASH attributes are not equal, {} != {}"
        raise ValueError(
            msg.format(
                cube1.attributes.get("STASH", None), cube2.attributes.get("STASH", None)
            )
        )
    if cube1.units != cube2.units:
        msg = 'Cube "units" do not match, {} != {}'
        raise ValueError(msg.format(cube1.units, cube2.units))

    std_name_1 = getattr(cube1, "standard_name", None)
    std_name_2 = getattr(cube2, "standard_name", None)
    if std_name_1 != std_name_2:
        msg = "Cube standard_name do not match, {} != {}"
        raise ValueError(msg.format(std_name_1, std_name_2))


[docs] def horizontal_grid_reorder(cube): """ Return a transposed view of the cube's data, so that the outermost dimension corresponds to 'y' while the next corresponds to 'x'. Parameters ---------- cube : :class:`~iris.cube.Cube` Cube to transpose. Returns ------- tuple(:class:`~numpy.ndarray`, :class:`~numpy.ndarray`) A tuple of arrays, both transposed as [y, x, ...]: - The first array corresponds to the cube's data. - The second array corresponds to the cube's mask. .. note:: Both array view and mask views are returned. This minimises memory use by ensuring the mask is a view. We can not take the mask from the masked array view as this mask is not a view. .. warning:: Care should be taken when reordering when there are more than three dimensions as the order of the dimension which are not mapped to the horizontal grid might not agree but be of the same length. """ if cube.ndim > 3: raise RuntimeError( "Grid re-ordering has yet to support cubes of " "dimensionality greater then 3" ) sx, sy = ants.utils.cube.horizontal_grid(cube) dsx, dsy = cube.coord_dims(sx)[0], cube.coord_dims(sy)[0] transpose = [dsy, dsx] transpose += [i for i in range(cube.ndim) if i not in transpose] if not np.ma.isMaskedArray(cube.data): cube.data = np.ma.array(cube.data) cube.data.mask = np.ma.getmaskarray(cube.data) data = cube.data.data.transpose(transpose) mask = cube.data.mask.transpose(transpose) return data, mask
def _add_halo(source, fill_value=None, halo_width=[1, 1]): """Return numpy array by adding halo points to source data.""" if fill_value is None: fill_value = getattr( source.data, "fill_value", np.ma.array(0, source.data.dtype).fill_value ) data = np.empty( np.array(source.shape) + np.array(halo_width) * 2, dtype=source.dtype ) data.fill(fill_value) # Fill interior with original array. data[halo_width[0] : -halo_width[0], halo_width[1] : -halo_width[1]] = source.data if np.ma.count_masked(source.data) != 0: data[halo_width[0] : -halo_width[0], halo_width[1] : -halo_width[1]][ source.data.mask ] = fill_value # Handle wraparound if supported x_coord, y_coord = ants.utils.cube.horizontal_grid(source) circular = getattr(x_coord, "circular", False) if circular: xdim, ydim = source.coord_dims(x_coord), source.coord_dims(y_coord) if len(xdim) != 1 or len(ydim) != 1: warnings.warn( "Currently wraparound only supported for regular " "grids. Continuing without considering " "wraparound." ) else: xdim, ydim = xdim[0], ydim[0] # Set the longitude very left and very right-hand-side edges if xdim == 1: bounds = source.data[:, -halo_width[1] :] unmasked = ~np.ma.getmaskarray(bounds) data[halo_width[0] : -halo_width[0], 0 : halo_width[1]][unmasked] = ( bounds[unmasked] ) bounds = source.data[:, 0 : halo_width[1]] unmasked = ~np.ma.getmaskarray(bounds) data[halo_width[0] : -halo_width[0], -halo_width[1] :][unmasked] = ( bounds[unmasked] ) else: bounds = source.data[-halo_width[0] :, :] unmasked = ~np.ma.getmaskarray(bounds) data[0 : halo_width[0], halo_width[1] : -halo_width[1]][unmasked] = ( bounds[unmasked] ) bounds = source.data[0 : halo_width[0], :] unmasked = ~np.ma.getmaskarray(bounds) data[-halo_width[0] :, halo_width[1] : -halo_width[1]][unmasked] = ( bounds[unmasked] ) return data def _neighbourhoods(data, window_shape=(3, 3), window_step=(1, 1)): """ Generate neighbourhood views of the data. This function is a convenience wrapper around :func:`numpy.lib.stride_tricks` which returns an ndarray representing the neighbourhood views of the data. By default, the default window size (ergo neighbourhood) is 3x3. That is, the neighbourhood is the 8 points around any one point (though in this case the neighbourhood includes the point itself). The first and last rows and columns of the input data do not have neighbourhoods as they are the 'edges'. Parameters ---------- data : 2darray Grid on which to stride. window_shape : tuple Shape of each points neighbourhood. window_step : tuple Window element step, corresponding to window center. Returns ------- ndarray Neighbourhood views of the grid. The simplest case is of data with shape (3, 3). Only the central point has a complete neighbourhood, made up of all the points. This means only one point (the central point) has a neighbourhood - and this neighbourhood is the entire input data. >>> import numpy as np >>> a = np.arange(3 * 3).reshape((3, 3)) >>> b = _neighbourhoods(a) >>> b.shape (1, 1, 3, 3) >>> b[0, 0, :, :] array([[0, 1, 2], [3, 4, 5], [6, 7, 8]]) When there are more points the index [i, j, :, :] of the returned neighbourhoods is the neighbourhood of the i+1, j+1 location in the input data, and has values data[i:i+2, j:j+2]. >>> c = np.arange(4 * 5).reshape((4, 5)) >>> d = _neighbourhoods(c) >>> d.shape (2, 3, 3, 3) >>> np.all(d[0, 0, :, :] == c[0:3, 0:3]) True >>> np.all(d[1, 2, :, :] == c[1:4, 2:5]) True """ if data.ndim != len(window_shape): msg = ( "Window shape specified {} does not match the dimensionality " "of the specified data {}".format(window_shape, data.ndim) ) raise ValueError(msg) if len(window_step) != len(window_shape): msg = ( "Window shape length ({}) and the window step size length ({}) " "does not match".format(len(window_shape), len(window_step)) ) dstrides = np.asarray(data.strides) strides = np.hstack([dstrides * window_step, dstrides]) window = np.hstack( [ (np.asarray(data.shape) - (np.array(window_shape) - 1)) // window_step, window_shape, ] ) views = stride.as_strided(data, window, strides) return views
[docs] class MooreNeighbourhood(object): """Operations on the Moore Neighbourhood of points in a cube."""
[docs] def __init__(self, acube, window_mask=None): """ Each grid box in a cube has a Moore Neighbourhood: the 8 grid boxes around it on the grid. An instance of this class can be used to calculate some of the properties of the neighbourhood of the grid boxes of a single cube. Parameters ---------- acube : :class:`~iris.cube.Cube` Calculations of the Moore neighbourhood will be done on this cube. window_mask : bool :class:`~numpy.ndarray` object, optional Mask array which defines the Moore neighbourhood of each in the window of each grid point. The provided mask must have an odd length for each dimension. If not provided, this is assumed to be a 3x3 with all surrounding points considered. Raises ------ ValueError If the provided mask does not have an odd length for each dimension. ValueError If the cube is not 2 dimensional. This is could be relaxed with a more advanced implementation. ValueError If the axes of the cube are not contiguous, and so do not look like they are neighbours. References ---------- https://en.wikipedia.org/wiki/Moore_neighborhood """ self._exception_on_bad_cube_arg(acube) self._fill_value = getattr( acube.data, "fill_value", np.ma.array(0, acube.data.dtype).fill_value ) if window_mask is not None: self._mask = window_mask.astype(dtype="bool", copy=False) else: self._mask = np.array( [[True, True, True], [True, False, True], [True, True, True]] ) odd_shape = np.array(self._mask.shape) % 2 if not odd_shape.all(): msg = ( "Expecting the provided mask to have an odd length for " "each dimension." ) raise ValueError(msg) halo_width = (np.array(self._mask.shape) - 1) // 2 self._data = _add_halo( acube, fill_value=self._fill_value, halo_width=halo_width ) self._neighbours = self._mask.sum() self._views = _neighbourhoods(self._data, self._mask.shape)
def _exception_on_bad_cube_arg(self, acube): if acube.ndim != 2: raise ValueError("current implementation supports only 2d cubes") sx, sy = ants.utils.cube.horizontal_grid(acube) if not (sx.is_contiguous() and sy.is_contiguous()): raise ValueError("axes must be contiguous")
[docs] def all_equal_value(self, value): """ Returns True for grid boxes where all neighbouring grid boxes have the specified value. Parameters ---------- value : ~numbers.Number Choosing a type that can be compared to the cube data type, the value is compared to neighbourhood values. Returns ------- : 2d :obj:`bool` type :class:`~numpy.ndarray` True where the neighbours are all equal to value. Notes ----- The comparison is exact; a tolerance isn't currently supported. """ missing = self._count_missing() return ((self._views[..., self._mask] == value).sum(2)) == ( self._neighbours - missing )
def _count_missing(self): # Boundary points are included in this missing = 0 if self._fill_value is not None: missing = (self._views[..., self._mask] == self._fill_value).sum(2) return missing
[docs] def all_missing(self): """ Returns True for grid boxes where the neighbourhood contains only the missing value. """ return self._count_missing() == self._neighbours
[docs] def any_non_missing(self): """ Returns True for grid boxes where the Moore neighbourhood contains any valid data. """ return self._count_missing() < self._neighbours
[docs] def any_equal_value(self, value): """ Return True for grid boxes where any of the Moore neighbours have the specified value. Parameters ---------- value : :class:`~numbers.Number` or :class:`~numpy.ndarray` The value(s) to compare neighbourhood to. If an :class:`~numpy.ndarray`, this should match the mask shape. Returns ------- : 2d :obj:`bool` type :class:`~numpy.ndarray` True where the neighbours are all equal to value. Notes ----- The comparison is exact; a tolerance isn't currently supported. """ if hasattr(value, "shape"): value = value.astype(self._views.dtype, copy=False) res = self._views[..., self._mask] == value[self._mask] res = res.sum(2) > 0 else: res = (self._views[..., self._mask] == value).sum(2) > 0 return res
[docs] def neighbourhood_mean(self): """ Return the mean of the values in the Moore neighbourhood. If the neighbourhood contains missing data then the mean is taken over only the neighbours that are not missing. """ value = self._fill_value nn_coasts = (self._views[..., self._mask] != value).sum(2) # Temporarily remove 'value' for the sake of determining the sum mod_mask = self._data == value self._data[mod_mask] = 0 # Capture runtime warnings (where there are no neighbours) with warnings.catch_warnings(): warnings.simplefilter("ignore") coasts_masks = self._views[..., self._mask].sum(2) / nn_coasts.astype( "float", copy=False ) self._data[mod_mask] = value return coasts_masks
def _unified_grid(cube, cube2): """Return a cube which has horizontal grid coverage of both cubes.""" sx, sy = ants.utils.cube.horizontal_grid(cube) tx, ty = ants.utils.cube.horizontal_grid(cube2) if sx == tx and sy == ty: ex = sx.copy() ey = sy.copy() else: # Create a unifying grid which covers the area of both sources try: ex = sx.copy(ants.utils.ndarray.merge_array(sx.points, tx.points)) ey = sy.copy(ants.utils.ndarray.merge_array(sy.points, ty.points)) except ValueError as err: msg = ( "Unable to define a unified grid covering the domain of " "both supplied cubes: " ) err.args = tuple([msg + err.args[0]]) raise err dsx, dsy = cube.coord_dims(sx)[0], cube.coord_dims(sy)[0] shape = list(cube.shape) shape[dsx] = ex.points.size shape[dsy] = ey.points.size # Initialise unified grid as masked. This means that in the case where # both sources have points which do not overlapped, these will remain # masked. data = np.ma.empty(shape, dtype=cube.dtype) try: data.fill(np.nan) except ValueError: # Not all data types accept nan as a fill value. Fill with 0 instead. data.fill(0) data.mask = True merged_cube = iris.cube.Cube(data) merged_cube.add_dim_coord(ex, dsx) merged_cube.add_dim_coord(ey, dsy) ants.utils.cube.guess_horizontal_bounds(merged_cube) # Add any coordinates which are not mapped to the horizontal dimensions dimensions = list(range(cube.ndim)) dimensions.pop(dimensions.index(dsx)) dimensions.pop(dimensions.index(dsy)) for dimension in dimensions: for coord in cube.coords(dimensions=(dimension)): try: merged_cube.add_dim_coord(coord, dimension) except ValueError: merged_cube.add_aux_coord(coord, dimension) # copy the rest of the meta data across to the new cube merged_cube.cell_methods = cube.cell_methods merged_cube.metadata = cube.metadata return merged_cube def merge(primary_cube, alternate_cube, validity_polygon=None): """ Merges data from the alternative cube into the primary cube. The primary cube data is used as a base, then cells from the alternate cube which lay outside the provided polygon, override the values of the primary at those locations. Containment is defined as any cell corner which lies within the polygon. "Within" explicitly does not include those points which exactly lay on the polygon boundary. Points with numpy.NAN values in the source are also overridden by the corresponding alternate cube data values. The presence of a numpy.NAN in the primary cube dataset, overrides that elements validity defined in the cases of a specified validity polygon. Parameters ---------- primary_cube : `~iris.cube.Cube` The primary dataset which has a highest priority i.e. overriding values in the alternate dataset. alternate_cube : `~iris.cube.Cube` The alternate data set which is to be merged, taking a lower priority to values contained within the primary dataset. validity_polygon : iterable or :class:`shapely.Polygon` instance, optional Polygon defining the region of valid data within the primary dataset. Data defined outside this region will not take preference of the alternate dataset. The crs of the polygon is assumed to be the same as the primary dataset to which it describes. If an iterable is provided, then each item should correspond to the x, y point definition. If not provided, the entire primary_cube dataset is considered valid (including masked value). This means that the two datasets are stacked together with the primary_cube taking priority over alternate_cube in the case of an overlap. If a validity polygon is provided and the entire primary_cube dataset is within the polygon then a Runtime error will be raised. Raises ------ RuntimeError If the primary cube is wholly within a provided validity_polygon. .. note:: Limited to datasets on the same grid. .. warning:: Currently assumes that mask and numpy.NAN values are consistent across all horizontal slices i.e. a form of broadcasting is utilised. The first horizontal grid slice is used as reference. """ def _coord_intersect(coord, coord2): """Return a slice object where coord2 intersects coord""" x_cont = ants.utils.ndarray.in1d(coord.points, coord2.points) x_inside = np.where(np.equal(x_cont, True))[0] x_ind = slice(x_inside[0], x_inside[-1] + 1) return x_ind def _overwrite_data(cube, cube2): """ Override the data of cube with that of cube2, where the horizontal domain of cube should contain that of cube2. """ sx, sy = ants.utils.cube.horizontal_grid(cube) sdx, sdy = cube.coord_dims(sx)[0], cube.coord_dims(sy)[0] tx, ty = ants.utils.cube.horizontal_grid(cube2) slices = [slice(None)] * cube.ndim slices[sdx] = _coord_intersect(sx, tx) slices[sdy] = _coord_intersect(sy, ty) # Ensure that both cubes have the same dimension mapping. coord_names = [cube.coord(dimensions=(i)).name() for i in range(cube.ndim)] transpose = [cube2.coord_dims(name)[0] for name in coord_names] cube.data[tuple(slices)] = cube2.data.astype(cube.dtype).transpose(transpose) # Ensure consistency between cube metadata (do they both describe the same # phenomena). _merge_compatible(primary_cube, alternate_cube) ants.utils.cube.guess_horizontal_bounds([primary_cube, alternate_cube]) sx, sy = ants.utils.cube.horizontal_grid(primary_cube) tx, ty = ants.utils.cube.horizontal_grid(alternate_cube) if sy.coord_system != ty.coord_system or sx.coord_system != tx.coord_system: raise RuntimeError("Currently only same coordinate system merging " "supported") # define a polygon for the domain we're merging into domain_polygon = [ [sx.bounds.min(), sy.bounds.min()], [sx.bounds.max(), sy.bounds.min()], [sx.bounds.max(), sy.bounds.max()], [sx.bounds.min(), sy.bounds.max()], ] domain_polygon = Polygon(domain_polygon) if validity_polygon: if isinstance(validity_polygon, (tuple, list)): # Instantiate polygon from x, y pairs. validity_polygon = Polygon(validity_polygon) if validity_polygon.contains(domain_polygon): raise RuntimeError("Target domain is wholly within the provided polygon") else: # Workaround for supporting different shaped sources. validity_polygon = domain_polygon if not np.isreal(primary_cube.dtype): warnings.warn( "The source provided is not of dtype float and cannot " "store nan values, unable to distinguish between " "missing and no values in the case of an overlap" ) merged_cube = _unified_grid(primary_cube, alternate_cube) # Fill the merged cube data with the primary cube data _overwrite_data(merged_cube, primary_cube) # Migrate the alternate cube data to the full grid version full_alternate_cube = merged_cube.copy() _overwrite_data(full_alternate_cube, alternate_cube) if validity_polygon: # Take only the values from the primary cube which are within the # provided polygon. corners = ( (sx.bounds[:, i], sy.bounds[:, j]) for i, j in ((0, 0), (0, 1), (1, 1), (1, 0)) ) # Test all four corners for overlap for containment within polygon mask_inside = np.zeros((sy.points.size, sx.points.size), dtype="bool") for corner in corners: xx, yy = np.meshgrid(corner[0], corner[1]) mask_inside |= contains(validity_polygon, xx, yy) mask_outside = ~mask_inside # Upscale mask to the full-size grid ex, ey = ants.utils.cube.horizontal_grid(merged_cube) x_ind = _coord_intersect(ex, sx) y_ind = _coord_intersect(ey, sy) full_mask_outside = np.ones((ey.points.size, ex.points.size), dtype="bool") full_mask_outside[y_ind, x_ind] = mask_outside # Apply data which is outside of the polygon to the other. # Transpose the data (view) to allow broadcasting pdata, pmask = horizontal_grid_reorder(merged_cube) adata, amask = horizontal_grid_reorder(full_alternate_cube) pdata[full_mask_outside] = adata[full_mask_outside] pmask[full_mask_outside] = amask[full_mask_outside] # Identify overlap priority using np.nan values (these are assigned # when source cells are beyond the extent of the target grid whilst # regridding). pdata, pmask = horizontal_grid_reorder(merged_cube) adata, amask = horizontal_grid_reorder(full_alternate_cube) slices = [slice(None)] * pdata.ndim slices[2:] = [0] * (pdata.ndim - 2) nan_mask = np.isnan(pdata[tuple(slices)]) pdata[nan_mask] = adata[nan_mask] pmask[nan_mask] = amask[nan_mask] # Clear some memory if we can np.ma.MaskedArray.shrink_mask(merged_cube.data) if np.isnan(merged_cube.data).any(): raise RuntimeError( "Coverage of provided sources is not complete, unable to merge datasets." ) return merged_cube def _spiral_wrapper( unresolved_mask, to_fill_mask, lats, lons, target_mask, invert_mask=False, constrained=False, cyclic=False, ): """ Call the underlying UM spiral search. Parameters ---------- unresolved_mask : :class:`~numpy.ndarray` A 2D boolean array representing the missing data values (y, x). to_fill_mask : :class:`~numpy.ndarray` A 2D boolean array representing the input mask (y, x). This often represents the missing data and ocean values in the case of land only fields for example. lats : :class:`~numpy.ndarray` 1D array of latitude values. lons : :class:`~numpy.ndarray` 1D array of longitude values. target_mask : :class:`~numpy.ndarray` A 2D boolean array for type indentification (y, x). 'invert_mask' determines whether True or False is denoted as masked. More often than not, this is the land binary mask field. This maps to the 'lsm' variable of the spiral search algorithm. invert_mask : bool, optional Behavioural modifier for whether the target_mask represent True for masked elements or False for masked elements. This actually maps to 'is_land_field' in the spiral search. By default (False), True denotes masked elements. constrained : bool, optional Constrained search behavioural modifier (200km). cyclic : bool, optional Specifying the field as being cyclic, allows both latitudinal and longitudinal wrap around in distance evaluation. Returns ------- : tuple(:class:`~numpy.ndarray`, :class:`~numpy.ndarray`) First item corresponds to indices of missing data point, while the second item corresponds to indices of those elements which are to populate these missing data points. See Also -------- `<https://code.metoffice.gov.uk/doc/um/latest/papers/umdp_S11.pdf>`_ .. note:: See the reconfiguration spiral search module for further details on these parameters. .. warning:: Performing a spiral search when 'cyclic' is True, may have significant performance implications. """ unresolved_mask = unresolved_mask.reshape(-1) to_fill_mask = to_fill_mask.reshape(-1) target_mask = target_mask.reshape(-1) _missing_index = np.where(np.equal(to_fill_mask, True))[0] missing_index = _missing_index.astype("int64", copy=False) unresolved_mask = unresolved_mask.astype("bool", copy=False) target_mask = target_mask.astype("bool", copy=False) lats = lats.astype("float64", copy=False) lons = lons.astype("float64", copy=False) planet_radius = float(ants.coord_systems.EARTH_RADIUS) # Default constraint distance taken from reconfiguration (this became a # parameter in shumlib). constrained_max_dist = 200000.0 # Default step size modifier for search taken from reconfiguration (this # became a parameter in shumlib). dist_step = 3 msg = "Calling spiral search, with {} missing points to be resolved." _LOGGER.info(msg.format(missing_index.size)) with ants.stats.TimeIt() as timer: replacing_index = spiral( target_mask, missing_index, unresolved_mask, lats, lons, planet_radius, bool(cyclic), bool(invert_mask), bool(constrained), constrained_max_dist, dist_step, ) msg = "{} took {}".format("Spiral search", timer.time_taken) _LOGGER.info(msg) return missing_index, replacing_index
[docs] class FillABC(abc.ABC): """An abstract base class which provides an interface for fill operations. A custom fill class may be created by inheriting this base class, providing a concrete implementation of the :meth:`~ants.analysis.FillABC._calculate_fill_lookup` method. See Also -------- :class:`~ants.analysis.UMSpiralSearch` :class:`~ants.analysis.KDTreeFill` """
[docs] def __init__(self, source, search_mask=None, target_mask=None): """ Fill missing data in the source with their nearest acceptable neighbours. Nearest acceptable neighbours include all locations in the source which are unmasked. Optionally, we can provide a ``search_mask`` to further constrain the search (thereby not having to override the source mask). Missing data is those masked source locations which are unmasked in the target (``target_mask``). All masked source locations which are masked in the target mask, will remain masked. Return a callable which applies pre-calculated (cached) results to the destination, calculated at the point of instantiation of this object with the provided source-target pair. Parameters ---------- source : :class:`~iris.cube.Cube` Source dataset with unresolved points. That is, missing points or perhaps coastlines which do not match a provided target landsea mask. search_mask : :class:`~iris.cube.Cube`, optional A mask used to constrain the set of points which are considered valid by the search in the input source. target_mask : :class:`~iris.cube.Cube`, optional Target mask field. The source will inherit this mask. Often this represents a land or ocean mask. Returns ------- ~collections.abc.Callable Callable object for applying the spiral search algorithm. Raises ------ ValueError Where no suitable neighbour has been found. ValueError Where source, search_mask or target_mask are incompatible (not defined on identical horizontal grids). """ # Cache a copy of the source horizontal grid to protect # against changes to the copied object from outside the class source_x, source_y = ants.utils.cube.horizontal_grid(source) self._source_x = source_x.copy() self._source_y = source_y.copy() if target_mask: self._validate_mask("target_mask", target_mask) if search_mask: self._validate_mask("search_mask", search_mask) inherit_mask = True if target_mask is None: inherit_mask = False # Create a target with all unmasked points. target_mask = next(source.slices((self._source_y, self._source_x))) target_mask = target_mask.copy(np.zeros(target_mask.shape, dtype="bool")) # Generate cache self._source = source.copy() self._target_mask = target_mask self._inherit_mask = inherit_mask self._missing_indices_2d = self._replacing_indices_2d = None self._source_mask = None self._search_mask = search_mask self.search(source)
[docs] def _validate_mask(self, mask_name, mask): """Check a provided mask is consistent with the ``source`` cube. The ``mask`` is validated according to the following criteria: - The ``mask`` must be 2-dimensional. - The ``mask``'s horizontal grid must be consistent with that of the ``source`` cube used to instantiate the class. Parameters ---------- mask_name : str Name to be used in the error message. mask : :class:`~iris.cube.Cube` Mask to validate. Raises ------ ValueError If the ``mask`` is not 2-dimensional. ValueError If the ``mask``'s horizontal grid is inconsistent with that of the ``source`` cube. :meta public: """ if mask.ndim != 2: msg = f"Expecting a 2-dimensional {mask_name}, got {mask.ndim}-dimensions." raise ValueError(msg) self._validate_horizontal_grids(mask_name, mask)
[docs] def _validate_horizontal_grids(self, cube_name, cube): """Check a cube's horizontal grid is consistent with that of the ``source`` cube. Parameters ---------- cube_name : str Name to be used in the error message. cube : :class:`~iris.cube.Cube` Cube to validate. The cube's x and y coordinates are compared against those of the ``source`` cube used to initialise the class. Raises ------ ValueError If either the x or y coordinates do not match those of the original ``source`` cube. :meta public: """ x_coord, y_coord = ants.utils.cube.horizontal_grid(cube) if not ants.utils.coord.relaxed_equality(self._source_x, x_coord): msg = ( f"The provided {cube_name}'s x coordinates do not match " "those of the source." ) raise ValueError(msg) if not ants.utils.coord.relaxed_equality(self._source_y, y_coord): msg = ( f"The provided {cube_name}'s y coordinates do not match " "those of the source." ) raise ValueError(msg)
def _validate_destination(self, destination): destination_data, destination_mask_nd = horizontal_grid_reorder(destination) slices = [slice(None), slice(None)] + ([0] * (destination_data.ndim - 2)) destination_mask = np.isnan(destination_data[tuple(slices)]) destination_mask += destination_mask_nd[tuple(slices)] self._validate_horizontal_grids("destination cube", destination) if not (self._source_mask == destination_mask).all(): raise ValueError( "Destination mask is not compatible with the " "cached nearest neighbours." )
[docs] @abc.abstractmethod def _calculate_fill_lookup(self, unresolved_mask, to_fill_mask, target_mask): """Calculate the cached lookup arrays to be used to fill the destination cube. Parameters ---------- unresolved_mask : :class:`~numpy.ndarray` A 2-dimensional boolean array representing the unresolved points in the source cube. These points are not considered valid fill candidates. Unresolved points are those which are masked in the source, or have been specified as points to ignore in the ``search_mask``. - ``True`` if point is unresolved (not valid fill candidate) - ``False`` if point is resolved (valid fill candidate) The array must be of the same shape as the horizontal dimensions of the source cube. to_fill_mask : :class:`~numpy.ndarray` A 2-dimensional boolean array representing the missing points in the source cube that require filling. Missing points are those which are masked in the source and not masked in the ``target_mask``. - ``True`` if point is missing (requires filling) - ``False`` if point is not missing (does not require filling) The array must be of the same shape as the horizontal dimensions of the source cube. target_mask : :class:`~numpy.ndarray` A 2-dimensional boolean array representing the target mask. The array must be of the same shape as the horizontal dimensions of the source cube. Returns ------- tuple(:class:`~numpy.ndarray`, :class:`~numpy.ndarray`) First item corresponds to indices of missing data point, while the second item corresponds to indices of those elements which are to populate these missing data points. Raises ------ ValueError If the provided source contains no valid data. ValueError If the algorithm cannot identify any valid neighbours for an unresolved point. :meta public: """ pass
[docs] def search(self, source): """Search for points to fill the missing data in the ``source``. Perform pre-calculation steps up front, independent of which underlying search algorithm is used: - Reshape data - Identify points to be filled (``to_fill_mask``) - Identify points that should be excluded from the search (``unresolved_mask``) - Call :meth:`~ants.analysis.FillABC._calculate_fill_lookup`, which wraps the specific underlying search algorithm - Cache the resulting lookup arrays, to be applied when the filler is called Parameters ---------- source : :class:`~iris.cube.Cube` Source cube with unresolved points, identified by being masked or NaN. Raises ------ ValueError If the provided source contains no valid data. ValueError If no valid neighbours can be identified for any cell. """ # Transpose as necessary. source_data, mask = horizontal_grid_reorder(source) target_mask, _ = horizontal_grid_reorder(self._target_mask) target_mask = target_mask.astype("bool", copy=False) if self._search_mask: search_mask, _ = horizontal_grid_reorder(self._search_mask) search_mask = search_mask.astype("bool", copy=False) # Difference between the source and target_mask defines which points # are truly missing data points (NaN values are also considered # missing). # Assumes consistency across all horizontal slices. slices = [slice(None), slice(None)] + ([0] * (source_data.ndim - 2)) source_mask = np.isnan(source_data[tuple(slices)]) source_mask += mask[tuple(slices)] # Cache the source mask. self._source_mask = source_mask to_fill_mask = source_mask & ~target_mask # Points we want resolving if self._search_mask: unresolved_mask = source_mask | search_mask # Points we want excluding. else: unresolved_mask = source_mask if unresolved_mask.any(): if unresolved_mask.all(): msg = "The provided source doesn't appear to have any valid data." raise ValueError(msg) missing_indices_1d, replacing_indices_1d = self._calculate_fill_lookup( unresolved_mask, to_fill_mask, target_mask ) # Unravel indices across horizontal domain so that broadcasting # occurs over all remaining dimensions. self._missing_indices_2d = np.unravel_index( missing_indices_1d, source_data.shape[:2] ) try: self._replacing_indices_2d = np.unravel_index( replacing_indices_1d, source_data.shape[:2] ) except ValueError as err: original_msg = "".join(list(err.args)) new_msg = f"{original_msg}. Are there any valid neighbours?" err.args = [new_msg] raise
[docs] def __call__(self, destination): """ Apply the cached results of the search algorithm to resolve points in the ``destination``. Parameters ---------- destination : :class:`~iris.cube.Cube` Destination dataset with unresolved points. That is, missing points or perhaps coastlines which do not match a provided target landsea mask. This destination need not be the same as the source used to instantiate the class, however it must be compatible (the same grid and with the same points requiring filling). Returns ------- : None In-place operation. Raises ------ ValueError When the provided destination coordinates do not match those cached for the nearest neighbour search. ValueError If the destination mask is not compatible with the cached nearest neighbours. """ self._validate_destination(destination) destination_data, destination_mask = horizontal_grid_reorder(destination) # Apply nearest neighbour cache to the provided source. if self._missing_indices_2d is not None: destination_data[ self._missing_indices_2d[0], self._missing_indices_2d[1] ] = destination_data[ self._replacing_indices_2d[0], self._replacing_indices_2d[1] ] destination_mask[ self._missing_indices_2d[0], self._missing_indices_2d[1] ] = False # Inherit mask from final_mask (reorder dimensions to perform true # broadcasting) transpose_axes = [0, 1] transpose_axes = [ i for i in range(destination_data.ndim) if i not in transpose_axes ] + transpose_axes if self._inherit_mask: target_mask, _ = horizontal_grid_reorder(self._target_mask) target_mask = target_mask.astype("bool", copy=False) destination_mask.transpose(transpose_axes)[:] = target_mask if not (destination.data.mask).any(): destination.data = destination.data.data
def __repr__(self): """Provide a string representation of the class instance.""" return self.__class__.__name__
[docs] class UMSpiralSearch(FillABC): """Spiral search missing data fill and coastal adjustment algorithm. This uses the spiral search algorithm or ``coast_adj_method=3`` as it is known in the community. See Also -------- `<https://code.metoffice.gov.uk/doc/um/latest/papers/umdp_S11.pdf>`_ : for details of the spiral search method. .. warning:: Performing a spiral search for circular fields (wraparound) is known to have performance implications. """ # Constrained search behavioural modifier (200km). See reference. # This is useful in the case where the source has resolved # points over both masked and unmasked points as identified by the # target mask. For example, source fields with resolved points over # both ocean and land. That is, the target mask is also used for type # identification when performing a constrained search. This # constrained search is most commonly done when the target field # represents a landsea mask, while the source has valid values over # both ocean and land. # - Caution should be taken when utilising the constrained search # behaviour. Types identification (land/ocean for example) is # identified by the target mask. This means that the target mask # is assumed to be relevant to the source type identification. # # Dev note: Consider plugging this into the class initialisation if we are # presented with a usecase for using it. _CONSTRAINED = False
[docs] def __init__(self, source, search_mask=None, target_mask=None): super().__init__(source, search_mask, target_mask)
[docs] def __call__(self, destination): return super().__call__(destination)
[docs] def _calculate_fill_lookup(self, unresolved_mask, to_fill_mask, target_mask): """Call the spiral search algorithm to calculate the fill lookup. :meta public: """ if spiral is None: raise ValueError(_SPIRAL_IMPORT_ERROR_MESSAGE) cyclic = self._source_x.circular # When not constrained, ensure that all source points are # considered the same 'type' (all ocean or all land etc.). if not self._CONSTRAINED: target_mask = np.zeros(target_mask.shape, "bool") # We set land_field to False as masks are interpreted as True to # denote land and false to denote ocean (which is generally the # other way around to mask fields). missing_ind, replacing_ind = _spiral_wrapper( unresolved_mask, to_fill_mask, self._source_y.points, self._source_x.points, target_mask, False, self._CONSTRAINED, cyclic, ) return missing_ind, replacing_ind
[docs] class FillMissingPoints(UMSpiralSearch): """ Warning ------- This class is deprecated as of ANTS 2.0. The functionality has been moved to :class:`~ants.analysis.UMSpiralSearch`. """
[docs] def __init__(self, source, search_mask=None, target_mask=None): warnings.warn( "'ants.analysis.FillMissingPoints' is deprecated as of ANTS 2.0. " "The functionality of 'ants.analysis.FillMissingPoints' has been " "moved to 'ants.analysis.UMSpiralSearch'.", FutureWarning, ) super().__init__(source, search_mask, target_mask)
[docs] class KDTreeFill(FillABC): """K-D tree missing data fill and coastal adjustment algorithm. See Also -------- `<https://en.wikipedia.org/wiki/K-d_tree>`_ : for details of the K-D tree search method. """ _default_latitude_scale = 1.0
[docs] def __init__(self, source, search_mask=None, target_mask=None): config_latitude_scale = ants.config.CONFIG["ants_fill"]["kdtree_latitude_scale"] if config_latitude_scale is not None: self.latitude_scale = float(config_latitude_scale) else: self.latitude_scale = self._default_latitude_scale super().__init__(source, search_mask, target_mask)
[docs] def __call__(self, destination): return super().__call__(destination)
[docs] def _calculate_fill_lookup(self, unresolved_mask, to_fill_mask, target_mask): """Call the K-D tree search algorithm to calculate the fill lookup. :meta public: """ cartesian_coordinates = ants.utils.convert_to_cartesian(self._source) # Scale z coordinate by latitude scale factor, # to give higher weight to points of similar latitudes cartesian_coordinates[..., 2] *= self.latitude_scale fill_candidates_coordinates = cartesian_coordinates[~unresolved_mask] points_to_fill_coordinates = cartesian_coordinates[to_fill_mask] kdtree = KDTree(fill_candidates_coordinates) _, valid_index = kdtree.query(points_to_fill_coordinates) to_fill_mask_1D = to_fill_mask.reshape(-1) missing_ind = np.where(to_fill_mask_1D)[0] missing_ind = missing_ind.astype("int64") valid_replacing_indices = np.where(~unresolved_mask.reshape(-1))[0] replacing_ind = valid_replacing_indices[valid_index] return missing_ind, replacing_ind
def moore_neighbourhood_search(source, land_binary_mask, value=None): """ Updates the source cube data, where necessary, to make it consistent with the land_binary_mask. This function is used to ensure that the source has data present on all land points specified by the land_binary_mask. The land_binary_mask should have 1's to signify land points and 0's to signify ocean points. The algorithm used is relatively straight forward: 1. where the land_binary_mask is 0 the source data will be set to missing. 2. where the land_binary_mask is 1 and the source has missing data then one of two strategies are taken. If there are non-missing data points in the Moore neighborhood (8 surrounding points) of source then the missing point is overwritten with the mean of the non-missing neighbours. If the Moore neighborhood contains only missing data then the source data is set to 'value', or the source data mean if 'value' is None. Parameters ---------- source : :class:`~iris.cube.Cube` Source cube which is to have its data updated according to the land binary mask. land_binary_mask : :class:`~iris.cube.Cube` The land_binary_mask indicates the ocean/land classification wanted on the source cube. value : Bool, optional Fill value used where there are no valid neighbours. Returns ------- : :class:`~iris.cube.Cube` Source, with data points modified corresponding to location identified by the provided land binary mask. .. note:: The source and land_binary_mask must be on the same grid and currently must be also defined in the same orientation. .. warning:: This routine should not be used in the decomposition framework without taking care of haloes in the decomposition. """ if (source.ndim != 2) or land_binary_mask.ndim != 2: raise RuntimeError( "Currently, only mask application to 2D grids are " "supported and with no broadcasting." ) sx, sy = ants.utils.cube.horizontal_grid(source) bx, by = ants.utils.cube.horizontal_grid(land_binary_mask) if not ants.utils.coord.relaxed_equality( sx, bx ) or not ants.utils.coord.relaxed_equality(sy, by): raise RuntimeError( "Both source and land_binary_mask must be defined " "on identical grids" ) if (source.coord_dims(sx) != land_binary_mask.coord_dims(bx)) or ( source.coord_dims(sx) != land_binary_mask.coord_dims(bx) ): raise RuntimeError( "Currently, the source and the land_binary_mask " "must be defined in the same orientation" ) # Determine which values need to be filled and apply the supplied mask. if not np.ma.isMaskedArray(source.data): source.data = np.ma.array(source.data) # Difference between the current mask and the land_binary_mask supplied dmask = source.data.mask * (land_binary_mask.data == 1) # Set Ocean points missing source.data.mask = land_binary_mask.data == 0 # Deal with missing land points neighbours = MooreNeighbourhood(source) # Points with some data in neighbourhood and missing data at centre points coasts = neighbours.any_non_missing() * dmask coasts_masks = neighbours.neighbourhood_mean() source.data[coasts] = coasts_masks[coasts] if value is None: value = source.data[source.data != source.data.fill_value].mean() # Islands islands = neighbours.all_missing() * dmask source.data[islands] = value def derive_mask(source, geometries, geometry_crs=None): """ Given a source and one or more geometries, return a mask representing containment with these geometries. Parameters ---------- source : :class:`~iris.cube.Cube` geometries : iterator of :shapely:`shapely.geometry <shapely.geometry>` Geometries used to define containment mask within source domain. See :class:`cartopy.io.shapereader.Reader` for helper functions on reading shapefiles. geometry_crs : :class:`~iris.coord_system.CoordSystem`, optional Geometry of the geometries provided, if not specified, assumed to be equivelent to the source. .. note:: Currently limited to 2D cubes. """ if source.ndim > 2: raise RuntimeError("Currently limited to 2D cubes") # FILTER RELEVANT GEOMETRIES - those that intersect the area under # interest. sx, sy = ants.utils.cube.horizontal_grid(source) minx, maxx = sx.bounds.min(), sx.points.max() miny, maxy = sy.bounds.min(), sy.points.max() src_cartopy_crs = sx.coord_system.as_cartopy_projection() bbox = shapely.geometry.Polygon( ((minx, miny), (minx, maxy), (maxx, maxy), (maxx, miny), (minx, miny)) ) # Project bbox if crs does not match geom_cartopy_crs = None if geometry_crs is not None and sx.coord_system != geometry_crs: geom_cartopy_crs = geometry_crs.as_cartopy_projection() bbox = geom_cartopy_crs.project_geometry(bbox, src_cartopy_crs) geoms = [] for geom in geometries: geom = bbox.intersection(geom) if geom: # Project geometry to source crs if crs does not match if geom_cartopy_crs is not None: geom = src_cartopy_crs.project_geometry(geom, geom_cartopy_crs) geoms.append(geom) if len(geoms) == 0: return False # DERIVE MASK - through containment corners = ( (sy.bounds[:, i], sx.bounds[:, j]) for i, j in ((0, 0), (0, 1), (1, 1), (1, 0)) ) # corners = [(sy.points, sx.points)] # points only mask_inside = np.zeros((sx.points.size, sy.points.size), dtype="bool") for corner in corners: xx, yy = np.meshgrid(corner[1], corner[0]) for geom in geoms: mask_inside |= contains(geom, xx, yy) # Transform the mask to match the source dimension mapping dsx, dsy = source.coord_dims(sx)[0], source.coord_dims(sy)[0] if dsy > dsx: mask_inside = mask_inside.transpose((1, 0)) return mask_inside