# The MIT License (MIT)
# Copyright (c) 2019 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from typing import List
import numpy as np
import xarray as xr
from xcube.core.geom import is_lon_lat_dataset
from xcube.core.schema import get_dataset_xy_var_names, get_dataset_time_var_name
[docs]def assert_cube(dataset: xr.Dataset, name=None) -> xr.Dataset:
"""
Assert that the given *dataset* is a valid xcube dataset.
:param dataset: The dataset to be validated.
:param name: Optional parameter name.
:raise: ValueError, if dataset is not a valid xcube dataset
"""
report = verify_cube(dataset)
if report:
message = f"Dataset" + (name + " " if name else " ")
message += "is not a valid xcube dataset, because:\n"
message += "- " + ";\n- ".join(report) + "."
raise ValueError(message)
return dataset
[docs]def verify_cube(dataset: xr.Dataset) -> List[str]:
"""
Verify the given *dataset* for being a valid xcube dataset.
The tool verifies that *dataset*
* defines two spatial x,y coordinate variables, that are 1D, non-empty, using correct units;
* defines a time coordinate variables, that are 1D, non-empty, using correct units;
* has valid bounds variables for spatial x,y and time coordinate variables, if any;
* has any data variables and that they are valid, e.g. min. 3-D, all have
same dimensions, have at least the dimensions dim(time), dim(y), dim(x) in that order.
Returns a list of issues, which is empty if *dataset* is a valid xcube dataset.
:param dataset: A dataset to be verified.
:return: List of issues or empty list.
"""
report = []
xy_var_names = get_dataset_xy_var_names(dataset, must_exist=False)
if xy_var_names is None:
report.append(f"missing spatial x,y coordinate variables")
time_var_name = get_dataset_time_var_name(dataset, must_exist=False)
if time_var_name is None:
report.append(f"missing time coordinate variable")
if time_var_name:
_check_time(dataset, time_var_name, report)
if xy_var_names and is_lon_lat_dataset(dataset, xy_var_names=xy_var_names):
_check_lon_or_lat(dataset, xy_var_names[0], -180., 180., report)
_check_lon_or_lat(dataset, xy_var_names[1], -90., 90., report)
if xy_var_names and time_var_name:
_check_data_variables(dataset, xy_var_names, time_var_name, report)
if xy_var_names:
_check_coord_equidistance(dataset, xy_var_names[0], xy_var_names[0], report)
_check_coord_equidistance(dataset, xy_var_names[1], xy_var_names[1], report)
return report
def _check_coord_equidistance(dataset, coord_name, dim_name, report, rtol=None):
diff = dataset[coord_name].diff(dim=dim_name)
if not _check_equidistance_from_diff(dataset, diff, rtol=rtol):
report.append(f"coordinate variable {coord_name!r} is not equidistant")
bnds_name = dataset.attrs.get('bounds', f'{coord_name}_bnds')
if bnds_name in dataset.coords:
diff = dataset[bnds_name].diff(dim=dim_name)
if not _check_equidistance_from_diff(dataset, diff[:, 0], rtol=rtol):
report.append(f"coordinate variable {bnds_name!r} is not equidistant")
elif not _check_equidistance_from_diff(dataset, diff[:, 1], rtol=rtol):
report.append(f"coordinate variable {bnds_name!r} is not equidistant")
def _check_equidistance_from_diff(dataset, diff, rtol=None):
if rtol is None:
rtol = np.abs(np.divide(diff[0], 100.00)).values
# Check whether the bounding box intersect with the anti-meridian for lon/lat datasets.
# This is the case when exactly one difference is negative.
if is_lon_lat_dataset(dataset):
ct_neg = diff.where(diff < 0).count().values
if ct_neg == 1:
# If a bounding box intersects with the anti-meridian, compute its correct width
diff = xr.where(diff < 0, diff + 360., diff)
return np.allclose(diff[0], diff, rtol=rtol)
def _check_data_variables(dataset, xy_var_names, time_var_name, report):
x_var_name, y_var_name = xy_var_names
x_var, y_var, time_var = dataset[x_var_name], dataset[y_var_name], dataset[time_var_name]
x_dim, y_dim, time_dim = x_var.dims[0], y_var.dims[0], time_var.dims[0]
first_var = None
first_dims = None
first_chunks = None
for var_name, var in dataset.data_vars.items():
dims = var.dims
chunks = var.data.chunks if hasattr(var.data, "chunks") else None
if len(dims) < 3 or dims[0] != time_dim or dims[-2] != y_dim or dims[-1] != x_dim:
report.append(f"dimensions of data variable {var_name!r}"
f" must be ({time_dim!r}, ..., {y_dim!r}, {x_dim!r}),"
f" but were {dims!r} for {var_name!r}")
if first_var is None:
first_var = var
first_dims = dims
first_chunks = chunks
continue
if first_dims != dims:
report.append("dimensions of all data variables must be same,"
f" but found {first_dims!r} for {first_var.name!r} "
f"and {dims!r} for {var_name!r}")
if first_chunks != chunks:
report.append("all data variables must have same chunk sizes,"
f" but found {first_chunks!r} for {first_var.name!r} "
f"and {chunks!r} for {var_name!r}")
def _check_dim(dataset, name, report):
if name not in dataset.dims:
report.append(f"missing dimension {name!r}")
elif dataset.dims[name] < 0:
report.append(f"size of dimension {name!r} must be a positive integer")
def _check_coord_var(dataset, var_name, report):
if var_name not in dataset.coords:
report.append(f"missing coordinate variable {var_name!r}")
return None
var = dataset.coords[var_name]
if var.dims != (var_name,):
report.append(f"coordinate variable {var_name!r} must have a single dimension {var_name!r}")
return None
if var.size == 0:
report.append(f"coordinate variable {var_name!r} must not be empty")
return None
bnds_name = var.attrs.get('bounds', f'{var_name}_bnds')
if bnds_name in dataset.coords:
bnds_var = dataset.coords[bnds_name]
expected_shape = var.size, 2
expected_dtype = var.dtype
if len(bnds_var.dims) != 2 or bnds_var.dims[0] != var_name:
report.append(f"bounds coordinate variable {bnds_name!r}"
f" must have dimensions ({var_name!r}, <bounds_dim>)")
if bnds_var.shape != expected_shape:
report.append(
f"shape of bounds coordinate variable {bnds_name!r}"
f" must be {expected_shape!r} but was {bnds_var.shape!r}")
if bnds_var.dtype != expected_dtype:
report.append(
f"type of bounds coordinate variable {bnds_name!r}"
f" must be {expected_dtype!r} but was {bnds_var.dtype!r}")
return None
return var
def _check_lon_or_lat(dataset, var_name, min_value, max_value, report):
var = _check_coord_var(dataset, var_name, report)
if var is None:
return
if not np.all(np.isfinite(var)):
report.append(f"values of coordinate variable {var_name!r} must be finite")
if np.min(var) < min_value or np.max(var) > max_value:
report.append(f"values of coordinate variable {var_name!r}"
f" must be in the range {min_value} to {max_value}")
def _check_time(dataset, name, report):
var = _check_coord_var(dataset, name, report)
if var is None:
return
if not np.issubdtype(var.dtype, np.datetime64):
report.append(f"type of coordinate variable {name!r} must be datetime64")
if not np.all(np.diff(var.astype(np.float64)) > 0):
report.append(f"values of coordinate variable {name!r} must be monotonic increasing")