Source code for xcube.core.resampling.rectify

# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

from typing import Mapping, Optional, Sequence, Tuple, Union
import warnings

import dask.array as da
import numba as nb
import numpy as np
import xarray as xr

from xcube.core.gridmapping import GridMapping
from xcube.core.select import select_spatial_subset
from xcube.util.dask import compute_array_from_func
from .cf import maybe_encode_grid_mapping


_INTERPOLATIONS = {"nearest": 0, "triangular": 1, "bilinear": 2}


[docs] def rectify_dataset( source_ds: xr.Dataset, *, var_names: Union[str, Sequence[str]] = None, source_gm: GridMapping = None, target_gm: GridMapping = None, encode_cf: bool = True, gm_name: Optional[str] = None, tile_size: Union[int, Tuple[int, int]] = None, is_j_axis_up: bool = None, output_ij_names: Tuple[str, str] = None, compute_subset: bool = True, uv_delta: float = 1e-3, interpolation: Optional[str] = None, xy_var_names: Tuple[str, str] = None, ) -> Optional[xr.Dataset]: """Reproject dataset *source_ds* using its per-pixel x,y coordinates or the given *source_gm*. The function expects *source_ds* or the given *source_gm* to have either one- or two-dimensional coordinate variables that provide spatial x,y coordinates for every data variable with the same spatial dimensions. For example, a dataset may comprise variables with spatial dimensions ``var(..., y_dim, x_dim)``, then one the function expects coordinates to be provided in two forms: 1. One-dimensional ``x_var(x_dim)`` and ``y_var(y_dim)`` (coordinate) variables. 2. Two-dimensional ``x_var(y_dim, x_dim)`` and ``y_var(y_dim, x_dim)`` (coordinate) variables. If *target_gm* is given and it defines a tile size or *tile_size* is given, and the number of tiles is greater than one in the output's x- or y-direction, then the returned dataset will be composed of lazy, chunked dask arrays. Otherwise, the returned dataset will be composed of ordinary numpy arrays. Args: source_ds: Source dataset grid mapping. var_names: Optional variable name or sequence of variable names. source_gm: Source dataset grid mapping. target_gm: Optional target geometry. If not given, output geometry will be computed to spatially fit *dataset* and to retain its spatial resolution. encode_cf: Whether to encode the target grid mapping into the resampled dataset in a CF-compliant way. Defaults to ``True``. gm_name: Name for the grid mapping variable. Defaults to "crs". Used only if *encode_cf* is ``True``. tile_size: Optional tile size for the output. is_j_axis_up: Whether y coordinates are increasing with positive image j axis. output_ij_names: If given, a tuple of variable names in which to store the computed source pixel coordinates in the returned output. compute_subset: Whether to compute a spatial subset from *dataset* using *output_geom*. If set, the function may return ``None`` in case there is no overlap. uv_delta: A normalized value that is used to determine whether x,y coordinates in the output are contained in the triangles defined by the input x,y coordinates. The higher this value, the more inaccurate the rectification will be. interpolation: Interpolation method for computing output pixels. If given, must be "nearest", "triangular", or "bilinear". The default is "nearest". The "triangular" interpolation is performed between 3 and "bilinear" between 4 adjacent source pixels. Both are applied only to variables of floating point type. If you need to interpolate between integer data you should cast it to float first. xy_var_names: Deprecated. No longer used since 1.0.0, no replacement. Returns: a reprojected dataset, or None if the requested output does not intersect with *dataset*. """ if xy_var_names: warnings.warn( "argument 'xy_var_names' has been deprecated in 1.4.2" " and may be removed anytime.", category=DeprecationWarning, ) if source_gm is None: source_gm = GridMapping.from_dataset(source_ds) src_attrs = dict(source_ds.attrs) if target_gm is None: target_gm = source_gm.to_regular(tile_size=tile_size) elif compute_subset: source_ds_subset = select_spatial_subset( source_ds, xy_bbox=target_gm.xy_bbox, ij_border=1, xy_border=0.5 * (target_gm.x_res + target_gm.y_res), grid_mapping=source_gm, ) if source_ds_subset is None: return None if source_ds_subset is not source_ds: source_gm = GridMapping.from_dataset(source_ds_subset) source_ds = source_ds_subset if tile_size is not None or is_j_axis_up is not None: target_gm = target_gm.derive(tile_size=tile_size, is_j_axis_up=is_j_axis_up) src_vars = _select_variables(source_ds, source_gm, var_names) interpolation_mode = _INTERPOLATIONS.get(interpolation or "nearest") if interpolation_mode is None: raise ValueError(f"invalid interpolation: {interpolation!r}") if target_gm.is_tiled: compute_dst_src_ij_images = _compute_ij_images_xarray_dask compute_dst_var_image = _compute_var_image_xarray_dask else: compute_dst_src_ij_images = _compute_ij_images_xarray_numpy compute_dst_var_image = _compute_var_image_xarray_numpy dst_src_ij_array = compute_dst_src_ij_images(source_gm, target_gm, uv_delta) dst_x_dim, dst_y_dim = target_gm.xy_dim_names dst_dims = dst_y_dim, dst_x_dim dst_ds_coords = target_gm.to_coords() dst_vars = dict() for src_var_name, src_var in src_vars.items(): dst_var_dims = src_var.dims[0:-2] + dst_dims dst_var_coords = { d: src_var.coords[d] for d in dst_var_dims if d in src_var.coords } # noinspection PyTypeChecker dst_var_coords.update( {d: dst_ds_coords[d] for d in dst_var_dims if d in dst_ds_coords} ) dst_var_array = compute_dst_var_image( src_var, dst_src_ij_array, fill_value=np.nan, interpolation=interpolation_mode, ) dst_var = xr.DataArray( dst_var_array, dims=dst_var_dims, coords=dst_var_coords, attrs=src_var.attrs, ) dst_vars[src_var_name] = dst_var if output_ij_names: output_i_name, output_j_name = output_ij_names dst_ij_coords = {d: dst_ds_coords[d] for d in dst_dims if d in dst_ds_coords} dst_vars[output_i_name] = xr.DataArray( dst_src_ij_array[0], dims=dst_dims, coords=dst_ij_coords ) dst_vars[output_j_name] = xr.DataArray( dst_src_ij_array[1], dims=dst_dims, coords=dst_ij_coords ) return maybe_encode_grid_mapping( encode_cf, xr.Dataset(dst_vars, coords=dst_ds_coords, attrs=src_attrs), target_gm, gm_name, )
def _select_variables( source_ds: xr.Dataset, source_gm: GridMapping, var_names: Union[None, str, Sequence[str]], ) -> Mapping[str, xr.DataArray]: """Select variables from *dataset*. Args: source_ds: Source dataset. source_gm: Optional dataset geo-coding. var_names: Optional variable name or sequence of variable names. Returns: The selected variables as a variable name to ``xr.DataArray`` mapping """ spatial_var_names = source_gm.xy_var_names spatial_shape = tuple(reversed(source_gm.size)) spatial_dims = tuple(reversed(source_gm.xy_dim_names)) if var_names is None: var_names = [ var_name for var_name, var in source_ds.data_vars.items() if var_name not in spatial_var_names and _is_2d_spatial_var(var, spatial_shape, spatial_dims) ] elif isinstance(var_names, str): var_names = (var_names,) elif len(var_names) == 0: raise ValueError(f"empty var_names") src_vars = {} for var_name in var_names: src_var = source_ds[var_name] if not _is_2d_spatial_var(src_var, spatial_shape, spatial_dims): raise ValueError( f"cannot rectify variable {var_name!r}" f" as its shape or dimensions" f" do not match those of {spatial_var_names[0]!r}" f" and {spatial_var_names[1]!r}" ) src_vars[var_name] = src_var return src_vars def _is_2d_spatial_var(var: xr.DataArray, shape, dims) -> bool: return var.ndim >= 2 and var.shape[-2:] == shape and var.dims[-2:] == dims def _compute_ij_images_xarray_numpy( src_geo_coding: GridMapping, output_geom: GridMapping, uv_delta: float ) -> np.ndarray: """Compute numpy.ndarray destination image with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_width = output_geom.width dst_height = output_geom.height dst_shape = 2, dst_height, dst_width dst_src_ij_images = np.full(dst_shape, np.nan, dtype=np.float64) dst_x_offset = output_geom.x_min dst_y_offset = output_geom.y_min if output_geom.is_j_axis_up else output_geom.y_max dst_x_scale = output_geom.x_res dst_y_scale = output_geom.y_res if output_geom.is_j_axis_up else -output_geom.y_res x_values, y_values = src_geo_coding.xy_coords.values _compute_ij_images_numpy_parallel( x_values, y_values, 0, 0, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta, ) return dst_src_ij_images def _compute_ij_images_xarray_dask( src_geo_coding: GridMapping, output_geom: GridMapping, uv_delta: float ) -> da.Array: """Compute dask.array.Array destination image with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_width = output_geom.width dst_height = output_geom.height dst_tile_width = output_geom.tile_width dst_tile_height = output_geom.tile_height dst_var_shape = 2, dst_height, dst_width dst_var_chunks = 2, dst_tile_height, dst_tile_width dst_x_min, dst_y_min, dst_x_max, dst_y_max = output_geom.xy_bbox dst_x_res, dst_y_res = output_geom.xy_res dst_is_j_axis_up = output_geom.is_j_axis_up # Compute an empirical xy_border as a function of the # number of tiles, because the more tiles we have # the smaller the destination xy-bboxes and the higher # the risk to not find any source ij-bbox for a given xy-bbox. # xy_border will not be larger than half of the # coverage of a tile. # num_tiles_x = dst_width / dst_tile_width num_tiles_y = dst_height / dst_tile_height xy_border = min( min(2 * num_tiles_x * output_geom.x_res, 2 * num_tiles_y * output_geom.y_res), min(0.5 * (dst_x_max - dst_x_min), 0.5 * (dst_y_max - dst_y_min)), ) dst_xy_bboxes = output_geom.xy_bboxes src_ij_bboxes = src_geo_coding.ij_bboxes_from_xy_bboxes( dst_xy_bboxes, xy_border=xy_border, ij_border=1 ) return compute_array_from_func( _compute_ij_images_xarray_dask_block, dst_var_shape, dst_var_chunks, np.float64, ctx_arg_names=[ "dtype", "block_id", "block_shape", "block_slices", ], args=( src_geo_coding.xy_coords, src_ij_bboxes, dst_x_min, dst_y_min, dst_y_max, dst_x_res, dst_y_res, dst_is_j_axis_up, uv_delta, ), name="ij_pixels", ) def _compute_ij_images_xarray_dask_block( dtype: np.dtype, block_id: int, block_shape: Tuple[int, int], block_slices: Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int]], src_xy_coords: xr.DataArray, src_ij_bboxes: np.ndarray, dst_x_min: float, dst_y_min: float, dst_y_max: float, dst_x_res: float, dst_y_res: float, dst_is_j_axis_up: bool, uv_delta: float, ) -> np.ndarray: """Compute dask.array.Array destination block with source pixel i,j coords from xarray.DataArray x,y sources. """ dst_src_ij_block = np.full(block_shape, np.nan, dtype=dtype) _, (dst_y_slice_start, _), (dst_x_slice_start, _) = block_slices src_ij_bbox = src_ij_bboxes[block_id] src_i_min, src_j_min, src_i_max, src_j_max = src_ij_bbox if src_i_min == -1: return dst_src_ij_block src_xy_values = src_xy_coords[ :, src_j_min : src_j_max + 1, src_i_min : src_i_max + 1 ].values src_x_values = src_xy_values[0] src_y_values = src_xy_values[1] dst_x_offset = dst_x_min + dst_x_slice_start * dst_x_res if dst_is_j_axis_up: dst_y_offset = dst_y_min + dst_y_slice_start * dst_y_res else: dst_y_offset = dst_y_max - dst_y_slice_start * dst_y_res _compute_ij_images_numpy_sequential( src_x_values, src_y_values, src_i_min, src_j_min, dst_src_ij_block, dst_x_offset, dst_y_offset, dst_x_res, dst_y_res if dst_is_j_axis_up else -dst_y_res, uv_delta, ) return dst_src_ij_block @nb.njit(nogil=True, parallel=True, cache=True) def _compute_ij_images_numpy_parallel( src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float, ): """Compute numpy.ndarray destination image with source pixel i,j coords from numpy.ndarray x,y sources in parallel mode. """ src_height = src_x_image.shape[-2] dst_src_ij_images[:, :, :] = np.nan for src_j0 in nb.prange(src_height - 1): _compute_ij_images_for_source_line( src_j0, src_x_image, src_y_image, src_i_min, src_j_min, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta, ) # Extra dask version, because if we use parallel=True # and nb.prange, we end up in infinite JIT compilation :( @nb.njit(nogil=True, cache=True) def _compute_ij_images_numpy_sequential( src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float, ): """Compute numpy.ndarray destination image with source pixel i,j coords from numpy.ndarray x,y sources NOT in parallel mode. """ src_height = src_x_image.shape[-2] dst_src_ij_images[:, :, :] = np.nan for src_j0 in range(src_height - 1): _compute_ij_images_for_source_line( src_j0, src_x_image, src_y_image, src_i_min, src_j_min, dst_src_ij_images, dst_x_offset, dst_y_offset, dst_x_scale, dst_y_scale, uv_delta, ) @nb.njit(nogil=True, cache=True) def _compute_ij_images_for_source_line( src_j0: int, src_x_image: np.ndarray, src_y_image: np.ndarray, src_i_min: int, src_j_min: int, dst_src_ij_images: np.ndarray, dst_x_offset: float, dst_y_offset: float, dst_x_scale: float, dst_y_scale: float, uv_delta: float, ): """Compute numpy.ndarray destination image with source pixel i,j coords from a numpy.ndarray x,y source line. """ src_width = src_x_image.shape[-1] dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_px = np.zeros(4, dtype=src_x_image.dtype) dst_py = np.zeros(4, dtype=src_y_image.dtype) u_min = v_min = -uv_delta uv_max = 1.0 + 2 * uv_delta for src_i0 in range(src_width - 1): src_i1 = src_i0 + 1 src_j1 = src_j0 + 1 dst_px[0] = dst_p0x = src_x_image[src_j0, src_i0] dst_px[1] = dst_p1x = src_x_image[src_j0, src_i1] dst_px[2] = dst_p2x = src_x_image[src_j1, src_i0] dst_px[3] = dst_p3x = src_x_image[src_j1, src_i1] dst_py[0] = dst_p0y = src_y_image[src_j0, src_i0] dst_py[1] = dst_p1y = src_y_image[src_j0, src_i1] dst_py[2] = dst_p2y = src_y_image[src_j1, src_i0] dst_py[3] = dst_p3y = src_y_image[src_j1, src_i1] dst_pi = np.floor((dst_px - dst_x_offset) / dst_x_scale).astype(np.int64) dst_pj = np.floor((dst_py - dst_y_offset) / dst_y_scale).astype(np.int64) dst_i_min = np.min(dst_pi) dst_i_max = np.max(dst_pi) dst_j_min = np.min(dst_pj) dst_j_max = np.max(dst_pj) if ( dst_i_max < 0 or dst_j_max < 0 or dst_i_min >= dst_width or dst_j_min >= dst_height ): continue if dst_i_min < 0: dst_i_min = 0 if dst_i_max >= dst_width: dst_i_max = dst_width - 1 if dst_j_min < 0: dst_j_min = 0 if dst_j_max >= dst_height: dst_j_max = dst_height - 1 # u from p0 right to p1, v from p0 down to p2 # noinspection PyTypeChecker det_a = _fdet(dst_p0x, dst_p0y, dst_p1x, dst_p1y, dst_p2x, dst_p2y) if np.isnan(det_a): det_a = 0.0 # u from p3 left to p2, v from p3 up to p1 # noinspection PyTypeChecker det_b = _fdet(dst_p3x, dst_p3y, dst_p2x, dst_p2y, dst_p1x, dst_p1y) if np.isnan(det_b): det_b = 0.0 if det_a == 0.0 and det_b == 0.0: # Both the triangles do not exist. continue for dst_j in range(dst_j_min, dst_j_max + 1): dst_y = dst_y_offset + (dst_j + 0.5) * dst_y_scale for dst_i in range(dst_i_min, dst_i_max + 1): sentinel = dst_src_ij_images[0, dst_j, dst_i] if not np.isnan(sentinel): # If we have a source pixel in dst_i, dst_j already, # there is no need to compute another one. # One is as good as the other. continue dst_x = dst_x_offset + (dst_i + 0.5) * dst_x_scale src_i = src_j = -1 if det_a != 0.0: # noinspection PyTypeChecker u = _fu(dst_x, dst_y, dst_p0x, dst_p0y, dst_p2x, dst_p2y) / det_a # noinspection PyTypeChecker v = _fv(dst_x, dst_y, dst_p0x, dst_p0y, dst_p1x, dst_p1y) / det_a if u >= u_min and v >= v_min and u + v <= uv_max: src_i = src_i0 + _fclamp(u, 0.0, 1.0) src_j = src_j0 + _fclamp(v, 0.0, 1.0) if src_i == -1 and det_b != 0.0: # noinspection PyTypeChecker u = _fu(dst_x, dst_y, dst_p3x, dst_p3y, dst_p1x, dst_p1y) / det_b # noinspection PyTypeChecker v = _fv(dst_x, dst_y, dst_p3x, dst_p3y, dst_p2x, dst_p2y) / det_b if u >= u_min and v >= v_min and u + v <= uv_max: src_i = src_i1 - _fclamp(u, 0.0, 1.0) src_j = src_j1 - _fclamp(v, 0.0, 1.0) if src_i != -1: dst_src_ij_images[0, dst_j, dst_i] = src_i_min + src_i dst_src_ij_images[1, dst_j, dst_i] = src_j_min + src_j def _compute_var_image_xarray_numpy( src_var: xr.DataArray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan, interpolation: int = 0, ) -> np.ndarray: """Extract source pixels from xarray.DataArray source with numpy.ndarray data. """ return _compute_var_image_numpy( src_var.values, dst_src_ij_images, fill_value, interpolation ) def _compute_var_image_xarray_dask( src_var: xr.DataArray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex] = np.nan, interpolation: int = 0, ) -> da.Array: """Extract source pixels from xarray.DataArray source with dask.array.Array data. """ return da.map_blocks( _compute_var_image_xarray_dask_block, src_var.values, dst_src_ij_images, fill_value, interpolation, dtype=src_var.dtype, drop_axis=0, ) @nb.njit(nogil=True, cache=True) def _compute_var_image_numpy( src_var: np.ndarray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex], interpolation: int, ) -> np.ndarray: """Extract source pixels from numpy.ndarray source with numba in parallel mode. """ dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_shape = src_var.shape[:-2] + (dst_height, dst_width) dst_values = np.full(dst_shape, fill_value, dtype=src_var.dtype) _compute_var_image_numpy_parallel( src_var, dst_src_ij_images, dst_values, interpolation ) return dst_values @nb.njit(nogil=True, cache=True) def _compute_var_image_xarray_dask_block( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, fill_value: Union[int, float, complex], interpolation: int, ) -> np.ndarray: """Extract source pixels from np.ndarray source and return a block of a dask array. """ dst_width = dst_src_ij_images.shape[-1] dst_height = dst_src_ij_images.shape[-2] dst_shape = src_var_image.shape[:-2] + (dst_height, dst_width) dst_values = np.full(dst_shape, fill_value, dtype=src_var_image.dtype) _compute_var_image_numpy_sequential( src_var_image, dst_src_ij_images, dst_values, interpolation ) return dst_values @nb.njit(nogil=True, parallel=True, cache=True) def _compute_var_image_numpy_parallel( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray, interpolation: int, ): """Extract source pixels from np.ndarray source using numba parallel mode. """ dst_height = dst_var_image.shape[-2] for dst_j in nb.prange(dst_height): _compute_var_image_for_dest_line( dst_j, src_var_image, dst_src_ij_images, dst_var_image, interpolation, ) # Extra dask version, because if we use parallel=True # and nb.prange, we end up in infinite JIT compilation :( @nb.njit(nogil=True, cache=True) def _compute_var_image_numpy_sequential( src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray, interpolation: int, ): """Extract source pixels from np.ndarray source NOT using numba parallel mode. """ dst_height = dst_var_image.shape[-2] for dst_j in range(dst_height): _compute_var_image_for_dest_line( dst_j, src_var_image, dst_src_ij_images, dst_var_image, interpolation ) @nb.njit(nogil=True, cache=True) def _compute_var_image_for_dest_line( dst_j: int, src_var_image: np.ndarray, dst_src_ij_images: np.ndarray, dst_var_image: np.ndarray, interpolation: int, ): """Extract source pixels from *src_values* np.ndarray and write into dst_values np.ndarray. """ src_width = src_var_image.shape[-1] src_height = src_var_image.shape[-2] dst_width = dst_var_image.shape[-1] src_i_min = 0 src_j_min = 0 src_i_max = src_width - 1 src_j_max = src_height - 1 for dst_i in range(dst_width): src_i_f = dst_src_ij_images[0, dst_j, dst_i] src_j_f = dst_src_ij_images[1, dst_j, dst_i] if np.isnan(src_i_f) or np.isnan(src_j_f): continue # Note int() is 2x faster than math.floor() and # should yield the same results for only positive i,j. src_i0 = int(src_i_f) src_j0 = int(src_j_f) u = src_i_f - src_i0 v = src_j_f - src_j0 if interpolation == 0: # interpolation == "nearest" if u > 0.5: src_i0 = _iclamp(src_i0 + 1, src_i_min, src_i_max) if v > 0.5: src_j0 = _iclamp(src_j0 + 1, src_j_min, src_j_max) dst_var_value = src_var_image[..., src_j0, src_i0] elif interpolation == 1: # interpolation == "triangular" src_i1 = _iclamp(src_i0 + 1, src_i_min, src_i_max) src_j1 = _iclamp(src_j0 + 1, src_j_min, src_j_max) value_01 = src_var_image[..., src_j0, src_i1] value_10 = src_var_image[..., src_j1, src_i0] if u + v < 1.0: # Closest triangle value_00 = src_var_image[..., src_j0, src_i0] dst_var_value = ( value_00 + u * (value_01 - value_00) + v * (value_10 - value_00) ) else: # Opposite triangle value_11 = src_var_image[..., src_j1, src_i1] dst_var_value = ( value_11 + (1.0 - u) * (value_10 - value_11) + (1.0 - v) * (value_01 - value_11) ) else: # interpolation == "bilinear" src_i1 = _iclamp(src_i0 + 1, src_i_min, src_i_max) src_j1 = _iclamp(src_j0 + 1, src_j_min, src_j_max) value_00 = src_var_image[..., src_j0, src_i0] value_01 = src_var_image[..., src_j0, src_i1] value_10 = src_var_image[..., src_j1, src_i0] value_11 = src_var_image[..., src_j1, src_i1] value_u0 = value_00 + u * (value_01 - value_00) value_u1 = value_10 + u * (value_11 - value_10) dst_var_value = value_u0 + v * (value_u1 - value_u0) dst_var_image[..., dst_j, dst_i] = dst_var_value @nb.njit( "float64(float64, float64, float64, float64, float64, float64)", nogil=True, inline="always", ) def _fdet( px0: float, py0: float, px1: float, py1: float, px2: float, py2: float ) -> float: return (px0 - px1) * (py0 - py2) - (px0 - px2) * (py0 - py1) @nb.njit( "float64(float64, float64, float64, float64, float64, float64)", nogil=True, inline="always", ) def _fu(px: float, py: float, px0: float, py0: float, px2: float, py2: float) -> float: return (px0 - px) * (py0 - py2) - (py0 - py) * (px0 - px2) @nb.njit( "float64(float64, float64, float64, float64, float64, float64)", nogil=True, inline="always", ) def _fv(px: float, py: float, px0: float, py0: float, px1: float, py1: float) -> float: return (py0 - py) * (px0 - px1) - (px0 - px) * (py0 - py1) @nb.njit("float64(float64, float64, float64)", nogil=True, inline="always") def _fclamp(x: float, x_min: float, x_max: float) -> float: return x_min if x < x_min else (x_max if x > x_max else x) @nb.njit("int64(int64, int64, int64)", nogil=True, inline="always") def _iclamp(x: int, x_min: int, x_max: int) -> int: return x_min if x < x_min else (x_max if x > x_max else x) def _millis(seconds: float) -> int: return round(1000 * seconds)