Source code for ants.fileformats.pp

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.

"""
This module includes utility functions for working with PP files for
ancillary generation.

Loading PP data
---------------

The following additional functionality is provided on load by ANTS:

1. Pseudo-level order from the PP file is preserved.

"""
import collections
import itertools

import ants
import cf_units
import iris
import iris.fileformats.pp as ipp
import numpy as np

RMDI = -1.073741824e9


class _CallbackPP(object):
    """
    Callback to preserve pseudo level order.

    Returns a callable object used as a callback when loading pp files.

    Parameters
    ----------
    user_callback : function(cube, field, filename), optional
        Additional callback function to be run after this callback.

    See Also
    --------
    :meth:`_Callback.__call__`
    :func:`iris.io.run_callback`

    """

    def __init__(self):
        self._index = 0
        self._mapping = {}
        self._user_callback = None

    def __call__(self, cube, field, filename):
        """
        ANTS callback to maintain pseudo level order.

        Used as a callback when loading pp files for all ants.io.load
        operations (e.g. :func:`~ants.io.load.load`, :func:`~ants.io.load.load_cube`
        etc).

        Parameters
        ----------
        cube : :class:`iris.cube.Cube`
            The cube generated from the field.
        field: ppfield
            Not used in this implementation, but kept for consistency
            with the iris load callback protocol.
        filename: str
            The name of the ancillary file.

        """
        if len(cube.coords("pseudo_level")) > 0:
            self._freeze_pseudo_level(cube)
        if self._user_callback is not None:
            self._user_callback(cube, field, filename)

    def _freeze_pseudo_level(self, cube):
        """
        Add pseudo-level ordering coordinate.

        All scalar coordinates are converted to aux coords and an additional
        scalar dim coord representing pseudo-level order is added to ensure
        that pseudo level order from the fields file is maintained after all
        fields are loaded.

        Parameters
        ----------
        cube : :class:`iris.cube.Cube`
            The cube containing a pseudo level coordinate to be fixed.

        """
        self._demote_scalar_dim_coords(cube)
        pseudo_level_coord = cube.coord("pseudo_level")
        _order = self._get_order(pseudo_level_coord.points[0])
        cube.add_aux_coord(
            iris.coords.DimCoord(_order, long_name="_pseudo_level_order")
        )

    def _demote_scalar_dim_coords(self, cube):
        for dim_coord in cube.coords(dim_coords=True):
            if len(dim_coord.points) == 1:
                aux_coord = iris.coords.AuxCoord.from_coord(dim_coord)
                cube.remove_coord(dim_coord)
                cube.add_aux_coord(aux_coord)

    def _get_order(self, value):
        _order = self._mapping.get(value, None)
        if _order is None:
            self._index += 1
            self._mapping[value] = self._index
            _order = self._index
        return _order

    def append_user_callback(self, callback):
        self._user_callback = callback


[docs] def field_filter_strict(*args, **kwargs): """ Return the sole field matching the given stash code. Arguments are passed straight to :func:`field_filter`. Returns ------- :class:`iris.fileformats.pp.PPField` Single field with requested stash code. Raises ------ RuntimeError if more than one field has the given stash code. See Also -------- :func:`field_filter` """ fields = field_filter(*args, **kwargs) if fields in [None, []]: raise RuntimeError("No fields found matching the filter parameters") if len(fields) > 1: raise RuntimeError( "More than one field matches the desired filter " "parameters" ) return fields[0]
[docs] def field_filter(fields, stash): """ Return only those fields with the given stash code. Parameters ---------- fields : iterator of :class:`iris.fileformats.pp.PPField` Fields to filter. stash : :class:`iris.fileformats.pp.STASH` Stash code to filter on. Returns ------- :class:`iris.fileformats.pp.PPField` List of fields with requested stash code. Notes ----- The model identifier is ignored in the comparison when it has a value of 0. This is due to the commonly missing model identifier in existing fields. """ try: fields = list(fields) except TypeError: fields = [fields] # Note that some fields have an invalid model identifier '0' so we allow # these fields. try: fields = [ field for field in fields if field.lbuser[3] == ((stash.section * 1000) + stash.item) and field.lbuser[6] in (stash.model, 0) ] except AttributeError: msg = "Type {} is not a recognised valid field type" raise TypeError(msg.format(type(fields[0]))) return fields
def _iris_workarounds(cube, field, field_index): x_coord = cube.coord(axis="x") # Set LBLEV to model level number rather than 0 for depth fields when an # absolute depth coordinate and bounds have been provided. # See https://github.com/SciTools/iris/issues/4082. try: z_coord = cube.coord("depth") number_of_depth_levels = len(z_coord.points) if field.lblev == 0: field.lblev = (field_index % number_of_depth_levels) + 1 except iris.exceptions.CoordinateNotFoundError: pass # Set LBLEV to 9999 for surface fields, rather than 0 as currently # set in iris. See https://github.com/SciTools/iris/issues/3820 if field.lblev == 0: field.lblev = 9999 # Override lbnpt bdx as iris does not support cubes with zonal # mean. See https://github.com/SciTools/iris/issues/2054 if field.lbnpt == 0: field.lbnpt = len(x_coord.points) if field.bdy != RMDI and field.bdx == 0.0: field.bdx = 360.0 # Overide bplon as iris doesn not support properly support unrotated pp # LAMs. See https://github.com/SciTools/iris/issues/3560 if not ants.utils.cube.is_global(cube): if x_coord.coord_system == ants.coord_systems.UM_SPHERE.crs: field.bplon = 180
[docs] def cube2pp(cube, field_coords=None): """ Generates pp fields from a cube. This is a wrapper around :func:`iris.fileformats.pp.as_fields`. In addition to that function, this ensures the horizontal coordinates are defined in the directions anticipated by the UM, identifies logical data, and applies fixes required for zonal mean data. Parameters ---------- cube : :class:`iris.cube.Cube` Cube to convert into pp fields. field_coords : list of 2 :class:`iris.coords.Coord` instances or list of \ 2 str coordinate names, optional. The coordinates to use to reduce the cube into 2D slices. Passed directly to :func:`iris.fileformats.pp.as_fields`. If not provided, this is automatically determined from the cube dimension coordinates if possible. Returns ------- : list of :class:`iris.fileformats.pp.PPField` The fields generated from the cube. Note that due to processing requirements, this is a list, rather than the generator returned by :func:`iris.fileformats.pp.as_fields`. Raises ------ RuntimeError If the cube has more than time based coordinate. ValueError If the cube has a time coordinate with a year of zero. ValueError If the cube has contradictory valid_range and valid_min/valid_max attributes. """ # Ensure a south-north definition - required by the UM x_coord, y_coord = ants.utils.cube.horizontal_grid(cube) if y_coord.points[-1] < y_coord.points[0] and isinstance( y_coord, iris.coords.DimCoord ): # Ensure that we do not invert the original - restrict this # transformation to the ancillary output. cube = cube.copy(cube.lazy_data()) cube = ants.utils.cube.reverse_coordinate(cube, y_coord) # TODO: Iris ignores proleptic_gregorian calendar time coordinates... # https://github.com/SciTools/iris/issues/3561 time_coords = ants.utils.cube.find_time_coordinates(cube) if len(time_coords) == 1: if time_coords[0].units.calendar in [ "proleptic_gregorian", "standard", "360_day", ]: units = time_coords[0].units.cftime_unit datetime = units.split("since")[1] year = datetime.split("-")[0] if int(year) == 0: raise ValueError( "\nZero not allowed as a reference year.\n" "Ensure that dates are representative of the data.\n" "See https://metoffice.github.io/ANTS/\n" "appendixA_time_handling.html#date-information" ) if cube.coord("time").units.calendar in ["proleptic_gregorian"]: cube = cube.copy(cube.lazy_data()) time_coord = cube.coord("time") new_units = cf_units.Unit(time_coord.units.name, calendar="standard") time_coord.units = new_units elif len(time_coords) > 1: msg = "More than one time based coordinate: {}" time_coord_names = [] for coord in time_coords: time_coord_names.append(coord.name()) raise RuntimeError(msg.format(time_coord_names)) if field_coords is None: field_coords = [y_coord, x_coord] try: fields = list(ipp.as_fields(cube, field_coords)) except ValueError as err: iris_msg = ( "zero not allowed as a reference year, does not exist in " "Julian or Gregorian calendars" ) err_msg = list(err.args) if iris_msg in list(err.args): msg = ( "\nEnsure that dates are representative of the data. See " "https://metoffice.github.io/ANTS/" "appendixA_time_handling.html#date-information" ) err_msg[0] += msg err.args = err_msg raise # Determine whether the cube represents logical data even if not a boolean # dtype. # http://www.unidata.ucar.edu/software/netcdf/docs/ # attribute_conventions.html logical_field = 0 dtype_kind = cube.lazy_data().dtype.kind if dtype_kind in ["i", "u"]: valid_range = [] valid_range.append(cube.attributes.get("valid_range")) valid_range.append( [cube.attributes.get("valid_min"), cube.attributes.get("valid_max")] ) valid_range = [x for x in valid_range if x is not None] valid_range = [x for x in valid_range if None not in x] for vrange in valid_range: if np.allclose([0, 1], vrange): logical_field += 1 if logical_field > 0 and len(valid_range) != logical_field: msg = ( "Cube attribute valid range is overspecified and " "contradictory. valid_range:{}, valid_min, valid_max: {}" ) raise ValueError(msg.format(valid_range[0], valid_range[1])) for field_index, field in enumerate(fields): _iris_workarounds(cube, field, field_index) if logical_field > 0: field.lbuser[0] = 3 else: # Populate lbuser0 now as iris only populates on save so that the # field becomes more self contained. field.lbuser[0] = {"b": 3, "u": 2, "i": 2, "f": 1}[dtype_kind] return fields
def _sorted_ppfields(cubes): """ Converts a Cubelist into a list of fields in the expected order. The field order is defined as: for month in month_list: for STASH in STASH_list: for level in level_pseudolevel_list: for zlevel in z_level_list: write out field where the level_pseudolevel_list and STASH_list are orders in which the pseudo levels, and STASH codes are first encountered in the cubes, while the month list and the z_level_list is ascending order of time or model level number. Parameters ---------- cubes : :class:`iris.cube.CubeList` Cube to convert into pp fields. Returns ------- : list of :class:`iris.fileformats.pp.PPField` The fields generated from the cube in the required order for ancillaries. Raises ------ KeyError If one or more of the cubes in the cubelist does not have a STASH code. ValueError If the cubes do not have identical pseudolevel coordinates. See Also -------- http://fcm2/projects/UM/ticket/4612 """ def get_stash_order(cubes): stash_order = [] for cube in cubes: try: stash_order.append(cube.attributes["STASH"]) except KeyError: raise ValueError( "Cube {} does not have a stash code".format(cube.name()) ) return stash_order def sort_keys(ppfield): return int_time(ppfield.t1), stash_order.index(ppfield.stash), ppfield.blev def int_time(time): try: # Converts a time of "1-02-03 04:05:06" to 10203040506 result = int(time.strftime("%Y%m%d%H%M%S").strip()) except ValueError: # To cover cases where no time value needs to be provided. # If the cftime strftime attempts to format a time of 0 # (i.e. no time information), this creates a string of 0000-00-00, # which is invalid for the cftime api. It requires a valid calendar # based date, e.g. 0000-01-01 for year 0, day 1, month 1. # In this case, we want to show that no time information has been # provided, so return the original 0 for the time field. result = 0 return result ppfields = [] stash_order = get_stash_order(cubes) for cube in cubes: if not isinstance(cube.attributes["STASH"], ipp.STASH): cube.attributes["STASH"] = ipp.STASH.from_msi(cube.attributes["STASH"]) ppfields.extend(cube2pp(cube)) ppfields = sorted(ppfields, key=sort_keys) return ppfields def _add_callback(callback, *args, **kwargs): """ Adds both the ants callback and the user provided callback (if any) to the load. """ args = list(args) if len(args) == 1: args += [callback] else: callback.append_user_callback(args[1]) args[1] = callback args = tuple(args) return args, kwargs
[docs] def load_cubes(*args, **kwargs): """ Loads cubes from a list of pp filenames. This function acts as a wrapper to :func:`iris.fileformats.pp.load_cubes`. See Also -------- :func:`iris.fileformats.pp.load_cubes` """ args, kwargs = _add_callback(_CallbackPP(), *args, **kwargs) return iris.fileformats.pp.load_cubes(*args, **kwargs)
[docs] def load_cubes_little_endian(*args, **kwargs): """ Loads cubes from a list of pp filenames. This function acts as a wrapper to :func:`iris.fileformats.pp.load_cubes`. See Also -------- :func:`iris.fileformats.pp.load_cubes` """ args, kwargs = _add_callback(_CallbackPP(), *args, **kwargs) return iris.fileformats.pp.load_cubes_little_endian(*args, **kwargs)
[docs] def pp2cubes(fields): """ Convert ppfields to cube(s). Parameters ---------- fields : :class:`iris.fileformats.pp.PPField` A single pp field or an iterator of pp fields to convert to one or more cubes. Returns ------- :class:`iris.cube.CubeList` The cubes derived from the pp fields. Notes ----- At time of writing there was no iris implementation of this function. This implementation will be retired when the iris implementation is complete. """ if not isinstance(fields, collections.abc.Iterable): fields = [fields] cube_field_pairs = iris.fileformats.pp.load_pairs_from_fields(fields) cubes = iris.cube.CubeList( [cube_field_pair[0] for cube_field_pair in cube_field_pairs] ).merge() ants.utils.cube.guess_horizontal_bounds(cubes) return cubes
[docs] def load_ppfields(*args, **kwargs): """ Read PPFields from on or more files. Parameters ---------- filenames : str or ~collections.abc.Iterable of str One or more filenames to load. Returns ------- ~collections.abc.Iterator The pp fields from all files read. Notes ----- This function differs from :func:`iris.fileformats.pp.load` by supporting multiple file names on input. """ args = list(args) filenames = args.pop(0) if isinstance(filenames, str): filenames = [filenames] return itertools.chain.from_iterable( (ipp.load(fnme, *args, **kwargs) for fnme in filenames) )