Skip to content

Commit

Permalink
ref: add multithreading for parallel API fetching and caching
Browse files Browse the repository at this point in the history
  • Loading branch information
B3ns44d committed Sep 15, 2024
1 parent 80b1383 commit ad80718
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 76 deletions.
12 changes: 6 additions & 6 deletions exporter/configs/providers.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
providers:
- name: "Buenos Aires"
auto_discovery_url: "https://buenosaires.publicbikesystem.net/ube/gbfs/v1/"
- name: "Ecobici"
auto_discovery_url: "https://buenosaires.publicbikesystem.net/ube/gbfs/v1/"
- name: "Bike Nordelta"
auto_discovery_url: "https://nordelta.publicbikesystem.net/ube/gbfs/v1/"
- name: "Bicing"
auto_discovery_url: "https://barcelona-sp.publicbikesystem.net/customer/gbfs/v2/gbfs.json"
- name: "Belfast Bikes"
auto_discovery_url: "https://gbfs.nextbike.net/maps/gbfs/v2/nextbike_bu/gbfs.json"
- name: "Metropolradruhr Germany"
auto_discovery_url: "https://gbfs.nextbike.net/maps/gbfs/v2/nextbike_mr/gbfs.json"
17 changes: 5 additions & 12 deletions exporter/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def collect(self):
"""This method is called by Prometheus to collect the metrics."""
try:
if not self.initialized:
self.initialize_fetchers()
self.initialized = True

# Use ThreadPoolExecutor to fetch data concurrently from all providers
Expand All @@ -36,18 +35,12 @@ def collect(self):
except Exception as exc:
logging.error(f"Provider {fetcher.provider_name} generated an exception: {exc}")

# Yield metrics that were updated
for metric in self.metrics_manager.collect():
yield metric
# Yield all metrics
yield from self.metrics_manager.collect()

except Exception as e:
logging.error(f"Error during data fetching: {e}")

def initialize_fetchers(self):
"""Initialize all fetchers by retrieving their feed URLs."""
for fetcher in self.data_fetchers:
fetcher.initialize() # Synchronous now

def process_provider(self, fetcher: DataFetcher):
"""Fetches data for a single provider and updates the metrics."""
try:
Expand All @@ -58,9 +51,9 @@ def process_provider(self, fetcher: DataFetcher):
logging.warning(f"Skipping provider {fetcher.provider_name} due to missing data.")
return

provider_name = result['provider']
station_info_raw = result['station_information']
station_status_raw = result['station_status']
provider_name = result.get('provider', 'unknown_provider')
station_info_raw = result.get('station_information', {})
station_status_raw = result.get('station_status', {})

# Process and update the metrics
self.metrics_manager.update_metrics(provider_name, station_info_raw, station_status_raw)
Expand Down
1 change: 0 additions & 1 deletion exporter/src/discovery/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from discovery import GBFSDiscovery
102 changes: 53 additions & 49 deletions exporter/src/fetcher/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,68 @@
import logging
from threading import Lock
from typing import Dict, Any
import time
from typing import Optional, Dict

import requests

from discovery import GBFSDiscovery


class DataFetcher:
def __init__(self, provider_name: str, auto_discovery_url: str):
def __init__(self, provider_name: str, auto_discovery_url: str, cache_duration: int = 60):
self.provider_name = provider_name
self.auto_discovery_url = auto_discovery_url
self.feed_urls = {}
self.session = requests.Session()
self.initialized = False
self.lock = Lock()

def initialize(self):
"""Initialize the fetcher by fetching the feed URLs."""
with self.lock:
if not self.initialized:
discovery = GBFSDiscovery(self.auto_discovery_url)
self.feed_urls = discovery.fetch_feed_urls(self.session)
self.cache_duration = cache_duration # in seconds
self.cache = None
self.cache_timestamp = 0

if not self.feed_urls:
logging.error(f"Skipping provider {self.provider_name} due to missing required feeds.")
self.initialized = False
else:
self.initialized = True
def fetch_provider_data(self) -> Optional[Dict]:
"""Fetches data from the GBFS provider's auto-discovery URL with caching."""
current_time = time.time()
if self.cache and (current_time - self.cache_timestamp) < self.cache_duration:
logging.info(f"Using cached data for {self.provider_name}")
return self.cache

def fetch_feed_data(self, url: str) -> Dict[str, Any]:
try:
response = self.session.get(url, timeout=5)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logging.error(f"Timeout fetching data from {url}")
raise
except Exception as e:
logging.error(f"Error fetching data from {url}: {e}")
raise
response = requests.get(self.auto_discovery_url, timeout=10)
if response.status_code != 200:
logging.error(f"Failed to fetch auto-discovery for {self.provider_name}: HTTP {response.status_code}")
return None

auto_discovery = response.json()
feeds = auto_discovery.get('data', {}).get('en', {}).get('feeds', [])

# Extract required feeds
station_information_url = next((feed['url'] for feed in feeds if feed['name'] == 'station_information'),
None)
station_status_url = next((feed['url'] for feed in feeds if feed['name'] == 'station_status'), None)

if not station_information_url or not station_status_url:
logging.error(f"Missing required feeds for {self.provider_name}")
return None

# Fetch station information
station_info_response = requests.get(station_information_url, timeout=10)
if station_info_response.status_code != 200:
logging.error(
f"Failed to fetch station information for {self.provider_name}: HTTP {station_info_response.status_code}")
return None

station_status_response = requests.get(station_status_url, timeout=10)
if station_status_response.status_code != 200:
logging.error(
f"Failed to fetch station status for {self.provider_name}: HTTP {station_status_response.status_code}")
return None

def fetch_provider_data(self) -> Dict[str, Any]:
"""Fetch station information and status data for the provider."""
if not self.initialized:
logging.warning(f"Provider {self.provider_name} is not initialized due to missing feeds.")
return {}
data = {
'provider': self.provider_name,
'station_information': station_info_response.json(),
'station_status': station_status_response.json()
}

# Check if 'station_information' and 'station_status' feeds exist
if 'station_information' not in self.feed_urls or 'station_status' not in self.feed_urls:
logging.error(
f"Provider {self.provider_name} is missing required feeds ('station_information' or "
f"'station_status'). Skipping...")
return {}
# Update cache
self.cache = data
self.cache_timestamp = current_time

station_info_data = self.fetch_feed_data(self.feed_urls['station_information'])
station_status_data = self.fetch_feed_data(self.feed_urls['station_status'])
return data

return {
'provider': self.provider_name,
'station_information': station_info_data,
'station_status': station_status_data
}
except requests.RequestException as e:
logging.error(f"Error fetching data for {self.provider_name}: {e}")
return None
3 changes: 2 additions & 1 deletion exporter/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ def main():
logging.error(f"Failed to start HTTP server on port {args.port}: {e}")
sys.exit(1)

import time
try:
while True:
pass
time.sleep(1)
except KeyboardInterrupt:
logging.info("Exporter is shutting down.")

Expand Down
10 changes: 3 additions & 7 deletions exporter/src/metrics/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging

from prometheus_client.core import GaugeMetricFamily

from . import metrics_definitions as md
Expand All @@ -15,15 +14,11 @@ def update_metrics(
"""Updates Prometheus metrics based on the fetched data."""
logging.debug(f"Updating metrics for provider: {provider_name}")

# Clear previous samples to avoid accumulation
for metric in self.metrics.__dict__.values():
if isinstance(metric, GaugeMetricFamily):
metric.samples.clear()

# Parse the raw data
station_info_list = station_info_raw.get('data', {}).get('stations', [])
station_status_list = station_status_raw.get('data', {}).get('stations', [])

# Initialize accumulators for overall metrics
total_bikes_available = 0
total_docks_available = 0
total_bikes_disabled = 0
Expand All @@ -45,7 +40,8 @@ def update_metrics(

lat = info.get('lat', 0)
lon = info.get('lon', 0)
labels = [provider_name, station_id, info['name'], str(lat), str(lon)]
station_name = info.get('name', 'unknown')
labels = [provider_name, station_id, station_name, str(lat), str(lon)]

# Get status data
num_bikes_available = status.get('num_bikes_available', 0)
Expand Down

0 comments on commit ad80718

Please sign in to comment.