Source code for input.manager
"""Defines the InputManager class for finding, managing, preprocessing and loading input data for ValEnsPy."""
from functools import partial
from pathlib import Path
from valenspy.input.converter import INPUT_CONVERTORS
from valenspy.input.catalog import ValenspyEsmDatastore
from valenspy.input.esm_catalog_builder import CatalogBuilder, CATALOG_COLS
from valenspy._utilities import load_yml
esmcat_default_data = {
"id" : "test",
"esmcat_version": "0.1.0", # intake-esm JSON file structure version, as per: https://github.com/NCAR/esm-collection-spec
"assets": {"column_name": "path", "format": "netcdf"},
"aggregation_control": {
"variable_column_name": "variable_id",
"groupby_attrs": CATALOG_COLS["required_identifiers"] + CATALOG_COLS["required_identifiers_with_default"],
"aggregations": [
{
"type": "join_existing",
"attribute_name": "time_period_start",
"options": {"dim": "time"},
},
{
"type": "union",
"attribute_name": "variable_id"},
],
},
"attributes": [],
}
#TODO: Inhert from both ValenspyEsmDatastore and CatalogBuilder? - this may allow:
# - default values for to_dataset_dict methods for preprocess, xarray_open_kwargs and xarray_combine_by_coords_kwargs
# - No need for the __getattr__ method
[docs]
class InputManager:
"""A class to find, manage, preprocess and load input data for ValEnsPy.
The InputManager class consists of an ValEnsPy specific intake-esm catalog (ValenspyEsmDatastore) and a CatalogBuilder.
The Catalog Builder is used to create the catalog, a df with dataset information per file, using minimal information about the datasets and their path structure.
This catalog is then used to create an esm_datastore (ValenspyEsmDatastore) which can be used to search and load the datasets.
The InputManager class provides a preprocessing function based on the input convertors to convert the datasets to ValEnsPy
"""
[docs]
def __init__(
self,
machine : str,
datasets_info : dict = None,
description=None,
input_convertors : dict = INPUT_CONVERTORS,
esmcat_data : dict = esmcat_default_data,
xarray_open_kwargs : dict = {},
xarray_combine_by_coords_kwargs : dict = {},
intake_esm_kwargs : dict = {"sep": "/"}
):
"""
Initialize an InputManager.
Parameters
----------
machine : str
The name of the catalog. If dataset_info is not passed it will be used to load the dataset_info from the built-in dataset_paths.yaml file.
datasets_info : dict | str, optional
A dictionary containing datasets and their dataset information needed to build the catalog. This can be a dictionary or a path to a YAML file.
Default is None. If None, the built-in dataset info for the provided catalog_id is used if it exists.
The dictionary should contain dataset names as keys and their dataset information as values.
The datasetinfo should contain the following keys:
- root: The root directory of the dataset.
- pattern: The regex pattern for matching files in the dataset. This is the reletave path starting from the root and in the following format:
<indentifier_name>/<indentifier_name>/<indentifier_name>_fixed_part_<variable_id>/<another_identifier>_<year>.nc
- meta_data: A dictionary containing metadata for the dataset.
description : str, optional
A description of the catalog. Default is None. This is used to create the description in the intake-esm catalog.
input_convertors : dict, optional
A dictionary containing input convertors for the datasets. The keys are dataset names and the values are functions that convert the dataset to ValEnsPy compliant format.
Default is INPUT_CONVERTORS, the set of predefined input convertors.
esmcat_data : dict, optional
A dictionary containing the description of esmcat data to create the intake-esm catalog. Default is esmcat_default_data. See :func:`intake_esm.esm_datastore.from_dict` for more information.
xarray_open_kwargs : dict, optional
A dictionary containing default arguments to pass to xarray_open_kwargs in :func:`intake_esm.esm_datastore.to_dataset_dict`, :func:`intake_esm.esm_datastore.to_dask` and/or :func:`intake_esm.esm_datastore.to_datatree`.
xarray_combine_by_coords_kwargs : dict, optional
A dictionary containing default arguments to pass to xarray_combine_by_coords_kwargs in :func:`intake_esm.esm_datastore.to_dataset_dict`, :func:`intake_esm.esm_datastore.to_dask` and/or :func:`intake_esm.esm_datastore.to_datatree`.
intake_esm_kwargs : dict, optional
A dictionary containing additional arguments for the creation of the intake_esm catalog. Default is an empty dictionary. See :func:`intake_esm.esm_datastore` for more information.
"""
self.catalog_builder = CatalogBuilder(
catalog_id=machine,
datasets_info=datasets_info
)
if self.catalog_builder.skipped_files:
print("There are files that were skipped during the catalog creation. Please check the skipped_files property for a list of skipped files.")
#Collection of functions to preprocess the data so that they are ValEnsPy compliant
self.input_convertors = input_convertors
if description:
esmcat_data["description"] = description
self.esm_datastore = ValenspyEsmDatastore(
obj={"esmcat": esmcat_data, "df": self.catalog_builder.df},
**intake_esm_kwargs
)
#Options for xarray.open_dataset and xarray.combine_by_coords
self.xarray_open_kwargs = xarray_open_kwargs
self.xarray_combine_by_coords_kwargs = xarray_combine_by_coords_kwargs
def __getattr__(self, name):
"""
Delegate attribute access to the esm_datastore instance (self.esm_datastore).
This allows all methods and attributes of esm_datastore to be accessed
directly from the InputManager instance.
"""
if hasattr(self.esm_datastore, name):
return getattr(self.esm_datastore, name)
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
# @classmethod
# def from_yaml(cls, yaml_path, intake_esm_kwargs : dict = {}):
# """
# Create an InputManager instance from a YAML file.
# Parameters
# ----------
# yaml_path : str
# The path to the YAML file containing dataset information. It should contain:
# - root: The root directory of the dataset.
# - pattern: The regex pattern for matching files in the dataset. This is the reletave path starting from the root and in the following format:
# <indentifier_name>/<indentifier_name>/<indentifier_name>_fixed_part_<variable_id>/<another_identifier>_<year>.nc
# - meta_data: A dictionary containing metadata for the dataset.
# intake_esm_kwargs : dict
# A dictionary containing additional arguments for the intake_esm catalog. Default is an empty dictionary. See the intake_esm documentation for more information.
# """
# # Load the YAML file
# datasets_info = load_yml(yaml_path)
# # Create an instance of InputManager
# return cls(machine=None, dataset_info=datasets_info, intake_esm_kwargs=intake_esm_kwargs)
@property
def skipped_files(self):
"""
The files that where skipped during the catalog creation.
"""
return self.catalog_builder.skipped_files
@property
def intake_to_xarray_kwargs(self):
"""
Easy access of kwargs to be used passed that can be passed to :func:`intake_esm.esm_datastore.to_dataset_dict`, :func:`intake_esm.esm_datastore.to_dask` and/or :func:`intake_esm.esm_datastore.to_datatree`
Three types of kwargs are created:
- preprocess: The preprocessor function which applies the input convertor to the dataset if an input convertor exists (i.e. source_id is in INPUT_CONVERTORS).
- xarray_open_kwargs: The kwargs to be passed to xarray.open_dataset.
- xarray_combine_by_coords_kwargs: The kwargs to be passed to xarray.combine_by_coords.
"""
kwargs = {
"preprocess": self.preprocess,}
if self.xarray_open_kwargs:
kwargs["xarray_open_kwargs"] = self.xarray_open_kwargs
if self.xarray_combine_by_coords_kwargs:
kwargs["xarray_combine_by_coords_kwargs"] = self.xarray_combine_by_coords_kwargs
return kwargs
[docs]
def add_input_convertor(self, dataset_name, input_convertor):
"""Add an input convertor to the InputManager."""
self.input_convertors[dataset_name] = input_convertor
[docs]
def _update_catalog(self, dataset_name, dataset_info_dict):
"""
Update the catalog (df with dataset information per file) with a new dataset.
The catalog_builder is used to parse the dataset information and update the catalog.
Parameters
----------
dataset_info_dict : dict
A dictionary containing dataset information. The keys are dataset names and the values are dictionaries with the following keys:
- root: The root directory of the dataset.
- pattern: The regex pattern for matching files in the dataset.
- meta_data: A dictionary containing metadata for the dataset.
"""
self.catalog_builder.add_dataset(
dataset_name,
dataset_info_dict
)
[docs]
def update_catalog_from_yaml(self, yaml_path):
"""
Update the catalog from a YAML file.
For each dataset, parse the dataset information, validate it, add it to the catalog and update the esm_datastore.
Parameters
----------
yaml_path : Path
The path to the YAML file containing datasets information a dictionary of dataset names and their dataset information.
The datasetinfo should contain the following keys:
- root: The root directory of the dataset.
- pattern: The regex pattern for matching files in the dataset. This is the reletave path starting from the root and in the following format:
<indentifier_name>/<indentifier_name>/<indentifier_name>_fixed_part_<variable_id>/<another_identifier>_<year>.nc
- meta_data: A dictionary containing metadata for the dataset.
"""
# Load the YAML file
datasets_info = load_yml(yaml_path)
for dataset_name, dataset_info in datasets_info.items():
# Add the new dataset to the catalog
self._update_catalog(dataset_name, dataset_info)
self.catalog_builder._validate_dataset_info()
self.esm_datastore.esmcat._df = self.catalog_builder.df
[docs]
def update_catalog_from_dataset_info(self, dataset_name, dataset_root_dir, dataset_pattern, metadata={}):
"""
Update the catalog with a new dataset.
For the dataset, parse the dataset information, validate it, add it to the catalog and update the esm_datastore.
Parameters
----------
dataset_name : str
The name of the dataset.
dataset_root_dir : str
The root directory of the dataset.
dataset_pattern : str
The regex pattern for matching files in the dataset. This is the reletave path starting from the root and in the following format:
<indentifier_name>/<indentifier_name>/<indentifier_name>_fixed_part_<variable_id>/<another_identifier>_<year>.nc
metadata : dict, optional
Additional metadata to include in the catalog. Default is an empty dictionary.
"""
dataset_info = {
"root": dataset_root_dir,
"pattern": dataset_pattern,
"meta_data": metadata,
}
self._update_catalog(dataset_name, dataset_info)
self.catalog_builder._validate_dataset_info()
self.esm_datastore.esmcat._df = self.catalog_builder.df
@property
def preprocess(self):
"""
A preprocessor function to convert the input dataset to ValEnsPy compliant data.
This function applys the input convertor to the dataset if an input convertor exists (i.e. source_id is in this managers input convertors).
"""
def process_IC(ds, IC_dict, df):
file_name = ds.encoding["source"]
source_id = df[df["path"] == Path(file_name)]["source_id"].values[0]
if source_id in IC_dict:
return IC_dict[source_id](ds)
else:
return ds
return partial(process_IC, IC_dict=self.input_convertors, df=self.esm_datastore.df)