Source code for xcube.core.dsio

# The MIT License (MIT)
# Copyright (c) 2019 by the xcube development team and contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

import os
import shutil
import warnings
from abc import ABCMeta, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union, Mapping

import pandas as pd
import s3fs
import urllib3.util
import xarray as xr
import zarr

from xcube.constants import EXTENSION_POINT_DATASET_IOS
from xcube.core.timeslice import append_time_slice, insert_time_slice, replace_time_slice
from xcube.core.verify import assert_cube
from xcube.util.plugin import ExtensionComponent, get_extension_registry

[docs]def open_cube(input_path: str, format_name: str = None, **kwargs) -> xr.Dataset: """ Open a xcube dataset from *input_path*. If *format* is not provided it will be guessed from *input_path*. :param input_path: input path :param format_name: format, e.g. "zarr" or "netcdf4" :param kwargs: format-specific keyword arguments :return: xcube dataset """ return open_dataset(input_path, format_name=format_name, is_cube=True, **kwargs)
[docs]def write_cube(cube: xr.Dataset, output_path: str, format_name: str = None, cube_asserted: bool = False, **kwargs) -> xr.Dataset: """ Write a xcube dataset to *output_path*. If *format* is not provided it will be guessed from *output_path*. :param cube: xcube dataset to be written. :param output_path: output path :param format_name: format, e.g. "zarr" or "netcdf4" :param kwargs: format-specific keyword arguments :param cube_asserted: If False, *cube* will be verified, otherwise it is expected to be a valid cube. :return: xcube dataset *cube* """ if not cube_asserted: assert_cube(cube) return write_dataset(cube, output_path, format_name=format_name, **kwargs)
def open_dataset(input_path: str, format_name: str = None, is_cube: bool = False, **kwargs) -> xr.Dataset: """ Open a dataset from *input_path*. If *format* is not provided it will be guessed from *output_path*. :param input_path: input path :param format_name: format, e.g. "zarr" or "netcdf4" :param is_cube: Whether a ValueError will be raised, if the dataset read from *input_path* is not a xcube dataset. :param kwargs: format-specific keyword arguments :return: dataset object """ format_name = format_name if format_name else guess_dataset_format(input_path) if format_name is None: raise ValueError("Unknown input format") dataset_io = find_dataset_io(format_name, modes=["r"]) if dataset_io is None: raise ValueError(f"Unknown input format {format_name!r} for {input_path}") dataset =, **kwargs) if is_cube: assert_cube(dataset) return dataset def write_dataset(dataset: xr.Dataset, output_path: str, format_name: str = None, **kwargs) -> xr.Dataset: """ Write dataset to *output_path*. If *format* is not provided it will be guessed from *output_path*. :param dataset: Dataset to be written. :param output_path: output path :param format_name: format, e.g. "zarr" or "netcdf4" :param kwargs: format-specific keyword arguments :return: the input dataset """ format_name = format_name if format_name else guess_dataset_format(output_path) if format_name is None: raise ValueError("Unknown output format") dataset_io = find_dataset_io(format_name, modes=["w"]) if dataset_io is None: raise ValueError(f"Unknown output format {format_name!r} for {output_path}") dataset_io.write(dataset, output_path, **kwargs) return dataset class DatasetIO(ExtensionComponent, metaclass=ABCMeta): """ An abstract base class that represents dataset input/output. :param name: A unique dataset I/O identifier. """ def __init__(self, name: str): super().__init__(EXTENSION_POINT_DATASET_IOS, name) @property def description(self) -> str: """ :return: A description for this input processor """ return self.get_metadata_attr('description', '') @property def ext(self) -> str: """The primary filename extension used by this dataset I/O.""" return self.get_metadata_attr('ext', '') @property def modes(self) -> Set[str]: """ A set describing the modes of this dataset I/O. Must be one or more of "r" (read), "w" (write), and "a" (append). """ return self.get_metadata_attr('modes', set()) @abstractmethod def fitness(self, path: str, path_type: str = None) -> float: """ Compute a fitness of this dataset I/O in the interval [0 to 1] for reading/writing from/to the given *path*. :param path: The path or URL. :param path_type: Either "file", "dir", "url", or None. :return: the chance in range [0 to 1] """ return 0.0 def read(self, input_path: str, **kwargs) -> xr.Dataset: """Read a dataset from *input_path* using format-specific read parameters *kwargs*.""" raise NotImplementedError() def write(self, dataset: xr.Dataset, output_path: str, **kwargs): """"Write *dataset* to *output_path* using format-specific write parameters *kwargs*.""" raise NotImplementedError() def append(self, dataset: xr.Dataset, output_path: str, **kwargs): """"Append *dataset* to existing *output_path* using format-specific write parameters *kwargs*.""" raise NotImplementedError() def insert(self, dataset: xr.Dataset, index: int, output_path: str, **kwargs): """"Insert *dataset* at *index* into existing *output_path* using format-specific write parameters *kwargs*.""" raise NotImplementedError() def replace(self, dataset: xr.Dataset, index: int, output_path: str, **kwargs): """"Replace *dataset* at *index* in existing *output_path* using format-specific write parameters *kwargs*.""" raise NotImplementedError() def update(self, output_path: str, global_attrs: Dict[str, Any] = None, **kwargs): """"Update *dataset* at *output_path* using format-specific open parameters *kwargs*.""" raise NotImplementedError() def get_extension(name: str): return get_extension_registry().get_extension(EXTENSION_POINT_DATASET_IOS, name) def find_dataset_io_by_name(name: str): extension = get_extension(name) if not extension: return None return extension.component def find_dataset_io(format_name: str, modes: Iterable[str] = None, default: DatasetIO = None) -> Optional[DatasetIO]: modes = set(modes) if modes else None format_name = format_name.lower() dataset_ios = get_extension_registry().find_components(EXTENSION_POINT_DATASET_IOS) for dataset_io in dataset_ios: # noinspection PyUnresolvedReferences if format_name == # noinspection PyTypeChecker if not modes or modes.issubset(dataset_io.modes): return dataset_io for dataset_io in dataset_ios: # noinspection PyUnresolvedReferences if format_name == dataset_io.ext.lower(): # noinspection PyTypeChecker if not modes or modes.issubset(dataset_io.modes): return dataset_io return default def guess_dataset_format(path: str) -> Optional[str]: """ Guess a dataset format for a file system path or URL given by *path*. :param path: A file system path or URL. :return: The name of a dataset format guessed from *path*. """ dataset_io_fitness_list = guess_dataset_ios(path) if dataset_io_fitness_list: return dataset_io_fitness_list[0][0].name return None def guess_dataset_ios(path: str) -> List[Tuple[DatasetIO, float]]: """ Guess suitable DatasetIO objects for a file system path or URL given by *path*. Returns a list of (DatasetIO, fitness) tuples, sorted by descending fitness values. Fitness values are in the interval (0, 1]. The first entry is the most appropriate DatasetIO object. :param path: A file system path or URL. :return: A list of (DatasetIO, fitness) tuples. """ if os.path.isfile(path): input_type = "file" elif os.path.isdir(path): input_type = "dir" elif path.find("://") > 0: input_type = "url" else: input_type = None dataset_ios = get_extension_registry().find_components(EXTENSION_POINT_DATASET_IOS) dataset_io_fitness_list = [] for dataset_io in dataset_ios: fitness =, path_type=input_type) if fitness > 0.0: dataset_io_fitness_list.append((dataset_io, fitness)) dataset_io_fitness_list.sort(key=lambda item: -item[1]) return dataset_io_fitness_list def _get_ext(path: str) -> Optional[str]: _, ext = os.path.splitext(path) return ext.lower() def query_dataset_io(filter_fn: Callable[[DatasetIO], bool] = None) -> List[DatasetIO]: dataset_ios = get_extension_registry().find_components(EXTENSION_POINT_DATASET_IOS) if filter_fn is None: return dataset_ios return list(filter(filter_fn, dataset_ios)) # noinspection PyAbstractClass class MemDatasetIO(DatasetIO): """ An in-memory dataset I/O. Keeps all datasets in a dictionary. :param datasets: The initial datasets as a path to dataset mapping. """ def __init__(self, datasets: Dict[str, xr.Dataset] = None): super().__init__(FORMAT_NAME_MEM) self._datasets = datasets or {} @property def datasets(self) -> Dict[str, xr.Dataset]: return self._datasets def fitness(self, path: str, path_type: str = None) -> float: if path in self._datasets: return 1.0 ext_value = _get_ext(path) == ".mem" type_value = 0.0 return (3 * ext_value + type_value) / 4 def read(self, path: str, **kwargs) -> xr.Dataset: if path in self._datasets: return self._datasets[path] raise FileNotFoundError(path) def write(self, dataset: xr.Dataset, path: str, **kwargs): self._datasets[path] = dataset def append(self, dataset: xr.Dataset, path: str, **kwargs): if path in self._datasets: old_ds = self._datasets[path] # noinspection PyTypeChecker self._datasets[path] = xr.concat([old_ds, dataset], dim='time', data_vars='minimal', coords='minimal', compat='equals') else: self._datasets[path] = dataset.copy() def update(self, output_path: str, global_attrs: Dict[str, Any] = None, **kwargs): if global_attrs: ds = self._datasets[output_path] ds.attrs.update(global_attrs) class Netcdf4DatasetIO(DatasetIO): """ A dataset I/O that reads from / writes to NetCDF files. """ def __init__(self): super().__init__(FORMAT_NAME_NETCDF4) def fitness(self, path: str, path_type: str = None) -> float: ext = _get_ext(path) ext_value = ext in {'.nc', '.hdf', '.h5'} type_value = 0.0 if path_type == "file": type_value = 1.0 elif path_type is None: type_value = 0.5 else: ext_value = 0.0 return (3 * ext_value + type_value) / 4 def read(self, input_path: str, **kwargs) -> xr.Dataset: return xr.open_dataset(input_path, **kwargs) def write(self, dataset: xr.Dataset, output_path: str, **kwargs): dataset.to_netcdf(output_path) def append(self, dataset: xr.Dataset, output_path: str, **kwargs): import os temp_path = output_path + '' os.rename(output_path, temp_path) old_ds = xr.open_dataset(temp_path) new_ds = xr.concat([old_ds, dataset], dim='time', data_vars='minimal', coords='minimal', compat='equals') # noinspection PyUnresolvedReferences new_ds.to_netcdf(output_path) old_ds.close() rimraf(temp_path) def update(self, output_path: str, global_attrs: Dict[str, Any] = None, **kwargs): if global_attrs: import netCDF4 ds = netCDF4.Dataset(output_path, 'r+') ds.setncatts(global_attrs) ds.close() class ZarrDatasetIO(DatasetIO): """ A dataset I/O that reads from / writes to Zarr directories or archives. """ def __init__(self): super().__init__(FORMAT_NAME_ZARR) def fitness(self, path: str, path_type: str = None) -> float: ext = _get_ext(path) ext_value = 0.0 type_value = 0.0 if ext == ".zarr": ext_value = 1.0 if path_type == "dir": type_value = 1.0 elif path_type == "url" or path_type is None: type_value = 0.5 else: ext_value = 0.0 else: lower_path = path.lower() if lower_path.endswith(""): ext_value = 1.0 if path_type == "file": type_value = 1.0 elif path_type is None: type_value = 0.5 else: ext_value = 0.0 else: if path_type == "dir": type_value = 1.0 elif path_type == "url": type_value = 0.5 return (3 * ext_value + type_value) / 4 def read(self, path: str, s3_kwargs: Dict[str, Any] = None, s3_client_kwargs: Dict[str, Any] = None, max_cache_size: int = None, **kwargs) -> xr.Dataset: """ Read dataset from some Zarr storage. :param path: File path or object storage URL. :param s3_kwargs: if *path* is an object storage URL, keyword-arguments passed to S3 file system, that is ``s3fs.S3FileSystem(**s3_kwargs, ...)``. :param s3_client_kwargs: if *path* is an object storage URL, keyword-arguments passed to S3 (boto3) client, that is ``s3fs.S3FileSystem(..., client_kwargs=s3_client_kwargs)``. :param max_cache_size: if this is a positive integer, the store will be wrapped in an in-memory cache, that is ``store = zarr.LRUStoreCache(store, max_size=max_cache_size)``. :param kwargs: Keyword-arguments passed to xarray Zarr adapter, that is ``xarray.open_zarr(..., **kwargs)``. In addition, the parameter ** :return: """ path_or_store = path consolidated = False if isinstance(path, str): path_or_store, consolidated = get_path_or_s3_store(path_or_store, s3_kwargs=s3_kwargs, s3_client_kwargs=s3_client_kwargs, mode='r') if max_cache_size is not None and max_cache_size > 0: path_or_store = zarr.LRUStoreCache(path_or_store, max_size=max_cache_size) return xr.open_zarr(path_or_store, consolidated=consolidated, **kwargs) def write(self, dataset: xr.Dataset, output_path: str, compressor: Dict[str, Any] = None, chunksizes: Dict[str, int] = None, packing: Dict[str, Dict[str, Any]] = None, s3_kwargs: Dict[str, Any] = None, s3_client_kwargs: Dict[str, Any] = None, **kwargs): path_or_store, consolidated = get_path_or_s3_store(output_path, s3_kwargs=s3_kwargs, s3_client_kwargs=s3_client_kwargs, mode='w') encoding = self._get_write_encodings(dataset, compressor, chunksizes, packing) dataset.to_zarr(path_or_store, mode='w', encoding=encoding, **kwargs) @classmethod def _get_write_encodings(cls, dataset, compressor, chunksizes, packing): encoding = None if chunksizes: encoding = {} for var_name in dataset.data_vars: var = dataset[var_name] chunks: List[int] = [] for i in range(len(var.dims)): dim_name = var.dims[i] if dim_name in chunksizes: chunks.append(chunksizes[dim_name]) else: chunks.append(var.shape[i]) encoding[var_name] = dict(chunks=chunks) if packing: if encoding: for var_name in packing.keys(): if var_name in encoding.keys(): encoding[var_name].update(dict(packing[var_name])) else: encoding[var_name] = dict(packing[var_name]) else: encoding = {} for var_name in packing.keys(): encoding[var_name] = dict(packing[var_name]) if compressor: compressor = zarr.Blosc(**compressor) if encoding: for var_name in encoding.keys(): encoding[var_name].update(compressor=compressor) else: encoding = {var_name: dict(compressor=compressor) for var_name in dataset.data_vars} return encoding def append(self, dataset: xr.Dataset, output_path: str, **kwargs): append_time_slice(output_path, dataset) def insert(self, dataset: xr.Dataset, index: int, output_path: str, **kwargs): insert_time_slice(output_path, index, dataset) def replace(self, dataset: xr.Dataset, index: int, output_path: str, **kwargs): replace_time_slice(output_path, index, dataset) def update(self, output_path: str, global_attrs: Dict[str, Any] = None, **kwargs): if global_attrs: import zarr ds = zarr.open_group(output_path, mode='r+', **kwargs) ds.attrs.update(global_attrs) # noinspection PyAbstractClass class CsvDatasetIO(DatasetIO): """ A dataset I/O that reads from / writes to CSV files. """ def __init__(self): super().__init__(FORMAT_NAME_CSV) def fitness(self, path: str, path_type: str = None) -> float: if path_type == "dir": return 0.0 ext = _get_ext(path) ext_value = {".csv": 1.0, ".txt": 0.5, ".dat": 0.2}.get(ext, 0.1) type_value = {"file": 1.0, "url": 0.5, None: 0.5}.get(path_type, 0.0) return (3 * ext_value + type_value) / 4 def read(self, path: str, **kwargs) -> xr.Dataset: return xr.Dataset.from_dataframe(pd.read_csv(path, **kwargs)) def write(self, dataset: xr.Dataset, output_path: str, **kwargs): dataset.to_dataframe().to_csv(output_path, **kwargs) def rimraf(path): """ The UNIX command `rm -rf` for xcube. Recursively remove directory or single file. :param path: directory or single file """ if os.path.isdir(path): try: shutil.rmtree(path, ignore_errors=False) except OSError: warnings.warn(f"failed to remove file {path}") elif os.path.isfile(path): try: os.remove(path) except OSError: warnings.warn(f'failed to remove file {path}') pass def get_path_or_s3_store(path_or_url: str, s3_kwargs: Mapping[str, Any] = None, s3_client_kwargs: Mapping[str, Any] = None, mode: str = 'r') -> Tuple[Union[str, Dict], bool]: """ If *path_or_url* is an object storage URL, return a object storage Zarr store (mapping object) using *s3_client_kwargs* and *mode* and a flag indicating whether the Zarr datasets is consolidated. Otherwise *path_or_url* is interpreted as a local file system path, retured as-is plus a flag indicating whether the Zarr datasets is consolidated. :param path_or_url: A path or a URL. :param s3_kwargs: keyword arguments for S3 file system. :param s3_client_kwargs: keyword arguments for S3 boto3 client. :param mode: "r" or "w" :return: A tuple (path_or_obs_store, consolidated). """ if is_s3_url(path_or_url) or s3_kwargs is not None or s3_client_kwargs is not None: root, s3_kwargs, s3_client_kwargs = parse_s3_url_and_kwargs(path_or_url, s3_kwargs=s3_kwargs, s3_client_kwargs=s3_client_kwargs) s3 = s3fs.S3FileSystem(**s3_kwargs, client_kwargs=s3_client_kwargs) consolidated = mode == "r" and s3.exists(f'{root}/.zmetadata') return s3fs.S3Map(root=root, s3=s3, check=False, create=mode == "w"), consolidated else: consolidated = os.path.exists(os.path.join(path_or_url, '.zmetadata')) return path_or_url, consolidated def parse_s3_url_and_kwargs(s3_url: str, s3_kwargs: Mapping[str, Any] = None, s3_client_kwargs: Mapping[str, Any] = None) \ -> Tuple[str, Dict[str, Any], Dict[str, Any]]: """ Parses *obs_url*, *s3_kwargs*, *s3_client_kwargs* and returns a new tuple (*root*, *s3_kwargs*, *s3_client_kwargs*) with updated kwargs whose elements can be passed to the s3fs.S3FileSystem and s3fs.S3Map constructors as follows::: obs_fs = s3fs.S3FileSystem(**s3_kwargs, client_kwargs=s3_client_kwargs) obs_map = s3fs.S3Map(root=root, s3=obs_fs) :param s3_url: Object storage URL, e.g. "s3://bucket/root", or "". :param s3_kwargs: keyword arguments for S3 file system. :param s3_client_kwargs: keyword arguments for S3 boto3 client. :return: A tuple (root, s3_kwargs, s3_client_kwargs). """ endpoint_url, root = split_s3_url(s3_url) new_s3_client_kwargs = dict(s3_client_kwargs) if s3_client_kwargs else dict() if endpoint_url: new_s3_client_kwargs['endpoint_url'] = endpoint_url # The following key + secret kwargs are no longer supported in client_kwargs and are now moved into s3_kwargs key = secret = None if 'provider_access_key_id' in new_s3_client_kwargs: key = new_s3_client_kwargs.pop('provider_access_key_id') if 'aws_access_key_id' in new_s3_client_kwargs: key = new_s3_client_kwargs.pop('aws_access_key_id') if 'provider_secret_access_key' in new_s3_client_kwargs: secret = new_s3_client_kwargs.pop('provider_secret_access_key') if 'aws_secret_access_key' in new_s3_client_kwargs: secret = new_s3_client_kwargs.pop('aws_secret_access_key') new_s3_kwargs = dict(key=key, secret=secret) if s3_kwargs: new_s3_kwargs.update(**s3_kwargs) return root, new_s3_kwargs, new_s3_client_kwargs def split_s3_url(path: str) -> Tuple[Optional[str], str]: """ If *path* is a URL, return tuple (endpoint_url, root), otherwise (None, *path*) """ url = urllib3.util.parse_url(path) if all((url.scheme,, url.path)) and url.scheme != 's3': if url.port is not None: endpoint_url = f'{url.scheme}://{}:{url.port}' else: endpoint_url = f'{url.scheme}://{}' root = url.path if root.startswith('/'): root = root[1:] return endpoint_url, root return None, path def is_s3_url(path_or_url: str) -> bool: """ Test if *path_or_url* is a potential object storage URL. :param path_or_url: Path or URL to test. :return: True, if so. """ return path_or_url.startswith("https://") \ or path_or_url.startswith("http://") \ or path_or_url.startswith("s3://")