Skip to content

Commit

Permalink
ADD: more project-agnostic (CMIP3 and CMIP5 tested) (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
nocollier authored Mar 4, 2024
1 parent 4406b92 commit 76fdb8e
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 187 deletions.
163 changes: 135 additions & 28 deletions intake_esgf/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""General functions used in various parts of intake-esgf."""

import hashlib
import logging
import re
import time
from functools import partial
from pathlib import Path
from typing import Any, Union
from typing import Any, Literal, Union

import pandas as pd
import requests
Expand All @@ -28,41 +29,22 @@
bar_format = "{desc:>20}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]"


def get_dataset_pattern() -> str:
"""Return the dataset id regular expression pattern.
This function is used to get facet information in a dictionary from the dataset_id.
"""
COLUMNS = [
"mip_era",
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"member_id",
"table_id",
"variable_id",
"grid_label",
"version",
"data_node",
]
pattern = r"\.".join([rf"(?P<{c}>\S[^.|]+)" for c in COLUMNS[:-1]])
pattern += rf"\|(?P<{COLUMNS[-1]}>\S+)"
return pattern


def combine_results(dfs: list[pd.DataFrame]) -> pd.DataFrame:
"""Return a combined dataframe where ids are now a list."""
# combine and remove duplicate entries
logger = setup_logging()
combine_time = time.time()
df = pd.concat(dfs).drop_duplicates(subset="id").reset_index(drop=True)
df = pd.concat(dfs)
if len(df) == 0:
logger.info("\x1b[36;32msearch end \x1b[91;20mno results\033[0m")
raise NoSearchResults()
combine_time = time.time()
variable_facet = get_facet_by_type(df, "variable")
df = df.drop_duplicates(subset=[variable_facet, "id"]).reset_index(drop=True)
# now convert groups to list
for _, grp in df.groupby(list(df.columns[:-3])):
group_cols = [
col for col in df.columns if col not in ["version", "data_node", "id"]
]
for _, grp in df.groupby(group_cols, dropna=False):
df = df.drop(grp.iloc[1:].index)
df.loc[grp.index[0], "id"] = grp.id.to_list()
df = df.drop(columns="data_node")
Expand Down Expand Up @@ -347,3 +329,128 @@ def get_cell_measure(var: str, ds: xr.Dataset) -> Union[xr.DataArray, None]:
ds[vid] *= 0.01
measure *= ds[vid]
return measure


def get_dataframe_columns(content: dict[str, Any]) -> list[str]:
"""Get the columns to be populated in a pandas dataframe.
We determine the columns that will be part of the search dataframe by parsing out
facets from the `dataset_id_template_` found in the query response. We look for
facets between the sentinels `%(...)s` and then assume that they will have values in
the response. CMIP5 has many inconsistencies and so we hard code it here. We also
postpend `version` and `data_node` to the facets. Any facets that do not appear in
the content will show up as `nan` in the dataframe.
Parameters
----------
content
The content (Globus) or document (Solr) returned from the query.
"""

# CMIP5 is a disaster so...
if "project" in content and content["project"][0] == "CMIP5":
return [
"institute",
"model",
"experiment",
"time_frequency",
"realm",
"cmor_table",
"ensemble",
"variable",
"version",
"data_node",
]
# ...everything else (so far) behaves nicely so...
if "dataset_id_template_" not in content:
raise ValueError(f"No `dataset_id_template_` in {content[id]}")
columns = re.findall(
r"%\((\w+)\)s",
content["dataset_id_template_"][0],
)
columns = list(set(columns).union(["version", "data_node"]))
return columns


def expand_cmip5_record(
search_vars: list[str], content_vars: list[str], record: dict[str, Any]
) -> list[dict[str, Any]]:
"""Expand the CMIP5 record to include variables."""
assert record["project"] == "CMIP5"
variables = list(set(search_vars).intersection(content_vars))
if not variables:
variables = content_vars.copy()
records = []
for var in variables:
r = record.copy()
r["variable"] = var
records.append(r)
return records


def get_facet_by_type(
df: pd.DataFrame, ftype: Literal["variable", "model", "variant", "grid"]
) -> str:
"""Get the facet name by the type.
Across projects, facets may have different names but serve similar functions. Here
we provide a method of defining equivalence so functions like `model_groups()` can
work for all projects. We may have to expand this collection or make this a more
general and public function.
"""
possible = {
"variable": ["variable", "variable_id"],
"model": ["model", "source_id"],
"variant": ["ensemble", "ensemble_member", "member_id", "variant_label"],
"grid": ["grid", "grid_label", "grid_resolution"],
}
facet = [col for col in df.columns if col in possible[ftype]]
if not facet:
raise ValueError(f"Could not find a '{ftype}' facet in {list(df.columns)}")
if len(facet) > 1: # relax this to handle multi-project searches
raise ValueError(
f"Ambiguous '{ftype}' facet in {list(df.columns)}, found {facet}"
)
return facet[0]


def get_content_path(content: dict[str, Any]) -> Path:
"""Get the local path where the data is to be stored.
In CMIP6 we get a directory template, we just fill in values from the content. In
older projects we do not, but can search for the project name and grab all the text
following it. In the end, as long as we are consistent it does not matter.
"""

def _form_from_template(content) -> Path:
template = re.findall(r"%\((\w+)\)s", content["directory_format_template_"][0])
template = [
content[t][0] if isinstance(content[t], list) else content[t]
for t in template
if t in content
]
return Path("/".join(template)) / content["title"]

# the file `_version_` is not the same as the dataset `version` so we parse it out
# of the `dataset_id`
content["version"] = [content["dataset_id"].split("|")[0].split(".")[-1]]
if "directory_format_template_" in content:
return _form_from_template(content)

# otherwise we look for the project text in the url and return everything following
# it
urls = [url for url in content["url"] if "application/netcdf|HTTPServer"]
project = (
content["project"][0]
if isinstance(content["project"], list)
else content["project"]
)
if not urls:
raise ValueError(f"Could not find a http link in {content['url']}")
match = re.search(rf".*({project.lower()}.*.nc)|.*", urls[0])
if not match:
raise ValueError(f"Could not parse out the path from {urls[0]}")
# try to fix records with case-insensitive paths
path = match.group(1).replace(project.lower(), project)
return Path(path)
Loading

0 comments on commit 76fdb8e

Please sign in to comment.