Source code for xcube.core.chunk

import itertools
from typing import Dict, Tuple, Iterable, Iterator

import numpy as np
import xarray as xr

from xcube.core.update import update_dataset_chunk_encoding

[docs]def chunk_dataset(dataset: xr.Dataset, chunk_sizes: Dict[str, int] = None, format_name: str = None) -> xr.Dataset: """ Chunk *dataset* using *chunk_sizes* and optionally update encodings for given *format_name*. :param dataset: input dataset :param chunk_sizes: mapping from dimension name to new chunk size :param format_name: optional format, e.g. "zarr" or "netcdf4" :return: the (re)chunked dataset """ dataset = dataset.chunk(chunks=chunk_sizes) if format_name: dataset = update_dataset_chunk_encoding(dataset, chunk_sizes=chunk_sizes, format_name=format_name) return dataset
def get_empty_dataset_chunks(dataset: xr.Dataset) \ -> Iterator[Tuple[str, Iterator[Tuple[int, ...]]]]: """ Identify empty dataset chunks and return their indices. :param dataset: The dataset. :return: An iterator that provides a stream of (variable name, block indices tuple) tuples. """ return ((str(var_name), get_empty_var_chunks(dataset[var_name])) for var_name in dataset.data_vars) def get_empty_var_chunks(var: xr.DataArray) -> Iterator[Tuple[int, ...]]: """ Identify empty variable chunks and return their indices. :param var: The variable. :return: A list of block indices. """ chunks = var.chunks if chunks is None: return None for chunk_index, chunk_slice in compute_chunk_slices(chunks): data_index = tuple(slice(start, end) for start, end in chunk_slice) data = var[data_index] if np.all(np.isnan(data)): # print(f'empty: {}/{".".join(map(str, chunk_index))}') yield chunk_index def compute_chunk_slices(chunks: Tuple[Tuple[int, ...], ...]) -> Iterable: chunk_indices = [] for c in chunks: chunk_indices.append(tuple(i for i in range(len(c)))) chunk_slices = [] for c in chunks: x = [] o = 0 for s in c: x.append((o, o + s)) o += s chunk_slices.append(tuple(x)) return zip(itertools.product(*chunk_indices), itertools.product(*chunk_slices))