# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.
"""
Many of the routines you will need to perform the processing of
source data into an ancillary field are found in :mod:`ants.analysis`.
You can also call iris routines to help your ancillary field
processing. Some ANTS functions are relatively minimal wrappers around
iris functions. Where appropriate, call the ANTS functions as these
provide additional benefits such as updating meta-data on the cubes.
Some of the most common routines you will want to use are:
* Calculating the area weighted mean of a field
using :func:`~ants.analysis.mean`
* Filling a field to ensure it has values on every
land/ocean/both point using :func:`~ants.analysis.UMSpiralSearch`.
* Merging two datasets using :func:`~ants.analysis.merge`.
Note, ANTS merge is used in a different context to iris. In ANTS it means
combining two data sources either
1. to build a larger coverage than the source data or
2. to embed a high quality local source within a dataset with greater
coverage.
The meaning in iris is discussed in the `iris documentation
<https://scitools-iris.readthedocs.io/en/latest/userguide/merge_and_concat.html>`_.
"""
import warnings
import ants
import ants.regrid
import ants.utils
import iris
import iris.analysis.calculus
import numpy as np
from . import _merge, cover_mapping
from ._merge import (
FillABC,
FillMissingPoints,
KDTreeFill,
MooreNeighbourhood,
UMSpiralSearch,
horizontal_grid_reorder,
)
__all__ = [
"MooreNeighbourhood",
"UMSpiralSearch",
"calc_grad",
"mean",
"merge",
"standard_deviation",
"floodfill",
"find_similar_region",
"make_consistent_with_lsm",
"horizontal_grid_reorder",
"FillABC",
"KDTreeFill",
"FillMissingPoints",
"find_small_feature_seed_points",
"cover_mapping",
]
[docs]
def calc_grad(source):
"""
.. attention::
The calc_grad routine has been removed from the core ants library at
version 2.2. It has been moved to Apps/Orography/orography_utils.py in
the ancillary-file-science repository. Attempting to use this function
will result in an ImportError.
"""
raise ImportError(
"The calc_grad routine has been removed from the core ants library. It has been"
" moved to Apps/Orography/orography_utils.py in the ancillary-file-science "
"repository."
)
def _update_metadata(source, cube, operation):
"""
Set the cell_methods on cube for an area weighted 'operation' and
inherit certain metadata from source.
"""
if source.long_name is not None:
cube.long_name = "{} {}".format(operation.replace("_", " "), source.long_name)
if "grid_staggering" in source.attributes:
cube.attributes["grid_staggering"] = source.attributes["grid_staggering"]
cube.add_cell_method(
iris.coords.CellMethod("area: {} (area-weighted)".format(operation))
)
[docs]
def mean(source, target):
"""
Calculate the source mean :math:`\\bar{x}` from the area mean of the
source :math:`(x_i)` over each target grid box.
.. math::
\\bar{x} = \\sum_i {x_i}{w_i}
where :math:`w_i` is the area of the source grid box (given an index i)
that overlaps the target grid box divided by the area of the target grid
box.
Parameters
----------
source : :class:`~iris.cube.Cube`
Source cube.
target : :class:`~iris.cube.Cube`
Target cube.
Returns
-------
: :class:`~iris.cube.Cube`
The area-weighted mean of the source over the target grid.
"""
mean_cube = source.regrid(
target, ants.regrid.GeneralRegridScheme(horizontal_scheme="TwoStage")
)
_update_metadata(source, mean_cube, "mean")
return mean_cube
[docs]
def standard_deviation(source, src_mean):
"""
Calculate the standard deviation of the source within the target grid box.
This is calculated as the square root of the variance. The variance is
defined here as the mean of the squares minus the square of the mean.
.. math::
\\sigma^2 = \\sum_i {x_i}^2{w_i} - \\bar{x}^2
Parameters
----------
source : :class:`~iris.cube.Cube`
Source cube.
src_mean : :class:`~iris.cube.Cube`
Mean of the source field on the target grid.
Returns
-------
: :class:`~iris.cube.Cube`
The sub-grid standard deviation of the source over the target grid.
Notes
-----
There is no correction for biases in the standard deviation (Bessel's
correction). This could be an issue when the target grid
is much lower resolution than the source grid.
"""
src_meta = source.metadata
if "grid_staggering" in src_mean.attributes:
src_meta.attributes["grid_staggering"] = src_mean.attributes["grid_staggering"]
source **= 2
src_mean **= 2
awm = source.regrid(
src_mean, ants.regrid.GeneralRegridScheme(horizontal_scheme="TwoStage")
)
awm -= src_mean
# Account for precision issues resulting in negative values.
awm.data = np.abs(awm.data)
awm **= 1 / 2.0
_update_metadata(src_meta, awm, "standard_deviation")
# History attribute gets removed during processing - add it back from the original
# source metadata.
if "history" in src_meta.attributes and "history" not in awm.attributes:
ants.utils.cube.update_history(
awm, src_meta.attributes["history"], add_date=False
)
return awm
[docs]
def merge(primary_cube, alternate_cube, validity_polygon=None):
"""
Merges data from the alternative cube into the primary cube.
The primary cube data is used as a base, then cells from the alternate
cube which lay outside the provided polygon, override the values of the
primary at those locations. Containment is defined as any cell corner
which lies within the polygon. "Within" explicitly does not include
those points which exactly lay on the polygon boundary. Where multiple
primary and alternate cubes are provided, then these are paired
appropriately where possible. Where these datasets are not defined on the
same grid, the user should consider a regrid first to then utilise merge.
Parameters
----------
primary_cube : :class:`~iris.cube.Cube` or :class:`~iris.cube.CubeList`
The primary dataset which has a highest priority i.e. overriding values
in the alternate dataset.
alternate_cube : :class:`~iris.cube.Cube` or :class:`~iris.cube.CubeList`
The alternate data set which is to be merged, taking a lower priority
to values contained within the primary dataset.
validity_polygon : :class:`~collections.abc.Iterable` or \
:class:`shapely.Polygon` instance, optional
Polygon defining the region of valid data within the primary dataset.
Data defined outside this region will not take preference of the
alternate dataset. The crs of the polygon is assumed to be the same as
the primary dataset to which it describes. If an iterable is provided,
then each item should correspond to the x, y point definition.
If not provided, the entire primary_cube dataset is considered valid
(including masked value). This means that the two datasets are
stacked together with the primary_cube taking priority over
alternate_cube in the case of an overlap. A runtime error will be
raised if the primary_cube is wholly within the validity_polygon.
Returns
-------
: :class:`~iris.cube.CubeList`
One or more :class:`~iris.cube.Cube` objects
Raises
------
RuntimeError
If the primary cube is wholly within a provided ``validity_polygon``.
"""
primary_cubes = ants.utils.cube.as_cubelist(primary_cube)
alternate_cubes = ants.utils.cube.as_cubelist(alternate_cube)
# Group (sort) cubes so they are ordered in a way suitable for merging.
primary_cubes, alternate_cubes = ants.utils.cube.sort_cubes(
primary_cubes, alternate_cubes
)
result = iris.cube.CubeList([])
for src1, src2 in zip(primary_cubes, alternate_cubes):
nsource = _merge.merge(src1, src2, validity_polygon)
result.append(nsource)
if isinstance(primary_cube, iris.cube.Cube):
result = result[0]
return result
def _floodfill_neighbour_identify(
shape, coords, seed_point, extended_neighbourhood, wraparound
):
(yy, xx) = seed_point
if yy > 0:
coords.add((yy - 1, xx))
if yy < (shape[0] - 1):
coords.add((yy + 1, xx))
if xx > 0 or wraparound:
coords.add((yy, (xx - 1) % shape[1]))
if xx < (shape[1] - 1) or wraparound:
coords.add((yy, (xx + 1) % shape[1]))
if extended_neighbourhood:
if yy > 0 and (xx > 0 or wraparound):
coords.add((yy - 1, (xx - 1) % shape[1]))
if yy > 0 and (xx < (shape[1] - 1) or wraparound):
coords.add((yy - 1, (xx + 1) % shape[1]))
if yy < (shape[0] - 1) and (xx > 0 or wraparound):
coords.add((yy + 1, (xx - 1) % shape[1]))
if yy < (shape[0] - 1) and (xx < (shape[1] - 1) or wraparound):
coords.add((yy + 1, (xx + 1) % shape[1]))
[docs]
def floodfill(
array, seed_point, fill_value, extended_neighbourhood=False, wraparound=False
):
"""
Floodfill via an iterative algorithm.
Parameters
----------
array : :class:`~numpy.ndarray`
The array to apply the floodfill.
seed_point : tuple
The starting (y, x) index (the seed point).
fill_value : int or float
The value which results in the 'flooded area'.
extended_neighbourhood : :obj:`bool`, optional
In the extended neighbourhood case, also consider the diagonals in
each locations neighbourhood:
| Default neighbourhood:
| [False, True, False]
| [True, True, True ]
| [False, True, False]
| Extended Neighbourhood:
| [True, True, True]
| [True, True, True]
| [True, True, True]
wraparound : :obj:`bool`, optional
When True, support wraparound in 'x', otherwise stop at the boundary.
"""
(y, x) = seed_point
if array.ndim != 2:
msg = "The provided array should be 2D but that provided is {}D"
raise ValueError(msg.format(array.ndim))
if array[y, x] == fill_value:
raise ants.exceptions.FloodfillError(
"The value at location {}x{} already has this fill " "value.".format(y, x)
)
value_at_seed = array[y, x]
coords = set(((y, x),))
while coords:
yy, xx = coords.pop()
if array[yy, xx] == value_at_seed:
array[yy, xx] = fill_value
_floodfill_neighbour_identify(
array.shape, coords, (yy, xx), extended_neighbourhood, wraparound
)
[docs]
def find_small_feature_seed_points(
array,
min_feature_size,
land_features=False,
extended_neighbourhood=False,
wraparound=False,
):
"""
Return a set of indices for seed points for filling regions of less than a
specified number of points in a provided land sea mask.
.. deprecated:: 2.2
Planned for removal due to no known use cases. Please contact
miao@metoffice.gov.uk if you do need this functionality.
Parameters
----------
array : :class:`~numpy.ndarray`
The array to search.
min_feature_size : int
Find features with fewer grid boxes than this
land_features : bool
Set to True to identify land seed points, False (default) for sea seed points.
extended_neighbourhood : :obj:`bool`, optional
In the extended neighbourhood case, also consider the diagonals in
each locations neighbourhood:
| Default neighbourhood:
| [False, True, False]
| [True, True, True ]
| [False, True, False]
| Extended Neighbourhood:
| [True, True, True]
| [True, True, True]
| [True, True, True]
wraparound : :obj:`bool`, optional
When True, support wraparound in 'x', otherwise stop at the boundary.
Returns
-------
: :class:`~numpy.ndarray`
2D array containing the row and column indices of those locations
identified as potential fill seed points for features smaller than the
specified feature_size.
"""
message = (
"This function does not appear to be used, and hence has been "
"deprecated. Please contact miao@metoffice.gov.uk if you do "
"require this function, and we can discuss options (potentially "
"including keeping this function and removing the deprecation)."
)
warnings.warn(message, FutureWarning)
# candidate indices to evaluate
if land_features:
candidate_inds = np.where(array == 1)
else:
candidate_inds = np.where(array == 0)
# covert to list of pairs for convenience
candidate_inds = list(zip(list(candidate_inds[0]), list(candidate_inds[1])))
# list for holding set of seed point tuples to use
seed_points = []
# set fill value as outside data range
fill_value = np.min(array) - 1
while candidate_inds:
seed_point = candidate_inds.pop()
filled = array.copy()
floodfill(filled, seed_point, fill_value, extended_neighbourhood, wraparound)
filled_inds = np.where(filled == fill_value)
filled_inds = list(zip(list(filled_inds[0]), list(filled_inds[1])))
# if the number of points that has been filled is less than the min
# feature size then record the seed point used
if len(filled_inds) < min_feature_size:
seed_points.append(seed_point)
# remove points from the list so we don't reconsider them again
filled_inds.remove(seed_point)
for point in filled_inds:
candidate_inds.remove(point)
return seed_points
[docs]
def find_similar_region(
array, seed_point, extended_neighbourhood=False, wraparound=False
):
"""
Return a set of indices where the connecting neighbours have the same value
This function is functionaly equivelent :func:`floodfill`, except that
here the fill locations are returned rather than filled.
Parameters
----------
array : :class:`~numpy.ndarray`
The array to search.
seed_point : tuple
The starting (y, x) index (the seed point).
extended_neighbourhood : :obj:`bool`, optional
In the extended neighbourhood case, also consider the diagonals in
each locations neighbourhood:
| Default neighbourhood:
| [False, True, False]
| [True, True, True ]
| [False, True, False]
| Extended Neighbourhood:
| [True, True, True]
| [True, True, True]
| [True, True, True]
wraparound : :obj:`bool`, optional
When True, support wraparound in 'x', otherwise stop at the boundary.
Returns
-------
: :class:`~numpy.ndarray`
2D array containing the row and column indices of those locations
identified as similar.
"""
(y, x) = seed_point
if array.ndim != 2:
msg = "The provided array should be 2D but that provided is {}D"
raise ValueError(msg.format(array.ndim))
visited = np.zeros(array.shape, dtype="bool")
src_value = array[y, x]
indices = set(((y, x),))
coords = set(((y, x),))
while coords:
yy, xx = coords.pop()
if not visited[yy, xx] and array[yy, xx] == src_value:
visited[yy, xx] = True
indices.add((yy, xx))
_floodfill_neighbour_identify(
array.shape, coords, (yy, xx), extended_neighbourhood, wraparound
)
return tuple([[ind[i] for ind in list(indices)] for i in range(array.ndim)])
[docs]
def make_consistent_with_lsm(sources, lsm, invert_mask, method="spiral"):
"""
Make the provided source(s) consistent with the provided land sea mask.
Replaces missing data values with valid data, where missing is defined as
data that is either masked or NaN. Missing land points are filled with
nearest valid land values, and missing sea points are filled with nearest
valid sea values. Land and sea points are defined by the provided
landsea mask.
Additional coordinates on the source(s), such as time and pseudo levels are
handled. However, if the dimensions are not already in the order of
(<other dimensions>, y, x), they will be rearranged to this order.
Parameters
----------
sources : :class:`~collections.abc.Iterable` of :class:`~iris.cube.Cube` objects
Source cubes.
lsm : :class:`~iris.cube.Cube`
Landsea mask field.
invert_mask : bool
Invert the mask (land field) or not (ocean field). The landsea mask
has True values to denote land.
method : :obj:`str`, optional
Select the search method to be used when filling missing points. The methods
currently supported are "spiral" and "kdtree".
Returns
-------
: None
In-place operation.
Raises
------
NotImplementedError
If an unsupported search method is specified.
See Also
--------
:class:`ants.analysis.UMSpiralSearch` : for more details on how the
nearest valid points are determined when the "spiral" method is used.
:class:`ants.analysis.KDTreeFill` : for more details on how the
nearest valid points are determined when the "kdtree" method is used.
"""
# Check whether there are any values in the lsm that are masked or are not 0 or 1.
# (This can occur where there is missing data in the source used to derive the
# lsm.)
if np.ma.is_masked(lsm.data):
warnings.warn(
"The land sea mask has values that are masked. These may cause unexpected "
"results when calling ants.analysis.make_consistent_with_lsm."
)
if np.any(np.isin(lsm.data, [0, 1], invert=True)):
warnings.warn(
"The land sea mask has values that are not 0 or 1. These may cause "
"unexpected results when calling ants.analysis.make_consistent_with_lsm."
)
if ants.utils.cube._is_ugrid(lsm):
raise ValueError("ANTS doesn't support ugrid data. Please use UG-ANTS instead.")
mask = lsm.copy(lsm.data.astype("bool", copy=False))
ants.utils.cube.guess_horizontal_bounds(mask)
if invert_mask:
mask.data = np.logical_not(mask.data)
sources = ants.utils.cube.as_cubelist(sources)
match method.lower():
case "spiral":
DefaultFiller = UMSpiralSearch
case "kdtree":
DefaultFiller = KDTreeFill
case _:
raise RuntimeError(f"Unknown search method: {method}")
for cube in sources:
if ants.utils.cube._is_ugrid(cube):
raise ValueError(
"ANTS doesn't support ugrid data. Please use UG-ANTS instead."
)
else:
Filler = DefaultFiller
ants.utils.cube.guess_horizontal_bounds(cube)
filler = Filler(cube, target_mask=mask)
filler(cube)