Source code for xcube.core.resample

# The MIT License (MIT)
# Copyright (c) 2019 by the xcube development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from typing import Dict, Any, Sequence, Union

import xarray as xr
import numpy as np

from xcube.core.schema import CubeSchema
from xcube.core.select import select_variables_subset
from xcube.core.verify import assert_cube


[docs]def resample_in_time(cube: xr.Dataset, frequency: str, method: Union[str, Sequence[str]], offset=None, base: int = 0, tolerance=None, interp_kind=None, time_chunk_size=None, var_names: Sequence[str] = None, metadata: Dict[str, Any] = None, cube_asserted: bool = False) -> xr.Dataset: """ Resample a xcube dataset in the time dimension. The argument *method* may be one or a sequence of ``'all'``, ``'any'``, ``'argmax'``, ``'argmin'``, ``'argmax'``, ``'count'``, ``'first'``, ``'last'``, ``'max'``, ``'min'``, ``'mean'``, ``'median'``, ``'percentile_<p>'``, ``'std'``, ``'sum'``, ``'var'``. In value ``'percentile_<p>'`` is a placeholder, where ``'<p>'`` must be replaced by an integer percentage value, e.g. ``'percentile_90'`` is the 90%-percentile. *Important note:* As of xarray 0.14 and dask 2.8, the methods ``'median'`` and ``'percentile_<p>'` cannot be used if the variables in *cube* comprise chunked dask arrays. In this case, use the ``compute()`` or ``load()`` method to convert dask arrays into numpy arrays. :param cube: The xcube dataset. :param frequency: Temporal aggregation frequency. Use format "<count><offset>" "where <offset> is one of 'H', 'D', 'W', 'M', 'Q', 'Y'. :param method: Resampling method or sequence of resampling methods. :param offset: Offset used to adjust the resampled time labels. Uses same syntax as *frequency*. :param base: For frequencies that evenly subdivide 1 day, the "origin" of the aggregated intervals. For example, for '24H' frequency, base could range from 0 through 23. :param time_chunk_size: If not None, the chunk size to be used for the "time" dimension. :param var_names: Variable names to include. :param tolerance: Time tolerance for selective upsampling methods. Defaults to *frequency*. :param interp_kind: Kind of interpolation if *method* is 'interpolation'. :param metadata: Output metadata. :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: A new xcube dataset resampled in time. """ if not cube_asserted: assert_cube(cube) if frequency == 'all': time_gap = np.array(cube.time[-1]) - np.array(cube.time[0]) days = int((np.timedelta64(time_gap, 'D') / np.timedelta64(1, 'D')) + 1) frequency = f'{days}D' if var_names: cube = select_variables_subset(cube, var_names) resampler = cube.resample(skipna=True, closed='left', label='left', keep_attrs=True, time=frequency, loffset=offset, base=base) if isinstance(method, str): methods = [method] else: methods = list(method) percentile_prefix = 'percentile_' resampled_cubes = [] for method in methods: method_args = [] method_postfix = method if method.startswith(percentile_prefix): p = int(method[len(percentile_prefix):]) q = p / 100.0 method_args = [q] method_postfix = f'p{p}' method = 'quantile' resampling_method = getattr(resampler, method) method_kwargs = get_method_kwargs(method, frequency, interp_kind, tolerance) resampled_cube = resampling_method(*method_args, **method_kwargs) resampled_cube = resampled_cube.rename( {var_name: f'{var_name}_{method_postfix}' for var_name in resampled_cube.data_vars}) resampled_cubes.append(resampled_cube) if len(resampled_cubes) == 1: resampled_cube = resampled_cubes[0] else: resampled_cube = xr.merge(resampled_cubes) # TODO: add time_bnds to resampled_ds time_coverage_start = '%s' % cube.time[0] time_coverage_end = '%s' % cube.time[-1] resampled_cube.attrs.update(metadata or {}) # TODO: add other time_coverage_ attributes resampled_cube.attrs.update(time_coverage_start=time_coverage_start, time_coverage_end=time_coverage_end) schema = CubeSchema.new(cube) chunk_sizes = {schema.dims[i]: schema.chunks[i] for i in range(schema.ndim)} if isinstance(time_chunk_size, int) and time_chunk_size >= 0: chunk_sizes['time'] = time_chunk_size return resampled_cube.chunk(chunk_sizes)
def get_method_kwargs(method, frequency, interp_kind, tolerance): if method == 'interpolate': kwargs = {'kind': interp_kind or 'linear'} elif method in {'nearest', 'bfill', 'ffill', 'pad'}: kwargs = {'tolerance': tolerance or frequency} elif method in {'first', 'last', 'sum', 'min', 'max', 'mean', 'median', 'std', 'var'}: kwargs = {'dim': 'time', 'keep_attrs': True, 'skipna': True} elif method == 'prod': kwargs = {'dim': 'time', 'skipna': True} elif method == 'count': kwargs = {'dim': 'time', 'keep_attrs': True} else: kwargs = {} return kwargs