Source code for xcube.core.select

# The MIT License (MIT)
# Copyright (c) 2021-2022 by the xcube team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import warnings
from typing import Collection, Optional, Tuple, Callable, Dict, Any, \
    List, Mapping
from typing import Union

import cftime
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.gridmapping import GridMapping
from xcube.util.assertions import assert_given
from xcube.util.timeindex import ensure_time_index_compatible

Bbox = Tuple[float, float, float, float]
TimeRange = Union[Tuple[Optional[str], Optional[str]],
                  Tuple[Optional[pd.Timestamp], Optional[pd.Timestamp]]]


def select_subset(
        dataset: xr.Dataset,
        *,
        var_names: Optional[Collection[str]] = None,
        bbox: Optional[Bbox] = None,
        time_range: Optional[TimeRange] = None,
        grid_mapping: Optional[GridMapping] = None
):
    """
    Create a subset from *dataset* given *var_names*,
    *bbox*, *time_range*.

    This is a high-level convenience function that may invoke

    * :func:select_variables_subset
    * :func:select_spatial_subset
    * :func:select_temporal_subset

    :param dataset: The dataset.
    :param var_names: Optional variable names.
    :param bbox: Optional bounding box in the dataset's
        CRS coordinate units.
    :param time_range: Optional time range
    :param grid_mapping: Optional dataset grid mapping.
    :return: a subset of *dataset*, or unchanged *dataset*
        if no keyword-arguments are used.
    """
    if var_names is not None:
        dataset = select_variables_subset(
            dataset, var_names=var_names
        )
    if bbox is not None:
        dataset = select_spatial_subset(
            dataset, xy_bbox=bbox, grid_mapping=grid_mapping
        )
    if time_range is not None:
        dataset = select_temporal_subset(
            dataset, time_range=time_range
        )
    return dataset


[docs] def select_variables_subset( dataset: xr.Dataset, var_names: Optional[Collection[str]] = None ) -> xr.Dataset: """ Select data variable from given *dataset* and create new dataset. :param dataset: The dataset from which to select variables. :param var_names: The names of data variables to select. :return: A new dataset. It is empty, if *var_names* is empty. It is *dataset*, if *var_names* is None. """ if var_names is None: return dataset dropped_variables = set(dataset.data_vars.keys()).difference(var_names) if not dropped_variables: return dataset return dataset.drop_vars(dropped_variables)
def select_spatial_subset( dataset: xr.Dataset, ij_bbox: Optional[Tuple[int, int, int, int]] = None, ij_border: int = 0, xy_bbox: Optional[Tuple[float, float, float, float]] = None, xy_border: float = 0., grid_mapping: Optional[GridMapping] = None, ) -> Optional[xr.Dataset]: """ Select a spatial subset of *dataset* for the bounding box *ij_bbox* or *xy_bbox*. *ij_bbox* or *xy_bbox* must not be given both. :param xy_bbox: Bounding box in coordinates of the dataset's CRS. :param xy_border: Extra border added to *xy_bbox*. :param dataset: Source dataset. :param ij_bbox: Bounding box (i_min, i_min, j_max, j_max) in pixel coordinates. :param ij_border: Extra border added to *ij_bbox* in number of pixels :param xy_bbox: The bounding box in x,y coordinates. :param xy_border: Border in units of the x,y coordinates. :param grid_mapping: Optional dataset grid mapping. :return: Spatial dataset subset """ if ij_bbox is None and xy_bbox is None: raise ValueError('One of ij_bbox and xy_bbox must be given') if ij_bbox and xy_bbox: raise ValueError('Only one of ij_bbox and xy_bbox can be given') if grid_mapping is None: grid_mapping = GridMapping.from_dataset(dataset) x_name, y_name = grid_mapping.xy_var_names x = dataset[x_name] y = dataset[y_name] if x.ndim == 1 and y.ndim == 1: # Hotfix für #981 and #985 if xy_bbox: if y.values[0] < y.values[-1]: ds = dataset.sel(**{ x_name: slice(xy_bbox[0] - xy_border, xy_bbox[2] + xy_border), y_name: slice(xy_bbox[1] - xy_border, xy_bbox[3] + xy_border) }) else: ds = dataset.sel(**{ x_name: slice(xy_bbox[0] - xy_border, xy_bbox[2] + xy_border), y_name: slice(xy_bbox[3] + xy_border, xy_bbox[1] - xy_border) }) return ds else: return dataset.isel(**{ x_name: slice(ij_bbox[0] - ij_border, ij_bbox[2] + ij_border), y_name: slice(ij_bbox[1] - ij_border, ij_bbox[3] + ij_border) }) else: if xy_bbox: ij_bbox = grid_mapping.ij_bbox_from_xy_bbox(xy_bbox, ij_border=ij_border, xy_border=xy_border) if ij_bbox[0] == -1: return None width, height = grid_mapping.size i_min, j_min, i_max, j_max = ij_bbox if i_min > 0 or j_min > 0 or i_max < width - 1 or j_max < height - 1: x_dim, y_dim = grid_mapping.xy_dim_names i_slice = slice(i_min, i_max + 1) j_slice = slice(j_min, j_max + 1) return dataset.isel({x_dim: i_slice, y_dim: j_slice}) return dataset def select_temporal_subset(dataset: xr.Dataset, time_range: TimeRange, time_name: str = 'time') -> xr.Dataset: """ Select a temporal subset from *dataset* given *time_range*. :param dataset: The dataset. Must include time :param time_range: Time range given as two time stamps (start, end) that may be (ISO) strings or datetime objects. :param time_name: optional name of the time coordinate variable. Defaults to "time". :return: """ assert_given(time_range, 'time_range') time_name = time_name or 'time' if time_name not in dataset: raise ValueError(f'cannot compute temporal subset: variable' f' "{time_name}" not found in dataset') time_1, time_2 = time_range time_1 = pd.to_datetime(time_1) if time_1 is not None else None time_2 = pd.to_datetime(time_2) if time_2 is not None else None if time_1 is None and time_2 is None: return dataset if time_2 is not None: delta = time_2 - time_2.floor('1D') if delta == pd.Timedelta('0 days 00:00:00'): time_2 += pd.Timedelta('1D') try: time_slice = ensure_time_index_compatible(dataset, slice(time_1, time_2), time_name) return dataset.sel({time_name or 'time': time_slice}) except TypeError: calendar = dataset.time.encoding.get('calendar') time_1 = cftime.datetime(time_1.year, time_1.month, time_1.day, calendar=calendar) time_2 = cftime.datetime(time_2.year, time_2.month, time_2.day, calendar=calendar) time_slice = ensure_time_index_compatible(dataset, slice(time_1, time_2), time_name) return dataset.sel({time_name or 'time': time_slice}) _PREDICATE_SIGNATURE = "predicate(" \ "slice_array: xr.DataArray, " \ "slice_info: Dict" \ ") -> bool" Predicate = Callable[ [ xr.DataArray, Dict[str, Any] ], bool ] def select_label_subset(dataset: xr.Dataset, dim: str, predicate: Union[Predicate, Mapping[str, Predicate]], use_dask: bool = False): """Select the labels in *dataset* along a given dimension *dim* using a predicate function *predicate* that is called for all variable slices for a current label. The *predicate* can also be provided as a mapping from variable names to dedicated predicate functions. The predicate function is called for all *dim* labels in *dataset* and for every variable that contains *dim*. If *predicate* returns False for any given label, that label will be dropped from dimension *dim*. Predicate functions are defined as follows: ```python def predicate(slice_array: xr.DataArray, slice_info: Dict) -> bool: ... ``` Here, *slice_array* is a variable's array slice for the given label. The argument *slice_info* is a dictionary that contains the following keys: * var: str - name of the current variable. * dim: str - value of *dim*. * index: int - value for the current index within dimension *dim*. * label: Optional[xr.DataArray] - value for the current label within dimension *dim*. Note, the value of "label" will be None, if *dataset* does not contain a 1D-coordinate variable named *dim*. The following example selects only time labels from a 3-D (time, y, x) cube where the 2-D (y, x) images of variable "CHL" comprises more than 50% valid values: ``` >>> chl_data = np.random.random((5, 10, 20)) >>> chl_data = np.where(chl_data > 0.5, chl_data, np.nan) >>> ds = xr.Dataset({"CHL": (["time", "y", "x"], chl_data)}) >>> >>> def is_valid_slice(slice_array, slice_label): >>> return np.sum(np.isnan(slice_array)) / slice_array.size <= 0.5 >>> >>> ds_subset = select_label_subset(ds, "time", >>> predicate={"CHL": is_valid_slice}) ``` :param dataset: The dataset. :param dim: The name of the dimension from which to select the labels. :param predicate: The predicate function or a mapping from variable names to variable-specific predicate functions. :param use_dask: Whether to use a Dask graph that will compute the validity of labels in parallel. For a large number of labels, very complex Dask graphs will result (every label is a node) whose overhead may compensate the performance gain. :return: A new dataset with labels along *dim* selected by the *predicate*. If all labels are selected, *dataset* is returned without change. """ if callable(predicate): predicate_lookup = {var_name: predicate for var_name, var in dataset.data_vars.items() if dim in var.dims} elif isinstance(predicate, Mapping): predicate_lookup = predicate for var_name, var_predicate in predicate_lookup.items(): if not callable(var_predicate): raise TypeError(f'predicate for variable {var_name!r}' f' must be callable with' f' signature {_PREDICATE_SIGNATURE}') else: raise TypeError(f'predicate' f' must be callable with' f' signature {_PREDICATE_SIGNATURE}') num_labels = dataset.dims[dim] valid_mask = [_is_label_valid(dataset, predicate_lookup, dim, index) for index in range(num_labels)] if use_dask: valid_mask = da.stack(valid_mask).compute() dropped_indexes = [i for i in range(num_labels) if not valid_mask[i]] if not dropped_indexes: return dataset return dataset.drop_isel({dim: dropped_indexes}) def _is_label_valid(dataset: xr.Dataset, predicate_lookup: Mapping[str, Predicate], dim: str, index: int) -> da.Array: label = dataset[dim][index] if dim in dataset else None results: List[da.Array] = [] for var_name, var in dataset.data_vars.items(): if dim in var.dims: predicate = predicate_lookup.get(var_name) if predicate is not None: slice_array = var.isel({dim: index}) slice_info = dict(var=var_name, dim=dim, index=index, label=label) result = predicate(slice_array, slice_info) if isinstance(result, xr.DataArray): result = result.data if isinstance(result, da.Array): results.append(result) else: results.append(da.from_array(result)) if len(results) == 0: return da.from_array(True) elif len(results) == 1: return results[0] else: return da.all(da.stack(results))