Source code for xcube.core.chunk

import itertools
from typing import Dict, Tuple, Iterable, Iterator

import numpy as np
import xarray as xr

from xcube.core.update import update_dataset_chunk_encoding


[docs] def chunk_dataset( dataset: xr.Dataset, chunk_sizes: Dict[str, int] = None, format_name: str = None ) -> xr.Dataset: """Chunk *dataset* using *chunk_sizes* and optionally update encodings for given *format_name*. Args: dataset: input dataset chunk_sizes: mapping from dimension name to new chunk size format_name: optional format, e.g. "zarr" or "netcdf4" Returns: the (re)chunked dataset """ dataset = dataset.chunk(chunks=chunk_sizes) if format_name: dataset = update_dataset_chunk_encoding( dataset, chunk_sizes=chunk_sizes, format_name=format_name ) return dataset
def get_empty_dataset_chunks( dataset: xr.Dataset, ) -> Iterator[Tuple[str, Iterator[Tuple[int, ...]]]]: """Identify empty dataset chunks and return their indices. Args: dataset: The dataset. Returns: An iterator that provides a stream of (variable name, block indices tuple) tuples. """ return ( (str(var_name), get_empty_var_chunks(dataset[var_name])) for var_name in dataset.data_vars ) def get_empty_var_chunks(var: xr.DataArray) -> Iterator[Tuple[int, ...]]: """Identify empty variable chunks and return their indices. Args: var: The variable. Returns: A list of block indices. """ chunks = var.chunks if chunks is None: return None for chunk_index, chunk_slice in compute_chunk_slices(chunks): data_index = tuple(slice(start, end) for start, end in chunk_slice) data = var[data_index] if np.all(np.isnan(data)): # print(f'empty: {var.name}/{".".join(map(str, chunk_index))}') yield chunk_index def compute_chunk_slices(chunks: Tuple[Tuple[int, ...], ...]) -> Iterable: chunk_indices = [] for c in chunks: chunk_indices.append(tuple(i for i in range(len(c)))) chunk_slices = [] for c in chunks: x = [] o = 0 for s in c: x.append((o, o + s)) o += s chunk_slices.append(tuple(x)) return zip(itertools.product(*chunk_indices), itertools.product(*chunk_slices))