This repository has been archived by the owner on Nov 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ega_meta_mirror.py
173 lines (144 loc) · 6.69 KB
/
ega_meta_mirror.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""Mirror EGA Metadata using the REST endpoints.
Starting with the list of datasets mirror all data related to that specific dataset.
"""
from typing import Generator, Iterable, Dict, Union
import requests
import itertools
import json
import logging
from pathlib import Path
import click
import sys
FORMAT = '[%(asctime)s][%(name)s][%(process)d %(processName)s][%(levelname)-8s] (L:%(lineno)s) %(funcName)s: %(message)s'
logging.basicConfig(format=FORMAT, datefmt='%Y-%m-%d %H:%M:%S')
logging.StreamHandler(sys.stdout)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
BASE_URL = 'https://ega-archive.org/metadata/v2/'
ENDPOINTS = ["analyses", "dacs", "runs", "samples", "studies", "files"]
SESSION = requests.Session()
def get_policy(policy_id: str) -> Union[Dict, None]:
"""Retrieve data by object type and dataset ID.
We are expecting always one policy
"""
r = SESSION.get(f'{BASE_URL}policies/{policy_id}')
if r.status_code == 200:
response = r.json()
if response["response"]["numTotalResults"] == 1:
LOG.info(f"Retrieving policy {policy_id}.")
return response["response"]["result"]
else:
LOG.error(f"we got more thatn 1 policy")
return None
else:
LOG.error(f"Error retrieving policy {policy_id}. API call returned a {r.status_code}")
return None
def get_dataset_object(data_type: str, dataset_id: str) -> Generator:
"""Retrieve data by object type and dataset ID."""
skip: int = 0
limit: int = 10
has_more = True
payload = {"queryBy": "dataset",
"queryId": dataset_id,
"skip": str(skip),
"limit": str(limit)}
while has_more:
r = SESSION.get(f'{BASE_URL}{data_type}', params=payload)
if r.status_code == 200:
response = r.json()
results_nb = int(response["response"]["numTotalResults"])
LOG.info(f"Retrieving {limit} {data_type} for {dataset_id} from {results_nb} results.")
for res in response["response"]["result"]:
yield res
if results_nb >= limit:
limit += 10
skip += 10
else:
has_more = False
else:
LOG.error(f"Error retrieving {data_type} for {dataset_id}. API call returned a {r.status_code}")
has_more = False
def get_datasets(start_limit: int = 0, defined_limit: int = 10) -> Generator:
"""Retrieve datasets from EGA."""
skip: int = start_limit
limit: int = defined_limit
has_more = True
payload = {"skip": str(skip),
"limit": str(limit)}
while has_more:
r = SESSION.get(f'{BASE_URL}datasets', params=payload)
if r.status_code == 200:
response = r.json()
results_nb = int(response["response"]["numTotalResults"])
LOG.info(f"Retrieving {defined_limit} from {results_nb} results starting from {start_limit} results.")
for res in response["response"]["result"]:
yield res
if results_nb >= limit and not defined_limit:
limit += 10
skip += 10
else:
has_more = False
else:
LOG.error(f"Error retrieving datasets. API call returned a {r.status_code}")
has_more = False
def get_dataset_objects(dataset_id: str) -> Generator:
"""Retrieve information associated to dataset."""
raw_events: Iterable = iter(())
for endpoint in ENDPOINTS:
LOG.info(f"Processing {endpoint} for {dataset_id} ...")
yield itertools.chain(raw_events, get_dataset_object(endpoint, dataset_id))
def mirror_dataset(dataset: str) -> None:
"""Write data to JSON file."""
if not dataset.startswith('EGAD'):
raise ValueError(f"{dataset} does not appear to be a valid EGA dataset.")
r = SESSION.get(f'{BASE_URL}datasets/{dataset}')
if r.status_code == 200:
response = r.json()
LOG.info(f"Retrieving dataset {dataset}.")
LOG.info(f"Processing {dataset} ...")
Path(dataset).mkdir(parents=True, exist_ok=True)
with open(f'{dataset}/dataset_{dataset}.json', 'w') as datasetfile:
json.dump(response["response"]["result"][0], datasetfile, indent=4)
objects = get_dataset_objects(dataset)
with open(f'{dataset}/data_{dataset}_policy.json', 'w') as policy_datafile:
policy_data = get_policy(response["response"]["result"][0]["policyStableId"])
json.dump(policy_data, policy_datafile, indent=4)
for idx, val in enumerate(objects):
with open(f'{dataset}/data_{dataset}_{ENDPOINTS[idx]}.json', 'w') as datafile:
the_data = list(val)
json.dump(the_data, datafile, indent=4)
def mirror_pipeline(start: int = 0, limit: int = 1) -> None:
"""Build pipeline to mirror metadata."""
datasets = get_datasets(start_limit=start, defined_limit=limit)
objects: Iterable = iter(())
for dataset in datasets:
LOG.info(f"Processing {dataset['egaStableId']} ...")
Path(dataset["egaStableId"]).mkdir(parents=True, exist_ok=True)
with open(f'{dataset["egaStableId"]}/dataset_{dataset["egaStableId"]}.json', 'w') as datasetfile:
json.dump(dataset, datasetfile, indent=4)
objects = get_dataset_objects(dataset["egaStableId"])
with open(f'{dataset["egaStableId"]}/data_{dataset["egaStableId"]}_policy.json', 'w') as policy_datafile:
policy_data = get_policy(dataset["policyStableId"])
json.dump(policy_data, policy_datafile, indent=4)
for idx, val in enumerate(objects):
with open(f'{dataset["egaStableId"]}/data_{dataset["egaStableId"]}_{ENDPOINTS[idx]}.json', 'w') as datafile:
the_data = list(val)
json.dump(the_data, datafile, indent=4)
@click.command()
@click.option('-l', '--limit-results', default=1, help='Number of results.')
@click.option('-s', '--skip-results', default=0, help="Skip the first n results.")
@click.option('-d', '--dataset', help="Download a specific dataset, will ignore limit and skip options.")
def cli(limit_results: int, skip_results: int, dataset: str):
"""Mirror EGA dataset metadata information.
In order to use limit the amount of requests the limit represents the number of datasets to query per run.
Skip parameter is used to create pipelines to resume querying datasets from a specific point the dataset list.
Using the -d option allows to specify the dataset to mirror.
"""
LOG.info(f"Start ==== >")
if dataset:
mirror_dataset(dataset)
else:
mirror_pipeline(start=skip_results, limit=limit_results)
LOG.info(f"< ==== End")
if __name__ == "__main__":
cli()