Source code for xcube.core.resampling.rectify

# The MIT License (MIT)
# Copyright (c) 2023 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

from typing import Mapping, Optional, Sequence, Tuple, Union

import dask.array as da
import numba as nb
import numpy as np
import xarray as xr

from xcube.core.gridmapping import GridMapping
from xcube.core.select import select_spatial_subset
from xcube.util.dask import compute_array_from_func
from .cf import maybe_encode_grid_mapping


[docs] def rectify_dataset(source_ds: xr.Dataset, *, var_names: Union[str, Sequence[str]] = None, source_gm: GridMapping = None, xy_var_names: Tuple[str, str] = None, target_gm: GridMapping = None, encode_cf: bool = True, gm_name: Optional[str] = None, tile_size: Union[int, Tuple[int, int]] = None, is_j_axis_up: bool = None, output_ij_names: Tuple[str, str] = None, compute_subset: bool = True, uv_delta: float = 1e-3) -> Optional[xr.Dataset]: """ Reproject dataset *source_ds* using its per-pixel x,y coordinates or the given *source_gm*. The function expects *source_ds* or the given *source_gm* to have either one- or two-dimensional coordinate variables that provide spatial x,y coordinates for every data variable with the same spatial dimensions. For example, a dataset may comprise variables with spatial dimensions ``var(..., y_dim, x_dim)``, then one the function expects coordinates to be provided in two forms: 1. One-dimensional ``x_var(x_dim)`` and ``y_var(y_dim)`` (coordinate) variables. 2. Two-dimensional ``x_var(y_dim, x_dim)`` and ``y_var(y_dim, x_dim)`` (coordinate) variables. If *target_gm* is given and it defines a tile size or *tile_size* is given, and the number of tiles is greater than one in the output's x- or y-direction, then the returned dataset will be composed of lazy, chunked dask arrays. Otherwise, the returned dataset will be composed of ordinary numpy arrays. :param source_ds: Source dataset grid mapping. :param var_names: Optional variable name or sequence of variable names. :param source_gm: Source dataset grid mapping. :param xy_var_names: Optional tuple of the x- and y-coordinate variables in *source_ds*. Ignored if *source_gm* is given. :param target_gm: Optional target geometry. If not given, output geometry will be computed to spatially fit *dataset* and to retain its spatial resolution. :param encode_cf: Whether to encode the target grid mapping into the resampled dataset in a CF-compliant way. Defaults to ``True``. :param gm_name: Name for the grid mapping variable. Defaults to "crs". Used only if *encode_cf* is ``True``. :param tile_size: Optional tile size for the output. :param is_j_axis_up: Whether y coordinates are increasing with positive image j axis. :param output_ij_names: If given, a tuple of variable names in which to store the computed source pixel coordinates in the returned output. :param compute_subset: Whether to compute a spatial subset from *dataset* using *output_geom*. If set, the function may return ``None`` in case there is no overlap. :param uv_delta: A normalized value that is used to determine whether x,y coordinates in the output are contained in the triangles defined by the input x,y coordinates. The higher this value, the more inaccurate the rectification will be. :return: a reprojected dataset, or None if the requested output does not intersect with *dataset*. """ if source_gm is None: source_gm = GridMapping.from_dataset(source_ds) src_attrs = dict(source_ds.attrs) if target_gm is None: target_gm = source_gm.to_regular(tile_size=tile_size) elif compute_subset: source_ds_subset = select_spatial_subset( source_ds, xy_bbox=target_gm.xy_bbox, ij_border=1, xy_border=0.5 * (target_gm.x_res + target_gm.y_res), grid_mapping=source_gm ) if source_ds_subset is None: return None if source_ds_subset is not source_ds: # TODO: GridMapping.from_dataset() may be expensive. # Find a more effective way. source_gm = GridMapping.from_dataset(source_ds_subset) source_ds = source_ds_subset # if src_geo_coding.xy_var_names != output_geom.xy_var_names: # output_geom = output_geom.derive( # xy_var_names=src_geo_coding.xy_var_names # ) # if src_geo_coding.xy_dim_names != output_geom.xy_dim_names: # output_geom = output_geom.derive( # xy_dim_names=src_geo_coding.xy_dim_names # ) if tile_size is not None or is_j_axis_up is not None: target_gm = target_gm.derive(tile_size=tile_size, is_j_axis_up=is_j_axis_up) src_vars = _select_variables(source_ds, source_gm, var_names) if target_gm.is_tiled: compute_dst_src_ij_images = _compute_ij_images_xarray_dask compute_dst_var_image = _compute_var_image_xarray_dask else: compute_dst_src_ij_images = _compute_ij_images_xarray_numpy compute_dst_var_image = _compute_var_image_xarray_numpy dst_src_ij_array = compute_dst_src_ij_images(source_gm, target_gm, uv_delta) dst_x_dim, dst_y_dim = target_gm.xy_dim_names dst_dims = dst_y_dim, dst_x_dim dst_ds_coords = target_gm.to_coords() dst_vars = dict() for src_var_name, src_var in src_vars.items(): dst_var_dims = src_var.dims[0:-2] + dst_dims dst_var_coords = {d: src_var.coords[d] for d in dst_var_dims if d in src_var.coords} dst_var_coords.update({d: dst_ds_coords[d] for d in dst_var_dims if d in dst_ds_coords}) dst_var_array = compute_dst_var_image(src_var, dst_src_ij_array, fill_value=np.nan) dst_var = xr.DataArray(dst_var_array, dims=dst_var_dims, coords=dst_var_coords, attrs=src_var.attrs) dst_vars[src_var_name] = dst_var if output_ij_names: output_i_name, output_j_name = output_ij_names dst_ij_coords = {d: dst_ds_coords[d] for d in dst_dims if d in dst_ds_coords} dst_vars[output_i_name] = xr.DataArray(dst_src_ij_array[0], dims=dst_dims, coords=dst_ij_coords) dst_vars[output_j_name] = xr.DataArray(dst_src_ij_array[1], dims=dst_dims, coords=dst_ij_coords) return maybe_encode_grid_mapping(encode_cf, xr.Dataset(dst_vars, coords=dst_ds_coords, attrs=src_attrs), target_gm, gm_name)
def _select_variables( source_ds: xr.Dataset, source_gm: GridMapping, var_names: Union[None, str, Sequence[str]] ) -> Mapping[str, xr.DataArray]: """ Select variables from *dataset*. :param source_ds: Source dataset. :param source_gm: Optional dataset geo-coding. :param var_names: Optional variable name or sequence of variable names. :return: The selected variables as a variable name to ``xr.DataArray`` mapping """ spatial_var_names = source_gm.xy_var_names spatial_shape = tuple(reversed(source_gm.size)) spatial_dims = tuple(reversed(source_gm.xy_dim_names)) if var_names is None: var_names = [var_name for var_name, var in source_ds.data_vars.items() if var_name not in spatial_var_names and _is_2d_spatial_var(var, spatial_shape, spatial_dims)] elif isinstance(var_names, str): var_names = (var_names,) elif len(var_names) == 0: raise ValueError(f'empty var_names') src_vars = {} for var_name in var_names: src_var = source_ds[var_name] if not _is_2d_spatial_var(src_var, spatial_shape, spatial_dims): raise ValueError( f"cannot rectify variable {var_name!r}" f" as its shape or dimensions " f"do not match those of {spatial_var_names[0]!r}" f" and {spatial_var_names[1]!r}" ) src_vars[var_name] = src_var return src_vars def _is_2d_spatial_var(var: xr.DataArray, shape, dims) -> bool: return var.ndim >= 2 and var.shape[-2:] == shape and var.dims[-2:] == dims def _compute_ij_images_xarray_numpy(src_geo_coding: GridMapping, output_geom: GridMapping, uv_delta: float) -> np.ndarray: """ Compute numpy.ndarray destination image with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_width = output_geom.width dst_height = output_geom.height dst_shape = 2, dst_height, dst_width dst_src_ij_images = np.full(dst_shape, np.nan, dtype=np.float64) dst_x_offset = output_geom.x_min dst_y_offset = output_geom.y_min \ if output_geom.is_j_axis_up else output_geom.y_max dst_x_scale = output_geom.x_res dst_y_scale = output_geom.y_res \ if output_geom.is_j_axis_up else -output_geom.y_res # TODO: GridMapping: this will cause out-of-memory problems! x_values, y_values = src_geo_coding.xy_coords.values _compute_ij_images_numpy_parallel(x_values, y_values, 0, 0, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta) return dst_src_ij_images def _compute_ij_images_xarray_dask(src_geo_coding: GridMapping, output_geom: GridMapping, uv_delta: float) -> da.Array: """ Compute dask.array.Array destination image with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_width = output_geom.width dst_height = output_geom.height dst_tile_width = output_geom.tile_width dst_tile_height = output_geom.tile_height dst_var_shape = 2, dst_height, dst_width dst_var_chunks = 2, dst_tile_height, dst_tile_width dst_x_min, dst_y_min, dst_x_max, dst_y_max = output_geom.xy_bbox dst_x_res, dst_y_res = output_geom.xy_res dst_is_j_axis_up = output_geom.is_j_axis_up # Compute an empirical xy_border as a function of the # number of tiles, because the more tiles we have # the smaller the destination xy-bboxes and the higher # the risk to not find any source ij-bbox for a given xy-bbox. # xy_border will not be larger than half of the # coverage of a tile. # num_tiles_x = dst_width / dst_tile_width num_tiles_y = dst_height / dst_tile_height xy_border = min(min(2 * num_tiles_x * output_geom.x_res, 2 * num_tiles_y * output_geom.y_res), min(0.5 * (dst_x_max - dst_x_min), 0.5 * (dst_y_max - dst_y_min))) dst_xy_bboxes = output_geom.xy_bboxes src_ij_bboxes = src_geo_coding.ij_bboxes_from_xy_bboxes( dst_xy_bboxes, xy_border=xy_border, ij_border=1 ) return compute_array_from_func( _compute_ij_images_xarray_dask_block, dst_var_shape, dst_var_chunks, np.float64, ctx_arg_names=[ 'dtype', 'block_id', 'block_shape', 'block_slices', ], args=( src_geo_coding.xy_coords, src_ij_bboxes, dst_x_min, dst_y_min, dst_y_max, dst_x_res, dst_y_res, dst_is_j_axis_up, uv_delta ), name='ij_pixels' ) def _compute_ij_images_xarray_dask_block( dtype: np.dtype, block_id: int, block_shape: Tuple[int, int], block_slices: Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int]], src_xy_coords: xr.DataArray, src_ij_bboxes: np.ndarray, dst_x_min: float, dst_y_min: float, dst_y_max: float, dst_x_res: float, dst_y_res: float, dst_is_j_axis_up: bool, uv_delta: float ) -> np.ndarray: """ Compute dask.array.Array destination block with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_src_ij_block = np.full(block_shape, np.nan, dtype=dtype) _, (dst_y_slice_start, _), (dst_x_slice_start, _) = block_slices src_ij_bbox = src_ij_bboxes[block_id] src_i_min, src_j_min, src_i_max, src_j_max = src_ij_bbox if src_i_min == -1: return dst_src_ij_block src_xy_values = src_xy_coords[ :, src_j_min:src_j_max + 1, src_i_min:src_i_max + 1 ].values src_x_values = src_xy_values[0] src_y_values = src_xy_values[1] dst_x_offset = dst_x_min + dst_x_slice_start * dst_x_res if dst_is_j_axis_up: dst_y_offset = dst_y_min + dst_y_slice_start * dst_y_res else: dst_y_offset = dst_y_max - dst_y_slice_start * dst_y_res _compute_ij_images_numpy_sequential( src_x_values, src_y_values, src_i_min, src_j_min, dst_src_ij_block, dst_x_offset, dst_y_offset, dst_x_res, dst_y_res if dst_is_j_axis_up else -dst_y_res, uv_delta ) return dst_src_ij_block @nb.njit(nogil=True, parallel=True, cache=True) def _compute_ij_images_numpy_parallel( src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float ): """ Compute numpy.ndarray destination image with source pixel i,j coords from numpy.ndarray x,y sources in parallel mode. """ src_height = src_x_image.shape[-2] dst_src_ij_images[:, :, :] = np.nan for src_j0 in nb.prange(src_height - 1): _compute_ij_images_for_source_line( src_j0, src_x_image, src_y_image, src_i_min, src_j_min, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta ) # Extra dask version, because if we use parallel=True # and nb.prange, we end up in infinite JIT compilation :( @nb.njit(nogil=True, cache=True) def _compute_ij_images_numpy_sequential( src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float ): """ Compute numpy.ndarray destination image with source pixel i,j coords from numpy.ndarray x,y sources NOT in parallel mode. """ src_height = src_x_image.shape[-2] dst_src_ij_images[:, :, :] = np.nan for src_j0 in range(src_height - 1): _compute_ij_images_for_source_line( src_j0, src_x_image, src_y_image, src_i_min, src_j_min, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta ) @nb.njit(nogil=True, cache=True) def _compute_ij_images_for_source_line( src_j0: int, src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float ): """ Compute numpy.ndarray destination image with source pixel i,j coords from a numpy.ndarray x,y source line. """ src_width = src_x_image.shape[-1] dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_px = np.zeros(4, dtype=src_x_image.dtype) dst_py = np.zeros(4, dtype=src_y_image.dtype) u_min = v_min = -uv_delta uv_max = 1.0 + 2 * uv_delta for src_i0 in range(src_width - 1): src_i1 = src_i0 + 1 src_j1 = src_j0 + 1 dst_px[0] = dst_p0x = src_x_image[src_j0, src_i0] dst_px[1] = dst_p1x = src_x_image[src_j0, src_i1] dst_px[2] = dst_p2x = src_x_image[src_j1, src_i0] dst_px[3] = dst_p3x = src_x_image[src_j1, src_i1] dst_py[0] = dst_p0y = src_y_image[src_j0, src_i0] dst_py[1] = dst_p1y = src_y_image[src_j0, src_i1] dst_py[2] = dst_p2y = src_y_image[src_j1, src_i0] dst_py[3] = dst_p3y = src_y_image[src_j1, src_i1] dst_pi = np.floor((dst_px - dst_x_offset) / dst_x_scale).astype(np.int64) dst_pj = np.floor((dst_py - dst_y_offset) / dst_y_scale).astype(np.int64) dst_i_min = np.min(dst_pi) dst_i_max = np.max(dst_pi) dst_j_min = np.min(dst_pj) dst_j_max = np.max(dst_pj) if dst_i_max < 0 \ or dst_j_max < 0 \ or dst_i_min >= dst_width \ or dst_j_min >= dst_height: continue if dst_i_min < 0: dst_i_min = 0 if dst_i_max >= dst_width: dst_i_max = dst_width - 1 if dst_j_min < 0: dst_j_min = 0 if dst_j_max >= dst_height: dst_j_max = dst_height - 1 # u from p0 right to p1, v from p0 down to p2 det_a = _fdet(dst_p0x, dst_p0y, dst_p1x, dst_p1y, dst_p2x, dst_p2y) # u from p3 left to p2, v from p3 up to p1 det_b = _fdet(dst_p3x, dst_p3y, dst_p2x, dst_p2y, dst_p1x, dst_p1y) if np.isnan(det_a) or np.isnan(det_b): # print('no plane at:', src_i0, src_j0) continue for dst_j in range(dst_j_min, dst_j_max + 1): dst_y = dst_y_offset + (dst_j + 0.5) * dst_y_scale for dst_i in range(dst_i_min, dst_i_max + 1): dst_x = dst_x_offset + (dst_i + 0.5) * dst_x_scale # TODO: use two other combinations, # if one of the dst_px<n>,dst_py<n> pairs is missing. src_i = src_j = -1 if det_a != 0.0: u = _fu(dst_x, dst_y, dst_p0x, dst_p0y, dst_p2x, dst_p2y) / det_a v = _fv(dst_x, dst_y, dst_p0x, dst_p0y, dst_p1x, dst_p1y) / det_a if u >= u_min and v >= v_min and u + v <= uv_max: src_i = src_i0 + _fclamp(u, 0.0, 1.0) src_j = src_j0 + _fclamp(v, 0.0, 1.0) if src_i == -1 and det_b != 0.0: u = _fu(dst_x, dst_y, dst_p3x, dst_p3y, dst_p1x, dst_p1y) / det_b v = _fv(dst_x, dst_y, dst_p3x, dst_p3y, dst_p2x, dst_p2y) / det_b if u >= u_min and v >= v_min and u + v <= uv_max: src_i = src_i1 - _fclamp(u, 0.0, 1.0) src_j = src_j1 - _fclamp(v, 0.0, 1.0) if src_i != -1: dst_src_ij_images[0, dst_j, dst_i] = src_i_min + src_i dst_src_ij_images[1, dst_j, dst_i] = src_j_min + src_j def _compute_var_image_xarray_numpy( src_var: xr.DataArray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan ) -> np.ndarray: """ Extract source pixels from xarray.DataArray source with numpy.ndarray data. """ return _compute_var_image_numpy(src_var.values, dst_src_ij_images, fill_value) def _compute_var_image_xarray_dask( src_var: xr.DataArray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan ) -> da.Array: """ Extract source pixels from xarray.DataArray source with dask.array.Array data. """ return da.map_blocks(_compute_var_image_xarray_dask_block, src_var.values, dst_src_ij_images, fill_value, dtype=src_var.dtype, drop_axis=0) @nb.njit(nogil=True, cache=True) def _compute_var_image_numpy( src_var: np.ndarray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan ) -> np.ndarray: """ Extract source pixels from numpy.ndarray source with numba in parallel mode. """ dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_shape = src_var.shape[:-2] + (dst_height, dst_width) dst_values = np.full(dst_shape, fill_value, dtype=src_var.dtype) _compute_var_image_numpy_parallel(src_var, dst_src_ij_images, dst_values) return dst_values @nb.njit(nogil=True, cache=True) def _compute_var_image_xarray_dask_block( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan ) -> np.ndarray: """ Extract source pixels from np.ndarray source and return a block of a dask array. """ dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_shape = src_var_image.shape[:-2] + (dst_height, dst_width) dst_values = np.full(dst_shape, fill_value, dtype=src_var_image.dtype) _compute_var_image_numpy_sequential(src_var_image, dst_src_ij_images, dst_values) return dst_values @nb.njit(nogil=True, parallel=True, cache=True) def _compute_var_image_numpy_parallel( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray ): """ Extract source pixels from np.ndarray source using numba parallel mode. """ dst_height = dst_var_image.shape[-2] for dst_j in nb.prange(dst_height): _compute_var_image_for_dest_line( dst_j, src_var_image, dst_src_ij_images, dst_var_image ) # Extra dask version, because if we use parallel=True # and nb.prange, we end up in infinite JIT compilation :( @nb.njit(nogil=True, cache=True) def _compute_var_image_numpy_sequential( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray ): """ Extract source pixels from np.ndarray source NOT using numba parallel mode. """ dst_height = dst_var_image.shape[-2] for dst_j in range(dst_height): _compute_var_image_for_dest_line( dst_j, src_var_image, dst_src_ij_images, dst_var_image ) @nb.njit(nogil=True, cache=True) def _compute_var_image_for_dest_line( dst_j: int, src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray ): """ Extract source pixels from *src_values* np.ndarray and write into dst_values np.ndarray. """ src_width = src_var_image.shape[-1] src_height = src_var_image.shape[-2] dst_width = dst_var_image.shape[-1] src_i_min = 0 src_j_min = 0 src_i_max = src_width - 1 src_j_max = src_height - 1 for dst_i in range(dst_width): src_i_f = dst_src_ij_images[0, dst_j, dst_i] src_j_f = dst_src_ij_images[1, dst_j, dst_i] if np.isnan(src_i_f) or np.isnan(src_j_f): continue # Note int() is 2x faster than math.floor() and should yield the same results for only positive i,j. src_i = int(src_i_f) src_j = int(src_j_f) u = src_i_f - src_i v = src_j_f - src_j if u > 0.5: src_i = _iclamp(src_i + 1, src_i_min, src_i_max) if v > 0.5: src_j = _iclamp(src_j + 1, src_j_min, src_j_max) dst_var_image[..., dst_j, dst_i] = src_var_image[..., src_j, src_i] @nb.njit('float64(float64, float64, float64, float64, float64, float64)', nogil=True, inline='always') def _fdet(px0: float, py0: float, px1: float, py1: float, px2: float, py2: float) -> float: return (px0 - px1) * (py0 - py2) - (px0 - px2) * (py0 - py1) @nb.njit('float64(float64, float64, float64, float64, float64, float64)', nogil=True, inline='always') def _fu(px: float, py: float, px0: float, py0: float, px2: float, py2: float) -> float: return (px0 - px) * (py0 - py2) - (py0 - py) * (px0 - px2) @nb.njit('float64(float64, float64, float64, float64, float64, float64)', nogil=True, inline='always') def _fv(px: float, py: float, px0: float, py0: float, px1: float, py1: float) -> float: return (py0 - py) * (px0 - px1) - (px0 - px) * (py0 - py1) @nb.njit('float64(float64, float64, float64)', nogil=True, inline='always') def _fclamp(x: float, x_min: float, x_max: float) -> float: return x_min if x < x_min else (x_max if x > x_max else x) @nb.njit('int64(int64, int64, int64)', nogil=True, inline='always') def _iclamp(x: int, x_min: int, x_max: int) -> int: return x_min if x < x_min else (x_max if x > x_max else x) def _millis(seconds: float) -> int: return round(1000 * seconds)