Source code for xcube.core.zarrstore.generic

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

import collections.abc
import inspect
import itertools
import json
import math
import threading
import warnings
from typing import Dict, Tuple, Any, Callable, Optional, List
from collections.abc import Iterator, Sequence
from typing import Union

import numcodecs.abc
import numpy as np
import xarray as xr
import zarr.storage

from xcube.util.assertions import assert_instance, assert_true

GetData = Callable[[tuple[int]], Union[bytes, np.ndarray]]

OnClose = Callable[[dict[str, Any]], None]


[docs] class GenericArray(dict[str, any]): """Represent a generic array in the ``GenericZarrStore`` as dictionary of properties. Although all properties of this class are optional, some of them are mandatory when added to the ``GenericZarrStore``. When added to the store using ``GenericZarrStore.add_array()``, the array *name* and *dims* must always be present. Other mandatory properties depend on the *data* and *get_data* properties, which are mutually exclusive: * *get_data* is called for a requested data chunk of an array. It must return a bytes object or a numpy nd-array and is passed the chunk index, the chunk shape, and this array info dictionary. *get_data* requires the following properties to be present too: *name*, *dims*, *dtype*, *shape*. *chunks* is optional and defaults to *shape*. * *data* must be a bytes object or a numpy nd-array. *data* requires the following properties to be present too: *name*, *dims*. *chunks* must be same as *shape*. The function *get_data* receives only keyword-arguments which comprises the ones passed by *get_data_params*, if any, and two special ones which may occur in the signature of *get_data*: * The keyword argument *chunk_info*, if given, provides a dictionary that holds information about the current chunk: - ``index: tuple[int, ...]`` - the chunk's index - ``shape: tuple[int, ...]`` - the chunk's shape - ``slices: tuple[slice, ...]`` - the chunk's array slices * The keyword argument *array_info*, if given, provides a dictionary that holds information about the overall array. It contains all array properties passed to the constructor of ``GenericArray`` plus - ``ndim: int`` - number of dimensions - ``num_chunks: tuple[int, ...]`` - number of chunks in every dimension ``GenericZarrStore`` will convert a Numpy array returned by *get_data* or given by *data* into a bytes object. It will also be compressed, if a *compressor* is given. It is important that the array chunks always See also https://zarr.readthedocs.io/en/stable/spec/v2.html#chunks Note that if the value of a named keyword argument is None, it will not be stored. Args: array: Optional array info dictionary name: Optional array name data: Optional array data. Mutually exclusive with *get_data*. Must be a bytes object or a numpy array. get_data: Optional array data chunk getter. Mutually exclusive with *data*. Called for a requested data chunk of an array. Must return a bytes object or a numpy array. get_data_params: Optional keyword-arguments passed to *get_data*. dtype: Optional array data type. Either a string using syntax of the Zarr spec or a ``numpy.dtype``. For string encoded data types, see https://zarr.readthedocs.io/en/stable/spec/v2.html#data- type-encoding dims: Optional sequence of dimension names. shape: Optional sequence of shape sizes for each dimension. chunks: Optional sequence of chunk sizes for each dimension. fill_value: Optional fill value, see https://zarr.readthedocs.io/en/stable/spec/v2.html#fill- value-encoding compressor: Optional compressor. If given, it must be an instance of ``numcodecs.abc.Codec``. filters: Optional sequence of filters, see https://zarr.readthedocs.io/en/stable/spec/v2.html#filters. order: Optional array endian ordering. If given, must be "C" or "F". Defaults to "C". attrs: Optional array attributes. If given, must be JSON- serializable. on_close: Optional array close handler. Called if the store is closed. chunk_encoding: Optional encoding type of the chunk data returned for the array. Can be "bytes" (the default) or "ndarray" for array chunks that are numpy.ndarray instances. kwargs: Other keyword arguments passed directly to the dictionary constructor. """ def __init__( self, array: Optional[dict[str, any]] = None, name: Optional[str] = None, get_data: Optional[GetData] = None, get_data_params: Optional[dict[str, Any]] = None, data: Optional[np.ndarray] = None, dtype: Optional[Union[str, np.dtype]] = None, dims: Optional[Union[str, Sequence[str]]] = None, shape: Optional[Sequence[int]] = None, chunks: Optional[Sequence[int]] = None, fill_value: Optional[Union[bool, int, float, str]] = None, compressor: Optional[numcodecs.abc.Codec] = None, filters: Optional[Sequence[numcodecs.abc.Codec]] = None, order: Optional[str] = None, attrs: Optional[dict[str, Any]] = None, on_close: Optional[OnClose] = None, chunk_encoding: Optional[str] = None, **kwargs, ): array = dict(array) if array is not None else dict() array.update( { k: v for k, v in dict( name=name, dtype=dtype, dims=dims, shape=shape, chunks=chunks, fill_value=fill_value, compressor=compressor, filters=filters, order=order, attrs=attrs, data=data, get_data=get_data, get_data_params=get_data_params, on_close=on_close, chunk_encoding=chunk_encoding, ).items() if v is not None } ) super().__init__(array, **kwargs)
[docs] def finalize(self) -> "GenericArray": """Normalize and validate array properties and return a valid array info dictionary to be stored in the `GenericZarrStore`. """ name = self.get("name") if not name: raise ValueError("missing array name") data = self.get("data") get_data = self.get("get_data") if data is None and get_data is None: raise ValueError( f"array {name!r}:" f" either data or get_data must be defined" ) if get_data is not None: if data is not None: raise ValueError( f"array {name!r}:" f" data and get_data cannot" f" be defined together" ) if not callable(get_data): raise TypeError(f"array {name!r}:" f" get_data must be a callable") sig = inspect.signature(get_data) get_data_info = { "has_array_info": "array_info" in sig.parameters, "has_chunk_info": "chunk_info" in sig.parameters, } get_data_params = dict(self.get("get_data_params") or {}) else: get_data_info = None get_data_params = None dims = self.get("dims") dims = [dims] if isinstance(dims, str) else dims if dims is None: raise ValueError(f"array {name!r}: missing dims") ndim = len(dims) if isinstance(data, np.ndarray): # forman: maybe warn if dtype or shape is given, # but does not match data.dtype and data.shape dtype = str(data.dtype.str) shape = data.shape chunks = data.shape else: dtype = self.get("dtype") shape = self.get("shape") chunks = self.get("chunks", shape) if not dtype: raise ValueError(f"array {name!r}: missing dtype") elif isinstance(dtype, np.dtype): dtype = dtype.str if shape is None: raise ValueError(f"array {name!r}: missing shape") if len(shape) != ndim: raise ValueError( f"array {name!r}:" f" dims and shape must have same length" ) if len(chunks) != ndim: raise ValueError( f"array {name!r}:" f" dims and chunks must have same length" ) num_chunks = tuple(map(lambda x: math.ceil(x[0] / x[1]), zip(shape, chunks))) filters = self.get("filters") if filters: filters = list(filters) for f in filters: if not isinstance(f, numcodecs.abc.Codec): raise TypeError( f"array {name!r}:" f" filter items must be an" f" instance of numcodecs.abc.Codec" ) else: filters = None compressor = self.get("compressor") if compressor is not None: if not isinstance(compressor, numcodecs.abc.Codec): raise TypeError( f"array {name!r}:" f" compressor must be an" f" instance of numcodecs.abc.Codec" ) fill_value = self.get("fill_value") if isinstance(fill_value, np.ndarray): fill_value = fill_value.item() allowed_fill_value_types = (type(None), bool, int, float, str) if not isinstance(fill_value, allowed_fill_value_types): raise TypeError( f"array {name!r}:" f" fill_value type must be one of" f" {tuple(t.__name__ for t in allowed_fill_value_types)}," f" was {type(fill_value).__name__}" ) order = self.get("order") or "C" allowed_orders = ("C", "F") if order not in allowed_orders: raise ValueError( f"array {name!r}:" f" order must be one of {allowed_orders}," f" was {order!r}" ) chunk_encoding = self.get("chunk_encoding") or "bytes" allowed_chunk_encodings = ("bytes", "ndarray") if chunk_encoding not in allowed_chunk_encodings: raise ValueError( f"array {name!r}:" f" chunk_encoding must be one of {allowed_chunk_encodings}," f" was {chunk_encoding!r}" ) attrs = self.get("attrs") if attrs is not None: if not isinstance(attrs, dict): raise TypeError( f"array {name!r}:" f" attrs must be dict, was {type(attrs).__name__}" ) # Note: passing the properties as dictionary # will prevent removing them if their value is None, # see GenericArray constructor. return GenericArray( { "name": name, "dtype": dtype, "dims": tuple(dims), "shape": tuple(shape), "chunks": tuple(chunks), "fill_value": fill_value, "filters": filters, "compressor": compressor, "order": order, "attrs": attrs, "data": data, "get_data": get_data, "get_data_params": get_data_params, "on_close": self.get("on_close"), "chunk_encoding": chunk_encoding, # Computed properties "ndim": len(dims), "num_chunks": num_chunks, "get_data_info": get_data_info, } )
GenericArrayLike = Union[GenericArray, dict[str, Any]]
[docs] class GenericZarrStore(zarr.storage.Store): """A Zarr store that maintains generic arrays in a flat, top-level hierarchy. The root of the store is a Zarr group conforming to the Zarr spec v2. It is designed to serve as a Zarr store for xarray datasets that compute their data arrays dynamically. See class ``GenericArray`` for specifying the arrays' properties. The array data of this store's arrays are either retrieved from static (numpy) arrays or from a callable that provides the array's data chunks as bytes or numpy arrays. Args: arrays: Arrays to be added. Typically, these will be instances of ``GenericArray``. attrs: Optional attributes of the top-level group. If given, it must be JSON serializable. array_defaults: Optional array defaults for array properties not passed to ``add_array``. Typically, this will be an instance of ``GenericArray``. """ # Shortcut for GenericArray Array = GenericArray def __init__( self, *arrays: GenericArrayLike, attrs: Optional[dict[str, Any]] = None, array_defaults: Optional[GenericArrayLike] = None, ): self._attrs = dict(attrs) if attrs is not None else {} self._array_defaults = array_defaults self._dim_sizes: dict[str, int] = {} self._arrays: dict[str, GenericArray] = {} for array in arrays: self.add_array(array)
[docs] def add_array( self, array: Optional[GenericArrayLike] = None, **array_kwargs ) -> None: """Add a new array to this store. Args: array: Optional array properties. Typically, this will be an instance of ``GenericArray``. array_kwargs: Keyword arguments form for the properties of ``GenericArray``. """ effective_array = GenericArray(self._array_defaults or {}) if array: effective_array.update(array) if array_kwargs: effective_array.update(array_kwargs) effective_array = effective_array.finalize() name = effective_array["name"] if name in self._arrays: raise ValueError(f"array {name!r} is already defined") dims = effective_array["dims"] shape = effective_array["shape"] for dim_name, dim_size in zip(dims, shape): old_dim_size = self._dim_sizes.get(dim_name) if old_dim_size is None: self._dim_sizes[name] = dim_size elif old_dim_size != dim_size: # Dimensions must have same lengths for all arrays # in this store raise ValueError( f"array {name!r}" f" defines dimension {dim_name!r}" f" with size {dim_size}," f" but existing size is {old_dim_size}" ) self._arrays[name] = effective_array
########################################################################## # Zarr Store implementation ##########################################################################
[docs] def is_writeable(self) -> bool: """Return False, because arrays in this store are generative.""" return False
[docs] def listdir(self, path: str = "") -> list[str]: """List a store path. Args: path: The path. Returns: List of sorted directory entries. """ if path == "": return sorted([".zmetadata", ".zgroup", ".zattrs", *self._arrays.keys()]) elif "/" not in path: return sorted(self._get_array_keys(path)) raise ValueError(f"{path} is not a directory")
[docs] def rmdir(self, path: str = "") -> None: """The general form removes store paths. This implementation can remove entire arrays only. Args: path: The array's name. """ if path not in self._arrays: raise ValueError(f"{path}: can only remove existing arrays") array = self._arrays.pop(path) dims = array["dims"] for i, dim_name in enumerate(dims): dim_used = False for array_name, array in self._arrays.items(): if dim_name in array["dims"]: dim_used = True break if not dim_used: del self._dim_sizes[dim_name]
[docs] def rename(self, src_path: str, dst_path: str) -> None: """The general form renames store paths. This implementation can rename arrays only. Args: src_path: Source array name. dst_path: Target array name. """ array = self._arrays.get(src_path) if array is None: raise ValueError( f"can only rename arrays, but {src_path!r}" f" is not an array" ) if dst_path in self._arrays: raise ValueError( f"cannot rename array {src_path!r} into" f" {dst_path!r} because it already exists" ) if "/" in dst_path: raise ValueError(f"cannot rename array {src_path!r}" f" into {dst_path!r}") array["name"] = dst_path self._arrays[dst_path] = array del self._arrays[src_path]
[docs] def close(self) -> None: """Calls the "on_close" handlers, if any, of arrays.""" for array in self._arrays.values(): on_close = array.get("on_close") if on_close is not None: on_close(array)
# Note, getsize is not implemented by intention as it requires # actual computation of arrays. # # def getsize(self, key: str) -> int: # pass ########################################################################## # MutableMapping implementation ########################################################################## def __iter__(self) -> Iterator[str]: """Get an iterator of all keys in this store.""" yield ".zmetadata" yield ".zgroup" yield ".zattrs" for array_name in self._arrays.keys(): yield from self._get_array_keys(array_name) def __len__(self) -> int: return sum(1 for _ in iter(self)) def __contains__(self, key: str) -> bool: if key in (".zmetadata", ".zgroup", ".zattrs"): return True try: array_name, value_id = self._parse_array_key(key) except KeyError: return False if value_id in (".zarray", ".zattrs"): return True try: self._get_array_chunk_index(array_name, value_id) return True except KeyError: return False def __getitem__(self, key: str) -> Union[bytes, np.ndarray]: item = self._get_item(key) if isinstance(item, dict): return dict_to_bytes(item) elif isinstance(item, str): return str_to_bytes(item) return item def __setitem__(self, key: str, value: bytes) -> None: class_name = self.__module__ + "." + self.__class__.__name__ raise TypeError(f"{class_name} is read-only") def __delitem__(self, key: str) -> None: self.rmdir(key) ######################################################################## # Utilities ##########################################################################
[docs] @classmethod def from_dataset( cls, dataset: xr.Dataset, array_defaults: Optional[GenericArrayLike] = None ) -> "GenericZarrStore": """Create a Zarr store for given *dataset*. to the *dataset*'s attributes. The following *array_defaults* properties can be provided (other properties are prescribed by the *dataset*): * ``fill_value``- defaults to None * ``compressor``- defaults to None * ``filters``- defaults to None * ``order``- defaults to "C" * ``chunk_encoding`` - defaults to "bytes" Args: dataset: The dataset array_defaults: Array default values. Returns: A new Zarr store instance. """ def _get_dataset_data(ds=None, chunk_info=None, array_info=None) -> np.ndarray: array_name = array_info["name"] chunk_slices = chunk_info["slices"] return ds[array_name][chunk_slices].values arrays = [] for var_name, var in dataset.variables.items(): arrays.append( GenericArray( name=str(var_name), dtype=np.dtype(var.dtype).str, dims=[str(dim) for dim in var.dims], shape=var.shape, chunks=( [(max(*c) if len(c) > 1 else c[0]) for c in var.chunks] if var.chunks else var.shape ), attrs={str(k): v for k, v in var.attrs.items()}, get_data=_get_dataset_data, get_data_params=dict(ds=dataset), ) ) attrs = {str(k): v for k, v in dataset.attrs.items()} return GenericZarrStore(*arrays, attrs=attrs, array_defaults=array_defaults)
######################################################################## # Helpers ########################################################################## def _get_item(self, key: str) -> Union[dict, str, bytes]: if key == ".zmetadata": return self._get_metadata_item() if key == ".zgroup": return self._get_group_item() if key == ".zattrs": return self._get_attrs_item() array_name, value_id = self._parse_array_key(key) array = self._arrays[array_name] if value_id == ".zarray": return self._get_array_spec_item(array) if value_id == ".zattrs": return self._get_array_attrs_item(array) chunk_index = self._get_array_chunk_index(array_name, value_id) return self._get_array_data_item(array, chunk_index) def _get_metadata_item(self): metadata = { ".zgroup": self._get_item(".zgroup"), ".zattrs": self._get_item(".zattrs"), } for array_name in self._arrays.keys(): key = array_name + "/.zarray" metadata[key] = self._get_item(key) key = array_name + "/.zattrs" metadata[key] = self._get_item(key) return {"zarr_consolidated_format": 1, "metadata": metadata} # noinspection PyMethodMayBeStatic def _get_group_item(self): return {"zarr_format": 2} def _get_attrs_item(self): return self._attrs or {} # noinspection PyMethodMayBeStatic def _get_array_spec_item(self, array: GenericArray): # JSON-encode fill_value fill_value = array["fill_value"] if isinstance(fill_value, float): if math.isnan(fill_value): fill_value = "NaN" elif math.isinf(fill_value): if fill_value < 0: fill_value = "-Infinity" else: fill_value = "Infinity" # JSON-encode compressor compressor = array["compressor"] if compressor is not None: compressor = compressor.get_config() # JSON-encode filters filters = array["filters"] if filters is not None: filters = list(f.get_config() for f in filters) return { "zarr_format": 2, "dtype": array["dtype"], "shape": list(array["shape"]), "chunks": list(array["chunks"]), "fill_value": fill_value, "compressor": compressor, "filters": filters, "order": array["order"], } # noinspection PyMethodMayBeStatic def _get_array_attrs_item(self, array: GenericArray): dims = array["dims"] attrs = array["attrs"] return {"_ARRAY_DIMENSIONS": dims, **(attrs or {})} # noinspection PyMethodMayBeStatic def _get_array_data_item( self, array: dict[str, Any], chunk_index: tuple[int] ) -> Union[bytes, np.ndarray]: # Note, here array is expected to be "finalized", # that is, validated and normalized shape = array["shape"] chunks = array["chunks"] chunk_shape = None data = array["data"] if data is None: get_data = array["get_data"] assert callable(get_data) # Has been ensured before get_data_params = array["get_data_params"] get_data_kwargs = dict(get_data_params) get_data_info = array["get_data_info"] if get_data_info["has_chunk_info"]: chunk_shape = get_chunk_shape(shape, chunks, chunk_index) array_slices = get_array_slices(shape, chunks, chunk_index) get_data_kwargs["chunk_info"] = { "index": chunk_index, "shape": chunk_shape, "slices": array_slices, } if get_data_info["has_array_info"]: get_data_kwargs["array_info"] = dict(array) data = get_data(**get_data_kwargs) chunk_encoding = array["chunk_encoding"] if isinstance(data, np.ndarray): # As of Zarr 2.0, all chunks of an array # must have the same shape (= chunks) if data.shape != chunks: # This commonly happens if array shape sizes # are not integer multiple of chunk shape sizes. if chunk_shape is None: # Compute expected chunk shape. chunk_shape = get_chunk_shape(shape, chunks, chunk_index) # We will only pad the data if the data shape # corresponds to the expected chunk's shape. if data.shape == chunk_shape: padding = get_chunk_padding(shape, chunks, chunk_index) fill_value = array["fill_value"] data = np.pad( data, padding, mode="constant", constant_values=fill_value or 0 ) else: key = format_chunk_key(array["name"], chunk_index) raise ValueError( f"{key}:" f" data chunk at {chunk_index}" f" must have shape {chunk_shape}," f" but was {data.shape}" ) if chunk_encoding == "bytes": # Convert to bytes, filter and compress data = ndarray_to_bytes( data, order=array["order"], filters=array["filters"], compressor=array["compressor"], ) # Sanity check if (chunk_encoding == "bytes" and not isinstance(data, bytes)) or ( chunk_encoding == "ndarray" and not isinstance(data, np.ndarray) ): key = format_chunk_key(array["name"], chunk_index) expected_type = "numpy.ndarray" if chunk_encoding == "ndarray" else "bytes" raise TypeError( f"{key}:" f" data must be encoded as {expected_type}," f" but was {type(data).__name__}" ) return data def _parse_array_key(self, key: str) -> tuple[str, str]: array_name_and_value_id = key.rsplit("/", maxsplit=1) if len(array_name_and_value_id) != 2: raise KeyError(key) array_name, value_id = array_name_and_value_id if array_name not in self._arrays: raise KeyError(key) return array_name, value_id def _get_array_chunk_index(self, array_name: str, index_id: str) -> tuple[int]: try: chunk_index = tuple(map(int, index_id.split("."))) except (ValueError, TypeError): raise KeyError(f"{array_name}/{index_id}") array = self._arrays[array_name] shape = array["shape"] if len(chunk_index) != len(shape): raise KeyError(f"{array_name}/{index_id}") num_chunks = array["num_chunks"] for i, n in zip(chunk_index, num_chunks): if not (0 <= i < n): raise KeyError(f"{array_name}/{index_id}") return chunk_index def _get_array_keys(self, array_name: str) -> Iterator[str]: yield array_name + "/.zarray" yield array_name + "/.zattrs" array = self._arrays[array_name] num_chunks = array["num_chunks"] yield from get_chunk_keys(array_name, num_chunks)
def get_array_slices( shape: tuple[int, ...], chunks: tuple[int, ...], chunk_index: tuple[int, ...] ) -> tuple[slice, ...]: return tuple( slice(i * c, i * c + (c if (i + 1) * c <= s else s % c)) for s, c, i in zip(shape, chunks, chunk_index) ) def get_chunk_shape( shape: tuple[int, ...], chunks: tuple[int, ...], chunk_index: tuple[int, ...] ) -> tuple[int, ...]: return tuple( c if (i + 1) * c <= s else s % c for s, c, i in zip(shape, chunks, chunk_index) ) def get_chunk_padding( shape: tuple[int, ...], chunks: tuple[int, ...], chunk_index: tuple[int, ...] ): return tuple( (0, 0 if (i + 1) * c <= s else c - s % c) for s, c, i in zip(shape, chunks, chunk_index) ) def get_chunk_indexes(num_chunks: tuple[int, ...]) -> Iterator[tuple[int, ...]]: if not num_chunks: yield 0, else: yield from itertools.product(*tuple(map(range, map(int, num_chunks)))) def get_chunk_keys(array_name: str, num_chunks: tuple[int, ...]) -> Iterator[str]: for chunk_index in get_chunk_indexes(num_chunks): yield format_chunk_key(array_name, chunk_index) def format_chunk_key(array_name: str, chunk_index: tuple[int, ...]) -> str: chunk_id = ".".join(map(str, chunk_index)) return f"{array_name}/{chunk_id}" def dict_to_bytes(d: dict) -> bytes: return str_to_bytes(json.dumps(d, indent=2)) def str_to_bytes(s: str) -> bytes: return bytes(s, encoding="utf-8") def ndarray_to_bytes( data: np.ndarray, order: Optional[str] = None, filters: Optional[Sequence[Any]] = None, compressor: Optional[numcodecs.abc.Codec] = None, ) -> bytes: data = data.tobytes(order=order or "C") if filters: for f in filters: data = f.encode(data) if compressor is not None: data = compressor.encode(data) return data