# The MIT License (MIT)
# Copyright (c) 2023 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import math
from typing import Union, Callable, Optional, \
Sequence, Tuple, Mapping, Hashable, Any
import numpy as np
import xarray as xr
from dask import array as da
from dask_image import ndinterp
from xcube.core.gridmapping import GridMapping
from xcube.core.gridmapping.helpers import AffineTransformMatrix
from xcube.util.assertions import assert_true
from .cf import maybe_encode_grid_mapping
NDImage = Union[np.ndarray, da.Array]
Aggregator = Callable[[NDImage], NDImage]
def resample_dataset(
dataset: xr.Dataset,
matrix: AffineTransformMatrix,
size: Tuple[int, int],
tile_size: Tuple[int, int],
xy_dim_names: Tuple[str, str],
var_configs: Mapping[Hashable, Mapping[str, Any]] = None,
) -> xr.Dataset:
"""
Resample dataset according to an affine transformation.
:param dataset: The source dataset
:param matrix: Affine transformation matrix.
:param size: Target image size.
:param tile_size: Target image tile size.
:param xy_dim_names: Names of the spatial dimensions.
:param var_configs: Optional resampling configurations
for individual variables.
:return: The resampled target dataset.
"""
((i_scale, _, i_off), (_, j_scale, j_off)) = matrix
width, height = size
tile_width, tile_height = tile_size
x_dim, y_dim = xy_dim_names
yx_dims = (y_dim, x_dim)
coords = dict()
var_configs = var_configs or {}
data_vars = dict()
for k, var in dataset.variables.items():
new_var = None
if var.ndim >= 2 and var.dims[-2:] == yx_dims:
var_config = var_configs.get(k, dict())
if np.issubdtype(var.dtype, np.integer) \
or np.issubdtype(var.dtype, bool):
spline_order = 0
aggregator = None
recover_nan = False
else:
spline_order = 1
aggregator = np.nanmean
recover_nan = True
var_data = resample_ndimage(
var.data,
scale=(j_scale, i_scale),
offset=(j_off, i_off),
shape=(height, width),
chunks=(tile_height, tile_width),
spline_order=var_config.get('spline_order', spline_order),
aggregator=var_config.get('aggregator', aggregator),
recover_nan=var_config.get('recover_nan', recover_nan),
)
new_var = xr.DataArray(var_data,
dims=var.dims,
attrs=var.attrs)
elif x_dim not in var.dims and y_dim not in var.dims:
new_var = var.copy()
if new_var is not None:
if k in dataset.coords:
coords[k] = new_var
elif k in dataset.data_vars:
data_vars[k] = new_var
return xr.Dataset(data_vars=data_vars,
coords=coords,
attrs=dataset.attrs)
[docs]
def resample_ndimage(
image: NDImage,
scale: Union[float, Tuple[float, float]] = 1,
offset: Union[float, Tuple[float, float]] = None,
shape: Union[int, Tuple[int, int]] = None,
chunks: Sequence[int] = None,
spline_order: int = 1,
aggregator: Optional[Aggregator] = np.nanmean,
recover_nan: bool = False
) -> da.Array:
image = da.asarray(image)
offset = _normalize_offset(offset, image.ndim)
scale = _normalize_scale(scale, image.ndim)
if shape is None:
shape = resize_shape(image.shape, scale)
else:
shape = _normalize_shape(shape, image)
chunks = _normalize_chunks(chunks, shape)
scale_y, scale_x = scale[-2], scale[-1]
divisor_x = math.ceil(abs(scale_x))
divisor_y = math.ceil(abs(scale_y))
if (divisor_x >= 2 or divisor_y >= 2) and aggregator is not None:
# Downsampling
# ------------
axes = {image.ndim - 2: divisor_y, image.ndim - 1: divisor_x}
elongation = _normalize_scale((scale_y / divisor_y,
scale_x / divisor_x), image.ndim)
larger_shape = resize_shape(shape, (divisor_y, divisor_x),
divisor_x=divisor_x,
divisor_y=divisor_y)
# print('Downsampling: ', scale)
# print(' divisor:', (divisor_y, divisor_x))
# print(' elongation:', elongation)
# print(' shape:', shape)
# print(' larger_shape:', larger_shape)
divisible_chunks = _make_divisible_tiles(larger_shape,
divisor_x, divisor_y)
image = _transform_array(image,
elongation, offset,
larger_shape, divisible_chunks,
spline_order, recover_nan)
image = da.coarsen(aggregator, image, axes)
if shape != image.shape:
image = image[..., 0:shape[-2], 0:shape[-1]]
if chunks is not None:
image = image.rechunk(chunks)
else:
# Upsampling
# ----------
# print('Upsampling: ', scale)
image = _transform_array(image,
scale, offset,
shape, chunks,
spline_order, recover_nan)
return image
def _transform_array(image: da.Array,
scale: Tuple[float, ...],
offset: Tuple[float, ...],
shape: Tuple[int, ...],
chunks: Optional[Tuple[int, ...]],
spline_order: int,
recover_nan: bool) -> da.Array:
"""
Apply affine transformation to ND-image.
:param image: ND-image with shape (..., size_y, size_x)
:param scale: Scaling factors (1, ..., 1, sy, sx)
:param offset: Offset values (0, ..., 0, oy, ox)
:param shape: (..., size_y, size_x)
:param chunks: (..., chunk_size_y, chunk_size_x)
:param spline_order: 0 ... 5
:param recover_nan: True/False
:return: Transformed ND-image.
"""
assert_true(len(scale) == image.ndim, 'invalid scale')
assert_true(len(offset) == image.ndim, 'invalid offset')
assert_true(len(shape) == image.ndim, 'invalid shape')
assert_true(chunks is None or len(chunks) == image.ndim,
'invalid chunks')
if _is_no_op(image, scale, offset, shape):
return image
# As of scipy 0.18, matrix = scale is no longer supported.
# Therefore we use the diagonal matrix form here,
# where scale is the diagonal.
matrix = np.diag(scale)
at_kwargs = dict(
offset=offset,
order=spline_order,
output_shape=shape,
output_chunks=chunks,
mode='constant',
)
if recover_nan and spline_order > 0:
# We can "recover" values that are neighbours to NaN values
# that would otherwise become NaN too.
mask = da.isnan(image)
# First check if there are NaN values ar all
if da.any(mask):
# Yes, then
# 1. replace NaN by zero
filled_im = da.where(mask, 0.0, image)
# 2. transform the zeo-filled image
scaled_im = ndinterp.affine_transform(filled_im,
matrix,
**at_kwargs,
cval=0.0)
# 3. transform the inverted mask
scaled_norm = ndinterp.affine_transform(1.0 - mask,
matrix,
**at_kwargs,
cval=0.0)
# 4. put back NaN where there was zero,
# otherwise decode using scaled mask
return da.where(da.isclose(scaled_norm, 0.0),
np.nan, scaled_im / scaled_norm)
# No dealing with NaN required
return ndinterp.affine_transform(image, matrix, **at_kwargs, cval=np.nan)
def resize_shape(shape: Sequence[int],
scale: Union[float, Tuple[float, ...]],
divisor_x: int = 1,
divisor_y: int = 1) -> Tuple[int, ...]:
scale = _normalize_scale(scale, len(shape))
height, width = shape[-2], shape[-1]
scale_y, scale_x = scale[-2], scale[-1]
wf = width * abs(scale_x)
hf = height * abs(scale_y)
w = divisor_x * math.ceil(wf / divisor_x)
h = divisor_y * math.ceil(hf / divisor_y)
return tuple(shape[0:-2]) + (h, w)
def _make_divisible_tiles(larger_shape: Tuple[int, ...],
divisor_x: int,
divisor_y: int) -> Tuple[int, ...]:
w = min(larger_shape[-1],
divisor_x * ((2048 + divisor_x - 1) // divisor_x))
h = min(larger_shape[-2],
divisor_y * ((2048 + divisor_y - 1) // divisor_y))
return (len(larger_shape) - 2) * (1,) + (h, w)
def _normalize_image(im: NDImage) -> da.Array:
return da.asarray(im)
def _normalize_offset(offset: Optional[Sequence[float]],
ndim: int) -> Tuple[int, ...]:
return _normalize_pair(offset, 0.0, ndim, 'offset')
def _normalize_scale(scale: Optional[Sequence[float]],
ndim: int) -> Tuple[int, ...]:
return _normalize_pair(scale, 1.0, ndim, 'scale')
def _normalize_pair(pair: Optional[Sequence[float]],
default: float,
ndim: int,
name: str) -> Tuple[int, ...]:
if pair is None:
pair = [default, default]
elif isinstance(pair, (int, float)):
pair = [pair, pair]
elif len(pair) != 2:
raise ValueError(f'illegal image {name}')
return (ndim - 2) * (default,) + tuple(pair)
def _normalize_shape(shape: Optional[Sequence[int]],
im: NDImage) -> Tuple[int, ...]:
if shape is None:
return im.shape
if len(shape) != 2:
raise ValueError('illegal image shape')
return im.shape[0:-2] + tuple(shape)
def _normalize_chunks(chunks: Optional[Sequence[int]],
shape: Tuple[int, ...]) -> Optional[Tuple[int, ...]]:
if chunks is None:
return None
if len(chunks) < 2 or len(chunks) > len(shape):
raise ValueError('illegal image chunks')
return (len(shape) - len(chunks)) * (1,) + tuple(chunks)
def _is_no_op(im: NDImage,
scale: Sequence[float],
offset: Sequence[float],
shape: Tuple[int, ...]):
return shape == im.shape \
and all(math.isclose(s, 1) for s in scale) \
and all(math.isclose(o, 0) for o in offset)