Source code for xcube.core.store.accessor

# The MIT License (MIT)
# Copyright (c) 2020 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from abc import abstractmethod, ABC
from typing import Any, List, Optional

import xarray as xr

from xcube.constants import EXTENSION_POINT_DATA_OPENERS
from xcube.constants import EXTENSION_POINT_DATA_WRITERS
from xcube.util.assertions import assert_given
from xcube.util.extension import Extension
from xcube.util.extension import ExtensionPredicate
from xcube.util.extension import ExtensionRegistry
from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.plugin import get_extension_registry
from .datatype import DataType
from .datatype import DataTypeLike
from .error import DataStoreError


#######################################################
# Data accessor instantiation and registry query
#######################################################

def new_data_opener(opener_id: str,
                    extension_registry: Optional[ExtensionRegistry] = None,
                    **opener_params) -> 'DataOpener':
    """
    Get an instance of the data opener identified by *opener_id*.

    The optional, extra opener parameters *opener_params* may
    be used by data store (``xcube.core.store.DataStore``)
    implementations so they can share their internal state with the opener.

    :param opener_id: The data opener identifier.
    :param extension_registry: Optional extension registry.
        If not given, the global extension registry will be used.
    :param opener_params: Extra opener parameters.
    :return: A data opener instance.
    """
    assert_given(opener_id, 'opener_id')
    extension_registry = extension_registry or get_extension_registry()
    if not extension_registry.has_extension(EXTENSION_POINT_DATA_OPENERS,
                                            opener_id):
        raise DataStoreError(f'A data opener named'
                             f' {opener_id!r} is not registered')
    return extension_registry.get_component(EXTENSION_POINT_DATA_OPENERS,
                                            opener_id)(**opener_params)


def new_data_writer(writer_id: str,
                    extension_registry: Optional[ExtensionRegistry] = None,
                    **writer_params) -> 'DataWriter':
    """
    Get an instance of the data writer identified by *writer_id*.

    The optional, extra writer parameters *writer_params* may be used by
    data store (``xcube.core.store.DataStore``) implementations so they
    can share their internal state with the writer.

    :param writer_id: The data writer identifier.
    :param extension_registry: Optional extension registry.
        If not given, the global extension registry will be used.
    :param writer_params: Extra writer parameters.
    :return: A data writer instance.
    """
    assert_given(writer_id, 'writer_id')
    extension_registry = extension_registry or get_extension_registry()
    if not extension_registry.has_extension(EXTENSION_POINT_DATA_WRITERS,
                                            writer_id):
        raise DataStoreError(f'A data writer named'
                             f' {writer_id!r} is not registered')
    return extension_registry.get_component(EXTENSION_POINT_DATA_WRITERS,
                                            writer_id)(**writer_params)


def find_data_opener_extensions(
        predicate: ExtensionPredicate = None,
        extension_registry: Optional[ExtensionRegistry] = None
) -> List[Extension]:
    """
    Get registered data opener extensions using the optional
    filter function *predicate*.

    :param predicate: An optional filter function.
    :param extension_registry: Optional extension registry.
        If not given, the global extension registry will be used.
    :return: List of matching extensions.
    """
    extension_registry = extension_registry or get_extension_registry()
    return extension_registry.find_extensions(
        EXTENSION_POINT_DATA_OPENERS,
        predicate=predicate
    )


def find_data_writer_extensions(
        predicate: ExtensionPredicate = None,
        extension_registry: Optional[ExtensionRegistry] = None
) -> List[Extension]:
    """
    Get registered data writer extensions using the optional filter
    function *predicate*.

    :param predicate: An optional filter function.
    :param extension_registry: Optional extension registry.
        If not given, the global extension registry will be used.
    :return: List of matching extensions.
    """
    extension_registry = extension_registry or get_extension_registry()
    return extension_registry.find_extensions(
        EXTENSION_POINT_DATA_WRITERS,
        predicate=predicate
    )


def get_data_accessor_predicate(
        data_type: DataTypeLike = None,
        format_id: str = None,
        storage_id: str = None
) -> ExtensionPredicate:
    """
    Get a predicate that checks if a data accessor extensions's name is
    compliant with *data_type*, *format_id*, *storage_id*.

    :param data_type: Optional data data type to be supported.
        May be given as type alias name, as a type,
        or as a DataType instance.
    :param format_id: Optional data format identifier to be supported.
    :param storage_id: Optional data storage identifier to be supported.
    :return: A filter function.
    :raise DataStoreError: If an error occurs.
    """
    if any((data_type, format_id, storage_id)):
        data_type = DataType.normalize(data_type) \
            if data_type is not None else None

        def _predicate(extension: Extension) -> bool:
            extension_parts = extension.name.split(':', maxsplit=4)
            if storage_id is not None:
                ext_storage_id = extension_parts[2]
                if ext_storage_id != '*' and ext_storage_id != storage_id:
                    return False
            if format_id is not None:
                ext_format_id = extension_parts[1]
                if ext_format_id != '*' and ext_format_id != format_id:
                    return False
            if data_type is not None:
                ext_data_type = DataType.normalize(extension_parts[0])
                if not data_type.is_super_type_of(ext_data_type):
                    return False
            return True
    else:
        # noinspection PyUnusedLocal
        def _predicate(extension: Extension) -> bool:
            return True

    return _predicate


#######################################################
# Classes
#######################################################


[docs] class DataOpener(ABC): """ An interface that specifies a parameterized `open_data()` operation. Possible open parameters are implementation-specific and are described by a JSON Schema. Note this interface uses the term "opener" to underline the expected laziness of the operation. For example, when a xarray.Dataset is returned from a Zarr directory, the actual data is represented by Dask arrays and will be loaded only on-demand. """
[docs] @abstractmethod def get_open_data_params_schema(self, data_id: str = None) \ -> JsonObjectSchema: """ Get the schema for the parameters passed as *open_params* to :meth:open_data(data_id, open_params). If *data_id* is given, the returned schema will be tailored to the constraints implied by the identified data resource. Some openers might not support this, therefore *data_id* is optional, and if it is omitted, the returned schema will be less restrictive. :param data_id: An optional data resource identifier. :return: The schema for the parameters in *open_params*. :raise DataStoreError: If an error occurs. """
[docs] @abstractmethod def open_data(self, data_id: str, **open_params) -> Any: """ Open the data resource given by the data resource identifier *data_id* using the supplied *open_params*. Raises if *data_id* does not exist. :param data_id: The data resource identifier. :param open_params: Opener-specific parameters. :return: An xarray.Dataset instance. :raise DataStoreError: If an error occurs. """
class DataDeleter(ABC): """ An interface that specifies a parameterized `delete_data()` operation. Possible delete parameters are implementation-specific and are described by a JSON Schema. """ @abstractmethod def get_delete_data_params_schema(self, data_id: str = None) \ -> JsonObjectSchema: """ Get the schema for the parameters passed as *delete_params* to :meth:delete_data. If *data_id* is given, the returned schema will be tailored to the constraints implied by the identified data resource. Some deleters might not support this, therefore *data_id* is optional, and if it is omitted, the returned schema will be less restrictive. :param data_id: An optional data resource identifier. :return: The schema for the parameters in *delete_params*. :raise DataStoreError: If an error occurs. """ @abstractmethod def delete_data(self, data_id: str, **delete_params): """ Delete a data resource. Raises if *data_id* does not exist. :param data_id: A data resource identifier known to exist. :param delete_params: Deleter-specific parameters. :raise DataStoreError: If an error occurs. """
[docs] class DataWriter(DataDeleter, ABC): """ An interface that specifies a parameterized `write_data()` operation. Possible write parameters are implementation-specific and are described by a JSON Schema. """
[docs] @abstractmethod def get_write_data_params_schema(self) -> JsonObjectSchema: """ Get the schema for the parameters passed as *write_params* to :meth:write_data(data resource, data_id, open_params). :return: The schema for the parameters in *write_params*. :raise DataStoreError: If an error occurs. """
[docs] @abstractmethod def write_data(self, data: Any, data_id: str, replace: bool = False, **write_params) -> str: """ Write a data resource using the supplied *data_id* and *write_params*. :param data: The data resource's in-memory representation to be written. :param data_id: A unique data resource identifier. :param replace: Whether to replace an existing data resource. :param write_params: Writer-specific parameters. :return: The data resource identifier used to write the data resource. :raise DataStoreError: If an error occurs. """
class DataTimeSliceUpdater(DataWriter, ABC): """ An interface that specifies writing of time slice data. """ @abstractmethod def append_data_time_slice(self, data_id: str, time_slice: xr.Dataset): """ Append a time slice to the identified data resource. :param data_id: The data resource identifier. :param time_slice: The time slice data to be inserted. Must be compatible with the data resource. :raise DataStoreError: If an error occurs. """ @abstractmethod def insert_data_time_slice(self, data_id: str, time_slice: Any, time_index: int): """ Insert a time slice into the identified data resource at given index. :param data_id: The data resource identifier. :param time_slice: The time slice data to be inserted. Must be compatible with the data resource. :param time_index: The time index. :raise DataStoreError: If an error occurs. """ @abstractmethod def replace_data_time_slice(self, data_id: str, time_slice: Any, time_index: int): """ Replace a time slice in the identified data resource at given index. :param data_id: The data resource identifier. :param time_slice: The time slice data to be inserted. Must be compatible with the data resource. :param time_index: The time index. :raise DataStoreError: If an error occurs. """