Source code for xcube.core.extract

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

from typing import Dict, Union, Tuple, Any, Optional
from collections.abc import Mapping, Sequence, Hashable

import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.verify import assert_cube

DEFAULT_INDEX_NAME_PATTERN = "{name}_index"
DEFAULT_REF_NAME_PATTERN = "{name}_ref"
INDEX_DIM_NAME = "idx"

POINT_INTERP_METHOD_NEAREST = "nearest"
POINT_INTERP_METHOD_LINEAR = "linear"
DEFAULT_INTERP_POINT_METHOD = POINT_INTERP_METHOD_NEAREST

SeriesLike = Union[
    np.ndarray, da.Array, xr.DataArray, pd.Series, Sequence[Union[int, float]]
]

PointsLike = Union[xr.Dataset, pd.DataFrame, Mapping[str, SeriesLike]]


[docs] def get_cube_values_for_points( cube: xr.Dataset, points: PointsLike, var_names: Sequence[str] = None, include_coords: bool = False, include_bounds: bool = False, include_indexes: bool = False, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, include_refs: bool = False, ref_name_pattern: str = DEFAULT_REF_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False, ) -> xr.Dataset: """Extract values from *cube* variables at given coordinates in *points*. Returns a new dataset with values of variables from *cube* selected by the coordinate columns provided in *points*. All variables will be 1-D and have the same order as the rows in points. Args: cube: The cube dataset. points: Dictionary that maps dimension name to coordinate arrays. var_names: An optional list of names of data variables in *cube* whose values shall be extracted. include_coords: Whether to include the cube coordinates for each point in return value. include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. include_indexes: Whether to include computed indexes into the cube for each point in return value. index_name_pattern: A naming pattern for the computed index columns. Must include "{name}" which will be replaced by the index' dimension name. include_refs: Whether to include point (reference) values from *points* in return value. ref_name_pattern: A naming pattern for the computed point data columns. Must include "{name}" which will be replaced by the point's attribute name. method: "nearest" or "linear". cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. Returns: A new data frame whose columns are values from *cube* variables at given *points*. """ if not cube_asserted: assert_cube(cube) index_dtype = np.int64 if method == POINT_INTERP_METHOD_NEAREST else np.float64 point_indexes = get_cube_point_indexes( cube, points, index_name_pattern=index_name_pattern, index_dtype=index_dtype, cube_asserted=True, ) cube_values = get_cube_values_for_indexes( cube, point_indexes, include_coords, include_bounds, data_var_names=var_names, index_name_pattern=index_name_pattern, method=method, cube_asserted=True, ) if include_indexes: cube_values.update(point_indexes) if include_refs: point_refs = xr.Dataset( { ref_name_pattern.format(name=name): xr.DataArray( points[name], dims=[INDEX_DIM_NAME] ) for name in points.keys() } ) cube_values.update(point_refs) # The (coordinate) variable named INDEX_DIM_NAME should not exist, # otherwise cube_values.to_dataframe() will use an empty string # to name that series "" in the output DataFrame instance. # This seems to be an issue of xarray 0.18.2+. if INDEX_DIM_NAME in cube_values: cube_values = cube_values.drop_vars(INDEX_DIM_NAME) return cube_values
[docs] def get_cube_values_for_indexes( cube: xr.Dataset, indexes: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], include_coords: bool = False, include_bounds: bool = False, data_var_names: Sequence[str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False, ) -> xr.Dataset: """Get values from the *cube* at given *indexes*. Args: cube: A cube dataset. indexes: A mapping from column names to index and fraction arrays for all cube dimensions. include_coords: Whether to include the cube coordinates for each point in return value. include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. data_var_names: An optional list of names of data variables in *cube* whose values shall be extracted. index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. method: "nearest" or "linear". cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. Returns: A new data frame whose columns are values from *cube* variables at given *indexes*. """ if not cube_asserted: assert_cube(cube) if method not in {POINT_INTERP_METHOD_NEAREST, POINT_INTERP_METHOD_LINEAR}: raise ValueError(f"invalid method {method!r}") if method != POINT_INTERP_METHOD_NEAREST: raise NotImplementedError(f"method {method!r} not yet implemented") all_data_var_names = tuple(cube.data_vars.keys()) if len(all_data_var_names) == 0: raise ValueError("cube is empty") if data_var_names is not None: if len(data_var_names) == 0: return _empty_dataset_from_points(indexes) for var_name in data_var_names: if var_name not in cube.data_vars: raise ValueError(f"variable {var_name!r} not found in cube") else: data_var_names = all_data_var_names dim_names = cube[data_var_names[0]].dims num_dims = len(dim_names) index_names = [index_name_pattern.format(name=dim_name) for dim_name in dim_names] indexes, num_points = _normalize_series( indexes, index_names, force_dataset=True, param_name="indexes" ) if num_points == 0: return _empty_dataset_from_points(indexes) cube = xr.Dataset( {var_name: cube[var_name] for var_name in data_var_names}, coords=cube.coords ) new_bounds_vars = {} bounds_var_names = _get_coord_bounds_var_names(cube) drop_coords = None if bounds_var_names: if include_bounds: # Flatten any coordinate bounds variables for var_name, bnds_var_name in bounds_var_names.items(): bnds_var = cube[bnds_var_name] new_bounds_vars[f"{var_name}_lower"] = bnds_var[:, 0] new_bounds_vars[f"{var_name}_upper"] = bnds_var[:, 1] cube = cube.assign_coords(**new_bounds_vars) cube = cube.drop_vars(bounds_var_names.values()) if not include_coords: drop_coords = set(cube.coords).difference(new_bounds_vars.keys()) else: if not include_coords: drop_coords = set(cube.coords) # Generate a validation condition so we can # filter out invalid rows (where any index == -1) is_valid_point = None for index_name in index_names: col = indexes[index_name] condition = col >= 0 if np.issubdtype(col.dtype, np.integer) else np.isnan(col) if is_valid_point is None: is_valid_point = condition else: is_valid_point = np.logical_and(is_valid_point, condition) num_valid_points = np.count_nonzero(is_valid_point) if num_valid_points == num_points: # All indexes valid cube_selector = {dim_names[i]: indexes[index_names[i]] for i in range(num_dims)} cube_values = cube.isel(cube_selector) elif num_valid_points == 0: # All indexes are invalid new_bounds_vars = {} for var_name in cube.variables: new_bounds_vars[var_name] = _empty_points_var(cube[var_name], num_points) cube_values = xr.Dataset(new_bounds_vars) else: # Some invalid indexes idx = np.arange(num_points) good_idx = idx[is_valid_point.values] idx_dim_name = indexes[index_names[0]].dims[0] good_indexes = indexes.isel({idx_dim_name: good_idx}) cube_selector = { dim_names[i]: good_indexes[index_names[i]] for i in range(num_dims) } cube_values = cube.isel(cube_selector) new_bounds_vars = {} for var_name in cube.variables: var = cube_values[var_name] new_var = _empty_points_var(var, num_points) new_var[good_idx] = var new_bounds_vars[var_name] = new_var cube_values = xr.Dataset(new_bounds_vars) if drop_coords: cube_values = cube_values.drop_vars(drop_coords) return cube_values
[docs] def get_cube_point_indexes( cube: xr.Dataset, points: PointsLike, dim_name_mapping: Mapping[str, str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, index_dtype=np.float64, cube_asserted: bool = False, ) -> xr.Dataset: """Get indexes of given point coordinates *points* into the given *dataset*. Args: cube: The cube dataset. points: A mapping from column names to column data arrays, which must all have the same length. dim_name_mapping: A mapping from dimension names in *cube* to column names in *points*. index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. index_dtype: Numpy data type for the indexes. If it is a floating point type (default), then *indexes* will contain fractions, which may be used for interpolation. For out-of- range coordinates in *points*, indexes will be -1 if *index_dtype* is an integer type, and NaN, if *index_dtype* is a floating point types. cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. Returns: A dataset containing the index columns. """ if not cube_asserted: assert_cube(cube) dim_name_mapping = dim_name_mapping if dim_name_mapping is not None else {} dim_names = _get_cube_data_var_dims(cube) col_names = [ dim_name_mapping.get(str(dim_name), dim_name) for dim_name in dim_names ] points, _ = _normalize_series( points, col_names, force_dataset=False, param_name="points" ) indexes = [] for dim_name, col_name in zip(dim_names, col_names): col = points[col_name] coord_indexes = get_dataset_indexes( cube, str(dim_name), col, index_dtype=index_dtype ) indexes.append( ( index_name_pattern.format(name=dim_name), xr.DataArray(coord_indexes, dims=[INDEX_DIM_NAME]), ) ) return xr.Dataset(dict(indexes))
[docs] def get_dataset_indexes( dataset: xr.Dataset, coord_var_name: str, coord_values: SeriesLike, index_dtype=np.float64, ) -> Union[xr.DataArray, np.ndarray]: """Compute the indexes and their fractions into a coordinate variable *coord_var_name* of a *dataset* for the given coordinate values *coord_values*. The coordinate variable's labels must be monotonic increasing or decreasing, otherwise the result will be nonsense. For any value in *coord_values* that is out of the bounds of the coordinate variable's values, the index depends on the value of *index_dtype*. If *index_dtype* is an integer type, invalid indexes are encoded as -1 while for floating point types, NaN will be used. Returns a tuple of indexes as int64 array and fractions as float64 array. Args: dataset: A cube dataset. coord_var_name: Name of a coordinate variable. coord_values: Array-like coordinate values. index_dtype: Numpy data type for the indexes. If it is floating point type (default), then *indexes* contain fractions, which may be used for interpolation. If *dtype* is an integer type out-of-range coordinates are indicated by index -1, and NaN if it is is a floating point type. Returns: The indexes and their fractions as a tuple of numpy int64 and float64 arrays. """ coord_var = dataset[coord_var_name] n1 = coord_var.size n2 = n1 + 1 coord_bounds_var = _get_bounds_var(dataset, coord_var_name) if coord_bounds_var is not None: coord_bounds = coord_bounds_var.values if np.issubdtype(coord_bounds.dtype, np.datetime64): coord_bounds = coord_bounds.astype(np.uint64) is_inverse = (coord_bounds[0, 1] - coord_bounds[0, 0]) < 0 if is_inverse: coord_bounds = coord_bounds[::-1, ::-1] coords = np.zeros(n2, dtype=coord_bounds.dtype) coords[0:-1] = coord_bounds[:, 0] coords[-1] = coord_bounds[-1, 1] elif coord_var.size > 1: center_coords = coord_var.values if np.issubdtype(center_coords.dtype, np.datetime64): center_coords = center_coords.astype(np.uint64) is_inverse = (center_coords[-1] - center_coords[0]) < 0 if is_inverse: center_coords = center_coords[::-1] deltas = np.zeros(n2, dtype=center_coords.dtype) deltas[0:-2] = np.diff(center_coords) deltas[-2] = deltas[-3] deltas[-1] = deltas[-3] coords = np.zeros(n2, dtype=center_coords.dtype) coords[0:-1] = center_coords coords[-1] = coords[-2] + deltas[-1] if np.issubdtype(deltas.dtype, np.integer): coords -= deltas // 2 else: coords -= 0.5 * deltas else: raise ValueError( f"cannot determine cell boundaries for" f" coordinate variable {coord_var_name!r}" f" of size {coord_var.size}" ) if np.issubdtype(coord_values.dtype, np.datetime64): try: coord_values = coord_values.astype(np.uint64) except TypeError: # Fixes https://github.com/dcs4cop/xcube/issues/95 coord_values = coord_values.values.astype(np.uint64) indexes = np.linspace(0.0, n1, n2, dtype=np.float64) interp_indexes = np.interp(coord_values, coords, indexes, left=-1, right=-1) if is_inverse: i = interp_indexes >= 0 interp_indexes[i] = n1 - interp_indexes[i] interp_indexes[interp_indexes >= n1] = n1 - 1e-9 if np.issubdtype(index_dtype, np.integer): interp_indexes = interp_indexes.astype(index_dtype) else: interp_indexes[interp_indexes < 0] = np.nan return interp_indexes
def _empty_points_var(var: xr.DataArray, num_points: int): fill_value = 0 if np.issubdtype(var.dtype, np.integer) else np.nan return xr.DataArray( np.full(num_points, fill_value, dtype=var.dtype), dims=[INDEX_DIM_NAME], attrs=var.attrs, ) def _normalize_series( points: PointsLike, col_names: Sequence[str], force_dataset: bool = False, param_name: str = "points", ) -> tuple[PointsLike, int]: if not isinstance(points, (xr.Dataset, pd.DataFrame)): new_points = {} for col_name in col_names: if col_name in points: col = points[col_name] if not isinstance(col, xr.DataArray): col = xr.DataArray(col) new_points[col_name] = col points = new_points num_points = None for col_name in col_names: if col_name not in points: raise ValueError(f"column {col_name!r} not found in {param_name}") col = points[col_name] if len(col.shape) != 1: raise ValueError( f"column {col_name!r} in {param_name} must be" f" one-dimensional, but has shape {col.shape!r}" ) if num_points is None: num_points = len(col) elif num_points != len(col): raise ValueError( f"column sizes in {param_name} must be all the" f" same, but found {len(points[col_names[0]])}" f" for column {col_names[0]!r} and {num_points}" f" for column {col_name!r}" ) if num_points is None: raise ValueError(f"{param_name} has no valid columns") if force_dataset: if isinstance(points, pd.DataFrame): points = xr.Dataset.from_dataframe(points) elif not isinstance(points, xr.DataArray): points = xr.Dataset(points) return points, num_points def _get_coord_bounds_var_names(dataset: xr.Dataset) -> dict[Hashable, Any]: bnds_var_names = {} for var_name in dataset.coords: var = dataset[var_name] bnds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bnds_var_name not in bnds_var_names and bnds_var_name in dataset: bnds_var = dataset[bnds_var_name] if bnds_var.shape == (var.shape[0], 2): bnds_var_names[var_name] = bnds_var_name return bnds_var_names def _get_cube_data_var_dims(cube: xr.Dataset) -> tuple[Hashable, ...]: for var in cube.data_vars.values(): return var.dims raise ValueError("cube dataset is empty") def _get_bounds_var(dataset: xr.Dataset, var_name: str) -> Optional[xr.DataArray]: var = dataset[var_name] if len(var.shape) == 1: bounds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bounds_var_name in dataset: bounds_var = dataset[bounds_var_name] if bounds_var.dtype == var.dtype and bounds_var.shape == (var.size, 2): return bounds_var return None def _empty_dataset_from_points(data: Any) -> xr.Dataset: return xr.Dataset(coords=data.coords if hasattr(data, "coords") else None)