# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.
import collections.abc
import json
import math
import pathlib
import warnings
from functools import cached_property
from typing import Any, Optional, List, Union, Mapping, Dict, Sequence
import fsspec
import fsspec.core
import numpy as np
import xarray as xr
import zarr
# noinspection PyUnresolvedReferences
import xcube.core.zarrstore
from xcube.core.gridmapping import GridMapping
from xcube.core.subsampling import AggMethods, AggMethod
from xcube.util.assertions import assert_instance
from xcube.util.fspath import get_fs_path_class
from xcube.util.fspath import resolve_path
from xcube.util.types import ScalarOrPair
from xcube.util.types import normalize_scalar_or_pair
from .abc import MultiLevelDataset
from .base import BaseMultiLevelDataset
from .lazy import LazyMultiLevelDataset
LEVELS_FORMAT_VERSION = "1.0"
[docs]
class FsMultiLevelDataset(LazyMultiLevelDataset):
_MIN_CACHE_SIZE = 1024 * 1024 # 1 MiB
def __init__(
self,
path: str,
fs: Optional[fsspec.AbstractFileSystem] = None,
fs_root: Optional[str] = None,
fs_kwargs: Optional[Mapping[str, Any]] = None,
cache_size: Optional[int] = None,
consolidate: Optional[bool] = None,
**zarr_kwargs,
):
if fs is None:
fs, path = fsspec.core.url_to_fs(path, **(fs_kwargs or {}))
assert_instance(fs, fsspec.AbstractFileSystem, name="fs")
assert_instance(path, str, name="data_id")
super().__init__(ds_id=path)
self._path = path
self._fs = fs
self._fs_root = fs_root
self._cache_size = cache_size
self._consolidate = consolidate
self._zarr_kwargs = zarr_kwargs
self._path_class = get_fs_path_class(fs)
@property
def path(self) -> str:
return self._path
@property
def fs(self) -> fsspec.AbstractFileSystem:
return self._fs
@property
def cache_size(self) -> Optional[int]:
return self._cache_size
@cached_property
def size_weights(self) -> np.ndarray:
"""Size weights are used to distribute the cache size
over the levels.
"""
return self.compute_size_weights(self.num_levels)
@cached_property
def tile_size(self) -> Optional[Sequence[int]]:
spec = self._levels_spec
return spec.get("tile_size")
@cached_property
def use_saved_levels(self) -> Optional[bool]:
spec = self._levels_spec
return spec.get("use_saved_levels")
@cached_property
def base_dataset_path(self) -> Optional[str]:
spec = self._levels_spec
return spec.get("base_dataset_path")
@cached_property
def agg_methods(self) -> Optional[Mapping[str, AggMethod]]:
spec = self._levels_spec
return spec.get("agg_methods")
@cached_property
def _levels_spec(self) -> Mapping[str, Any]:
path = f"{self._path}/.zlevels"
spec = {}
if self.fs.exists(path):
with self.fs.open(f"{self._path}/.zlevels") as fp:
spec = json.load(fp)
if not isinstance(spec, collections.abc.Mapping):
raise TypeError("Unexpected .zlevels file." " Must be a JSON object.")
# TODO (forman): validate JSON object
return spec
def _get_dataset_lazily(self, index: int, parameters) -> xr.Dataset:
cache_size = self._cache_size
fs = self._fs
ds_path = self._get_path(self._path)
link_path = ds_path / f"{index}.link"
if fs.isfile(str(link_path)):
# If file "{index}.link" exists, we have a link to
# a level Zarr and open this instead,
with fs.open(str(link_path), "r") as fp:
level_path = self._get_path(fp.read())
if not level_path.is_absolute() and not self._is_path_relative_to_path(
level_path, ds_path
):
level_path = resolve_path(ds_path / level_path)
else:
# Nominal "{index}.zarr" must exist
level_path = ds_path / f"{index}.zarr"
level_zarr_store = fs.get_mapper(str(level_path))
consolidated = (
self._consolidate
if self._consolidate is not None
else (".zmetadata" in level_zarr_store)
)
if isinstance(cache_size, int) and cache_size >= self._MIN_CACHE_SIZE:
# compute cache size for level weighted by
# size in pixels for each level
cache_size = math.ceil(self.size_weights[index] * cache_size)
if cache_size >= self._MIN_CACHE_SIZE:
level_zarr_store = zarr.LRUStoreCache(
level_zarr_store, max_size=cache_size
)
try:
level_dataset = xr.open_zarr(
level_zarr_store, consolidated=consolidated, **self._zarr_kwargs
)
except ValueError as e:
raise FsMultiLevelDatasetError(
f"Failed to open" f" dataset {level_path!r}:" f" {e}"
) from e
level_dataset.zarr_store.set(level_zarr_store)
return level_dataset
@staticmethod
def _is_path_relative_to_path(level_path, ds_path):
if hasattr(level_path, "is_relative_to"):
# Python >=3.9
return level_path.is_relative_to(ds_path)
try:
# Python <3.9
level_path.relative_to(ds_path)
return True
except ValueError:
return False
@classmethod
def compute_size_weights(cls, num_levels: int) -> np.ndarray:
weights = (2 ** np.arange(0, num_levels, dtype=np.float64)) ** 2
return weights[::-1] / np.sum(weights)
def _get_num_levels_lazily(self) -> int:
spec = self._levels_spec
num_levels = spec.get("num_levels")
levels = self._get_levels()
if num_levels is None:
num_levels = len(levels)
expected_levels = list(range(num_levels))
for level in expected_levels:
if level != levels[level]:
raise FsMultiLevelDatasetError(
f"Inconsistent"
f" multi-level dataset {self.ds_id!r},"
f" expected levels {expected_levels!r}"
f" found {levels!r}"
)
return num_levels
def _get_levels(self) -> List[int]:
levels = []
paths = [
self._get_path(entry["name"])
for entry in self._fs.listdir(self._path, detail=True)
]
for path in paths:
# No ext, i.e. dir_name = "<level>", is proposed by
# https://github.com/zarr-developers/zarr-specs/issues/50.
# xcube already selected dir_name = "<level>.zarr".
basename = path.stem
if path.stem and path.suffix in ("", ".zarr", ".link"):
try:
level = int(basename)
except ValueError:
continue
levels.append(level)
levels = sorted(levels)
return levels
def _get_path(self, *args) -> pathlib.PurePath:
return self._path_class(*args)
@classmethod
def write_dataset(
cls,
dataset: Union[xr.Dataset, MultiLevelDataset],
path: str,
fs: Optional[fsspec.AbstractFileSystem] = None,
fs_root: Optional[str] = None,
fs_kwargs: Optional[Mapping[str, Any]] = None,
replace: bool = False,
num_levels: Optional[int] = None,
consolidated: bool = True,
tile_size: Optional[ScalarOrPair[int]] = None,
use_saved_levels: bool = False,
base_dataset_path: Optional[str] = None,
agg_methods: Optional[AggMethods] = None,
**zarr_kwargs,
) -> str:
assert_instance(dataset, (xr.Dataset, MultiLevelDataset), name="dataset")
if fs is None:
fs, path = fsspec.core.url_to_fs(path, **(fs_kwargs or {}))
if tile_size is not None:
tile_size = normalize_scalar_or_pair(
tile_size, item_type=int, name="tile_size"
)
assert_instance(path, str, name="path")
assert_instance(fs, fsspec.AbstractFileSystem, name="fs")
if isinstance(dataset, MultiLevelDataset):
ml_dataset = dataset
if tile_size:
warnings.warn("tile_size is ignored for multi-level datasets")
if agg_methods:
warnings.warn("agg_methods is ignored for multi-level datasets")
else:
base_dataset: xr.Dataset = dataset
grid_mapping = None
if tile_size is not None:
grid_mapping = GridMapping.from_dataset(base_dataset)
x_name, y_name = grid_mapping.xy_dim_names
# noinspection PyTypeChecker
base_dataset = base_dataset.chunk(
{x_name: tile_size[0], y_name: tile_size[1]}
)
# noinspection PyTypeChecker
grid_mapping = grid_mapping.derive(tile_size=tile_size)
ml_dataset = BaseMultiLevelDataset(
base_dataset,
grid_mapping=grid_mapping,
num_levels=num_levels,
agg_methods=agg_methods,
)
if use_saved_levels:
ml_dataset = BaseMultiLevelDataset(
ml_dataset.get_dataset(0),
grid_mapping=ml_dataset.grid_mapping,
agg_methods=agg_methods,
)
path_class = get_fs_path_class(fs)
data_path = path_class(path)
fs.mkdirs(str(data_path), exist_ok=replace)
if num_levels is None or num_levels <= 0:
num_levels_max = ml_dataset.num_levels
else:
num_levels_max = min(num_levels, ml_dataset.num_levels)
with fs.open(str(data_path / ".zlevels"), mode="w") as fp:
levels_data: Dict[str, Any] = dict(
version=LEVELS_FORMAT_VERSION, num_levels=num_levels_max
)
if use_saved_levels is not None:
levels_data.update(use_saved_levels=bool(use_saved_levels))
if base_dataset_path:
levels_data.update(base_dataset_path=base_dataset_path)
if tile_size is not None:
levels_data.update(tile_size=list(tile_size))
if hasattr(ml_dataset, "agg_methods"):
levels_data.update(agg_methods=dict(ml_dataset.agg_methods))
json.dump(levels_data, fp, indent=2)
for index in range(num_levels_max):
level_dataset = ml_dataset.get_dataset(index)
if base_dataset_path and index == 0:
assert_instance(fs_root, str, name="fs_root")
# Write file "0.link" instead of copying
# level zero dataset to "0.zarr".
# Compute a relative base dataset path first
base_dataset_path = path_class(fs_root, base_dataset_path)
data_parent_path = data_path.parent
try:
base_dataset_path = base_dataset_path.relative_to(data_parent_path)
except ValueError as e:
raise FsMultiLevelDatasetError(
f"Invalid base_dataset_id: {base_dataset_path}"
) from e
base_dataset_path = ".." / base_dataset_path
# Then write relative base dataset path into link file
link_path = data_path / f"{index}.link"
with fs.open(str(link_path), mode="w") as fp:
fp.write(base_dataset_path.as_posix())
else:
# Write level "{index}.zarr"
level_path = data_path / f"{index}.zarr"
level_zarr_store = fs.get_mapper(str(level_path), create=True)
try:
level_dataset.to_zarr(
level_zarr_store,
mode="w" if replace else None,
consolidated=consolidated,
**zarr_kwargs,
)
except ValueError as e:
# TODO: remove already written data!
raise FsMultiLevelDatasetError(
f"Failed to write dataset {path}: {e}"
) from e
if use_saved_levels:
level_dataset = xr.open_zarr(
level_zarr_store, consolidated=consolidated
)
level_dataset.zarr_store.set(level_zarr_store)
ml_dataset.set_dataset(index, level_dataset)
return path
class FsMultiLevelDatasetError(ValueError):
def __init__(self, message: str):
super().__init__(message)