Source code for ugants.utils.cube

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of UG-ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.
"""Utility functions for handling UGrid cubes."""

import os
import sys
import warnings
from collections import Counter
from datetime import datetime
from pathlib import Path

import dask
import iris.cube
import numpy as np

from ugants.exceptions import PROVISIONAL_WARNING_MESSAGE, ProvisionalWarning


[docs] def as_cubelist(cubes): """ Return a CubeList, irrespective of whether a Cube or a CubeList has been provided. Parameters ---------- cubes : :class:`iris.cube.Cube` or :class:`iris.cube.CubeList` Raises ------ TypeError If provided with anything other than a :class:`iris.cube.Cube` or :class:`iris.cube.CubeList` Returns ------- :class:`iris.cube.CubeList` """ if isinstance(cubes, iris.cube.Cube): cubes = iris.cube.CubeList([cubes]) elif not isinstance(cubes, iris.cube.CubeList): raise TypeError( f"Expected iris.cube.Cube or iris.cube.CubeList, got {type(cubes)}" ) return cubes
[docs] def is_ugrid(cube): """Check if the provided cube contains UGrid data. Parameters ---------- cube : :class:`iris.cube.Cube` Cube to check. Returns ------- bool """ return cube.mesh is not None
[docs] def get_connectivity_indices(cube, connectivity): """Get the connectivity indices array for a cube's mesh. The ``start_index`` is subtracted from the connectivity array, so that the returned array is guaranteed to have zero-based indexing. For more information about zero- and one-based indexing, see the `UGRID conventions <http://ugrid-conventions.github.io/ugrid-conventions/#zero-or-one-based-indexing>`_. Parameters ---------- cube : iris.cube.Cube The cube from which to extract the connectivity indices. connectivity : str The connectivity to extract. Returns ------- connectivity_indices : numpy.ndarray The cube's zero-indexed connectivity array. """ connectivity = cube.mesh.connectivity(cf_role=connectivity) connectivity_indices = connectivity.indices - connectivity.start_index return connectivity_indices
[docs] def prepare_for_save(cubes: iris.cube.Cube | iris.cube.CubeList): """Add or amend attributes on a cube or cubes prior to saving to disk. Each cube is updated so that its history attribute acquires a new entry, specifying the operation performed by the invoked application. A history entry records a timestamp and the application (i.e. command-line) name and arguments of the current operation. History "entries" are delimited by newlines: the most recent appears first. The `CF Conventions <https://cfconventions.org/Data/cf-conventions/cf-conventions-1.11/cf-conventions.html#description-of-file-contents>`_ provide further description of the history attribute. A "ugants_status" attribute is added to indicate whether the version of UG-ANTS is stable. A "suite_provenance" attribute is added to provide information on the suite (if applicable) taken from the ``SUITE_PROVENANCE`` environment variable. Note ---- A copy of the provided cube(s) is returned, the operation is *not* performed in place. Parameters ---------- cubes : iris.cube.Cube | iris.cube.CubeList Cube(s) to be saved. Returns ------- iris.cube.Cube | iris.cube.CubeList A copy of the cube(s) with relevant metadata updated. If a single :class:`~iris.cube.Cube` is provided, then a single :class:`~iris.cube.Cube` will be returned. If a :class:`~iris.cube.CubeList` is provided, then a :class:`~iris.cube.CubeList` will be returned. """ updated_cubes = iris.cube.CubeList() operation_entry_str = _create_history_entry() for cube in as_cubelist(cubes): updated_cube = _update_single_cube_history(cube.copy(), operation_entry_str) updated_cube.attributes["ugants_status"] = PROVISIONAL_WARNING_MESSAGE updated_cube.attributes["suite_provenance"] = os.environ.get( "SUITE_PROVENANCE", "None" ) updated_cubes.append(updated_cube) warnings.warn(PROVISIONAL_WARNING_MESSAGE, ProvisionalWarning, stacklevel=1) if isinstance(cubes, iris.cube.Cube): updated_cubes = updated_cubes[0] return updated_cubes
def _create_history_entry(): """Create a new entry to be appended to an existing history attribute. The new history entry is a timestamped record of the command that was run to create or modify the cube. """ date = datetime.today().replace(microsecond=0) date_str = date.isoformat() app_path, app_args = sys.argv[0], sys.argv[1:] app_name = Path(app_path).name app_args.insert(0, app_name) app_str = " ".join(app_args) operation_entry_str = f"{date_str}: {app_str}" return operation_entry_str def _update_single_cube_history(cube: iris.cube.Cube, operation_entry_str: str): cube_history_attr = cube.attributes.get("history") if isinstance(cube_history_attr, str): new_history_str = f"{operation_entry_str}\n{cube_history_attr}" elif cube_history_attr is None: new_history_str = operation_entry_str else: raise TypeError( f"Cube '{cube.name()}' has a 'history' attribute of non-string " f"type : {cube_history_attr!r}." ) updated_cube = cube.copy() updated_cube.attributes["history"] = new_history_str return updated_cube
[docs] class Stencil: """Get indices of all cells in the neighbourhood of a given central cell. The algorithm follows that outlined in section 4.1 of `this paper <https://doi.org/10.5194/gmd-16-1265-2023>`_. Examples -------- The following table shows three panels of the C4 cubed sphere, which is used here to illustrate creating stencils on an unstructured grid. The numbers in the table represent the cell indices. In the example below, cell 0 neighbours cells 1, 4, 51, and 64. Cell 50 neighbours cells 49, 51, 54, and 65. +---+---+---+---+---+---+---+---+ | |67 |71 |75 |79 | + +---+---+---+---+ | |66 |70 |74 |78 | + +---+---+---+---+ | |65 |69 |73 |77 | + +---+---+---+---+ | |64 |68 |72 |76 | +---+---+---+---+---+---+---+---+ |48 |49 |50 |51 |0 |1 |2 |3 | +---+---+---+---+---+---+---+---+ |52 |53 |54 |55 |4 |5 |6 |7 | +---+---+---+---+---+---+---+---+ |56 |57 |58 |59 |8 |9 |10 |11 | +---+---+---+---+---+---+---+---+ |60 |61 |62 |63 |12 |13 |14 |15 | +---+---+---+---+---+---+---+---+ By default the stencil will only include the four immediate neighbours, plus the central cell itself: >>> stencil = Stencil(cube) >>> stencil[0] [0, 64, 1, 51, 4] >>> stencil[10] [6, 9, 10, 11, 14] Negative indices are permitted, and will be interpreted in the usual Python way, i.e. for a positive integer ``M``, the index ``-M`` will be replaced with ``N - M`` where ``N`` is the length of the array. For example, index ``-1`` refers to the final element of the array, ``-2`` the penultimate element, etc. If the provided central index is negative, then it will appear in the returned list as its positive counterpart. >>> cube.shape (96,) >>> stencil[-96] [0, 64, 1, 51, 4] >>> stencil[-86] [6, 9, 10, 11, 14] Passing ``iterations=2`` generates an extended stencil with the eight surrounding cells, or seven if the central cell is on the corner of a face: >>> extended_stencil = Stencil(cube, iterations=2) >>> extended_stencil[0] [0, 64, 1, 51, 4, 68, 5, 55] >>> extended_stencil[10] [5, 6, 7, 9, 10, 11, 13, 14, 15] Further iterations can be specified if a larger neighbourhood is required: >>> third_iteration_stencil = Stencil(cube, iterations=3) >>> third_iteration_stencil[0] [0, 64, 1, 65, 4, 68, 5, 2, 69, 6, 8, 72, 9, 50, 51, 54, 55, 59] >>> third_iteration_stencil[10] [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 80, 20, 84, 24, 88, 28] """
[docs] def __init__(self, cube, iterations=1): """Generate a stencil of a given size on a cube. Parameters ---------- cube : iris.cube.Cube Cube on which to generate stencil. iterations : :obj:`int`, optional The number of iterations for which to run the stencil generation algorithm. * If ``1`` (the default), use only the four immediate neighbours around the central cell. * If ``2``, use the extended neighbourhood by filling the corner cells, i.e. those which are connected to two cells in the immediate neighbourhood. Raises ------ TypeError If the provided ``iterations`` is not an :obj:`int`. ValueError If the provided ``iterations`` is not a positive integer. """ if not isinstance(iterations, int): raise TypeError(f"iterations must be an int, got {type(iterations)}.") if iterations < 1: raise ValueError( f"iterations must be a positive integer, got {iterations}." ) # Get the zero-indexed connectivity array self.connectivity_indices = get_connectivity_indices( cube, "face_face_connectivity", ) self._num_faces = cube.shape[0] self.iterations = iterations
[docs] def __getitem__(self, central_cell_index): """Retrieve the indices of the neighbourhood of ``central_cell_index``. Parameters ---------- central_cell_index : int Index of the central cell around which the stencil is defined. Negative indices are permitted, and will be interpreted in the usual Python way, i.e. index ``-1`` refers to the final element of the array, ``-2`` the penultimate element, etc. Returns ------- list[int] All indices in the stencil, including the central cell index. The list is unordered, and contains only positive indices. Note ---- Negative indices are permitted, and will be interpreted in the usual Python way, i.e. index ``-1`` refers to the final element of the array, ``-2`` the penultimate element, etc. If the provided central index is negative, then it will appear in the returned list as its positive counterpart, e.g. ``-1`` will be replaced with ``N - 1`` where ``N`` is the number of faces of the cube. """ # Allow negative indices as long as they are greater than -len(data) if (central_cell_index < -self._num_faces) or ( central_cell_index >= self._num_faces ): raise IndexError( f"Cannot index face {central_cell_index} for array " f"of length {self._num_faces}." ) # Rebase to convert negative index to equivalent positive index. # The reason for this is to avoid double counting in the neighbourhood, # e.g. -1 and 95 would be counted as separate indices for a C4 cubed-sphere. central_cell_index = central_cell_index % self._num_faces # Initialise neighbourhood with central cell only neighbourhood = {central_cell_index} for _ in range(self.iterations): # get faces connected to the current neighbourhood connected_faces = [ face for face in self.connectivity_indices[list(neighbourhood)].flatten() if face not in neighbourhood ] # count how many faces connect to each of these outer faces face_counts = Counter(connected_faces) # get the cells that are the most connected to the current neighbourhood new_faces = { face for face, count in face_counts.items() if count == max(face_counts.values()) } neighbourhood |= new_faces return list(neighbourhood)
[docs] def align_mask(cube_input): """.. versionadded:: 0.4.0 Adjust a cube's mask so that it is of the same shape as the associated data. Tests the input to see if it should be handled as a cube or cubelist and uses ```_expand_cube_mask(cube)``` to carry out the work of adjusting the mask(s). Parameters ---------- cube_input : :class:`iris.cube.Cube` or :class:`iris.cube.CubeList` Returns ------- : None In-place operation. """ # noqa: D400 if isinstance(cube_input, iris.cube.CubeList): for cube in cube_input: _expand_cube_mask(cube) else: _expand_cube_mask(cube_input)
def _expand_cube_mask(cube): """ If the input cube has no mask, this routine returns an mask array of False values that matches the shape of the input core data array. It is designed to address cases where a single False numpy boolean is being returned as a mask rather than a data sized array of False values. It maintains unrealised data if input is lazy. """ # noqa: D205 lazy = cube.has_lazy_data() cube_core_data = cube.core_data() if lazy: mask_values = dask.array.ma.getmaskarray(cube_core_data) else: mask_values = np.ma.getmask(cube_core_data) if cube_core_data.shape != mask_values.shape: if lazy: cube.data = dask.array.ma.masked_array( cube_core_data, mask=np.zeros(cube_core_data.shape, dtype=bool) ) else: cube.data = np.ma.masked_array( cube_core_data, mask=np.zeros(cube_core_data.shape, dtype=bool) ) return