# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.
import warnings
from typing import Optional, Tuple, Callable, Dict, Any, List
from collections.abc import Collection, Mapping
from typing import Union
import cftime
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr
from xcube.core.gridmapping import GridMapping
from xcube.util.assertions import assert_given
from xcube.util.timeindex import ensure_time_index_compatible
Bbox = tuple[float, float, float, float]
TimeRange = Union[
tuple[Optional[str], Optional[str]],
tuple[Optional[pd.Timestamp], Optional[pd.Timestamp]],
]
def select_subset(
dataset: xr.Dataset,
*,
var_names: Optional[Collection[str]] = None,
bbox: Optional[Bbox] = None,
time_range: Optional[TimeRange] = None,
grid_mapping: Optional[GridMapping] = None,
):
"""Create a subset from *dataset* given *var_names*,
*bbox*, *time_range*.
This is a high-level convenience function that may invoke
* :func:`select_variables_subset`
* :func:`select_spatial_subset`
* :func:`select_temporal_subset`
Args:
dataset: The dataset.
var_names: Optional variable names.
bbox: Optional bounding box in the dataset's CRS coordinate
units.
time_range: Optional time range
grid_mapping: Optional dataset grid mapping.
Returns:
a subset of *dataset*, or unchanged *dataset* if no keyword-
arguments are used.
"""
if var_names is not None:
dataset = select_variables_subset(dataset, var_names=var_names)
if bbox is not None:
dataset = select_spatial_subset(
dataset, xy_bbox=bbox, grid_mapping=grid_mapping
)
if time_range is not None:
dataset = select_temporal_subset(dataset, time_range=time_range)
return dataset
[docs]
def select_variables_subset(
dataset: xr.Dataset, var_names: Optional[Collection[str]] = None
) -> xr.Dataset:
"""Select data variable from given *dataset* and create new dataset.
Args:
dataset: The dataset from which to select variables.
var_names: The names of data variables to select.
Returns:
A new dataset. It is empty, if *var_names* is empty. It is
*dataset*, if *var_names* is None.
"""
if var_names is None:
return dataset
dropped_variables = set(dataset.data_vars.keys()).difference(var_names)
if not dropped_variables:
return dataset
return dataset.drop_vars(dropped_variables)
def select_spatial_subset(
dataset: xr.Dataset,
ij_bbox: Optional[tuple[int, int, int, int]] = None,
ij_border: int = 0,
xy_bbox: Optional[tuple[float, float, float, float]] = None,
xy_border: float = 0.0,
grid_mapping: Optional[GridMapping] = None,
) -> Optional[xr.Dataset]:
"""Select a spatial subset of *dataset* for the
bounding box *ij_bbox* or *xy_bbox*.
*ij_bbox* or *xy_bbox* must not be given both.
Args:
xy_bbox: The bounding box in x,y coordinates.
xy_border: Border in units of the x,y coordinates.
dataset: Source dataset.
ij_bbox: Bounding box (i_min, i_min, j_max, j_max) in pixel
coordinates.
ij_border: Extra border added to *ij_bbox* in number of pixels
grid_mapping: Optional dataset grid mapping.
Returns:
Spatial dataset subset
"""
if ij_bbox is None and xy_bbox is None:
raise ValueError("One of ij_bbox and xy_bbox must be given")
if ij_bbox and xy_bbox:
raise ValueError("Only one of ij_bbox and xy_bbox can be given")
if grid_mapping is None:
grid_mapping = GridMapping.from_dataset(dataset)
x_name, y_name = grid_mapping.xy_var_names
x = dataset[x_name]
y = dataset[y_name]
if x.ndim == 1 and y.ndim == 1:
# Hotfix für #981 and #985
if xy_bbox:
if y.values[0] < y.values[-1]:
ds = dataset.sel(
**{
x_name: slice(xy_bbox[0] - xy_border, xy_bbox[2] + xy_border),
y_name: slice(xy_bbox[1] - xy_border, xy_bbox[3] + xy_border),
}
)
else:
ds = dataset.sel(
**{
x_name: slice(xy_bbox[0] - xy_border, xy_bbox[2] + xy_border),
y_name: slice(xy_bbox[3] + xy_border, xy_bbox[1] - xy_border),
}
)
return ds
else:
return dataset.isel(
**{
x_name: slice(ij_bbox[0] - ij_border, ij_bbox[2] + ij_border),
y_name: slice(ij_bbox[1] - ij_border, ij_bbox[3] + ij_border),
}
)
else:
if xy_bbox:
ij_bbox = grid_mapping.ij_bbox_from_xy_bbox(
xy_bbox, ij_border=ij_border, xy_border=xy_border
)
if ij_bbox[0] == -1:
return None
width, height = grid_mapping.size
i_min, j_min, i_max, j_max = ij_bbox
if i_min > 0 or j_min > 0 or i_max < width - 1 or j_max < height - 1:
x_dim, y_dim = grid_mapping.xy_dim_names
i_slice = slice(i_min, i_max + 1)
j_slice = slice(j_min, j_max + 1)
return dataset.isel({x_dim: i_slice, y_dim: j_slice})
return dataset
def select_temporal_subset(
dataset: xr.Dataset, time_range: TimeRange, time_name: str = "time"
) -> xr.Dataset:
"""Select a temporal subset from *dataset* given *time_range*.
Args:
dataset: The dataset. Must include time
time_range: Time range given as two time stamps (start, end)
that may be (ISO) strings or datetime objects.
time_name: optional name of the time coordinate variable.
Defaults to "time".
Returns:
"""
assert_given(time_range, "time_range")
time_name = time_name or "time"
if time_name not in dataset:
raise ValueError(
f"cannot compute temporal subset: variable"
f' "{time_name}" not found in dataset'
)
time_1, time_2 = time_range
time_1 = pd.to_datetime(time_1) if time_1 is not None else None
time_2 = pd.to_datetime(time_2) if time_2 is not None else None
if time_1 is None and time_2 is None:
return dataset
if time_2 is not None:
delta = time_2 - time_2.floor("1D")
if delta == pd.Timedelta("0 days 00:00:00"):
time_2 += pd.Timedelta("1D")
try:
time_slice = ensure_time_index_compatible(
dataset, slice(time_1, time_2), time_name
)
return dataset.sel({time_name or "time": time_slice})
except TypeError:
calendar = dataset.time.encoding.get("calendar")
time_1 = cftime.datetime(
time_1.year, time_1.month, time_1.day, calendar=calendar
)
time_2 = cftime.datetime(
time_2.year, time_2.month, time_2.day, calendar=calendar
)
time_slice = ensure_time_index_compatible(
dataset, slice(time_1, time_2), time_name
)
return dataset.sel({time_name or "time": time_slice})
_PREDICATE_SIGNATURE = (
"predicate(" "slice_array: xr.DataArray, " "slice_info: Dict" ") -> bool"
)
Predicate = Callable[[xr.DataArray, dict[str, Any]], bool]
def select_label_subset(
dataset: xr.Dataset,
dim: str,
predicate: Union[Predicate, Mapping[str, Predicate]],
use_dask: bool = False,
):
"""Select the labels in *dataset* along a given dimension *dim*
using a predicate function *predicate* that is called for
all variable slices for a current label.
The *predicate* can also be provided as a mapping
from variable names to dedicated predicate functions.
The predicate function is called for all *dim* labels in *dataset*
and for every variable that contains *dim*.
If *predicate* returns False for any given label,
that label will be dropped from dimension *dim*.
Predicate functions are defined as follows:::
def predicate(slice_array: xr.DataArray, slice_info: Dict) -> bool:
...
Here, *slice_array* is a variable's array slice for the given label.
The argument *slice_info* is a dictionary that contains the
following keys:
* var: str - name of the current variable.
* dim: str - value of *dim*.
* index: int - value for the current index within dimension *dim*.
* label: Optional[xr.DataArray] - value for the current label
within dimension *dim*.
Note, the value of "label" will be None, if *dataset*
does not contain a 1D-coordinate variable named *dim*.
The following example selects only time labels
from a 3-D (time, y, x) cube where the 2-D (y, x) images
of variable "CHL" comprises more than 50% valid values:::
>>> chl_data = np.random.random((5, 10, 20))
>>> chl_data = np.where(chl_data > 0.5, chl_data, np.nan)
>>> ds = xr.Dataset({"CHL": (["time", "y", "x"], chl_data)})
>>>
>>> def is_valid_slice(slice_array, slice_label):
>>> return np.sum(np.isnan(slice_array)) / slice_array.size <= 0.5
>>>
>>> ds_subset = select_label_subset(ds, "time",
>>> predicate={"CHL": is_valid_slice})
Args:
dataset: The dataset.
dim: The name of the dimension from which to select the labels.
predicate: The predicate function or a mapping from variable
names to variable-specific predicate functions.
use_dask: Whether to use a Dask graph that will compute the
validity of labels in parallel. For a large number of
labels, very complex Dask graphs will result (every label is
a node) whose overhead may compensate the performance gain.
Returns:
A new dataset with labels along *dim* selected by the
*predicate*. If all labels are selected, *dataset* is returned
without change.
"""
if callable(predicate):
predicate_lookup = {
var_name: predicate
for var_name, var in dataset.data_vars.items()
if dim in var.dims
}
elif isinstance(predicate, Mapping):
predicate_lookup = predicate
for var_name, var_predicate in predicate_lookup.items():
if not callable(var_predicate):
raise TypeError(
f"predicate for variable {var_name!r}"
f" must be callable with"
f" signature {_PREDICATE_SIGNATURE}"
)
else:
raise TypeError(
f"predicate" f" must be callable with" f" signature {_PREDICATE_SIGNATURE}"
)
num_labels = dataset.sizes[dim]
valid_mask = [
_is_label_valid(dataset, predicate_lookup, dim, index)
for index in range(num_labels)
]
if use_dask:
valid_mask = da.stack(valid_mask).compute()
dropped_indexes = [i for i in range(num_labels) if not valid_mask[i]]
if not dropped_indexes:
return dataset
return dataset.drop_isel({dim: dropped_indexes})
def _is_label_valid(
dataset: xr.Dataset, predicate_lookup: Mapping[str, Predicate], dim: str, index: int
) -> da.Array:
label = dataset[dim][index] if dim in dataset else None
results: list[da.Array] = []
for var_name, var in dataset.data_vars.items():
if dim in var.dims:
predicate = predicate_lookup.get(var_name)
if predicate is not None:
slice_array = var.isel({dim: index})
slice_info = dict(var=var_name, dim=dim, index=index, label=label)
result = predicate(slice_array, slice_info)
if isinstance(result, xr.DataArray):
result = result.data
if isinstance(result, da.Array):
results.append(result)
else:
results.append(da.from_array(result))
if len(results) == 0:
return da.from_array(True)
elif len(results) == 1:
return results[0]
else:
return da.all(da.stack(results))