# Copyright (c) 2018-2024 by xcube team and contributors
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.
import threading
import uuid
from abc import abstractmethod, ABCMeta
from typing import Any, Dict, Mapping, Optional
import xarray as xr
from xcube.core.gridmapping import GridMapping
from xcube.util.assertions import assert_instance
from .abc import MultiLevelDataset
[docs]
class LazyMultiLevelDataset(MultiLevelDataset, metaclass=ABCMeta):
"""A multi-level dataset where each level dataset is lazily retrieved,
i.e. read or computed by the abstract method
``get_dataset_lazily(index, **kwargs)``.
Args:
ds_id: Optional dataset identifier.
parameters: Optional keyword arguments that will be passed to
the ``get_dataset_lazily`` method.
"""
def __init__(
self,
grid_mapping: Optional[GridMapping] = None,
num_levels: Optional[int] = None,
ds_id: Optional[str] = None,
parameters: Optional[Mapping[str, Any]] = None,
):
if grid_mapping is not None:
assert_instance(grid_mapping, GridMapping, name="grid_mapping")
if ds_id is not None:
assert_instance(ds_id, str, name="ds_id")
self._grid_mapping = grid_mapping
self._num_levels = num_levels
self._ds_id = ds_id
self._level_datasets: Dict[int, xr.Dataset] = {}
self._parameters = parameters or {}
self._lock = threading.RLock()
@property
def ds_id(self) -> str:
if self._ds_id is None:
with self._lock:
self._ds_id = str(uuid.uuid4())
return self._ds_id
@ds_id.setter
def ds_id(self, ds_id: str):
assert_instance(ds_id, str, name="ds_id")
self._ds_id = ds_id
@property
def grid_mapping(self) -> GridMapping:
if self._grid_mapping is None:
with self._lock:
self._grid_mapping = self._get_grid_mapping_lazily()
return self._grid_mapping
@property
def num_levels(self) -> int:
if self._num_levels is None:
with self._lock:
self._num_levels = self._get_num_levels_lazily()
return self._num_levels
@property
def lock(self) -> threading.RLock:
"""Get the reentrant lock used by this object to synchronize
lazy instantiation of properties.
"""
return self._lock
[docs]
def get_dataset(self, index: int) -> xr.Dataset:
"""Get or compute the dataset for the level at given *index*.
Args:
index: the level index
Returns:
the dataset for the level at *index*.
"""
if index not in self._level_datasets:
with self._lock:
# noinspection PyTypeChecker
level_dataset = self._get_dataset_lazily(index, self._parameters)
self.set_dataset(index, level_dataset)
# noinspection PyTypeChecker
return self._level_datasets[index]
[docs]
def set_dataset(self, index: int, level_dataset: xr.Dataset):
"""Set the dataset for the level at given *index*.
Callers need to ensure that the given *level_dataset*
has the correct spatial dimension sizes for the
given level at *index*.
Args:
index: the level index
level_dataset: the dataset for the level at *index*.
"""
with self._lock:
self._level_datasets[index] = level_dataset
@abstractmethod
def _get_num_levels_lazily(self) -> int:
"""Retrieve, i.e. read or compute, the number of levels.
Returns:
the number of dataset levels.
"""
@abstractmethod
def _get_dataset_lazily(self, index: int, parameters: Dict[str, Any]) -> xr.Dataset:
"""Retrieve, i.e. read or compute, the dataset for the
level at given *index*.
Args:
index: the level index
parameters: *parameters* keyword argument that was passed to
constructor.
Returns:
the dataset for the level at *index*.
"""
def _get_grid_mapping_lazily(self) -> GridMapping:
"""Retrieve, i.e. read or compute, the tile grid used
by the multi-level dataset.
Returns:
the dataset for the level at *index*.
"""
return GridMapping.from_dataset(self.get_dataset(0))
[docs]
def close(self):
with self._lock:
for dataset in self._level_datasets.values():
if dataset is not None:
dataset.close()