Source code for input.catalog

import pandas as pd
import intake_esm

from valenspy._utilities._formatting import parse_string_to_time_period

[docs] class ValenspyEsmDatastore(intake_esm.esm_datastore): """ Subclass of intake_esm.ESMDataStore for ValEnsPy. This extends the ESMDataStore class with a adittional search functionality for time based searching using the time_period column. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def search(self, require_all_on: str | list[str] | None = None, **query): """ Search for entries in the catalog. Standard search function of the intake_esm.esm_datastore class extended with time based searching based on the time_period column. Parameters ---------- require_all_on : str, optional A dataframe column or a list of dataframe columns across which all entries must satisfy the query criteria. If None, return entries that fulfill any of the criteria specified in the query, by default None. **query: keyword arguments corresponding to user's query to execute against the dataframe. See Also -------- :func:`intake_esm.esm_datastore.search` """ time_query = query.pop("time_period", None) if len(query) == 0: cat = self.__class__({"esmcat": self.esmcat.dict(), "df": self.esmcat._df}) else: cat = super().search(require_all_on=require_all_on, **query) if time_query: df = cat.esmcat.df if isinstance(time_query, str): start, end = parse_string_to_time_period(time_query) elif isinstance(time_query, list): start, _ = parse_string_to_time_period(time_query[0]) _, end = parse_string_to_time_period(time_query[1]) else: raise ValueError("time_period should be a string or a list of strings") #Filter keeping files which cover a period which overlaps with the time_period df = df[(pd.to_datetime(df["time_period_start"]) <= end) & (start <= pd.to_datetime(df["time_period_end"]))] cat.esmcat._df = df return cat
[docs] def to_datatree(self, levels: list[str] = None, **kwargs): #Hack to avoid deepcopy the esmcat object which breaks the search function? Dont know why #Probably when updating the catalog the df is not really updated #Maybe overwrite the deepcopy function? if levels: self.esmcat.aggregation_control.groupby_attrs, old_agg = levels, self.esmcat.aggregation_control.groupby_attrs dt = super().to_datatree(**kwargs) if levels: self.esmcat.aggregation_control.groupby_attrs = old_agg return dt