Source code for xcube.core.verify

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

from typing import List

import numpy as np
import xarray as xr

from xcube.core.geom import is_lon_lat_dataset
from xcube.core.schema import get_dataset_xy_var_names, get_dataset_time_var_name


[docs] def assert_cube(dataset: xr.Dataset, name=None) -> xr.Dataset: """Assert that the given *dataset* is a valid xcube dataset. Args: dataset: The dataset to be validated. name: Optional parameter name. Raises: ValueError, if dataset is not a valid xcube dataset """ report = verify_cube(dataset) if report: message = f"Dataset" + (name + " " if name else " ") message += "is not a valid xcube dataset, because:\n" message += "- " + ";\n- ".join(report) + "." raise ValueError(message) return dataset
[docs] def verify_cube(dataset: xr.Dataset) -> List[str]: """Verify the given *dataset* for being a valid xcube dataset. The tool verifies that *dataset* * defines two spatial x,y coordinate variables, that are 1D, non-empty, using correct units; * defines a time coordinate variables, that are 1D, non-empty, using correct units; * has valid bounds variables for spatial x,y and time coordinate variables, if any; * has any data variables and that they are valid, e.g. min. 3-D, all have same dimensions, have at least the dimensions dim(time), dim(y), dim(x) in that order. Returns a list of issues, which is empty if *dataset* is a valid xcube dataset. Args: dataset: A dataset to be verified. Returns: List of issues or empty list. """ report = [] xy_var_names = get_dataset_xy_var_names(dataset, must_exist=False) if xy_var_names is None: report.append(f"missing spatial x,y coordinate variables") time_var_name = get_dataset_time_var_name(dataset, must_exist=False) if time_var_name is None: report.append(f"missing time coordinate variable") if time_var_name: _check_time(dataset, time_var_name, report) if xy_var_names and is_lon_lat_dataset(dataset, xy_var_names=xy_var_names): _check_lon_or_lat(dataset, xy_var_names[0], -180.0, 180.0, report) _check_lon_or_lat(dataset, xy_var_names[1], -90.0, 90.0, report) if xy_var_names and time_var_name: _check_data_variables(dataset, xy_var_names, time_var_name, report) if xy_var_names: _check_coord_equidistance(dataset, xy_var_names[0], xy_var_names[0], report) _check_coord_equidistance(dataset, xy_var_names[1], xy_var_names[1], report) return report
def _check_coord_equidistance(dataset, coord_name, dim_name, report, rtol=None): diff = dataset[coord_name].diff(dim=dim_name) if not _check_equidistance_from_diff(dataset, diff, rtol=rtol): report.append(f"coordinate variable {coord_name!r} is not equidistant") bnds_name = dataset.attrs.get("bounds", f"{coord_name}_bnds") if bnds_name in dataset.coords: diff = dataset[bnds_name].diff(dim=dim_name) if not _check_equidistance_from_diff(dataset, diff[:, 0], rtol=rtol): report.append(f"coordinate variable {bnds_name!r} is not equidistant") elif not _check_equidistance_from_diff(dataset, diff[:, 1], rtol=rtol): report.append(f"coordinate variable {bnds_name!r} is not equidistant") def _check_equidistance_from_diff(dataset, diff, rtol=None): if rtol is None: rtol = np.abs(np.divide(diff[0], 100.00)).values # Check whether the bounding box intersect with the anti-meridian for lon/lat datasets. # This is the case when exactly one difference is negative. if is_lon_lat_dataset(dataset): ct_neg = diff.where(diff < 0).count().values if ct_neg == 1: # If a bounding box intersects with the anti-meridian, compute its correct width diff = xr.where(diff < 0, diff + 360.0, diff) return np.allclose(diff[0], diff, rtol=rtol) def _check_data_variables(dataset, xy_var_names, time_var_name, report): x_var_name, y_var_name = xy_var_names x_var, y_var, time_var = ( dataset[x_var_name], dataset[y_var_name], dataset[time_var_name], ) x_dim, y_dim, time_dim = x_var.dims[0], y_var.dims[0], time_var.dims[0] first_var = None first_dims = None first_chunks = None for var_name, var in dataset.data_vars.items(): dims = var.dims chunks = var.data.chunks if hasattr(var.data, "chunks") else None if ( "grid_mapping_name" in var.attrs or "geographic_crs_name" in var.attrs or "crs_wkt" in var.attrs ): # potential CRS / grid mapping variable continue if ( len(dims) < 3 or dims[0] != time_dim or dims[-2] != y_dim or dims[-1] != x_dim ): report.append( f"dimensions of data variable {var_name!r}" f" must be ({time_dim!r}, ..., {y_dim!r}, {x_dim!r})," f" but were {dims!r} for {var_name!r}" ) if first_var is None: first_var = var first_dims = dims first_chunks = chunks continue if first_dims != dims: report.append( "dimensions of all data variables must be same," f" but found {first_dims!r} for {first_var.name!r} " f"and {dims!r} for {var_name!r}" ) if first_chunks != chunks: report.append( "all data variables must have same chunk sizes," f" but found {first_chunks!r} for {first_var.name!r} " f"and {chunks!r} for {var_name!r}" ) def _check_dim(dataset, name, report): if name not in dataset.dims: report.append(f"missing dimension {name!r}") elif dataset.dims[name] < 0: report.append(f"size of dimension {name!r} must be a positive integer") def _check_coord_var(dataset, var_name, report): if var_name not in dataset.coords: report.append(f"missing coordinate variable {var_name!r}") return None var = dataset.coords[var_name] if var.dims != (var_name,): report.append( f"coordinate variable {var_name!r} must have a single dimension {var_name!r}" ) return None if var.size == 0: report.append(f"coordinate variable {var_name!r} must not be empty") return None bnds_name = var.attrs.get("bounds", f"{var_name}_bnds") if bnds_name in dataset.coords: bnds_var = dataset.coords[bnds_name] expected_shape = var.size, 2 expected_dtype = var.dtype if len(bnds_var.dims) != 2 or bnds_var.dims[0] != var_name: report.append( f"bounds coordinate variable {bnds_name!r}" f" must have dimensions ({var_name!r}, <bounds_dim>)" ) if bnds_var.shape != expected_shape: report.append( f"shape of bounds coordinate variable {bnds_name!r}" f" must be {expected_shape!r} but was {bnds_var.shape!r}" ) if bnds_var.dtype != expected_dtype: report.append( f"type of bounds coordinate variable {bnds_name!r}" f" must be {expected_dtype!r} but was {bnds_var.dtype!r}" ) return None return var def _check_lon_or_lat(dataset, var_name, min_value, max_value, report): var = _check_coord_var(dataset, var_name, report) if var is None: return if not np.all(np.isfinite(var)): report.append(f"values of coordinate variable {var_name!r} must be finite") if np.min(var) < min_value or np.max(var) > max_value: report.append( f"values of coordinate variable {var_name!r}" f" must be in the range {min_value} to {max_value}" ) def _check_time(dataset, name, report): var = _check_coord_var(dataset, name, report) if var is None: return if not np.issubdtype(var.dtype, np.datetime64): report.append(f"type of coordinate variable {name!r} must be datetime64") if not np.all(np.diff(var.astype(np.float64)) > 0): report.append( f"values of coordinate variable {name!r} must be monotonic increasing" )