# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of ANTS and is released under the BSD 3-Clause license.
# See LICENSE.txt in the root of the repository for full licensing details.
"""
This module supports saving cubes as UM ancillary files.
UM documentation paper `UM input and output file formats (F03)
<https://code.metoffice.gov.uk/doc/um/latest/papers/umdp_F03.pdf>`_
defines the ancillary file format.
Loading ancillary data
----------------------
The following additional functionality is provided on load by ANTS,
independent of which load function is used:
1. Pseudo-level order from the fieldsfile is preserved.
2. Grid staggering is stored on the cube and made available via
cube.attributes['grid_staggering'].
"""
import warnings
import ants
import iris
try:
import mule
except ImportError as _mule_import_error:
mule = None
message = (
f"Unable to import mule: {_mule_import_error}\nProceeding "
"without capabilities provided by mule."
)
warnings.warn(message)
import numpy as np
from ants.fileformats import pp
from . import template
# Dev note: within this file, we have both pp field and mule field instances.
# Be careful with the indexing when accessing elements by index: pp field
# headers are indexed from 0 while one dimensional mule field headers
# (i.e. lookup headers) are indexed from 1. Where possible, access the header
# elements by name rather than index.
IMDI = -32768 # As defined by UMDP F03 at UM version 13.9.
RMDI = -1073741824.0 # As defined by UMDP F03 at UM version 13.9.
class _CallbackUM(pp._CallbackPP):
"""Callback to preserve pseudo level order and grid staggering."""
def __init__(self, grid_staggering):
super(_CallbackUM, self).__init__()
self.grid_staggering = grid_staggering
def __call__(self, cube, field, filename):
"""
ANTS callback to add grid staggering and maintain pseudo level order.
Used as a callback when loading fields files for all ants.io.load
operations (e.g. :func:`~ants.io.load.load`, :func:`~ants.io.load.load_cube`
etc).
Parameters
----------
cube : :class:`iris.cube.Cube`
The cube generated from the field.
field: ppfield
Not used in this implementation, but kept for consistency
with the iris load callback protocol.
filename: str
The name of the ancillary file.
"""
cube.attributes["grid_staggering"] = self.grid_staggering[filename]
super(_CallbackUM, self).__call__(cube, field, filename)
class _IrisPPFieldDataProvider(object):
def __init__(self, ppfield):
self.ppfield = ppfield
def _data_array(self):
# Data type conversions and mask handling
data = self.ppfield.data
if isinstance(data, np.ma.core.MaskedArray):
data = data.filled(fill_value=RMDI)
data = data.astype(
{"f": ">f8", "u": ">i8", "b": ">i8", "i": ">i8"}[data.dtype.kind],
copy=False,
)
return data
class _GuardField3:
"""This class enables mule to be an optional import.
Any attempt to instantiate this class will trigger a ValueError. This
means that we can import this package safely, but any attempt to use
mule functionality will trigger an error.
Using the `from_pp` class method or the `_get_headers_from_pp` static
method will also trigger an error.
"""
def __init__(self, *args, **kwargs):
self._error()
@staticmethod
def _get_headers_from_pp(*args, **kwargs):
_GuardField3._error()
@classmethod
def from_pp(cls, *args, **kwargs):
cls._error()
@staticmethod
def _error():
raise ValueError(
"Mule cannot be imported, but an attempt has been "
"made to use mule functionality through the "
"_Field3 class"
)
def _get_Field3(mule_present):
if mule_present:
class _ActualField3(mule.Field3):
"""
Provides conveniences for ancillary generation on top of the mule Field3.
"""
_NUM_LOOKUP_INTS = mule.Field.NUM_LOOKUP_INTS
_NUM_LOOKUP_REALS = mule.Field.NUM_LOOKUP_REALS
@staticmethod
def _get_headers_from_pp(ppfield):
"""
Gets headers in a format suitable for use for generating a mule Field
from the ppfield.
Parameters
----------
ppfield : :class:`iris.fileformats.pp.PPField`
The field from which to extract the headers.
Returns
-------
: tuple(:class:`numpy.ndarray`, :class:`numpy.ndarray`)
First array is the int_headers and the second array is the real
headers
"""
# Following pp.PPField.save we make sure the data is big-endian
int_headers = np.empty(
shape=_ActualField3._NUM_LOOKUP_INTS,
dtype=np.dtype(">i%d" % mule._DEFAULT_WORD_SIZE),
)
int_headers.fill(IMDI)
real_headers = np.empty(
shape=_ActualField3._NUM_LOOKUP_REALS,
dtype=np.dtype(">f%d" % mule._DEFAULT_WORD_SIZE),
)
real_headers.fill(RMDI)
word_count = 0
for name, word_no in ppfield.HEADER_DEFN:
ppfield_value = getattr(ppfield, name)
for sub_index, num in enumerate(word_no):
if len(word_no) > 1:
value = ppfield_value[sub_index]
else:
value = ppfield_value
# First sort out the integer headers
if word_count >= _ActualField3._NUM_LOOKUP_INTS:
word_count = 0
# Cast as special types include pp.SplittableInt and pp._LBProc
if num < _ActualField3._NUM_LOOKUP_INTS:
int_headers[word_count] = int(value)
else:
real_headers[word_count] = float(value)
word_count += 1
return (int_headers, real_headers)
@classmethod
def from_pp(cls, ppfield):
"""
Generates a mule Field3 from an iris PPField3.
Data type conversion is done to ensure that the data is in the correct
format for writing as an ancillary. This also corrects the missing
data indicator if needed.
Parameters
----------
ppfield: :class:`iris.fileformats.pp.PPField3`
The PP field from which to create a mule Field.
Returns
-------
: :class:`mule.Field3`
The ppfield translated to a mule Field3.
.. warning::
This is not lazy - i.e. if the data attached to the ppfield is
lazy, the conversion to a mule field will realise it.
"""
# pp field with 64 word length header to ancillary field conversion.
if not isinstance(ppfield, iris.fileformats.pp.PPField3):
raise TypeError(
"pp header version not supported for ancillary "
"field generation."
)
int_headers, real_headers = cls._get_headers_from_pp(ppfield)
# Ensure consistent RMDI across all fields
if ppfield.bmdi != RMDI:
bmdi_offset = [
offset[0]
for (name, offset) in ppfield.HEADER_DEFN
if name == "bmdi"
][0]
bmdi_ind = bmdi_offset - _ActualField3._NUM_LOOKUP_INTS
real_headers[bmdi_ind] = RMDI
# LBVC - should always be set (surface/model level).
if ppfield.lbvc == 0:
int_headers[25] = 129
field = cls(
int_headers=int_headers,
real_headers=real_headers,
data_provider=_IrisPPFieldDataProvider(ppfield),
)
field.x = field.y = None
if getattr(ppfield, "x", None) is not None:
field.bdx = field.bzx = RMDI
field.x = ppfield.x
if getattr(ppfield, "y", None) is not None:
field.bdy = field.bzy = RMDI
field.y = ppfield.y
return field
@property
def is_regular(self):
"""
Determine whether the field has regular x or y.
Returns
-------
: tuple of bool
Regularity of the dimensions in the order (x, y).
"""
if not hasattr(self, "x"):
self.x = None
if not hasattr(self, "y"):
self.y = None
return (self.x is None, self.y is None)
@property
def is_rotated(self):
"""
Returns
-------
: bool
True if the pp header lbcode.ix is 1
"""
try:
ix = int(str(self.lbcode)[-3])
res = ix == 1
except IndexError:
res = False
return res
_Field3 = _ActualField3
else:
_Field3 = _GuardField3
return _Field3
_Field3 = _get_Field3(mule)
def _cubes_to_ancilfile(cubes):
"""
Converts iris cubes into headers and fields for saving with mule.
Returns
----------
: :class:`mule.AncilFile`
AncilFile generated from the cubes.
Parameters
----------
cubes : :class:`iris.cube.Cube` or :class:`iris.cube.CubeList`
Cubes from which to derive the information necessary for creating an
ancillary.
Raises
------
RuntimeError
If a cube with an unsupported coordinate is used.
"""
def _reject_unsupported_coords(cubes):
# Some coordinates we reject in all circumstances:
unsupported = [
"level_pressure",
]
for cube in cubes:
rejected = [c.name() for c in cube.coords() if c.name() in unsupported]
if rejected:
msg = (
"Coordinates {!s} are presently unsupported for "
"saving as F03 ancillary files."
).format(sorted(rejected))
raise RuntimeError(msg)
# For depths, we only support the case where depth has a "positive:
# down" attribute at this time:
depth_coords = [coord for coord in cube.coords() if "depth" in coord.name()]
if len(depth_coords) > 0:
for coord in depth_coords:
attributes = coord.attributes
if "positive" not in attributes or attributes["positive"] != "down":
msg = (
'Unsupported depth coordinate "{}". Currently, '
'only depths with the "positive" attribute defined '
'as "down" are supported.'
).format(coord.name())
raise ValueError(msg)
cubes = ants.utils.cube.as_cubelist(cubes)
_reject_unsupported_coords(cubes)
ants.utils.cube.derive_circular_status(cubes)
# _ppfields guaranteed to be a list due to sort, but could be either lazy
# or realised data
_ppfields = pp._sorted_ppfields(cubes)
_reference_field = _Field3.from_pp(_ppfields[0])
# But _Field3 is not lazy, so use a generator to avoid excessive memory
# usage
fields = (_Field3.from_pp(ppfield) for ppfield in _ppfields)
_template = template.create(cubes, _reference_field)
ancilfile = mule.AncilFile.from_template(_template)
ancilfile.fields.extend(fields)
return ancilfile
def _fetch_grid_staggering_from_file(filenames, mule_present):
"""Fetch grid filename: staggering mapping.
Parameters
----------
filename: str or iterable of str
The name of the ancillary file from which to fetch the grid staggering
attribute.
mule_present: obj
If this evaluates to True, use mule to get the grid staggering from
the file. Otherwise, raise a meaningful error.
Returns
-------
: dict
Mapping between filename and grid staggering.
"""
if mule_present is False:
raise ValueError(
"Mule cannot be imported, but an attempt has been "
"made to use mule functionality through loading"
"an F03 ancillary file."
)
mapping = {}
for filename in filenames:
ffv = mule.AncilFile.from_file(filename)
mapping.update({filename: ffv.fixed_length_header.grid_staggering})
return mapping
[docs]
def load_cubes(*args, **kwargs):
"""
Loads cubes from a list of fields files filenames.
This function acts as a wrapper to :func:`iris.fileformats.um.load_cubes`.
Adds grid staggering to the loaded data.
See Also
--------
:func:`iris.fileformats.um.load_cubes`
"""
grid_staggering = _fetch_grid_staggering_from_file(args[0], mule)
args, kwargs = pp._add_callback(_CallbackUM(grid_staggering), *args, **kwargs)
return iris.fileformats.um.load_cubes(*args, **kwargs)
[docs]
def load_cubes_32bit_ieee(*args, **kwargs):
"""
Loads cubes from a list of 32bit ieee converted fieldsfiles filenames.
Adds grid staggering to the loaded data.
See Also
--------
:func:`load_cubes` for keyword details
"""
grid_staggering = _fetch_grid_staggering_from_file(args[0], mule)
args, kwargs = pp._add_callback(_CallbackUM(grid_staggering), *args, **kwargs)
return iris.fileformats.um.load_cubes_32bit_ieee(*args, **kwargs)
def _mule_set_lbuser2(ancilfile):
# Set lbuser2 as some downstream applications are dependent on it and mule
# doesn't currently derive it for us.
# https://code.metoffice.gov.uk/trac/um/ticket/4656
# Size of the field in 64-bit words and adjusted for 512 word sector
# boundary described by Steve Wardle.
# Assumes that output is 64-bit and unpacked (something we enforce
# anyway for our ancillaries).
lbnrec = ancilfile.integer_constants.num_rows * ancilfile.integer_constants.num_cols
lbnrec = lbnrec - (lbnrec % -512)
for ind, field in enumerate(ancilfile.fields):
field.lbnrec = lbnrec
if ind == 0:
field.lbuser2 = 1
continue
prev_field = ancilfile.fields[ind - 1]
field.lbuser2 = prev_field.lbnrec + prev_field.lbuser2