Source code for xcube.core.compute

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

import inspect
import warnings
from typing import Tuple, Sequence, Dict, Any, Callable, Union, AbstractSet

import numpy as np
import xarray as xr

from xcube.core.schema import CubeSchema
from xcube.core.chunkstore import ChunkStore
from xcube.core.verify import assert_cube

CubeFuncOutput = Union[
    xr.DataArray, np.ndarray, Sequence[Union[xr.DataArray, np.ndarray]]
]
CubeFunc = Callable[..., CubeFuncOutput]

_PREDEFINED_KEYWORDS = ["input_params", "dim_coords", "dim_ranges"]


# TODO: support vectorize = all cubes have same variables and cube_func receives variables as vectors (with extra dim)


[docs] def compute_cube( cube_func: CubeFunc, *input_cubes: xr.Dataset, input_cube_schema: CubeSchema = None, input_var_names: Sequence[str] = None, input_params: Dict[str, Any] = None, output_var_name: str = "output", output_var_dtype: Any = np.float64, output_var_attrs: Dict[str, Any] = None, vectorize: bool = None, cube_asserted: bool = False, ) -> xr.Dataset: """Compute a new output data cube with a single variable named *output_var_name* from variables named *input_var_names* contained in zero, one, or more input data cubes in *input_cubes* using a cube factory function *cube_func*. For a more detailed description of the function usage, please refer to :func:`compute_dataset`. Args: cube_func: The cube factory function. input_cubes: An optional sequence of input cube datasets, must be provided if *input_cube_schema* is not. input_cube_schema: An optional input cube schema, must be provided if *input_cubes* is not. Will be ignored if *input_cubes* is provided. input_var_names: A sequence of variable names input_params: Optional dictionary with processing parameters passed to *cube_func*. output_var_name: Optional name of the output variable, defaults to ``'output'``. output_var_dtype: Optional numpy datatype of the output variable, defaults to ``'float32'``. output_var_attrs: Optional metadata attributes for the output variable. vectorize: Whether all *input_cubes* have the same variables which are concatenated and passed as vectors to *cube_func*. Not implemented yet. cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. Returns: A new dataset that contains the computed output variable. """ return compute_dataset( cube_func, *input_cubes, input_cube_schema=input_cube_schema, input_var_names=input_var_names, input_params=input_params, output_var_name=output_var_name, output_var_dtype=output_var_dtype, output_var_attrs=output_var_attrs, vectorize=vectorize, cube_asserted=cube_asserted, )
def compute_dataset( cube_func: CubeFunc, *input_cubes: xr.Dataset, input_cube_schema: CubeSchema = None, input_var_names: Sequence[str] = None, input_params: Dict[str, Any] = None, output_var_name: str = "output", output_var_dims: AbstractSet[str] = None, output_var_dtype: Any = np.float64, output_var_attrs: Dict[str, Any] = None, vectorize: bool = None, cube_asserted: bool = False, ) -> xr.Dataset: """Compute a new output dataset with a single variable named *output_var_name* from variables named *input_var_names* contained in zero, one, or more input data cubes in *input_cubes* using a cube factory function *cube_func*. *cube_func* is called concurrently for each of the chunks of the input variables. It is expected to return a chunk block whith is type ``np.ndarray``. If *input_cubes* is not empty, *cube_func* receives variables as specified by *input_var_names*. If *input_cubes* is empty, *input_var_names* must be empty too, and *input_cube_schema* must be given, so that a new cube can be created. The full signature of *cube_func* is::: def cube_func(*input_vars: np.ndarray, input_params: Dict[str, Any] = None, dim_coords: Dict[str, np.ndarray] = None, dim_ranges: Dict[str, Tuple[int, int]] = None) -> np.ndarray: pass The arguments are: * ``input_vars``: the variables according to the given *input_var_names*; * ``input_params``: is this call's *input_params*, a mapping from parameter name to value; * ``dim_coords``: a mapping from dimension names to the current chunk's coordinate arrays; * ``dim_ranges``: a mapping from dimension names to the current chunk's index ranges. Only the ``input_vars`` argument is mandatory. The keyword arguments ``input_params``, ``input_params``, ``input_params`` do need to be present at all. *output_var_dims* my be given in the case, where ... TODO: describe new output_var_dims... Args: cube_func: The cube factory function. *input_cubes: An optional sequence of input cube datasets, must be provided if *input_cube_schema* is not. input_cube_schema: An optional input cube schema, must be provided if *input_cubes* is not. input_var_names: A sequence of variable names input_params: Optional dictionary with processing parameters passed to *cube_func*. output_var_name: Optional name of the output variable, defaults to ``'output'``. output_var_dims: Optional set of names of the output dimensions, used in the case *cube_func* reduces dimensions. output_var_dtype: Optional numpy datatype of the output variable, defaults to ``'float32'``. output_var_attrs: Optional metadata attributes for the output variable. vectorize: Whether all *input_cubes* have the same variables which are concatenated and passed as vectors to *cube_func*. Not implemented yet. cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. Returns: A new dataset that contains the computed output variable. """ if vectorize is not None: # TODO: support vectorize = all cubes have same variables and cube_func # receives variables as vectors (with extra dim) raise NotImplementedError("vectorize is not supported yet") if not cube_asserted: for cube in input_cubes: assert_cube(cube) # Check compatibility of inputs if input_cubes: input_cube_schema = CubeSchema.new(input_cubes[0]) for cube in input_cubes: if not cube_asserted: assert_cube(cube) if cube != input_cubes[0]: # noinspection PyUnusedLocal other_schema = CubeSchema.new(cube) # TODO (forman): broadcast all cubes to same shape, rechunk to same chunks elif input_cube_schema is None: raise ValueError("input_cube_schema must be given") output_var_name = output_var_name or "output" # Collect named input variables, raise if not found input_var_names = input_var_names or [] input_vars = [] for var_name in input_var_names: input_var = None for cube in input_cubes: if var_name in cube.data_vars: input_var = cube[var_name] break if input_var is None: raise ValueError(f"variable {var_name!r} not found in any of cubes") input_vars.append(input_var) # Find out, if cube_func uses any of _PREDEFINED_KEYWORDS has_input_params, has_dim_coords, has_dim_ranges = _inspect_cube_func( cube_func, input_var_names ) def cube_func_wrapper(index_chunk, *input_var_chunks): nonlocal input_cube_schema, input_var_names, input_params, input_vars nonlocal has_input_params, has_dim_coords, has_dim_ranges # Note, xarray.apply_ufunc does a test call with empty input arrays, # so index_chunk.size == 0 is a valid case empty_call = index_chunk.size == 0 # TODO: when output_var_dims is given, index_chunk must be reordered # as core dimensions are moved to the and of index_chunk and input_var_chunks if not empty_call: index_chunk = index_chunk.ravel() if index_chunk.size < 2 * input_cube_schema.ndim: if not empty_call: warnings.warn( f"unexpected index_chunk of size {index_chunk.size} received!" ) return None dim_ranges = None if has_dim_ranges or has_dim_coords: dim_ranges = {} for i in range(input_cube_schema.ndim): dim_name = input_cube_schema.dims[i] if not empty_call: start = int(index_chunk[2 * i + 0]) end = int(index_chunk[2 * i + 1]) dim_ranges[dim_name] = start, end else: dim_ranges[dim_name] = () dim_coords = None if has_dim_coords: dim_coords = {} for coord_var_name, coord_var in input_cube_schema.coords.items(): coord_slices = [slice(None)] * coord_var.ndim for i in range(input_cube_schema.ndim): dim_name = input_cube_schema.dims[i] if dim_name in coord_var.dims: j = coord_var.dims.index(dim_name) coord_slices[j] = slice(*dim_ranges[dim_name]) dim_coords[coord_var_name] = coord_var[tuple(coord_slices)].values kwargs = {} if has_input_params: kwargs["input_params"] = input_params if has_dim_ranges: kwargs["dim_ranges"] = dim_ranges if has_dim_coords: kwargs["dim_coords"] = dim_coords return cube_func(*input_var_chunks, **kwargs) index_var = _gen_index_var(input_cube_schema) all_input_vars = [index_var] + input_vars input_core_dims = None if output_var_dims: input_core_dims = [] has_warned = False for i in range(len(all_input_vars)): input_var = all_input_vars[i] var_core_dims = [ dim for dim in input_var.dims if dim not in output_var_dims ] must_rechunk = False if var_core_dims and input_var.chunks: for var_core_dim in var_core_dims: dim_index = input_var.dims.index(var_core_dim) dim_chunk_size = input_var.chunks[dim_index][0] dim_shape_size = input_var.shape[dim_index] if dim_chunk_size != dim_shape_size: must_rechunk = True break if must_rechunk: if not has_warned: warnings.warn( f'Input variables must not be chunked in dimension(s): {", ".join(var_core_dims)}.\n' f"Rechunking applies, which may drastically decrease runtime performance " f"and increase memory usage." ) has_warned = True all_input_vars[i] = input_var.chunk( {var_core_dim: -1 for var_core_dim in var_core_dims} ) input_core_dims.append(var_core_dims) output_var = xr.apply_ufunc( cube_func_wrapper, *all_input_vars, dask="parallelized", input_core_dims=input_core_dims, output_dtypes=[output_var_dtype], ) if output_var_attrs: output_var.attrs.update(output_var_attrs) return xr.Dataset({output_var_name: output_var}, coords=input_cube_schema.coords) def _inspect_cube_func(cube_func: CubeFunc, input_var_names: Sequence[str] = None): ( args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations, ) = inspect.getfullargspec(cube_func) cube_func_name = "?" if hasattr(cube_func, "__name__"): cube_func_name = cube_func.__name__ true_args = [arg not in _PREDEFINED_KEYWORDS for arg in args] if False in true_args and any(true_args[true_args.index(False) :]): raise ValueError( f"invalid cube_func {cube_func_name!r}: " f'any argument must occur before any of {", ".join(_PREDEFINED_KEYWORDS)}, ' f'but got {", ".join(args)}' ) if not all(true_args) and varargs: raise ValueError( f"invalid cube_func {cube_func_name!r}: " f'any argument must occur before any of {", ".join(_PREDEFINED_KEYWORDS)}, ' f'but got {", ".join(args)} before *{varargs}' ) num_input_vars = len(input_var_names) if input_var_names else 0 num_args = sum(true_args) if varargs is None and num_input_vars != num_args: raise ValueError( f"invalid cube_func {cube_func_name!r}: " f"expected {num_input_vars} arguments, " f'but got {", ".join(args)}' ) has_input_params = "input_params" in args or "input_params" in kwonlyargs has_dim_coords = "dim_coords" in args or "dim_coords" in kwonlyargs has_dim_ranges = "dim_ranges" in args or "dim_ranges" in kwonlyargs return has_input_params, has_dim_coords, has_dim_ranges def _gen_index_var(cube_schema: CubeSchema): dims = cube_schema.dims shape = cube_schema.shape chunks = cube_schema.chunks # noinspection PyUnusedLocal def get_chunk(cube_store: ChunkStore, name: str, index: Tuple[int, ...]) -> bytes: data = np.zeros(cube_store.chunks, dtype=np.uint64) data_view = data.ravel() if data_view.base is not data: raise ValueError("view expected") if data_view.size < cube_store.ndim * 2: raise ValueError("size too small") for i in range(cube_store.ndim): j1 = cube_store.chunks[i] * index[i] j2 = j1 + cube_store.chunks[i] data_view[2 * i] = j1 data_view[2 * i + 1] = j2 return data.tobytes() store = ChunkStore(dims, shape, chunks) store.add_lazy_array("__index_var__", "<u8", get_chunk=get_chunk) dataset = xr.open_zarr(store) index_var = dataset.__index_var__ index_var = index_var.assign_coords(**cube_schema.coords) return index_var