Source code for input.esm_catalog_builder

from pathlib import Path
import pandas as pd
import re
import os
import warnings

from valenspy.input.converter import INPUT_CONVERTORS
from valenspy._utilities import load_yml, create_named_regex, parse_string_to_time_period

DATASET_PATHS = load_yml("dataset_info")
CORDEX_VARIABLES = load_yml("CORDEX_variables")

CATALOG_COLS = {
#Required for unique identification of the dataset
# - each unique combination of these columns should uniquely define an xarray dataset
"required_identifiers" :[
    "source_id",
    "source_type",
    "domain_id",
    "experiment_id",
    "version",
    "resolution",
    "frequency",
],
#Required but default values are used if not relevant
"required_identifiers_with_default" : [
    "driving_source_id",
    "institution_id",
    "realization",
    "post_processing",
],
"filtering_identifiers" : [
    #Filtering identifiers within a unique dataset allowing to limit the number of files to load
    "variable_id",
    "time_period_start", #Note that time_period_start and time_period_end will be created from time_period or (time_period_start and time_period_end) and time_format
    "time_period_end",
]
}

[docs] class CatalogBuilder:
[docs] def __init__(self, catalog_id, datasets_info : dict | str = None): """ Initialize the CatalogBuilder with a catalog ID and dataset information. Parameters ---------- catalog_id : str The ID of the catalog. If dataset_info is not provided, this will be used to load pre-existing dataset_info from ValEnsPy if it exists. datasets_info : dict | str, optional A dictionary containing datasets and their dataset information needed to build the catalog. This can be a dictionary or a path to a YAML file. Default is None. If None, the built-in dataset info for the provided catalog_id is used if it exists. The dictionary should contain dataset names as keys and their dataset information as values. The datasetinfo should contain the following keys: - root: The root directory of the dataset. - pattern: The regex pattern for matching files in the dataset. This is the reletave path starting from the root and in the following format: <indentifier_name>/<indentifier_name>/<indentifier_name>_fixed_part_<variable_id>/<another_identifier>_<year>.nc - meta_data: A dictionary containing metadata for the dataset. """ self.catalog_id = catalog_id if datasets_info: if isinstance(datasets_info, str): datasets_info = load_yml(datasets_info) self.datasets_info = datasets_info else: self.datasets_info = DATASET_PATHS[catalog_id] self._validate_dataset_info() self.skipped_files = {} self.df = self.create_df()
[docs] def add_dataset(self, dataset_name, dataset_info): """ Update the dataset information for a specific dataset. Parameters ---------- dataset_name : str The name of the dataset to update. dataset_info : dict The new dataset information to update. """ #TODO - check if the dataset_name is already in the datasets_info self.datasets_info[dataset_name] = dataset_info new_data = self._process_dataset_for_catalog(dataset_name, dataset_info) # Cast the variable_id column as a list - Hack needed for intake_esm catalog to garantee has_multiple_variable_assets in its current version new_df = pd.DataFrame(new_data) new_df["variable_id"] = new_df["variable_id"].apply(lambda x: x if isinstance(x, list) else [x]) self.df = pd.concat([self.df, new_df], ignore_index=True)
[docs] def _validate_dataset_info(self): """Validate the dataset information to ensure all required identifiers are present.""" required_identifiers = CATALOG_COLS["required_identifiers"] filtering_identifiers = CATALOG_COLS["filtering_identifiers"] for dataset_name, dataset_info in self.datasets_info.items(): key_set = set(dataset_info.get("meta_data", {}).keys()) pattern = dataset_info.get("pattern", None) if pattern: for key in re.findall(r"<(.*?)>", pattern): key_set.add(key) # Check if all required identifiers are present for identifier in required_identifiers: if identifier not in key_set: warnings.warn(f"Dataset {dataset_name} is missing the required identifier '{identifier}' in its pattern or metadata.") # Check if all required identifiers for filtering are present for identifier in filtering_identifiers: if identifier == "time_period_start" or identifier == "time_period_end": #This is checked seperately continue if identifier not in key_set: warnings.warn(f"Dataset {dataset_name} is missing the required identifier '{identifier}' for filtering in its pattern or metadata.") # Check if time_period or time_period_start/time_period_end is present if ("time_period" not in key_set) and ("time_period_start" not in key_set and "time_period_end" not in key_set): warnings.warn(f"Dataset {dataset_name} is missing the required identifier 'time_period' or 'time_period_start/time_period_end' in its pattern or metadata.")
[docs] def create_df(self): """ Create a catalog by scanning dataset paths and extracting metadata. """ files_with_metadata = [] for dataset_name, dataset_info in self.datasets_info.items(): # Process the dataset and extract metadata print(f"Processing dataset: {dataset_name}") grouped_files_with_metadata = self._process_dataset_for_catalog(dataset_name, dataset_info) # Add the dataset name to the metadata files_with_metadata.extend(grouped_files_with_metadata) # Create a DataFrame and save it as a CSV df = pd.DataFrame(files_with_metadata) # Cast the variable_id column as a list - Hack needed for intake_esm catalog to have has_multiple_variable_assets in its current version df["variable_id"] = df["variable_id"].apply(lambda x: x if isinstance(x, list) else [x]) return df
[docs] def _process_dataset_for_catalog(self, dataset_name, dataset_info): """ Process all files in a dataset and extract metadata for each file. Given a dataset name and its information, this function parses every file in the dataset, returning the metadata parsed from the file name and the dataset level metadata. Parameters ---------- dataset_name : str The name of the dataset to process. dataset_info : dict The dataset information containing the root directory, regex pattern, and metadata. Returns ------- list A list of dictionaries containing metadata for each file in the dataset. """ dataset_root = Path(dataset_info.get("root")) regex_pattern = create_named_regex(dataset_info.get("pattern", None)) regex = re.compile(dataset_root.as_posix() + r"/" + regex_pattern) dataset_meta_data = dataset_info.get("meta_data", {}) IC = INPUT_CONVERTORS.get(dataset_name, None) if IC: CORDEX_variable_set = IC.cordex_variables variable_set = IC.raw_variables long_name_set = IC.raw_variables_long_names files_with_metadata = [] for root, _, files in os.walk(dataset_root): for file in files: if file.endswith(".nc"): file_path = os.path.join(root, file) if match := regex.match(file_path): file_metadata = match.groupdict() else: #Add the skipped file to the skipped files dictionary (create the entry if it does not exist) if dataset_name not in self.skipped_files: self.skipped_files[dataset_name] = [] self.skipped_files[dataset_name].append(file_path) continue # Add the file path to the metadata file_metadata["path"] = Path(file_path) # Add dataset level metadata file_metadata = {**dataset_meta_data, **file_metadata} # Translate the variable_id to the CORDEX variable name (if possible) if IC: variable_id = file_metadata.get("variable_id") if not variable_id: file_metadata["raw_variable_id"] = list(variable_set) file_metadata["variable_id"] = list(CORDEX_variable_set) elif variable_id in variable_set or variable_id in long_name_set: file_metadata["raw_variable_id"] = variable_id file_metadata["variable_id"] = IC.get_CORDEX_variable(variable_id) #Create time_period attribute using time_period or time_period_start/time_period_end and the time_format time_period = file_metadata.pop("time_period", None) time_period_start = file_metadata.pop("time_period_start", None) time_period_end = file_metadata.pop("time_period_end", None) time_format = file_metadata.pop("time_format", None) if time_period and not (time_period_start or time_period_end): start, end = parse_string_to_time_period(time_period, format=time_format) elif time_period_start and time_period_end: start,_ = parse_string_to_time_period(time_period_start, format=time_format) #The earliest timestamp of that period, e.g. 2024 -> 2024-01-01 00:00:00 _,end = parse_string_to_time_period(time_period_end, format=time_format) #The latest timestamp of that period, e.g. 2024 -> 2024-12-31 23:59:59 else: start, end = None, None if start and end: file_metadata["time_period_start"] = start file_metadata["time_period_end"] = end else: if dataset_name not in self.skipped_files: self.skipped_files[dataset_name] = [] self.skipped_files[dataset_name].append(file_path) continue for key in CATALOG_COLS["required_identifiers_with_default"]: if key not in file_metadata: file_metadata[key] = "default" files_with_metadata.append(file_metadata) return files_with_metadata