# The MIT License (MIT)
# Copyright (c) 2022 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import collections.abc
import inspect
import itertools
import json
import math
import threading
import warnings
from typing import Iterator, Dict, Tuple, Any, Callable, \
Optional, List, Sequence
from typing import Union
import numcodecs.abc
import numpy as np
import xarray as xr
import zarr.storage
from xcube.util.assertions import assert_instance, assert_true
GetData = Callable[[Tuple[int]],
Union[bytes, np.ndarray]]
OnClose = Callable[[Dict[str, Any]], None]
[docs]class GenericArray(Dict[str, any]):
"""
Represent a generic array in the ``GenericZarrStore`` as
dictionary of properties.
Although all properties of this class are optional,
some of them are mandatory when added to the ``GenericZarrStore``.
When added to the store using ``GenericZarrStore.add_array()``,
the array *name* and *dims* must always be present.
Other mandatory properties depend on
the *data* and *get_data* properties, which are mutually exclusive:
* *get_data* is called for a requested data chunk of an array.
It must return a bytes object or a numpy nd-array and is passed
the chunk index, the chunk shape, and this array info dictionary.
*get_data* requires the following properties to be present too:
*name*, *dims*, *dtype*, *shape*.
*chunks* is optional and defaults to *shape*.
* *data* must be a bytes object or a numpy nd-array.
*data* requires the following properties to be present too:
*name*, *dims*. *chunks* must be same as *shape*.
The function *get_data* receives only keyword-arguments which
comprises the ones passed by *get_data_params*, if any, and
two special ones which may occur in the signature of *get_data*:
* The keyword argument *chunk_info*, if given, provides a dictionary
that holds information about the current chunk:
- ``index: tuple[int, ...]`` - the chunk's index
- ``shape: tuple[int, ...]`` - the chunk's shape
- ``slices: tuple[slice, ...]`` - the chunk's array slices
* The keyword argument *array_info*, if given, provides a dictionary
that holds information about the overall array. It contains
all array properties passed to the constructor of ``GenericArray``
plus
- ``ndim: int`` - number of dimensions
- ``num_chunks: tuple[int, ...]`` - number of chunks in every dimension
``GenericZarrStore`` will convert a Numpy array returned
by *get_data* or given by *data* into a bytes object.
It will also be compressed, if a *compressor* is given.
It is important that the array chunks always See also
https://zarr.readthedocs.io/en/stable/spec/v2.html#chunks
Note that if the value of a named keyword argument is None,
it will not be stored.
:param array: Optional array info dictionary
:param name: Optional array name
:param data: Optional array data.
Mutually exclusive with *get_data*.
Must be a bytes object or a numpy array.
:param get_data: Optional array data chunk getter.
Mutually exclusive with *data*.
Called for a requested data chunk of an array.
Must return a bytes object or a numpy array.
:param get_data_params: Optional keyword-arguments passed
to *get_data*.
:param dtype: Optional array data type.
Either a string using syntax of the Zarr spec or a ``numpy.dtype``.
For string encoded data types, see
https://zarr.readthedocs.io/en/stable/spec/v2.html#data-type-encoding
:param dims: Optional sequence of dimension names.
:param shape: Optional sequence of shape sizes for each dimension.
:param chunks: Optional sequence of chunk sizes for each dimension.
:param fill_value: Optional fill value, see
https://zarr.readthedocs.io/en/stable/spec/v2.html#fill-value-encoding
:param compressor: Optional compressor.
If given, it must be an instance of ``numcodecs.abc.Codec``.
:param filters: Optional sequence of filters, see
https://zarr.readthedocs.io/en/stable/spec/v2.html#filters.
:param order: Optional array endian ordering.
If given, must be "C" or "F". Defaults to "C".
:param attrs: Optional array attributes.
If given, must be JSON-serializable.
:param on_close: Optional array close handler.
Called if the store is closed.
:param chunk_encoding: Optional encoding type of the chunk
data returned for the array. Can be "bytes" (the default)
or "ndarray" for array chunks that are numpy.ndarray instances.
:param kwargs: Other keyword arguments passed directly to the
dictionary constructor.
"""
def __init__(self,
array: Optional[Dict[str, any]] = None,
name: Optional[str] = None,
get_data: Optional[GetData] = None,
get_data_params: Optional[Dict[str, Any]] = None,
data: Optional[np.ndarray] = None,
dtype: Optional[Union[str, np.dtype]] = None,
dims: Optional[Union[str, Sequence[str]]] = None,
shape: Optional[Sequence[int]] = None,
chunks: Optional[Sequence[int]] = None,
fill_value: Optional[Union[bool, int, float, str]] = None,
compressor: Optional[numcodecs.abc.Codec] = None,
filters: Optional[Sequence[numcodecs.abc.Codec]] = None,
order: Optional[str] = None,
attrs: Optional[Dict[str, Any]] = None,
on_close: Optional[OnClose] = None,
chunk_encoding: Optional[str] = None,
**kwargs):
array = dict(array) if array is not None else dict()
array.update({
k: v
for k, v in dict(
name=name,
dtype=dtype,
dims=dims,
shape=shape,
chunks=chunks,
fill_value=fill_value,
compressor=compressor,
filters=filters,
order=order,
attrs=attrs,
data=data,
get_data=get_data,
get_data_params=get_data_params,
on_close=on_close,
chunk_encoding=chunk_encoding
).items()
if v is not None
})
super().__init__(array, **kwargs)
[docs] def finalize(self) -> "GenericArray":
"""Normalize and validate array properties and return a valid
array info dictionary to be stored in the `GenericZarrStore`.
"""
name = self.get("name")
if not name:
raise ValueError("missing array name")
data = self.get("data")
get_data = self.get("get_data")
if data is None and get_data is None:
raise ValueError(f"array {name!r}:"
f" either data or get_data must be defined")
if get_data is not None:
if data is not None:
raise ValueError(f"array {name!r}:"
f" data and get_data cannot"
f" be defined together")
if not callable(get_data):
raise TypeError(f"array {name!r}:"
f" get_data must be a callable")
sig = inspect.signature(get_data)
get_data_info = {
"has_array_info": "array_info" in sig.parameters,
"has_chunk_info": "chunk_info" in sig.parameters,
}
get_data_params = dict(self.get("get_data_params") or {})
else:
get_data_info = None
get_data_params = None
dims = self.get("dims")
dims = [dims] if isinstance(dims, str) else dims
if dims is None:
raise ValueError(f"array {name!r}: missing dims")
ndim = len(dims)
if isinstance(data, np.ndarray):
# forman: maybe warn if dtype or shape is given,
# but does not match data.dtype and data.shape
dtype = str(data.dtype.str)
shape = data.shape
chunks = data.shape
else:
dtype = self.get("dtype")
shape = self.get("shape")
chunks = self.get("chunks", shape)
if not dtype:
raise ValueError(f"array {name!r}: missing dtype")
elif isinstance(dtype, np.dtype):
dtype = dtype.str
if shape is None:
raise ValueError(f"array {name!r}: missing shape")
if len(shape) != ndim:
raise ValueError(f"array {name!r}:"
f" dims and shape must have same length")
if len(chunks) != ndim:
raise ValueError(f"array {name!r}:"
f" dims and chunks must have same length")
num_chunks = tuple(map(lambda x: math.ceil(x[0] / x[1]),
zip(shape, chunks)))
filters = self.get("filters")
if filters:
filters = list(filters)
for f in filters:
if not isinstance(f, numcodecs.abc.Codec):
raise TypeError(f"array {name!r}:"
f" filter items must be an"
f" instance of numcodecs.abc.Codec")
else:
filters = None
compressor = self.get("compressor")
if compressor is not None:
if not isinstance(compressor, numcodecs.abc.Codec):
raise TypeError(f"array {name!r}:"
f" compressor must be an"
f" instance of numcodecs.abc.Codec")
fill_value = self.get("fill_value")
if isinstance(fill_value, np.ndarray):
fill_value = fill_value.item()
allowed_fill_value_types = (type(None), bool, int, float, str)
if not isinstance(fill_value, allowed_fill_value_types):
raise TypeError(
f"array {name!r}:"
f" fill_value type must be one of"
f" {tuple(t.__name__ for t in allowed_fill_value_types)},"
f" was {type(fill_value).__name__}"
)
order = self.get("order") or "C"
allowed_orders = ("C", "F")
if order not in allowed_orders:
raise ValueError(
f"array {name!r}:"
f" order must be one of {allowed_orders},"
f" was {order!r}"
)
chunk_encoding = self.get("chunk_encoding") or "bytes"
allowed_chunk_encodings = ("bytes", "ndarray")
if chunk_encoding not in allowed_chunk_encodings:
raise ValueError(
f"array {name!r}:"
f" chunk_encoding must be one of {allowed_chunk_encodings},"
f" was {chunk_encoding!r}"
)
attrs = self.get("attrs")
if attrs is not None:
if not isinstance(attrs, dict):
raise TypeError(
f"array {name!r}:"
f" attrs must be dict, was {type(attrs).__name__}"
)
# Note: passing the properties as dictionary
# will prevent removing them if their value is None,
# see GenericArray constructor.
return GenericArray({
"name": name,
"dtype": dtype,
"dims": tuple(dims),
"shape": tuple(shape),
"chunks": tuple(chunks),
"fill_value": fill_value,
"filters": filters,
"compressor": compressor,
"order": order,
"attrs": attrs,
"data": data,
"get_data": get_data,
"get_data_params": get_data_params,
"on_close": self.get("on_close"),
"chunk_encoding": chunk_encoding,
# Computed properties
"ndim": len(dims),
"num_chunks": num_chunks,
"get_data_info": get_data_info,
})
GenericArrayLike = Union[GenericArray, Dict[str, Any]]
[docs]class GenericZarrStore(zarr.storage.Store):
"""A Zarr store that maintains generic arrays in a flat, top-level
hierarchy. The root of the store is a Zarr group
conforming to the Zarr spec v2.
It is designed to serve as a Zarr store for xarray datasets
that compute their data arrays dynamically.
See class ``GenericArray`` for specifying the arrays' properties.
The array data of this store's arrays are either retrieved from
static (numpy) arrays or from a callable that provides the
array's data chunks as bytes or numpy arrays.
:param arrays: Arrays to be added.
Typically, these will be instances of ``GenericArray``.
:param attrs: Optional attributes of the top-level group.
If given, it must be JSON serializable.
:param array_defaults: Optional array defaults for
array properties not passed to ``add_array``.
Typically, this will be an instance of ``GenericArray``.
"""
# Shortcut for GenericArray
Array = GenericArray
def __init__(
self,
*arrays: GenericArrayLike,
attrs: Optional[Dict[str, Any]] = None,
array_defaults: Optional[GenericArrayLike] = None
):
self._attrs = dict(attrs) if attrs is not None else {}
self._array_defaults = array_defaults
self._dim_sizes: Dict[str, int] = {}
self._arrays: Dict[str, GenericArray] = {}
for array in arrays:
self.add_array(array)
[docs] def add_array(self,
array: Optional[GenericArrayLike] = None,
**array_kwargs) -> None:
"""
Add a new array to this store.
:param array: Optional array properties.
Typically, this will be an instance of ``GenericArray``.
:param array_kwargs: Keyword arguments form
for the properties of ``GenericArray``.
"""
effective_array = GenericArray(self._array_defaults or {})
if array:
effective_array.update(array)
if array_kwargs:
effective_array.update(array_kwargs)
effective_array = effective_array.finalize()
name = effective_array["name"]
if name in self._arrays:
raise ValueError(f"array {name!r} is already defined")
dims = effective_array["dims"]
shape = effective_array["shape"]
for dim_name, dim_size in zip(dims, shape):
old_dim_size = self._dim_sizes.get(dim_name)
if old_dim_size is None:
self._dim_sizes[name] = dim_size
elif old_dim_size != dim_size:
# Dimensions must have same lengths for all arrays
# in this store
raise ValueError(f"array {name!r}"
f" defines dimension {dim_name!r}"
f" with size {dim_size},"
f" but existing size is {old_dim_size}")
self._arrays[name] = effective_array
##########################################################################
# Zarr Store implementation
##########################################################################
[docs] def is_writeable(self) -> bool:
"""Return False, because arrays in this store are generative."""
return False
[docs] def listdir(self, path: str = "") -> List[str]:
"""List a store path.
:param path: The path.
:return: List of sorted directory entries.
"""
if path == "":
return sorted([
".zmetadata",
".zgroup",
".zattrs",
*self._arrays.keys()
])
elif "/" not in path:
return sorted(self._get_array_keys(path))
raise ValueError(f"{path} is not a directory")
[docs] def rmdir(self, path: str = "") -> None:
"""The general form removes store paths.
This implementation can remove entire arrays only.
:param path: The array's name.
"""
if path not in self._arrays:
raise ValueError(f"{path}: can only remove existing arrays")
array = self._arrays.pop(path)
dims = array["dims"]
for i, dim_name in enumerate(dims):
dim_used = False
for array_name, array in self._arrays.items():
if dim_name in array["dims"]:
dim_used = True
break
if not dim_used:
del self._dim_sizes[dim_name]
[docs] def rename(self, src_path: str, dst_path: str) -> None:
"""The general form renames store paths.
This implementation can rename arrays only.
:param src_path: Source array name.
:param dst_path: Target array name.
"""
array = self._arrays.get(src_path)
if array is None:
raise ValueError(f"can only rename arrays, but {src_path!r}"
f" is not an array")
if dst_path in self._arrays:
raise ValueError(f"cannot rename array {src_path!r} into"
f" {dst_path!r} because it already exists")
if "/" in dst_path:
raise ValueError(f"cannot rename array {src_path!r}"
f" into {dst_path!r}")
array["name"] = dst_path
self._arrays[dst_path] = array
del self._arrays[src_path]
[docs] def close(self) -> None:
"""Calls the "on_close" handlers, if any, of arrays."""
for array in self._arrays.values():
on_close = array.get("on_close")
if on_close is not None:
on_close(array)
# Note, getsize is not implemented by intention as it requires
# actual computation of arrays.
#
# def getsize(self, key: str) -> int:
# pass
##########################################################################
# MutableMapping implementation
##########################################################################
def __iter__(self) -> Iterator[str]:
"""Get an iterator of all keys in this store."""
yield ".zmetadata"
yield ".zgroup"
yield ".zattrs"
for array_name in self._arrays.keys():
yield from self._get_array_keys(array_name)
def __len__(self) -> int:
return sum(1 for _ in iter(self))
def __contains__(self, key: str) -> bool:
if key in (".zmetadata", ".zgroup", ".zattrs"):
return True
try:
array_name, value_id = self._parse_array_key(key)
except KeyError:
return False
if value_id in (".zarray", ".zattrs"):
return True
try:
self._get_array_chunk_index(array_name, value_id)
return True
except KeyError:
return False
def __getitem__(self, key: str) -> Union[bytes, np.ndarray]:
item = self._get_item(key)
if isinstance(item, dict):
return dict_to_bytes(item)
elif isinstance(item, str):
return str_to_bytes(item)
return item
def __setitem__(self, key: str, value: bytes) -> None:
class_name = self.__module__ + '.' + self.__class__.__name__
raise TypeError(f'{class_name} is read-only')
def __delitem__(self, key: str) -> None:
self.rmdir(key)
########################################################################
# Utilities
##########################################################################
[docs] @classmethod
def from_dataset(cls,
dataset: xr.Dataset,
array_defaults: Optional[GenericArrayLike] = None) \
-> "GenericZarrStore":
"""Create a Zarr store for given *dataset*.
to the *dataset*'s attributes.
The following *array_defaults* properties can be provided
(other properties are prescribed by the *dataset*):
* ``fill_value``- defaults to None
* ``compressor``- defaults to None
* ``filters``- defaults to None
* ``order``- defaults to "C"
* ``chunk_encoding`` - defaults to "bytes"
:param dataset: The dataset
:param array_defaults: Array default values.
:return: A new Zarr store instance.
"""
def _get_dataset_data(ds=None,
chunk_info=None,
array_info=None) -> np.ndarray:
array_name = array_info["name"]
chunk_slices = chunk_info["slices"]
return ds[array_name][chunk_slices].values
arrays = []
for var_name, var in dataset.variables.items():
arrays.append(GenericArray(
name=str(var_name),
dtype=np.dtype(var.dtype).str,
dims=[str(dim) for dim in var.dims],
shape=var.shape,
chunks=[(max(*c) if len(c) > 1 else c[0])
for c in var.chunks] if var.chunks else var.shape,
attrs={str(k): v for k, v in var.attrs.items()},
get_data=_get_dataset_data,
get_data_params=dict(ds=dataset),
))
attrs = {str(k): v for k, v in dataset.attrs.items()}
return GenericZarrStore(*arrays,
attrs=attrs,
array_defaults=array_defaults)
########################################################################
# Helpers
##########################################################################
def _get_item(self, key: str) -> Union[dict, str, bytes]:
if key == ".zmetadata":
return self._get_metadata_item()
if key == ".zgroup":
return self._get_group_item()
if key == ".zattrs":
return self._get_attrs_item()
array_name, value_id = self._parse_array_key(key)
array = self._arrays[array_name]
if value_id == '.zarray':
return self._get_array_spec_item(array)
if value_id == '.zattrs':
return self._get_array_attrs_item(array)
chunk_index = self._get_array_chunk_index(array_name, value_id)
return self._get_array_data_item(array, chunk_index)
def _get_metadata_item(self):
metadata = {
".zgroup": self._get_item(".zgroup"),
".zattrs": self._get_item(".zattrs"),
}
for array_name in self._arrays.keys():
key = array_name + "/.zarray"
metadata[key] = self._get_item(key)
key = array_name + "/.zattrs"
metadata[key] = self._get_item(key)
return {
"zarr_consolidated_format": 1,
"metadata": metadata
}
# noinspection PyMethodMayBeStatic
def _get_group_item(self):
return {
"zarr_format": 2
}
def _get_attrs_item(self):
return self._attrs or {}
# noinspection PyMethodMayBeStatic
def _get_array_spec_item(self, array: GenericArray):
# JSON-encode fill_value
fill_value = array["fill_value"]
if isinstance(fill_value, float):
if math.isnan(fill_value):
fill_value = "NaN"
elif math.isinf(fill_value):
if fill_value < 0:
fill_value = "-Infinity"
else:
fill_value = "Infinity"
# JSON-encode compressor
compressor = array["compressor"]
if compressor is not None:
compressor = compressor.get_config()
# JSON-encode filters
filters = array["filters"]
if filters is not None:
filters = list(f.get_config() for f in filters)
return {
"zarr_format": 2,
"dtype": array["dtype"],
"shape": list(array["shape"]),
"chunks": list(array["chunks"]),
"fill_value": fill_value,
"compressor": compressor,
"filters": filters,
"order": array["order"],
}
# noinspection PyMethodMayBeStatic
def _get_array_attrs_item(self, array: GenericArray):
dims = array["dims"]
attrs = array["attrs"]
return {
"_ARRAY_DIMENSIONS": dims,
**(attrs or {})
}
# noinspection PyMethodMayBeStatic
def _get_array_data_item(self,
array: Dict[str, Any],
chunk_index: Tuple[int]) \
-> Union[bytes, np.ndarray]:
# Note, here array is expected to be "finalized",
# that is, validated and normalized
shape = array["shape"]
chunks = array["chunks"]
chunk_shape = None
data = array["data"]
if data is None:
get_data = array["get_data"]
assert callable(get_data) # Has been ensured before
get_data_params = array["get_data_params"]
get_data_kwargs = dict(get_data_params)
get_data_info = array["get_data_info"]
if get_data_info["has_chunk_info"]:
chunk_shape = get_chunk_shape(shape, chunks, chunk_index)
array_slices = get_array_slices(shape, chunks, chunk_index)
get_data_kwargs["chunk_info"] = {
"index": chunk_index,
"shape": chunk_shape,
"slices": array_slices,
}
if get_data_info["has_array_info"]:
get_data_kwargs["array_info"] = dict(array)
data = get_data(**get_data_kwargs)
chunk_encoding = array["chunk_encoding"]
if isinstance(data, np.ndarray):
# As of Zarr 2.0, all chunks of an array
# must have the same shape (= chunks)
if data.shape != chunks:
# This commonly happens if array shape sizes
# are not integer multiple of chunk shape sizes.
if chunk_shape is None:
# Compute expected chunk shape.
chunk_shape = get_chunk_shape(shape, chunks, chunk_index)
# We will only pad the data if the data shape
# corresponds to the expected chunk's shape.
if data.shape == chunk_shape:
padding = get_chunk_padding(shape, chunks, chunk_index)
fill_value = array["fill_value"]
data = np.pad(data, padding,
mode="constant",
constant_values=fill_value or 0)
else:
key = format_chunk_key(array["name"], chunk_index)
raise ValueError(f"{key}:"
f" data chunk at {chunk_index}"
f" must have shape {chunk_shape},"
f" but was {data.shape}")
if chunk_encoding == "bytes":
# Convert to bytes, filter and compress
data = ndarray_to_bytes(data,
order=array["order"],
filters=array["filters"],
compressor=array["compressor"])
# Sanity check
if (chunk_encoding == "bytes"
and not isinstance(data, bytes)) \
or (chunk_encoding == "ndarray"
and not isinstance(data, np.ndarray)):
key = format_chunk_key(array["name"],
chunk_index)
expected_type = "numpy.ndarray" if chunk_encoding == "ndarray" \
else "bytes"
raise TypeError(f"{key}:"
f" data must be encoded as {expected_type},"
f" but was {type(data).__name__}")
return data
def _parse_array_key(self, key: str) -> Tuple[str, str]:
array_name_and_value_id = key.rsplit('/', maxsplit=1)
if len(array_name_and_value_id) != 2:
raise KeyError(key)
array_name, value_id = array_name_and_value_id
if array_name not in self._arrays:
raise KeyError(key)
return array_name, value_id
def _get_array_chunk_index(self,
array_name: str,
index_id: str) -> Tuple[int]:
try:
chunk_index = tuple(map(int, index_id.split('.')))
except (ValueError, TypeError):
raise KeyError(f"{array_name}/{index_id}")
array = self._arrays[array_name]
shape = array["shape"]
if len(chunk_index) != len(shape):
raise KeyError(f"{array_name}/{index_id}")
num_chunks = array["num_chunks"]
for i, n in zip(chunk_index, num_chunks):
if not (0 <= i < n):
raise KeyError(f"{array_name}/{index_id}")
return chunk_index
def _get_array_keys(self, array_name: str) -> Iterator[str]:
yield array_name + "/.zarray"
yield array_name + "/.zattrs"
array = self._arrays[array_name]
num_chunks = array["num_chunks"]
yield from get_chunk_keys(array_name, num_chunks)
def get_array_slices(shape: Tuple[int, ...],
chunks: Tuple[int, ...],
chunk_index: Tuple[int, ...]) -> Tuple[slice, ...]:
return tuple(
slice(i * c,
i * c + (c if (i + 1) * c <= s else s % c))
for s, c, i in zip(shape, chunks, chunk_index)
)
def get_chunk_shape(shape: Tuple[int, ...],
chunks: Tuple[int, ...],
chunk_index: Tuple[int, ...]) -> Tuple[int, ...]:
return tuple(
c if (i + 1) * c <= s else s % c
for s, c, i in zip(shape, chunks, chunk_index)
)
def get_chunk_padding(shape: Tuple[int, ...],
chunks: Tuple[int, ...],
chunk_index: Tuple[int, ...]):
return tuple(
(0, 0 if (i + 1) * c <= s else c - s % c)
for s, c, i in zip(shape, chunks, chunk_index)
)
def get_chunk_indexes(num_chunks: Tuple[int, ...]) \
-> Iterator[Tuple[int, ...]]:
if not num_chunks:
yield 0,
else:
yield from itertools.product(*tuple(map(range, map(int, num_chunks))))
def get_chunk_keys(array_name: str,
num_chunks: Tuple[int, ...]) -> Iterator[str]:
for chunk_index in get_chunk_indexes(num_chunks):
yield format_chunk_key(array_name, chunk_index)
def format_chunk_key(array_name: str,
chunk_index: Tuple[int, ...]) -> str:
chunk_id = '.'.join(map(str, chunk_index))
return f"{array_name}/{chunk_id}"
def dict_to_bytes(d: Dict) -> bytes:
return str_to_bytes(json.dumps(d, indent=2))
def str_to_bytes(s: str) -> bytes:
return bytes(s, encoding='utf-8')
def ndarray_to_bytes(
data: np.ndarray,
order: Optional[str] = None,
filters: Optional[Sequence[Any]] = None,
compressor: Optional[numcodecs.abc.Codec] = None
) -> bytes:
data = data.tobytes(order=order or "C")
if filters:
for f in filters:
data = f.encode(data)
if compressor is not None:
data = compressor.encode(data)
return data