Source code for xcube.core.mldataset.fs

# The MIT License (MIT)
# Copyright (c) 2023 by the xcube team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import collections.abc
import json
import math
import pathlib
import warnings
from functools import cached_property
from typing import Any, Optional, List, Union, Mapping, Dict, Sequence

import fsspec
import fsspec.core
import numpy as np
import xarray as xr
import zarr

# noinspection PyUnresolvedReferences
import xcube.core.zarrstore
from xcube.core.gridmapping import GridMapping
from xcube.core.subsampling import AggMethods, AggMethod
from xcube.util.assertions import assert_instance
from xcube.util.fspath import get_fs_path_class
from xcube.util.fspath import resolve_path
from xcube.util.types import ScalarOrPair
from xcube.util.types import normalize_scalar_or_pair
from .abc import MultiLevelDataset
from .base import BaseMultiLevelDataset
from .lazy import LazyMultiLevelDataset

LEVELS_FORMAT_VERSION = "1.0"


[docs] class FsMultiLevelDataset(LazyMultiLevelDataset): _MIN_CACHE_SIZE = 1024 * 1024 # 1 MiB def __init__(self, path: str, fs: Optional[fsspec.AbstractFileSystem] = None, fs_root: Optional[str] = None, fs_kwargs: Optional[Mapping[str, Any]] = None, cache_size: Optional[int] = None, consolidate: Optional[bool] = None, **zarr_kwargs): if fs is None: fs, path = fsspec.core.url_to_fs(path, **(fs_kwargs or {})) assert_instance(fs, fsspec.AbstractFileSystem, name="fs") assert_instance(path, str, name="data_id") super().__init__(ds_id=path) self._path = path self._fs = fs self._fs_root = fs_root self._cache_size = cache_size self._consolidate = consolidate self._zarr_kwargs = zarr_kwargs self._path_class = get_fs_path_class(fs) @property def path(self) -> str: return self._path @property def fs(self) -> fsspec.AbstractFileSystem: return self._fs @property def cache_size(self) -> Optional[int]: return self._cache_size @cached_property def size_weights(self) -> np.ndarray: """Size weights are used to distribute the cache size over the levels. """ return self.compute_size_weights(self.num_levels) @cached_property def tile_size(self) -> Optional[Sequence[int]]: spec = self._levels_spec return spec.get("tile_size") @cached_property def use_saved_levels(self) -> Optional[bool]: spec = self._levels_spec return spec.get("use_saved_levels") @cached_property def base_dataset_path(self) -> Optional[str]: spec = self._levels_spec return spec.get("base_dataset_path") @cached_property def agg_methods(self) -> Optional[Mapping[str, AggMethod]]: spec = self._levels_spec return spec.get("agg_methods") @cached_property def _levels_spec(self) -> Mapping[str, Any]: path = f"{self._path}/.zlevels" spec = {} if self.fs.exists(path): with self.fs.open(f"{self._path}/.zlevels") as fp: spec = json.load(fp) if not isinstance(spec, collections.abc.Mapping): raise TypeError('Unexpected .zlevels file.' ' Must be a JSON object.') # TODO (forman): validate JSON object return spec def _get_dataset_lazily(self, index: int, parameters) \ -> xr.Dataset: cache_size = self._cache_size fs = self._fs ds_path = self._get_path(self._path) link_path = ds_path / f'{index}.link' if fs.isfile(str(link_path)): # If file "{index}.link" exists, we have a link to # a level Zarr and open this instead, with fs.open(str(link_path), 'r') as fp: level_path = self._get_path(fp.read()) if not level_path.is_absolute() \ and not self._is_path_relative_to_path(level_path, ds_path): level_path = resolve_path(ds_path / level_path) else: # Nominal "{index}.zarr" must exist level_path = ds_path / f'{index}.zarr' level_zarr_store = fs.get_mapper(str(level_path)) consolidated = self._consolidate if self._consolidate is not None \ else ('.zmetadata' in level_zarr_store) if isinstance(cache_size, int) \ and cache_size >= self._MIN_CACHE_SIZE: # compute cache size for level weighted by # size in pixels for each level cache_size = math.ceil(self.size_weights[index] * cache_size) if cache_size >= self._MIN_CACHE_SIZE: level_zarr_store = zarr.LRUStoreCache(level_zarr_store, max_size=cache_size) try: level_dataset = xr.open_zarr(level_zarr_store, consolidated=consolidated, **self._zarr_kwargs) except ValueError as e: raise FsMultiLevelDatasetError( f'Failed to open' f' dataset {level_path!r}:' f' {e}' ) from e level_dataset.zarr_store.set(level_zarr_store) return level_dataset @staticmethod def _is_path_relative_to_path(level_path, ds_path): if hasattr(level_path, 'is_relative_to'): # Python >=3.9 return level_path.is_relative_to(ds_path) try: # Python <3.9 level_path.relative_to(ds_path) return True except ValueError: return False @classmethod def compute_size_weights(cls, num_levels: int) -> np.ndarray: weights = (2 ** np.arange(0, num_levels, dtype=np.float64)) ** 2 return weights[::-1] / np.sum(weights) def _get_num_levels_lazily(self) -> int: spec = self._levels_spec num_levels = spec.get("num_levels") levels = self._get_levels() if num_levels is None: num_levels = len(levels) expected_levels = list(range(num_levels)) for level in expected_levels: if level != levels[level]: raise FsMultiLevelDatasetError( f'Inconsistent' f' multi-level dataset {self.ds_id!r},' f' expected levels {expected_levels!r}' f' found {levels!r}' ) return num_levels def _get_levels(self) -> List[int]: levels = [] paths = [self._get_path(entry['name']) for entry in self._fs.listdir(self._path, detail=True)] for path in paths: # No ext, i.e. dir_name = "<level>", is proposed by # https://github.com/zarr-developers/zarr-specs/issues/50. # xcube already selected dir_name = "<level>.zarr". basename = path.stem if path.stem and path.suffix in ('', '.zarr', '.link'): try: level = int(basename) except ValueError: continue levels.append(level) levels = sorted(levels) return levels def _get_path(self, *args) -> pathlib.PurePath: return self._path_class(*args) @classmethod def write_dataset(cls, dataset: Union[xr.Dataset, MultiLevelDataset], path: str, fs: Optional[fsspec.AbstractFileSystem] = None, fs_root: Optional[str] = None, fs_kwargs: Optional[Mapping[str, Any]] = None, replace: bool = False, num_levels: Optional[int] = None, consolidated: bool = True, tile_size: Optional[ScalarOrPair[int]] = None, use_saved_levels: bool = False, base_dataset_path: Optional[str] = None, agg_methods: Optional[AggMethods] = None, **zarr_kwargs) -> str: assert_instance(dataset, (xr.Dataset, MultiLevelDataset), name='dataset') if fs is None: fs, path = fsspec.core.url_to_fs(path, **(fs_kwargs or {})) if tile_size is not None: tile_size = normalize_scalar_or_pair(tile_size, item_type=int, name='tile_size') assert_instance(path, str, name='path') assert_instance(fs, fsspec.AbstractFileSystem, name='fs') if isinstance(dataset, MultiLevelDataset): ml_dataset = dataset if tile_size: warnings.warn( 'tile_size is ignored for multi-level datasets' ) if agg_methods: warnings.warn( 'agg_methods is ignored for multi-level datasets' ) else: base_dataset: xr.Dataset = dataset grid_mapping = None if tile_size is not None: grid_mapping = GridMapping.from_dataset(base_dataset) x_name, y_name = grid_mapping.xy_dim_names # noinspection PyTypeChecker base_dataset = base_dataset.chunk({x_name: tile_size[0], y_name: tile_size[1]}) # noinspection PyTypeChecker grid_mapping = grid_mapping.derive(tile_size=tile_size) ml_dataset = BaseMultiLevelDataset(base_dataset, grid_mapping=grid_mapping, num_levels=num_levels, agg_methods=agg_methods) if use_saved_levels: ml_dataset = BaseMultiLevelDataset( ml_dataset.get_dataset(0), grid_mapping=ml_dataset.grid_mapping, agg_methods=agg_methods ) path_class = get_fs_path_class(fs) data_path = path_class(path) fs.mkdirs(str(data_path), exist_ok=replace) if num_levels is None or num_levels <= 0: num_levels_max = ml_dataset.num_levels else: num_levels_max = min(num_levels, ml_dataset.num_levels) with fs.open(str(data_path / '.zlevels'), mode='w') as fp: levels_data: Dict[str, Any] = dict( version=LEVELS_FORMAT_VERSION, num_levels=num_levels_max ) if use_saved_levels is not None: levels_data.update(use_saved_levels=bool(use_saved_levels)) if base_dataset_path: levels_data.update(base_dataset_path=base_dataset_path) if tile_size is not None: levels_data.update(tile_size=list(tile_size)) if hasattr(ml_dataset, 'agg_methods'): levels_data.update(agg_methods=dict(ml_dataset.agg_methods)) json.dump(levels_data, fp, indent=2) for index in range(num_levels_max): level_dataset = ml_dataset.get_dataset(index) if base_dataset_path and index == 0: assert_instance(fs_root, str, name="fs_root") # Write file "0.link" instead of copying # level zero dataset to "0.zarr". # Compute a relative base dataset path first base_dataset_path = path_class(fs_root, base_dataset_path) data_parent_path = data_path.parent try: base_dataset_path = base_dataset_path.relative_to( data_parent_path ) except ValueError as e: raise FsMultiLevelDatasetError( f'Invalid base_dataset_id: {base_dataset_path}' ) from e base_dataset_path = '..' / base_dataset_path # Then write relative base dataset path into link file link_path = data_path / f'{index}.link' with fs.open(str(link_path), mode='w') as fp: fp.write(base_dataset_path.as_posix()) else: # Write level "{index}.zarr" level_path = data_path / f'{index}.zarr' level_zarr_store = fs.get_mapper(str(level_path), create=True) try: level_dataset.to_zarr( level_zarr_store, mode='w' if replace else None, consolidated=consolidated, **zarr_kwargs ) except ValueError as e: # TODO: remove already written data! raise FsMultiLevelDatasetError( f'Failed to write dataset {path}: {e}' ) from e if use_saved_levels: level_dataset = xr.open_zarr(level_zarr_store, consolidated=consolidated) level_dataset.zarr_store.set(level_zarr_store) ml_dataset.set_dataset(index, level_dataset) return path
class FsMultiLevelDatasetError(ValueError): def __init__(self, message: str): super().__init__(message)