Source code for xcube.core.extract

from typing import Dict, Union, Tuple, Any, Optional, \
    Mapping, Sequence, Hashable

import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.verify import assert_cube

DEFAULT_INDEX_NAME_PATTERN = '{name}_index'
DEFAULT_REF_NAME_PATTERN = '{name}_ref'
INDEX_DIM_NAME = "idx"

POINT_INTERP_METHOD_NEAREST = "nearest"
POINT_INTERP_METHOD_LINEAR = "linear"
DEFAULT_INTERP_POINT_METHOD = POINT_INTERP_METHOD_NEAREST

SeriesLike = Union[
    np.ndarray,
    da.Array,
    xr.DataArray,
    pd.Series,
    Sequence[Union[int, float]]
]

PointsLike = Union[
    xr.Dataset,
    pd.DataFrame,
    Mapping[str, SeriesLike]
]


[docs]def get_cube_values_for_points( cube: xr.Dataset, points: PointsLike, var_names: Sequence[str] = None, include_coords: bool = False, include_bounds: bool = False, include_indexes: bool = False, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, include_refs: bool = False, ref_name_pattern: str = DEFAULT_REF_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False ) -> xr.Dataset: """ Extract values from *cube* variables at given coordinates in *points*. Returns a new dataset with values of variables from *cube* selected by the coordinate columns provided in *points*. All variables will be 1-D and have the same order as the rows in points. :param cube: The cube dataset. :param points: Dictionary that maps dimension name to coordinate arrays. :param var_names: An optional list of names of data variables in *cube* whose values shall be extracted. :param include_coords: Whether to include the cube coordinates for each point in return value. :param include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. :param include_indexes: Whether to include computed indexes into the cube for each point in return value. :param index_name_pattern: A naming pattern for the computed index columns. Must include "{name}" which will be replaced by the index' dimension name. :param include_refs: Whether to include point (reference) values from *points* in return value. :param ref_name_pattern: A naming pattern for the computed point data columns. Must include "{name}" which will be replaced by the point's attribute name. :param method: "nearest" or "linear". :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A new data frame whose columns are values from *cube* variables at given *points*. """ if not cube_asserted: assert_cube(cube) index_dtype = np.int64 \ if method == POINT_INTERP_METHOD_NEAREST else np.float64 point_indexes = get_cube_point_indexes( cube, points, index_name_pattern=index_name_pattern, index_dtype=index_dtype, cube_asserted=True ) cube_values = get_cube_values_for_indexes( cube, point_indexes, include_coords, include_bounds, data_var_names=var_names, index_name_pattern=index_name_pattern, method=method, cube_asserted=True ) if include_indexes: cube_values.update(point_indexes) if include_refs: point_refs = xr.Dataset({ ref_name_pattern.format(name=name): xr.DataArray(points[name], dims=[INDEX_DIM_NAME]) for name in points.keys() }) cube_values.update(point_refs) # The (coordinate) variable named INDEX_DIM_NAME should not exist, # otherwise cube_values.to_dataframe() will use an empty string # to name that series "" in the output DataFrame instance. # This seems to be an issue of xarray 0.18.2+. if INDEX_DIM_NAME in cube_values: cube_values = cube_values.drop_vars(INDEX_DIM_NAME) return cube_values
[docs]def get_cube_values_for_indexes( cube: xr.Dataset, indexes: Union[xr.Dataset, pd.DataFrame, Mapping[str, Any]], include_coords: bool = False, include_bounds: bool = False, data_var_names: Sequence[str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, method: str = DEFAULT_INTERP_POINT_METHOD, cube_asserted: bool = False ) -> xr.Dataset: """ Get values from the *cube* at given *indexes*. :param cube: A cube dataset. :param indexes: A mapping from column names to index and fraction arrays for all cube dimensions. :param include_coords: Whether to include the cube coordinates for each point in return value. :param include_bounds: Whether to include the cube coordinate boundaries (if any) for each point in return value. :param data_var_names: An optional list of names of data variables in *cube* whose values shall be extracted. :param index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. :param method: "nearest" or "linear". :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A new data frame whose columns are values from *cube* variables at given *indexes*. """ if not cube_asserted: assert_cube(cube) if method not in {POINT_INTERP_METHOD_NEAREST, POINT_INTERP_METHOD_LINEAR}: raise ValueError(f"invalid method {method!r}") if method != POINT_INTERP_METHOD_NEAREST: raise NotImplementedError(f"method {method!r} not yet implemented") all_data_var_names = tuple(cube.data_vars.keys()) if len(all_data_var_names) == 0: raise ValueError("cube is empty") if data_var_names is not None: if len(data_var_names) == 0: return _empty_dataset_from_points(indexes) for var_name in data_var_names: if var_name not in cube.data_vars: raise ValueError(f"variable {var_name!r} not found in cube") else: data_var_names = all_data_var_names dim_names = cube[data_var_names[0]].dims num_dims = len(dim_names) index_names = [index_name_pattern.format(name=dim_name) for dim_name in dim_names] indexes, num_points = _normalize_series(indexes, index_names, force_dataset=True, param_name="indexes") if num_points == 0: return _empty_dataset_from_points(indexes) cube = xr.Dataset({var_name: cube[var_name] for var_name in data_var_names}, coords=cube.coords) new_bounds_vars = {} bounds_var_names = _get_coord_bounds_var_names(cube) drop_coords = None if bounds_var_names: if include_bounds: # Flatten any coordinate bounds variables for var_name, bnds_var_name in bounds_var_names.items(): bnds_var = cube[bnds_var_name] new_bounds_vars[f"{var_name}_lower"] = bnds_var[:, 0] new_bounds_vars[f"{var_name}_upper"] = bnds_var[:, 1] cube = cube.assign_coords(**new_bounds_vars) cube = cube.drop_vars(bounds_var_names.values()) if not include_coords: drop_coords = set(cube.coords).difference(new_bounds_vars.keys()) else: if not include_coords: drop_coords = set(cube.coords) # Generate a validation condition so we can # filter out invalid rows (where any index == -1) is_valid_point = None for index_name in index_names: col = indexes[index_name] condition = col >= 0 \ if np.issubdtype(col.dtype, np.integer) else np.isnan(col) if is_valid_point is None: is_valid_point = condition else: is_valid_point = np.logical_and(is_valid_point, condition) num_valid_points = np.count_nonzero(is_valid_point) if num_valid_points == num_points: # All indexes valid cube_selector = {dim_names[i]: indexes[index_names[i]] for i in range(num_dims)} cube_values = cube.isel(cube_selector) elif num_valid_points == 0: # All indexes are invalid new_bounds_vars = {} for var_name in cube.variables: new_bounds_vars[var_name] = _empty_points_var(cube[var_name], num_points) cube_values = xr.Dataset(new_bounds_vars) else: # Some invalid indexes idx = np.arange(num_points) good_idx = idx[is_valid_point.values] idx_dim_name = indexes[index_names[0]].dims[0] good_indexes = indexes.isel({idx_dim_name: good_idx}) cube_selector = {dim_names[i]: good_indexes[index_names[i]] for i in range(num_dims)} cube_values = cube.isel(cube_selector) new_bounds_vars = {} for var_name in cube.variables: var = cube_values[var_name] new_var = _empty_points_var(var, num_points) new_var[good_idx] = var new_bounds_vars[var_name] = new_var cube_values = xr.Dataset(new_bounds_vars) if drop_coords: cube_values = cube_values.drop_vars(drop_coords) return cube_values
[docs]def get_cube_point_indexes( cube: xr.Dataset, points: PointsLike, dim_name_mapping: Mapping[str, str] = None, index_name_pattern: str = DEFAULT_INDEX_NAME_PATTERN, index_dtype=np.float64, cube_asserted: bool = False ) -> xr.Dataset: """ Get indexes of given point coordinates *points* into the given *dataset*. :param cube: The cube dataset. :param points: A mapping from column names to column data arrays, which must all have the same length. :param dim_name_mapping: A mapping from dimension names in *cube* to column names in *points*. :param index_name_pattern: A naming pattern for the computed indexes columns. Must include "{name}" which will be replaced by the dimension name. :param index_dtype: Numpy data type for the indexes. If it is a floating point type (default), then *indexes* will contain fractions, which may be used for interpolation. For out-of-range coordinates in *points*, indexes will be -1 if *index_dtype* is an integer type, and NaN, if *index_dtype* is a floating point types. :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A dataset containing the index columns. """ if not cube_asserted: assert_cube(cube) dim_name_mapping = dim_name_mapping if dim_name_mapping is not None else {} dim_names = _get_cube_data_var_dims(cube) col_names = [dim_name_mapping.get(str(dim_name), dim_name) for dim_name in dim_names] points, _ = _normalize_series(points, col_names, force_dataset=False, param_name="points") indexes = [] for dim_name, col_name in zip(dim_names, col_names): col = points[col_name] coord_indexes = get_dataset_indexes(cube, str(dim_name), col, index_dtype=index_dtype) indexes.append((index_name_pattern.format(name=dim_name), xr.DataArray(coord_indexes, dims=[INDEX_DIM_NAME]))) return xr.Dataset(dict(indexes))
[docs]def get_dataset_indexes( dataset: xr.Dataset, coord_var_name: str, coord_values: SeriesLike, index_dtype=np.float64 ) -> Union[xr.DataArray, np.ndarray]: """ Compute the indexes and their fractions into a coordinate variable *coord_var_name* of a *dataset* for the given coordinate values *coord_values*. The coordinate variable's labels must be monotonic increasing or decreasing, otherwise the result will be nonsense. For any value in *coord_values* that is out of the bounds of the coordinate variable's values, the index depends on the value of *index_dtype*. If *index_dtype* is an integer type, invalid indexes are encoded as -1 while for floating point types, NaN will be used. Returns a tuple of indexes as int64 array and fractions as float64 array. :param dataset: A cube dataset. :param coord_var_name: Name of a coordinate variable. :param coord_values: Array-like coordinate values. :param index_dtype: Numpy data type for the indexes. If it is floating point type (default), then *indexes* contain fractions, which may be used for interpolation. If *dtype* is an integer type out-of-range coordinates are indicated by index -1, and NaN if it is is a floating point type. :return: The indexes and their fractions as a tuple of numpy int64 and float64 arrays. """ coord_var = dataset[coord_var_name] n1 = coord_var.size n2 = n1 + 1 coord_bounds_var = _get_bounds_var(dataset, coord_var_name) if coord_bounds_var is not None: coord_bounds = coord_bounds_var.values if np.issubdtype(coord_bounds.dtype, np.datetime64): coord_bounds = coord_bounds.astype(np.uint64) is_inverse = (coord_bounds[0, 1] - coord_bounds[0, 0]) < 0 if is_inverse: coord_bounds = coord_bounds[::-1, ::-1] coords = np.zeros(n2, dtype=coord_bounds.dtype) coords[0:-1] = coord_bounds[:, 0] coords[-1] = coord_bounds[-1, 1] elif coord_var.size > 1: center_coords = coord_var.values if np.issubdtype(center_coords.dtype, np.datetime64): center_coords = center_coords.astype(np.uint64) is_inverse = (center_coords[-1] - center_coords[0]) < 0 if is_inverse: center_coords = center_coords[::-1] deltas = np.zeros(n2, dtype=center_coords.dtype) deltas[0:-2] = np.diff(center_coords) deltas[-2] = deltas[-3] deltas[-1] = deltas[-3] coords = np.zeros(n2, dtype=center_coords.dtype) coords[0:-1] = center_coords coords[-1] = coords[-2] + deltas[-1] if np.issubdtype(deltas.dtype, np.integer): coords -= deltas // 2 else: coords -= 0.5 * deltas else: raise ValueError(f"cannot determine cell boundaries for" f" coordinate variable {coord_var_name!r}" f" of size {coord_var.size}") if np.issubdtype(coord_values.dtype, np.datetime64): try: coord_values = coord_values.astype(np.uint64) except TypeError: # Fixes https://github.com/dcs4cop/xcube/issues/95 coord_values = coord_values.values.astype(np.uint64) indexes = np.linspace(0.0, n1, n2, dtype=np.float64) interp_indexes = np.interp(coord_values, coords, indexes, left=-1, right=-1) if is_inverse: i = interp_indexes >= 0 interp_indexes[i] = n1 - interp_indexes[i] interp_indexes[interp_indexes >= n1] = n1 - 1e-9 if np.issubdtype(index_dtype, np.integer): interp_indexes = interp_indexes.astype(index_dtype) else: interp_indexes[interp_indexes < 0] = np.nan return interp_indexes
def _empty_points_var(var: xr.DataArray, num_points: int): fill_value = 0 if np.issubdtype(var.dtype, np.integer) else np.nan return xr.DataArray(np.full(num_points, fill_value, dtype=var.dtype), dims=[INDEX_DIM_NAME], attrs=var.attrs) def _normalize_series( points: PointsLike, col_names: Sequence[str], force_dataset: bool = False, param_name: str = "points" ) -> Tuple[PointsLike, int]: if not isinstance(points, (xr.Dataset, pd.DataFrame)): new_points = {} for col_name in col_names: if col_name in points: col = points[col_name] if not isinstance(col, xr.DataArray): col = xr.DataArray(col) new_points[col_name] = col points = new_points num_points = None for col_name in col_names: if col_name not in points: raise ValueError(f"column {col_name!r} not found in {param_name}") col = points[col_name] if len(col.shape) != 1: raise ValueError(f"column {col_name!r} in {param_name} must be" f" one-dimensional, but has shape {col.shape!r}") if num_points is None: num_points = len(col) elif num_points != len(col): raise ValueError(f"column sizes in {param_name} must be all the" f" same, but found {len(points[col_names[0]])}" f" for column {col_names[0]!r} and {num_points}" f" for column {col_name!r}") if num_points is None: raise ValueError(f"{param_name} has no valid columns") if force_dataset: if isinstance(points, pd.DataFrame): points = xr.Dataset.from_dataframe(points) elif not isinstance(points, xr.DataArray): points = xr.Dataset(points) return points, num_points def _get_coord_bounds_var_names(dataset: xr.Dataset) -> Dict[Hashable, Any]: bnds_var_names = {} for var_name in dataset.coords: var = dataset[var_name] bnds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bnds_var_name not in bnds_var_names and bnds_var_name in dataset: bnds_var = dataset[bnds_var_name] if bnds_var.shape == (var.shape[0], 2): bnds_var_names[var_name] = bnds_var_name return bnds_var_names def _get_cube_data_var_dims(cube: xr.Dataset) -> Tuple[Hashable, ...]: for var in cube.data_vars.values(): return var.dims raise ValueError("cube dataset is empty") def _get_bounds_var(dataset: xr.Dataset, var_name: str) -> Optional[xr.DataArray]: var = dataset[var_name] if len(var.shape) == 1: bounds_var_name = var.attrs.get("bounds", f"{var_name}_bnds") if bounds_var_name in dataset: bounds_var = dataset[bounds_var_name] if bounds_var.dtype == var.dtype \ and bounds_var.shape == (var.size, 2): return bounds_var return None def _empty_dataset_from_points(data: Any) -> xr.Dataset: return xr.Dataset( coords=data.coords if hasattr(data, "coords") else None )