Source code for xcube.core.store.descriptor

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

import warnings
from typing import Tuple, Sequence, Mapping, Optional, Dict, Any, Union, Hashable

import dask.array
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.geom import get_dataset_bounds
from xcube.core.mldataset import MultiLevelDataset
from xcube.core.timecoord import get_end_time_from_attrs
from xcube.core.timecoord import get_start_time_from_attrs
from xcube.core.timecoord import get_time_range_from_data
from xcube.core.timecoord import remove_time_part_from_isoformat
from xcube.util.assertions import assert_given, assert_true
from xcube.util.assertions import assert_not_none
from xcube.util.ipython import register_json_formatter
from xcube.util.jsonschema import JsonArraySchema
from xcube.util.jsonschema import JsonDateSchema
from xcube.util.jsonschema import JsonIntegerSchema
from xcube.util.jsonschema import JsonNumberSchema
from xcube.util.jsonschema import JsonObject
from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.jsonschema import JsonStringSchema
from .datatype import ANY_TYPE
from .datatype import DATASET_TYPE
from .datatype import DataType
from .datatype import DataTypeLike
from .datatype import GEO_DATA_FRAME_TYPE
from .datatype import MULTI_LEVEL_DATASET_TYPE


# TODO: IMPORTANT: replace, reuse, or align with
#   xcube.core.schema.CubeSchema class
#   xcube.webapi.context.DatasetDescriptor type
#   responses of xcube.webapi.controllers.catalogue
# TODO: write tests
# TODO: validate params


def new_data_descriptor(
    data_id: str, data: Any, require: bool = False
) -> "DataDescriptor":
    if isinstance(data, MultiLevelDataset):
        dataset_descriptor_kwargs = _get_common_dataset_descriptor_props(
            data_id,
            # Note The highest level should have same metadata
            # and maybe loads faster.
            # data.get_dataset(data.num_levels - 1)
            data.get_dataset(0),
        )
        return MultiLevelDatasetDescriptor(
            num_levels=data.num_levels, **dataset_descriptor_kwargs
        )

    if isinstance(data, xr.Dataset):
        dataset_descriptor_kwargs = _get_common_dataset_descriptor_props(data_id, data)
        return DatasetDescriptor(**dataset_descriptor_kwargs)

    if isinstance(data, gpd.GeoDataFrame):
        # TODO: implement me: data -> GeoDataFrameDescriptor
        return GeoDataFrameDescriptor(data_id=data_id)

    if not require:
        return DataDescriptor(data_id=data_id, data_type=ANY_TYPE)

    raise NotImplementedError()


def _get_common_dataset_descriptor_props(
    data_id: str, dataset: Union[xr.Dataset, MultiLevelDataset]
) -> Dict[str, Any]:
    dims = {str(k): v for k, v in dataset.dims.items()}
    coords = _build_variable_descriptor_dict(dataset.coords)
    data_vars = _build_variable_descriptor_dict(dataset.data_vars)
    spatial_res = _determine_spatial_res(dataset)
    bbox = _determine_bbox(dataset)
    time_range = _determine_time_coverage(dataset)
    time_period = _determine_time_period(dataset)
    return dict(
        data_id=data_id,
        dims=dims,
        coords=coords,
        data_vars=data_vars,
        bbox=bbox,
        time_range=time_range,
        time_period=time_period,
        spatial_res=spatial_res,
        attrs=dataset.attrs,
    )


[docs] class DataDescriptor(JsonObject): """A generic descriptor for any data. Also serves as a base class for more specific data descriptors. Args: data_id: An identifier for the data data_type: A type specifier for the data crs: A coordinate reference system identifier, as an EPSG, PROJ or WKT string bbox: A bounding box of the data time_range: Start and end time delimiting this data's temporal extent time_period: The data's periodicity if it is evenly temporally resolved. open_params_schema: A JSON schema describing the parameters that may be used to open this data. """ def __init__( self, data_id: str, data_type: DataTypeLike, *, crs: str = None, bbox: Tuple[float, float, float, float] = None, time_range: Tuple[Optional[str], Optional[str]] = None, time_period: str = None, open_params_schema: JsonObjectSchema = None, **additional_properties, ): assert_given(data_id, "data_id") if additional_properties: warnings.warn( f"Additional properties received;" f" will be ignored: {additional_properties}" ) self.data_id = data_id self.data_type = DataType.normalize(data_type) self.crs = crs self.bbox = tuple(bbox) if bbox else None self.time_range = tuple(time_range) if time_range else None self.time_period = time_period self.open_params_schema = open_params_schema
[docs] @classmethod def get_schema(cls) -> JsonObjectSchema: return JsonObjectSchema( properties=dict( data_id=JsonStringSchema(min_length=1), data_type=DataType.get_schema(), crs=JsonStringSchema(min_length=1), bbox=JsonArraySchema( items=[ JsonNumberSchema(), JsonNumberSchema(), JsonNumberSchema(), JsonNumberSchema(), ] ), time_range=JsonDateSchema.new_range(nullable=True), time_period=JsonStringSchema(min_length=1), open_params_schema=JsonObjectSchema(additional_properties=True), ), required=["data_id", "data_type"], additional_properties=True, factory=cls, )
[docs] class DatasetDescriptor(DataDescriptor): """A descriptor for a gridded, N-dimensional dataset represented by xarray.Dataset. Comprises a description of the data variables contained in the dataset. Regrading *time_range* and *time_period* parameters, please refer to https://github.com/dcs4cop/xcube/blob/main/docs/source/storeconv.md#date-time-and-duration-specifications Args: data_id: An identifier for the data data_type: The data type of the data described crs: A coordinate reference system identifier, as an EPSG, PROJ or WKT string bbox: A bounding box of the data time_range: Start and end time delimiting this data's temporal extent time_period: The data's periodicity if it is evenly temporally resolved spatial_res: The spatial extent of a pixel in crs units dims: A mapping of the dataset's dimensions to their sizes coords: mapping of the dataset's data coordinate names to instances of :class:`VariableDescriptor` data_vars: A mapping of the dataset's variable names to instances of :class:`VariableDescriptor` attrs: A mapping containing arbitrary attributes of the dataset open_params_schema: A JSON schema describing the parameters that may be used to open this data """ def __init__( self, data_id: str, *, data_type: DataTypeLike = DATASET_TYPE, crs: str = None, bbox: Tuple[float, float, float, float] = None, time_range: Tuple[Optional[str], Optional[str]] = None, time_period: str = None, spatial_res: float = None, dims: Mapping[str, int] = None, coords: Mapping[str, "VariableDescriptor"] = None, data_vars: Mapping[str, "VariableDescriptor"] = None, attrs: Mapping[Hashable, any] = None, open_params_schema: JsonObjectSchema = None, **additional_properties, ): super().__init__( data_id=data_id, data_type=data_type, crs=crs, bbox=bbox, time_range=time_range, time_period=time_period, open_params_schema=open_params_schema, ) assert_true( DATASET_TYPE.is_super_type_of(data_type) or MULTI_LEVEL_DATASET_TYPE.is_super_type_of(data_type), f"illegal data_type," f" must be compatible with {DATASET_TYPE!r}" f" or {MULTI_LEVEL_DATASET_TYPE!r}", ) if additional_properties: warnings.warn( f"Additional properties received;" f" will be ignored: {additional_properties}" ) self.dims = dict(dims) if dims else None self.spatial_res = spatial_res self.coords = coords if coords else None self.data_vars = data_vars if data_vars else None self.attrs = _attrs_to_json(attrs) if attrs else None
[docs] @classmethod def get_schema(cls) -> JsonObjectSchema: schema = super().get_schema() schema.properties.update( dims=JsonObjectSchema(additional_properties=JsonIntegerSchema(minimum=0)), spatial_res=JsonNumberSchema(exclusive_minimum=0.0), coords=JsonObjectSchema( additional_properties=VariableDescriptor.get_schema() ), data_vars=JsonObjectSchema( additional_properties=VariableDescriptor.get_schema() ), attrs=JsonObjectSchema(additional_properties=True), ) schema.required = ["data_id", "data_type"] schema.additional_properties = False schema.factory = cls return schema
[docs] class VariableDescriptor(JsonObject): """A descriptor for dataset variable represented by xarray.DataArray instances. They are part of dataset descriptor for an gridded, N-dimensional dataset represented by xarray.Dataset. Args: name: The variable name dtype: The data type of the variable. dims: A list of the names of the variable's dimensions. chunks: A list of the chunk sizes of the variable's dimensions attrs: A mapping containing arbitrary attributes of the variable """ def __init__( self, name: str, dtype: str, dims: Sequence[str], *, chunks: Sequence[int] = None, attrs: Mapping[Hashable, any] = None, **additional_properties, ): assert_given(name, "name") assert_given(dtype, "dtype") assert_not_none(dims, "dims") if additional_properties: warnings.warn( f"Additional properties received;" f" will be ignored: {additional_properties}" ) self.name = name self.dtype = dtype self.dims = tuple(dims) self.chunks = tuple(chunks) if chunks else None self.attrs = _attrs_to_json(attrs) if attrs else None @property def ndim(self) -> int: """Number of dimensions.""" return len(self.dims)
[docs] @classmethod def get_schema(cls) -> JsonObjectSchema: return JsonObjectSchema( properties=dict( name=JsonStringSchema(min_length=1), dtype=JsonStringSchema(min_length=1), dims=JsonArraySchema(items=JsonStringSchema(min_length=1)), chunks=JsonArraySchema(items=JsonIntegerSchema(minimum=0)), attrs=JsonObjectSchema(additional_properties=True), ), required=["name", "dtype", "dims"], additional_properties=False, factory=cls, )
[docs] class MultiLevelDatasetDescriptor(DatasetDescriptor): """A descriptor for a gridded, N-dimensional, multi-level, multi-resolution dataset represented by xcube.core.mldataset.MultiLevelDataset. Args: data_id: An identifier of the multi-level dataset num_levels: The number of levels of this multi-level dataset data_type: A type specifier for the multi-level dataset """ def __init__( self, data_id: str, num_levels: int, *, data_type: DataTypeLike = MULTI_LEVEL_DATASET_TYPE, **kwargs, ): assert_given(data_id, "data_id") assert_given(num_levels, "num_levels") super().__init__(data_id=data_id, data_type=data_type, **kwargs) assert_true( MULTI_LEVEL_DATASET_TYPE.is_super_type_of(data_type), f"illegal data_type," f" must be compatible with {MULTI_LEVEL_DATASET_TYPE!r}", ) self.num_levels = num_levels
[docs] @classmethod def get_schema(cls) -> JsonObjectSchema: schema = super().get_schema() schema.properties.update( num_levels=JsonIntegerSchema(minimum=1), ) schema.required.append("num_levels") schema.additional_properties = False schema.factory = cls return schema
[docs] class GeoDataFrameDescriptor(DataDescriptor): """A descriptor for a geo-vector dataset represented by a geopandas.GeoDataFrame instance. Args: data_id: An identifier of the geopandas.GeoDataFrame feature_schema: A schema describing the properties of the vector data kwargs: Parameters passed to super :class:`DataDescriptor` """ def __init__( self, data_id: str, *, data_type: DataTypeLike = GEO_DATA_FRAME_TYPE, feature_schema: JsonObjectSchema = None, **kwargs, ): super().__init__(data_id=data_id, data_type=data_type, **kwargs) assert_true( GEO_DATA_FRAME_TYPE.is_super_type_of(data_type), f"illegal data_type," f" must be compatible with {GEO_DATA_FRAME_TYPE!r}", ) self.feature_schema = feature_schema
[docs] @classmethod def get_schema(cls) -> JsonObjectSchema: schema = super().get_schema() schema.properties.update( feature_schema=JsonObjectSchema(additional_properties=True), ) schema.required = ["data_id"] schema.additional_properties = False schema.factory = cls return schema
register_json_formatter(DataDescriptor) register_json_formatter(DatasetDescriptor) register_json_formatter(VariableDescriptor) register_json_formatter(MultiLevelDatasetDescriptor) register_json_formatter(GeoDataFrameDescriptor) ############################################################################# # Implementation helpers def _build_variable_descriptor_dict(variables) -> Mapping[str, "VariableDescriptor"]: return { str(var_name): VariableDescriptor( name=str(var_name), dtype=str(var.dtype), dims=var.dims, chunks=( tuple([max(chunk) for chunk in tuple(var.chunks)]) if var.chunks else None ), attrs=var.attrs, ) for var_name, var in variables.items() } def _determine_bbox(data: xr.Dataset) -> Optional[Tuple[float, float, float, float]]: try: return get_dataset_bounds(data) except ValueError: if ( "geospatial_lon_min" in data.attrs and "geospatial_lat_min" in data.attrs and "geospatial_lon_max" in data.attrs and "geospatial_lat_max" in data.attrs ): return ( data.geospatial_lon_min, data.geospatial_lat_min, data.geospatial_lon_max, data.geospatial_lat_max, ) def _determine_spatial_res(data: xr.Dataset): # TODO get rid of these hard-coded coord names as soon as # new resampling is available lat_dimensions = ["lat", "latitude", "y"] for lat_dimension in lat_dimensions: if lat_dimension in data: lat_diff = data[lat_dimension].diff(dim=data[lat_dimension].dims[0]).values lat_res = lat_diff[0] lat_regular = np.allclose(lat_res, lat_diff, 1e-8) if lat_regular: return float(abs(lat_res)) def _determine_time_coverage(data: xr.Dataset): start_time, end_time = get_time_range_from_data(data) if start_time is not None: try: start_time = remove_time_part_from_isoformat( pd.to_datetime(start_time).isoformat() ) except TypeError: start_time = None if start_time is None: start_time = get_start_time_from_attrs(data) if end_time is not None: try: end_time = remove_time_part_from_isoformat( pd.to_datetime(end_time).isoformat() ) except TypeError: end_time = None if end_time is None: end_time = get_end_time_from_attrs(data) return start_time, end_time def _determine_time_period(data: xr.Dataset): if "time" in data and len(data["time"].values) > 1: time_diff = ( data["time"].diff(dim=data["time"].dims[0]).values.astype(np.float64) ) time_res = time_diff[0] time_regular = np.allclose(time_res, time_diff, 1e-8) if time_regular: time_period = pd.to_timedelta(time_res).isoformat() # remove leading P time_period = time_period[1:] # removing sub-day precision return time_period.split("T")[0] def _attrs_to_json(attrs: Mapping[Hashable, Any]) -> Optional[Dict[str, Any]]: new_attrs: Dict[str, Any] = {} for k, v in attrs.items(): if isinstance(v, np.ndarray): v = v.tolist() elif isinstance(v, dask.array.Array): v = np.array(v).tolist() if isinstance(v, float) and np.isnan(v): v = None new_attrs[str(k)] = v return new_attrs