"""Core operations on a CCHDO CF/netCDF file."""
from collections.abc import Hashable
import numpy as np
import numpy.typing as npt
import xarray as xr
from cchdo.params import WHPName, WHPNames
from .exchange import (
FileType,
add_geometry_var,
add_profile_type,
combine_dt,
set_axis_attrs,
set_coordinate_encoding_fill,
)
from .exchange.flags import (
ExchangeBottleFlag,
ExchangeCTDFlag,
ExchangeFlag,
ExchangeSampleFlag,
)
[docs]
DIMS = ("N_PROF", "N_LEVELS")
[docs]
FILLS_MAP = {"string": "", "integer": np.nan, "decimal": np.nan}
[docs]
dtype_map = {"string": "U", "integer": "float32", "decimal": "float64"}
[docs]
EXPOCODE = WHPNames["EXPOCODE"]
[docs]
STNNBR = WHPNames["STNNBR"]
[docs]
CASTNO = WHPNames["CASTNO"]
[docs]
SAMPNO = WHPNames["SAMPNO"]
[docs]
DATE = WHPNames["DATE"]
[docs]
TIME = WHPNames["TIME"]
[docs]
LATITUDE = WHPNames["LATITUDE"]
[docs]
LONGITUDE = WHPNames["LONGITUDE"]
[docs]
CTDPRS = WHPNames[("CTDPRS", "DBAR")]
[docs]
BTLNBR = WHPNames["BTLNBR"]
[docs]
COORDS = [
EXPOCODE,
STNNBR,
CASTNO,
SAMPNO,
DATE,
TIME,
LATITUDE,
LONGITUDE,
CTDPRS,
]
[docs]
FLAG_SCHEME: dict[str, type[ExchangeFlag]] = {
"woce_bottle": ExchangeBottleFlag,
"woce_discrete": ExchangeSampleFlag,
"woce_ctd": ExchangeCTDFlag,
}
[docs]
def _dataarray_factory(
param: WHPName, ctype="data", N_PROF=0, N_LEVELS=0
) -> xr.DataArray:
dtype = dtype_map[param.dtype]
fill = FILLS_MAP[param.dtype]
if ctype == "flag":
dtype = dtype_map["integer"]
fill = FILLS_MAP["integer"]
if param.scope == "profile":
arr = np.full((N_PROF), fill_value=fill, dtype=dtype)
if param.scope == "sample":
arr = np.full((N_PROF, N_LEVELS), fill_value=fill, dtype=dtype)
attrs = param.get_nc_attrs()
if "C_format" in attrs:
attrs["C_format_source"] = "database"
if ctype == "error":
attrs = param.get_nc_attrs(error=True)
if ctype == "flag" and param.flag_w in FLAG_SCHEME:
flag_defs = FLAG_SCHEME[param.flag_w]
flag_values = []
flag_meanings = []
for flag in flag_defs:
flag_values.append(int(flag))
flag_meanings.append(flag.cf_def)
odv_conventions_map = {
"woce_bottle": "WOCESAMPLE - WOCE Quality Codes for the sampling device itself",
"woce_ctd": "WOCECTD - WOCE Quality Codes for CTD instrument measurements",
"woce_discrete": "WOCEBOTTLE - WOCE Quality Codes for water sample (bottle) measurements",
}
attrs = {
"standard_name": "status_flag",
"flag_values": np.array(flag_values, dtype="int8"),
"flag_meanings": " ".join(flag_meanings),
"conventions": odv_conventions_map[param.flag_w],
}
var_da = xr.DataArray(arr, dims=DIMS[: arr.ndim], attrs=attrs)
if param.dtype == "string":
var_da.encoding["dtype"] = "S1"
if param.dtype == "integer":
var_da.encoding["dtype"] = "int32"
var_da.encoding["_FillValue"] = -999 # classic
if param in COORDS and param != CTDPRS:
var_da.encoding["_FillValue"] = None
if param.dtype == "integer":
var_da = var_da.fillna(-999).astype("int32")
if ctype == "flag":
var_da.encoding["dtype"] = "int8"
var_da.encoding["_FillValue"] = 9
var_da.encoding["zlib"] = True
return var_da
[docs]
def add_param(ds: xr.Dataset, param: WHPName, with_flag=False) -> xr.Dataset:
return ds
[docs]
def add_profile_level(ds: xr.Dataset, idx, levels) -> xr.Dataset:
return ds
[docs]
def add_level(ds: xr.Dataset, n_levels=1) -> xr.Dataset:
return ds
[docs]
def add_profile(
ds: xr.Dataset,
expocode: npt.ArrayLike,
station: npt.ArrayLike,
cast: npt.ArrayLike,
time: npt.ArrayLike,
latitude: npt.ArrayLike,
longitude: npt.ArrayLike,
profile_type: npt.ArrayLike,
) -> xr.Dataset:
ds = ds.reset_coords()
(
expocode,
station,
cast,
time,
latitude,
longitude,
profile_type,
) = np.broadcast_arrays(
np.atleast_1d(expocode), station, cast, time, latitude, longitude, profile_type
)
new_profs: dict[Hashable, npt.NDArray] = {
"expocode": expocode,
"station": station,
"cast": cast,
"time": time.astype("datetime64[ns]"), # ensure ns precision for now
"latitude": latitude,
"longitude": longitude,
"profile_type": profile_type,
}
dataarrays: dict[Hashable, tuple[tuple[Hashable, ...], npt.ArrayLike]] = {}
for name, variable in ds.variables.items():
if name in new_profs:
data = new_profs[name].astype(variable.dtype.kind)
if len(variable.dims) == 0:
dataarrays[name] = (variable.dims, np.nan)
elif len(variable.dims) == 1:
dataarrays[name] = (variable.dims, data)
elif len(variable.dims) == 2:
dataarrays[name] = (
variable.dims,
np.empty((1, ds.sizes["N_LEVELS"]), dtype=variable.dtype),
)
ds = xr.concat([ds, xr.Dataset(dataarrays)], dim="N_PROF")
# scalar var is expanded... squish it
ds["geometry_container"] = ds.geometry_container.squeeze()
ds = ds.set_coords([coord.nc_name for coord in COORDS if coord.nc_name in ds])
return ds
[docs]
def create_new() -> xr.Dataset:
"""Create an empty CF Dataset with the minimum required contents."""
dataarrays = {}
for param in COORDS:
dataarrays[param.nc_name] = _dataarray_factory(param)
ds = xr.Dataset(dataarrays)
ds = set_coordinate_encoding_fill(ds)
ds = combine_dt(ds)
ds = ds.set_coords([coord.nc_name for coord in COORDS if coord.nc_name in ds])
ds = add_profile_type(ds, FileType.BOTTLE) # just adds the var if no dims > 0
ds = set_axis_attrs(ds)
ds = add_geometry_var(ds)
return ds