Source code for xcube.core.store.accessor

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

from abc import abstractmethod, ABC
from typing import Any, List, Optional

import xarray as xr

from xcube.constants import EXTENSION_POINT_DATA_OPENERS
from xcube.constants import EXTENSION_POINT_DATA_WRITERS
from xcube.util.assertions import assert_given
from xcube.util.extension import Extension
from xcube.util.extension import ExtensionPredicate
from xcube.util.extension import ExtensionRegistry
from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.plugin import get_extension_registry
from .datatype import DataType
from .datatype import DataTypeLike
from .error import DataStoreError


#######################################################
# Data accessor instantiation and registry query
#######################################################


def new_data_opener(
    opener_id: str,
    extension_registry: Optional[ExtensionRegistry] = None,
    **opener_params,
) -> "DataOpener":
    """Get an instance of the data opener identified by *opener_id*.

    The optional, extra opener parameters *opener_params* may
    be used by data store (``xcube.core.store.DataStore``)
    implementations so they can share their internal state with the opener.

    Args:
        opener_id: The data opener identifier.
        extension_registry: Optional extension registry. If not given,
            the global extension registry will be used.
        **opener_params: Extra opener parameters.

    Returns:
        A data opener instance.
    """
    assert_given(opener_id, "opener_id")
    extension_registry = extension_registry or get_extension_registry()
    if not extension_registry.has_extension(EXTENSION_POINT_DATA_OPENERS, opener_id):
        raise DataStoreError(f"A data opener named" f" {opener_id!r} is not registered")
    return extension_registry.get_component(EXTENSION_POINT_DATA_OPENERS, opener_id)(
        **opener_params
    )


def new_data_writer(
    writer_id: str,
    extension_registry: Optional[ExtensionRegistry] = None,
    **writer_params,
) -> "DataWriter":
    """Get an instance of the data writer identified by *writer_id*.

    The optional, extra writer parameters *writer_params* may be used by
    data store (``xcube.core.store.DataStore``) implementations so they
    can share their internal state with the writer.

    Args:
        writer_id: The data writer identifier.
        extension_registry: Optional extension registry. If not given,
            the global extension registry will be used.
        **writer_params: Extra writer parameters.

    Returns:
        A data writer instance.
    """
    assert_given(writer_id, "writer_id")
    extension_registry = extension_registry or get_extension_registry()
    if not extension_registry.has_extension(EXTENSION_POINT_DATA_WRITERS, writer_id):
        raise DataStoreError(f"A data writer named" f" {writer_id!r} is not registered")
    return extension_registry.get_component(EXTENSION_POINT_DATA_WRITERS, writer_id)(
        **writer_params
    )


def find_data_opener_extensions(
    predicate: ExtensionPredicate = None,
    extension_registry: Optional[ExtensionRegistry] = None,
) -> List[Extension]:
    """Get registered data opener extensions using the optional
    filter function *predicate*.

    Args:
        predicate: An optional filter function.
        extension_registry: Optional extension registry. If not given,
            the global extension registry will be used.

    Returns:
        List of matching extensions.
    """
    extension_registry = extension_registry or get_extension_registry()
    return extension_registry.find_extensions(
        EXTENSION_POINT_DATA_OPENERS, predicate=predicate
    )


def find_data_writer_extensions(
    predicate: ExtensionPredicate = None,
    extension_registry: Optional[ExtensionRegistry] = None,
) -> List[Extension]:
    """Get registered data writer extensions using the optional filter
    function *predicate*.

    Args:
        predicate: An optional filter function.
        extension_registry: Optional extension registry. If not given,
            the global extension registry will be used.

    Returns:
        List of matching extensions.
    """
    extension_registry = extension_registry or get_extension_registry()
    return extension_registry.find_extensions(
        EXTENSION_POINT_DATA_WRITERS, predicate=predicate
    )


def get_data_accessor_predicate(
    data_type: DataTypeLike = None, format_id: str = None, storage_id: str = None
) -> ExtensionPredicate:
    """Get a predicate that checks if a data accessor extensions's name is
    compliant with *data_type*, *format_id*, *storage_id*.

    Args:
        data_type: Optional data data type to be supported. May be given
            as type alias name, as a type, or as a DataType instance.
        format_id: Optional data format identifier to be supported.
        storage_id: Optional data storage identifier to be supported.

    Returns:
        A filter function.

    Raises:
        DataStoreError: If an error occurs.
    """
    if any((data_type, format_id, storage_id)):
        data_type = DataType.normalize(data_type) if data_type is not None else None

        def _predicate(extension: Extension) -> bool:
            extension_parts = extension.name.split(":", maxsplit=4)
            if storage_id is not None:
                ext_storage_id = extension_parts[2]
                if ext_storage_id != "*" and ext_storage_id != storage_id:
                    return False
            if format_id is not None:
                ext_format_id = extension_parts[1]
                if ext_format_id != "*" and ext_format_id != format_id:
                    return False
            if data_type is not None:
                ext_data_type = DataType.normalize(extension_parts[0])
                if not data_type.is_super_type_of(ext_data_type):
                    return False
            return True

    else:
        # noinspection PyUnusedLocal
        def _predicate(extension: Extension) -> bool:
            return True

    return _predicate


#######################################################
# Classes
#######################################################


[docs] class DataOpener(ABC): """An interface that specifies a parameterized `open_data()` operation. Possible open parameters are implementation-specific and are described by a JSON Schema. Note this interface uses the term "opener" to underline the expected laziness of the operation. For example, when a xarray.Dataset is returned from a Zarr directory, the actual data is represented by Dask arrays and will be loaded only on-demand. """
[docs] @abstractmethod def get_open_data_params_schema(self, data_id: str = None) -> JsonObjectSchema: """Get the schema for the parameters passed as *open_params* to :meth:`open_data`. If *data_id* is given, the returned schema will be tailored to the constraints implied by the identified data resource. Some openers might not support this, therefore *data_id* is optional, and if it is omitted, the returned schema will be less restrictive. Args: data_id: An optional data resource identifier. Returns: The schema for the parameters in *open_params*. Raises: DataStoreError: If an error occurs. """
[docs] @abstractmethod def open_data(self, data_id: str, **open_params) -> Any: """Open the data resource given by the data resource identifier *data_id* using the supplied *open_params*. Raises if *data_id* does not exist. Args: data_id: The data resource identifier. **open_params: Opener-specific parameters. Returns: An xarray.Dataset instance. Raises: DataStoreError: If an error occurs. """
class DataDeleter(ABC): """An interface that specifies a parameterized `delete_data()` operation. Possible delete parameters are implementation-specific and are described by a JSON Schema. """ @abstractmethod def get_delete_data_params_schema(self, data_id: str = None) -> JsonObjectSchema: """Get the schema for the parameters passed as *delete_params* to :meth:`delete_data`. If *data_id* is given, the returned schema will be tailored to the constraints implied by the identified data resource. Some deleters might not support this, therefore *data_id* is optional, and if it is omitted, the returned schema will be less restrictive. Args: data_id: An optional data resource identifier. Returns: The schema for the parameters in *delete_params*. Raises: DataStoreError: If an error occurs. """ @abstractmethod def delete_data(self, data_id: str, **delete_params): """Delete a data resource. Raises if *data_id* does not exist. Args: data_id: A data resource identifier known to exist. **delete_params: Deleter-specific parameters. Raises: DataStoreError: If an error occurs. """
[docs] class DataWriter(DataDeleter, ABC): """An interface that specifies a parameterized `write_data()` operation. Possible write parameters are implementation-specific and are described by a JSON Schema. """
[docs] @abstractmethod def get_write_data_params_schema(self) -> JsonObjectSchema: """Get the schema for the parameters passed as *write_params* to :meth:`write_data`. Returns: The schema for the parameters in *write_params*. Raises: DataStoreError: If an error occurs. """
[docs] @abstractmethod def write_data( self, data: Any, data_id: str, replace: bool = False, **write_params ) -> str: """Write a data resource using the supplied *data_id* and *write_params*. Args: data: The data resource's in-memory representation to be written. data_id: A unique data resource identifier. replace: Whether to replace an existing data resource. **write_params: Writer-specific parameters. Returns: The data resource identifier used to write the data resource. Raises: DataStoreError: If an error occurs. """
class DataTimeSliceUpdater(DataWriter, ABC): """An interface that specifies writing of time slice data.""" @abstractmethod def append_data_time_slice(self, data_id: str, time_slice: xr.Dataset): """Append a time slice to the identified data resource. Args: data_id: The data resource identifier. time_slice: The time slice data to be inserted. Must be compatible with the data resource. Raises: DataStoreError: If an error occurs. """ @abstractmethod def insert_data_time_slice(self, data_id: str, time_slice: Any, time_index: int): """Insert a time slice into the identified data resource at given index. Args: data_id: The data resource identifier. time_slice: The time slice data to be inserted. Must be compatible with the data resource. time_index: The time index. Raises: DataStoreError: If an error occurs. """ @abstractmethod def replace_data_time_slice(self, data_id: str, time_slice: Any, time_index: int): """Replace a time slice in the identified data resource at given index. Args: data_id: The data resource identifier. time_slice: The time slice data to be inserted. Must be compatible with the data resource. time_index: The time index. Raises: DataStoreError: If an error occurs. """