Source code for xcube.core.extract

from typing import Dict, Union, Tuple, Any, Optional, Mapping, Sequence, Hashable

import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.verify import assert_cube

DEFAULT_INDEX_NAME_PATTERN = '{name}_index'
DEFAULT_REF_NAME_PATTERN = '{name}_ref'
INDEX_DIM_NAME = "idx"

POINT_INTERP_METHOD_NEAREST = "nearest"
POINT_INTERP_METHOD_LINEAR = "linear"
DEFAULT_INTERP_POINT_METHOD = POINT_INTERP_METHOD_NEAREST


[docs]def get_cube_values_for_points(cube: xr.Dataset, points: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], var_names: Sequence[str] = None, include_coords: bool = False, include_bounds: bool = False, include_indexes: bool = False, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, include_refs: bool = False, ref_name_pattern: str = DEFAULT_REF_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False) -> xr.Dataset: """ Extract values from *cube* variables at given coordinates in *points*. Returns a new dataset with values of variables from *cube* selected by the coordinate columns provided in *points*. All variables will be 1-D and have the same order as the rows in points. :param cube: The cube dataset. :param points: Dictionary that maps dimension name to coordinate arrays. :param var_names: An optional list of names of data variables in *cube* whose values shall be extracted. :param include_coords: Whether to include the cube coordinates for each point in return value. :param include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. :param include_indexes: Whether to include computed indexes into the cube for each point in return value. :param index_name_pattern: A naming pattern for the computed index columns. Must include "{name}" which will be replaced by the index' dimension name. :param include_refs: Whether to include point (reference) values from *points* in return value. :param ref_name_pattern: A naming pattern for the computed point data columns. Must include "{name}" which will be replaced by the point's attribute name. :param method: "nearest" or "linear". :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A new data frame whose columns are values from *cube* variables at given *points*. """ if not cube_asserted: assert_cube(cube) point_indexes = get_cube_point_indexes(cube, points, index_name_pattern=index_name_pattern, index_dtype=np.int64 if method == POINT_INTERP_METHOD_NEAREST else np.float64, cube_asserted=True) cube_values = get_cube_values_for_indexes(cube, point_indexes, include_coords, include_bounds, data_var_names=var_names, index_name_pattern=index_name_pattern, method=method, cube_asserted=True) if include_indexes: cube_values.update(point_indexes) if include_refs: point_refs = xr.Dataset({ref_name_pattern.format(name=name): xr.DataArray(points[name], dims=[INDEX_DIM_NAME]) for name in points.keys()}) cube_values.update(point_refs) return cube_values
[docs]def get_cube_values_for_indexes(cube: xr.Dataset, indexes: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], include_coords: bool = False, include_bounds: bool = False, data_var_names: Sequence[str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False) -> xr.Dataset: """ Get values from the *cube* at given *indexes*. :param cube: A cube dataset. :param indexes: A mapping from column names to index and fraction arrays for all cube dimensions. :param include_coords: Whether to include the cube coordinates for each point in return value. :param include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. :param data_var_names: An optional list of names of data variables in *cube* whose values shall be extracted. :param index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. :param method: "nearest" or "linear". :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A new data frame whose columns are values from *cube* variables at given *indexes*. """ if not cube_asserted: assert_cube(cube) if method not in {POINT_INTERP_METHOD_NEAREST, POINT_INTERP_METHOD_LINEAR}: raise ValueError(f"invalid method {method!r}") if method != POINT_INTERP_METHOD_NEAREST: raise NotImplementedError(f"method {method!r} not yet implemented") all_data_var_names = tuple(cube.data_vars.keys()) if len(all_data_var_names) == 0: raise ValueError("cube is empty") if data_var_names is not None: if len(data_var_names) == 0: return xr.Dataset(coords=indexes.coords if hasattr(indexes, "coords") else None) for var_name in data_var_names: if var_name not in cube.data_vars: raise ValueError(f"variable {var_name!r} not found in cube") else: data_var_names = all_data_var_names dim_names = cube[data_var_names[0]].dims num_dims = len(dim_names) index_names = [index_name_pattern.format(name=dim_name) for dim_name in dim_names] num_points = _validate_points(indexes, index_names, param_name="indexes") indexes = _normalize_points(indexes) cube = xr.Dataset({var_name: cube[var_name] for var_name in data_var_names}, coords=cube.coords) new_bounds_vars = {} bounds_var_names = _get_coord_bounds_var_names(cube) drop_coords = None if bounds_var_names: if include_bounds: # Flatten any coordinate bounds variables for var_name, bnds_var_name in bounds_var_names.items(): bnds_var = cube[bnds_var_name] new_bounds_vars[f"{var_name}_lower"] = bnds_var[:, 0] new_bounds_vars[f"{var_name}_upper"] = bnds_var[:, 1] cube = cube.assign_coords(**new_bounds_vars) cube = cube.drop_vars(bounds_var_names.values()) if not include_coords: drop_coords = set(cube.coords).difference(new_bounds_vars.keys()) else: if not include_coords: drop_coords = set(cube.coords) # Generate a validation condition so we can filter out invalid rows (where any index == -1) is_valid_point = None for index_name in index_names: col = indexes[index_name] condition = col >= 0 if np.issubdtype(col.dtype, np.integer) else np.isnan(col) if is_valid_point is None: is_valid_point = condition else: is_valid_point = np.logical_and(is_valid_point, condition) num_valid_points = np.count_nonzero(is_valid_point) if num_valid_points == num_points: # All indexes valid cube_selector = {dim_names[i]: indexes[index_names[i]] for i in range(num_dims)} cube_values = cube.isel(cube_selector) elif num_valid_points == 0: # All indexes are invalid new_bounds_vars = {} for var_name in cube.variables: new_bounds_vars[var_name] = _empty_points_var(cube[var_name], num_points) cube_values = xr.Dataset(new_bounds_vars) else: # Some invalid indexes idx = np.arange(num_points) good_idx = idx[is_valid_point.values] idx_dim_name = indexes[index_names[0]].dims[0] good_indexes = indexes.isel({idx_dim_name: good_idx}) cube_selector = {dim_names[i]: good_indexes[index_names[i]] for i in range(num_dims)} cube_values = cube.isel(cube_selector) new_bounds_vars = {} for var_name in cube.variables: var = cube_values[var_name] new_var = _empty_points_var(var, num_points) new_var[good_idx] = var new_bounds_vars[var_name] = new_var cube_values = xr.Dataset(new_bounds_vars) if drop_coords: cube_values = cube_values.drop_vars(drop_coords) return cube_values
[docs]def get_cube_point_indexes(cube: xr.Dataset, points: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], dim_name_mapping: Mapping[str, str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, index_dtype=np.float64, cube_asserted: bool = False) -> xr.Dataset: """ Get indexes of given point coordinates *points* into the given *dataset*. :param cube: The cube dataset. :param points: A mapping from column names to column data arrays, which must all have the same length. :param dim_name_mapping: A mapping from dimension names in *cube* to column names in *points*. :param index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. :param index_dtype: Numpy data type for the indexes. If it is a floating point type (default), then *indexes* will contain fractions, which may be used for interpolation. For out-of-range coordinates in *points*, indexes will be -1 if *index_dtype* is an integer type, and NaN, if *index_dtype* is a floating point types. :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A dataset containing the index columns. """ if not cube_asserted: assert_cube(cube) dim_name_mapping = dim_name_mapping if dim_name_mapping is not None else {} dim_names = _get_cube_data_var_dims(cube) col_names = [dim_name_mapping.get(dim_name, dim_name) for dim_name in dim_names] _validate_points(points, col_names, param_name="points") indexes = [] for dim_name, col_name in zip(dim_names, col_names): col = points[col_name] coord_indexes = get_dataset_indexes(cube, str(dim_name), col, index_dtype=index_dtype) indexes.append((index_name_pattern.format(name=dim_name), xr.DataArray(coord_indexes, dims=[INDEX_DIM_NAME]))) return xr.Dataset(dict(indexes))
[docs]def get_dataset_indexes(dataset: xr.Dataset, coord_var_name: str, coord_values: Union[xr.DataArray, np.ndarray], index_dtype=np.float64) -> Union[xr.DataArray, np.ndarray]: """ Compute the indexes and their fractions into a coordinate variable *coord_var_name* of a *dataset* for the given coordinate values *coord_values*. The coordinate variable's labels must be monotonic increasing or decreasing, otherwise the result will be nonsense. For any value in *coord_values* that is out of the bounds of the coordinate variable's values, the index depends on the value of *index_dtype*. If *index_dtype* is an integer type, invalid indexes are encoded as -1 while for floating point types, NaN will be used. Returns a tuple of indexes as int64 array and fractions as float64 array. :param dataset: A cube dataset. :param coord_var_name: Name of a coordinate variable. :param coord_values: Array-like coordinate values. :param index_dtype: Numpy data type for the indexes. If it is floating point type (default), then *indexes* contain fractions, which may be used for interpolation. If *dtype* is an integer type out-of-range coordinates are indicated by index -1, and NaN if it is is a floating point type. :return: The indexes and their fractions as a tuple of numpy int64 and float64 arrays. """ coord_var = dataset[coord_var_name] n1 = coord_var.size n2 = n1 + 1 coord_bounds_var = _get_bounds_var(dataset, coord_var_name) if coord_bounds_var is not None: coord_bounds = coord_bounds_var.values if np.issubdtype(coord_bounds.dtype, np.datetime64): coord_bounds = coord_bounds.astype(np.uint64) is_inverse = (coord_bounds[0, 1] - coord_bounds[0, 0]) < 0 if is_inverse: coord_bounds = coord_bounds[::-1, ::-1] coords = np.zeros(n2, dtype=coord_bounds.dtype) coords[0:-1] = coord_bounds[:, 0] coords[-1] = coord_bounds[-1, 1] elif coord_var.size > 1: center_coords = coord_var.values if np.issubdtype(center_coords.dtype, np.datetime64): center_coords = center_coords.astype(np.uint64) is_inverse = (center_coords[-1] - center_coords[0]) < 0 if is_inverse: center_coords = center_coords[::-1] deltas = np.zeros(n2, dtype=center_coords.dtype) deltas[0:-2] = np.diff(center_coords) deltas[-2] = deltas[-3] deltas[-1] = deltas[-3] coords = np.zeros(n2, dtype=center_coords.dtype) coords[0:-1] = center_coords coords[-1] = coords[-2] + deltas[-1] if np.issubdtype(deltas.dtype, np.integer): coords -= deltas // 2 else: coords -= 0.5 * deltas else: raise ValueError(f"cannot determine cell boundaries for" f" coordinate variable {coord_var_name!r} of size {coord_var.size}") if np.issubdtype(coord_values.dtype, np.datetime64): try: coord_values = coord_values.astype(np.uint64) except TypeError: # Fixes https://github.com/dcs4cop/xcube/issues/95 coord_values = coord_values.values.astype(np.uint64) indexes = np.linspace(0.0, n1, n2, dtype=np.float64) interp_indexes = np.interp(coord_values, coords, indexes, left=-1, right=-1) if is_inverse: i = interp_indexes >= 0 interp_indexes[i] = n1 - interp_indexes[i] interp_indexes[interp_indexes >= n1] = n1 - 1e-9 if np.issubdtype(index_dtype, np.integer): interp_indexes = interp_indexes.astype(index_dtype) else: interp_indexes[interp_indexes < 0] = np.nan return interp_indexes
def _empty_points_var(var: xr.DataArray, num_points): fill_value = 0 if np.issubdtype(var.dtype, np.integer) else np.nan return xr.DataArray(np.full(num_points, fill_value, dtype=var.dtype), dims=[INDEX_DIM_NAME], attrs=var.attrs) def _normalize_points(points: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]]) -> xr.Dataset: if isinstance(points, pd.DataFrame): points = xr.Dataset.from_dataframe(points) elif not isinstance(points, xr.DataArray): points = xr.Dataset(points) return points def _validate_points(points: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], col_names: Sequence[str], param_name="points") -> Optional[int]: num_points = None for col_name in col_names: if col_name not in points: raise ValueError(f"column {col_name!r} not found in {param_name}") col = points[col_name] if len(col.shape) != 1: raise ValueError( f"column {col_name!r} in {param_name} must be one-dimensional, but has shape {col.shape!r}") if num_points is None: num_points = len(col) elif num_points != len(col): raise ValueError(f"column sizes in {param_name} must be all the same," f" but found {len(points[col_names[0]])} for column {col_names[0]!r}" f" and {num_points} for column {col_name!r}") return num_points def _get_coord_bounds_var_names(dataset: xr.Dataset) -> Dict[Hashable, Any]: bnds_var_names = {} for var_name in dataset.coords: var = dataset[var_name] bnds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bnds_var_name not in bnds_var_names and bnds_var_name in dataset: bnds_var = dataset[bnds_var_name] if bnds_var.shape == (var.shape[0], 2): bnds_var_names[var_name] = bnds_var_name return bnds_var_names def _get_cube_data_var_dims(cube: xr.Dataset) -> Tuple[Hashable, ...]: for var in cube.data_vars.values(): return var.dims raise ValueError("cube dataset is empty") def _get_bounds_var(dataset: xr.Dataset, var_name: str) -> Optional[xr.DataArray]: var = dataset[var_name] if len(var.shape) == 1: bounds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bounds_var_name in dataset: bounds_var = dataset[bounds_var_name] if bounds_var.dtype == var.dtype and bounds_var.shape == (var.size, 2): return bounds_var return None