Source code for hydro.checks
from collections import defaultdict
import numpy as np
import xarray as xr
from cchdo.hydro.exchange.exceptions import ExchangeDataFlagPairError
from cchdo.hydro.flags import (
ExchangeBottleFlag,
ExchangeCTDFlag,
ExchangeSampleFlag,
)
from cchdo.hydro.sorting import sort_ds
[docs]
def check_sorted(dataset: xr.Dataset) -> bool:
"""Check that the dataset is sorted by the rules in :func:`sort_ds`"""
sorted_ds = sort_ds(dataset.copy(deep=True))
return all(
[
np.allclose(sorted_ds.pressure, dataset.pressure, equal_nan=True),
np.all(
(sorted_ds.time == dataset.time)
| (np.isnat(sorted_ds.time) == np.isnat(dataset.time))
),
np.allclose(sorted_ds.latitude, dataset.latitude, equal_nan=True),
np.allclose(sorted_ds.longitude, dataset.longitude, equal_nan=True),
]
)
[docs]
def check_ancillary_variables(ds: xr.Dataset):
"""Check that everything in an ancillary_variables attribute appears as a variable
Check that every variable that is known ancillary appears in at least one ancillary_variable attribute
"""
looks_ancillary_suffixes = ("_qc", "_error")
ancillary_variables_attrs = defaultdict(list)
looks_ancillary = set()
for name, variable in ds.variables.items():
if not isinstance(name, str):
raise ValueError(f"variable names must be strings not {name}")
if any(name.endswith(suffix) for suffix in looks_ancillary_suffixes):
looks_ancillary.add(name)
if variable.attrs.get("ancillary_variables") is None:
continue
for ancillary in variable.attrs["ancillary_variables"].split():
ancillary_variables_attrs[ancillary].append(name)
if errors := ancillary_variables_attrs.keys() - ds.variables.keys():
raise ValueError(errors)
if errors := looks_ancillary - ancillary_variables_attrs.keys():
raise ValueError(errors)
[docs]
def check_flags(dataset: xr.Dataset, raises=True):
"""Check WOCE flag values agaisnt their param and ensure that the param either has a value or is "nan" depedning on the flag definition.
Return a boolean array of invalid locations?
"""
woce_flags = {
"WOCESAMPLE": ExchangeBottleFlag,
"WOCECTD": ExchangeCTDFlag,
"WOCEBOTTLE": ExchangeSampleFlag,
}
flag_has_value = {
"WOCESAMPLE": {flag.value: flag.has_value for flag in ExchangeBottleFlag},
"WOCECTD": {flag.value: flag.has_value for flag in ExchangeCTDFlag},
"WOCEBOTTLE": {flag.value: flag.has_value for flag in ExchangeSampleFlag},
}
# In some cases, a coordinate variable might have flags, so we are not using filter_by_attrs
# get all the flag vars (that also have conventions)
flag_vars = []
for var_name in dataset.variables:
# do not replace the above with .items() it will give you xr.Variable objects that you don't want to use
# the following gets a real xr.DataArray
data = dataset[var_name]
if not {"standard_name", "conventions"} <= data.attrs.keys():
continue
if not any(flag in data.attrs["conventions"] for flag in woce_flags):
continue
if "status_flag" in data.attrs["standard_name"]:
flag_vars.append(var_name)
# match flags with their data vars
# it is legal in CF for one set of flags to apply to multiple vars
flag_errors = {}
for flag_var in flag_vars:
# get the flag and check attrs for defs
flag_da = dataset[flag_var]
conventions = None
for flag in woce_flags:
if flag_da.attrs.get("conventions", "").startswith(flag):
conventions = flag
break
# we don't know these flags, skip the check
if not conventions:
continue
allowed_values = np.array(list(flag_has_value[conventions]))
illegal_flags = ~flag_da.fillna(9).isin(allowed_values)
if np.any(illegal_flags):
illegal_flags.attrs["comments"] = (
f"This is a boolean array in the same shape as '{flag_da.name}' which is truthy where invalid values exist"
)
flag_errors[f"{flag_da.name}_value_errors"] = illegal_flags
continue
for var_name in dataset.variables:
data = dataset[var_name]
if "ancillary_variables" not in data.attrs:
continue
if flag_var not in data.attrs["ancillary_variables"].split(" "):
continue
# check data against flags
has_fill_f = [
flag
for flag, value in flag_has_value[conventions].items()
if value is False
]
has_fill = flag_da.isin(has_fill_f) | np.isnan(flag_da)
# TODO deal with strs
if np.issubdtype(data.values.dtype, np.number):
fill_value_mismatch: xr.DataArray = ~(np.isfinite(data) ^ has_fill)
if np.any(fill_value_mismatch):
fill_value_mismatch.attrs["comments"] = (
f"This is a boolean array in the same shape as '{data.name}' which is truthy where invalid values exist"
)
flag_errors[f"{data.name}_value_errors"] = fill_value_mismatch
flag_errors_ds = xr.Dataset(flag_errors)
if raises and any(flag_errors_ds):
raise ExchangeDataFlagPairError(flag_errors_ds)
return flag_errors_ds