Source code for ants.fileformats.ancil.template

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.

"""
Tools for generating the template required for saving ancillary files.

"""
import itertools
import re

import ants
import iris
import numpy as np

from . import time_headers

GRIDS = {"newdynamics": 3, "endgame": 6}
"""dict: Map of model formulation to grid staggering fixed length header value.

Used in saving ancillaries, to set the grid staggering in the fixed length header."""


def _ancil_version():
    """
    Parse the ants version into a form suitable for the model version
    element of the fixed length header.

    The convention is as follows:
        (100 * release_number) + subrelease
    For example, ants version 1.2 would become 102.

    """
    version_identity = [int(v) for v in re.findall(r"(\d+).{0,1}", ants.__version__)]
    version = 0
    if version_identity[0] > 0:
        version = 100 * version_identity[0]
    if len(version_identity) > 1:
        version += version_identity[1]

    return version


def _get_base_headers(cubes, field):
    # Base template
    headers = {}
    headers["fixed_length_header"] = {
        "data_set_format_version": 20,  # for MASS storage: always 20
        # for ancillaries
        "sub_model": 1,  # Atmosphere
        "dataset_type": 4,  # Ancillary
        "vert_coord_type": 1,  # Hybrid heights
        "model_version": _ancil_version(),  # version of ancillary program
    }
    headers["integer_constants"] = {
        "num_times": 1,  # Workaround for xconv
        "num_field_types": len(cubes),
        "num_cols": field.lbnpt,
        "num_rows": field.lbrow,
    }
    headers["real_constants"] = {}
    return headers


def _unvertical_coords(cube):
    """Returns iterator over the non-vertical coordinates of cube."""
    return (
        coord for coord in cube.coords() if iris.util.guess_coord_axis(coord) != "Z"
    )


def _check_non_vertical_coords_f03_compatible(cubes):
    """Raise exception if cubes look like they are incompatible with F03."""
    refcube = cubes[0]
    refcube_coords_names = set(coord.name() for coord in _unvertical_coords(refcube))
    msg = "Currently, only support writing cubes with identical coordinates."

    for cube in cubes:
        cube_coords_names = set(coord.name() for coord in _unvertical_coords(cube))
        missing_coords = cube_coords_names.symmetric_difference(refcube_coords_names)
        if missing_coords:
            msg = "{}  {} coordinates not common to all cubes.".format(
                msg, " ".join(missing_coords)
            )
            raise RuntimeError(msg)
        for coord in _unvertical_coords(cube):
            try:
                if not ants.utils.coord.relaxed_equality(
                    coord, refcube.coord(coord.name())
                ):
                    msg = "{}  {} not identical in all cubes.".format(msg, coord.name())
                    raise RuntimeError(msg)
            except TypeError:
                raise RuntimeError(
                    f"Invalid values found in {coord.name()} coordinate."
                ) from None


def _get_grid(cubes):
    _check_non_vertical_coords_f03_compatible(cubes)

    grids = [
        cube.attributes["grid_staggering"]
        for cube in cubes
        if "grid_staggering" in cube.attributes
    ]
    unique_grids = set(grids)
    # Check grid staggering is consistent across all cubes
    grid_check = len(grids) != 0 and len(grids) != len(cubes)
    if grid_check or len(unique_grids) > 1:
        msg = "Cubes provided are defined with different grid " "staggering: ({})"
        raise RuntimeError(msg.format(grids))
    try:
        grid = unique_grids.pop()
    except KeyError:
        grid = GRIDS["endgame"]
    return grid


def _set_grid_definition(headers, grid, field):
    """
    Sets the grid definition for standard grids or rotated pole grids.

    Uses grid to specify the grid staggering and the provided field to
    determine whether the grid is rotated, and to populate grid
    information.

    """
    headers["fixed_length_header"]["grid_staggering"] = grid
    # Horizontal grid type
    horiz_grid_type = field.lbhem
    if field.is_rotated:
        horiz_grid_type += 100

    headers["fixed_length_header"]["horiz_grid_type"] = horiz_grid_type

    # REAL CONSTANTS
    (regular_x, regular_y) = field.is_regular
    if regular_x:
        headers["real_constants"]["col_spacing"] = field.bdx
        # Longitude of first column in degrees (longitudes in range 0-360)
        start_lon = field.bzx + field.bdx
        headers["real_constants"]["start_lon"] = start_lon
    else:
        if grid in GRIDS.values():
            headers["column_dependent_constants"] = {
                "dims": (len(field.x), None),
                "lambda_p": list(field.x),
            }
        else:
            msg = (
                'Can only derive "x" values for variable resolution '
                "grids for NewDynamics/EndGame.  Got grid staggering: "
                "{}"
            )
            raise RuntimeError(msg.format(grid))

    if regular_y:
        # NS (y) grid spacing in degrees (RMDI for variable resolution
        # grids)
        headers["real_constants"]["row_spacing"] = field.bdy
        # Latitude of first row in degrees (latitudes in range 90 to -90)
        start_lat = field.bzy + field.bdy
        headers["real_constants"]["start_lat"] = start_lat
    else:
        if grid == GRIDS["newdynamics"]:
            headers["row_dependent_constants"] = {
                "dims": (len(field.y), None),
                "phi_p": list(field.y),
            }
        elif grid == GRIDS["endgame"]:
            row_data = np.zeros(len(field.y) + 1)
            row_data[:-1] = list(field.y)
            headers["row_dependent_constants"] = {
                "dims": (len(field.y) + 1, None),
                "phi_p": row_data,
            }
        else:
            msg = (
                'Can only derive "y" values for variable resolution '
                "grids for NewDynamics/EndGame.  Got grid stagerring: "
                "{}"
            )
            raise RuntimeError(msg.format(grid))

    # Real latitude of 'pseudo' N pole in degrees
    headers["real_constants"]["north_pole_lat"] = field.bplat
    # Real longitude of 'pseudo' N pole in degrees
    headers["real_constants"]["north_pole_lon"] = field.bplon


def _nlevels(cube):
    result = 1
    vert_coords = cube.coords(axis="z")
    if len(vert_coords) > 0:
        result = len(vert_coords[0].points)
    return result


def _vert_coords_eq(cube1, cube2):
    vcs1 = cube1.coords(axis="z")
    vcs2 = cube2.coords(axis="z")
    if len(vcs1) != len(vcs2):
        result = False
    else:
        result = all(
            ants.utils.coord.relaxed_equality(vc1, vc2) for vc1, vc2 in zip(vcs1, vcs2)
        )
    return result


def _check_vert_coords_f03_compatible(cubes):
    """
    Raise exception if cubes have vertical coordinates incompabile with f03.
    """
    multis = (cube for cube in cubes if _nlevels(cube) > 1)
    for cube1, cube2 in itertools.combinations(multis, 2):
        if not (_vert_coords_eq(cube1, cube2)):
            raise RuntimeError(
                "Multi-level ancillary fields must share the "
                "same vertical coordinate."
            )


def _cube_with_max_levels(cubes):
    cmax = cubes[0]
    for cube in cubes[1:]:
        cmax = cube if _nlevels(cube) > _nlevels(cmax) else cmax
    return cmax


def _set_levels(cubes, headers):
    _check_vert_coords_f03_compatible(cubes)
    cube = _cube_with_max_levels(cubes)
    vert_coords = cube.coords(axis="z")
    if len(vert_coords) > 0:
        for coord in vert_coords:
            # Check for depth (otherwise, default mule vertical coord type is
            # used).  Note that the depth coordinates we support are defined
            # in ants.fileformats.ancil._cubes_to_ancilfile\
            # ._reject_unsupported_coords
            if coord.name() == "depth":
                headers["fixed_length_header"]["vert_coord_type"] = 4
    headers["integer_constants"]["num_levels"] = _nlevels(cube)


[docs] def create(cubes, field): """ Creates a template set of headers. The template is designed to be compatible with "from_template" method of :class:`mule.ancil.AncilFile`. Parameters ---------- cubes : :class:`iris.cube.CubeList` The cubes used to derive the some of the header values. field : :class:`mule.Field3` A realised reference field used to derive the remaining header values. Returns ------- : dict Contains the derived header values in a format suitable to be used to generate a mule AncilFile via the "from_template" method of :class:`mule.ancil.AncilFile`. """ headers = _get_base_headers(cubes, field) _set_levels(cubes, headers) _set_grid_definition(headers, _get_grid(cubes), field) time_headers.set_headers_time_information(cubes[0], headers) return headers