# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.
from typing import Type, Dict, Optional, Any, Sequence
import fsspec
from .accessor import FsAccessor
from .accessor import FsDataAccessor
from .impl.dataset import DatasetGeoTiffFsDataAccessor
from .impl.dataset import DatasetNetcdfFsDataAccessor
from .impl.dataset import DatasetZarrFsDataAccessor
from .impl.fs import AzureFsAccessor
from .impl.fs import FileFsAccessor
from .impl.fs import FtpFsAccessor
from .impl.fs import MemoryFsAccessor
from .impl.fs import S3FsAccessor
from .impl.geodataframe import GeoDataFrameGeoJsonFsDataAccessor
from .impl.geodataframe import GeoDataFrameShapefileFsDataAccessor
from .impl.geotiff import MultiLevelDatasetGeoTiffFsDataAccessor
from .impl.mldataset import DatasetLevelsFsDataAccessor
from .impl.mldataset import MultiLevelDatasetLevelsFsDataAccessor
from .store import FsDataStore
from ..assertions import assert_valid_params
from ..error import DataStoreError
############################################
# FsAccessor
_FS_ACCESSOR_CLASSES: Dict[str, Type[FsAccessor]] = {}
def register_fs_accessor_class(fs_accessor_class: Type[FsAccessor]):
"""Register a concrete filesystem accessor class.
Args:
fs_accessor_class: a concrete class that extends
:class:`FsAccessor`.
"""
protocol = fs_accessor_class.get_protocol()
_FS_ACCESSOR_CLASSES[protocol] = fs_accessor_class
for cls in (
AzureFsAccessor,
FileFsAccessor,
FtpFsAccessor,
MemoryFsAccessor,
S3FsAccessor,
):
register_fs_accessor_class(cls)
def get_fs_accessor_class(protocol: str) -> Type[FsAccessor]:
"""Get the class for a filesystem accessor.
Args:
protocol: The filesystem protocol, for example "file", "s3",
"memory".
Returns:
A class that derives from :class:`FsAccessor`
"""
fs_accessor_class = _FS_ACCESSOR_CLASSES.get(protocol)
if fs_accessor_class is None:
try:
fsspec.get_filesystem_class(protocol)
except ImportError as e:
raise DataStoreError(
f"Filesystem for protocol {protocol!r}"
f" is not installed or requires additional packages"
) from e
except ValueError as e:
raise DataStoreError(
f"Filesystem not found for protocol {protocol!r}"
) from e
class FsAccessorClass(FsAccessor):
@classmethod
def get_protocol(cls) -> str:
return protocol
fs_accessor_class = FsAccessorClass
return fs_accessor_class
############################################
# FsDataAccessor
_FS_DATA_ACCESSOR_CLASSES: Dict[str, Type[FsDataAccessor]] = {}
def register_fs_data_accessor_class(fs_data_accessor_class: Type[FsDataAccessor]):
"""Register an abstract filesystem data accessor class.
Such data accessor classes are used to dynamically
construct concrete data store classes by combining
them with a concrete :class:`FsAccessor`.
Args:
fs_data_accessor_class: an abstract class that extends
:class:`FsDataAccessor`.
"""
data_type = fs_data_accessor_class.get_data_type()
format_id = fs_data_accessor_class.get_format_id()
key = f"{data_type.alias}:{format_id}"
_FS_DATA_ACCESSOR_CLASSES[key] = fs_data_accessor_class
for cls in (
DatasetZarrFsDataAccessor,
DatasetNetcdfFsDataAccessor,
DatasetGeoTiffFsDataAccessor,
DatasetLevelsFsDataAccessor,
MultiLevelDatasetGeoTiffFsDataAccessor,
MultiLevelDatasetLevelsFsDataAccessor,
GeoDataFrameShapefileFsDataAccessor,
GeoDataFrameGeoJsonFsDataAccessor,
):
register_fs_data_accessor_class(cls)
def get_fs_data_accessor_class(
protocol: str, data_type_alias: str, format_id: str
) -> Type[FsDataAccessor]:
"""Get the class for a filesystem data accessor.
Args:
protocol: The filesystem protocol, for example "file", "s3",
"memory".
data_type_alias: The data type alias name, for example
"dataset", "geodataframe".
format_id: The format identifier, for example "zarr", "geojson".
Returns:
A class that derives from :class:`FsAccessor`
"""
accessor_id = f"{data_type_alias}:{format_id}"
data_accessor_class = _FS_DATA_ACCESSOR_CLASSES.get(accessor_id)
if data_accessor_class is None:
raise DataStoreError(
f"Combination of data type {data_type_alias!r}"
f" and format {format_id!r} is not supported"
)
fs_accessor_class = get_fs_accessor_class(protocol)
class FsDataAccessorClass(fs_accessor_class, data_accessor_class):
pass
# Should we set __name_ and __doc__ properties here?
return FsDataAccessorClass
############################################
# FsDataStore
def get_fs_data_store_class(protocol: str) -> Type[FsDataStore]:
"""Get the class for of a filesystem-based data store.
Args:
protocol: The filesystem protocol, for example "file", "s3",
"memory".
Returns:
A class that derives from :class:`FsDataStore`
"""
fs_accessor_class = get_fs_accessor_class(protocol)
class FsDataStoreClass(fs_accessor_class, FsDataStore):
pass
# Should we set set __name_ and __doc__ properties here?
return FsDataStoreClass
[docs]
def new_fs_data_store(
protocol: str,
root: str = "",
max_depth: Optional[int] = 1,
read_only: bool = False,
includes: Optional[Sequence[str]] = None,
excludes: Optional[Sequence[str]] = None,
storage_options: Dict[str, Any] = None,
) -> FsDataStore:
"""Create a new instance of a filesystem-based data store.
The data store is capable of filtering the data identifiers reported
by ``get_data_ids()``. For this purpose the optional keywords
`excludes` and `includes` are used which can both take the form of
a wildcard pattern or a sequence of wildcard patterns:
* ``excludes``: if given and if any pattern matches the identifier,
the identifier is not reported.
* ``includes``: if not given or if any pattern matches the identifier,
the identifier is reported.
Args:
protocol: The filesystem protocol, for example "file", "s3",
"memory".
root: Root or base directory. Defaults to "".
max_depth: Maximum recursion depth. None means limitless.
Defaults to 1.
read_only: Whether this is a read-only store. Defaults to False.
includes: Optional sequence of wildcards that include certain
filesystem paths. Affects the data identifiers (paths)
returned by `get_data_ids()`. By default, all paths are
included.
excludes: Optional sequence of wildcards that exclude certain
filesystem paths. Affects the data identifiers (paths)
returned by `get_data_ids()`. By default, no paths are
excluded.
storage_options: Options specific to the underlying filesystem
identified by *protocol*. Used to instantiate the
filesystem.
Returns:
A new data store instance of type :class:`FsDataStore`.
"""
fs_data_store_class = get_fs_data_store_class(protocol)
store_params_schema = fs_data_store_class.get_data_store_params_schema()
store_params = {
k: v
for k, v in dict(
root=root,
max_depth=max_depth,
read_only=read_only,
includes=includes,
excludes=excludes,
storage_options=storage_options,
).items()
if v is not None
}
assert_valid_params(store_params, name="store_params", schema=store_params_schema)
return fs_data_store_class(**store_params)