Skip to content

feat: Add epoch activity REST endpoints with optional historical storage #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
705 changes: 424 additions & 281 deletions API/openapi.yaml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions modules/epoch_activity_counter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
config = "0.15.11"
tracing = "0.1.40"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
hex = "0.4.3"

[lib]
path = "src/epoch_activity_counter.rs"
55 changes: 48 additions & 7 deletions modules/epoch_activity_counter/src/epoch_activity_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use acropolis_common::{
messages::{CardanoMessage, Message},
rest_helper::{handle_rest, handle_rest_with_parameter},
Era,
};
use anyhow::Result;
Expand All @@ -15,10 +16,16 @@ use tracing::{error, info};

mod state;
use state::State;
mod rest;
use rest::{handle_epoch, handle_historical_epoch};

const DEFAULT_SUBSCRIBE_HEADERS_TOPIC: &str = "cardano.block.header";
const DEFAULT_SUBSCRIBE_FEES_TOPIC: &str = "cardano.block.fees";
const DEFAULT_PUBLISH_TOPIC: &str = "cardano.epoch.activity";
const DEFAULT_HANDLE_CURRENT_TOPIC: (&str, &str) = ("handle-topic-current-epoch", "rest.get.epoch");
const DEFAULT_HANDLE_HISTORICAL_TOPIC: (&str, &str) =
("handle-topic-historical-epoch", "rest.get.epochs.*");
const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false);

/// Epoch activity counter module
#[module(
Expand All @@ -33,17 +40,14 @@ impl EpochActivityCounter {
async fn run(
context: Arc<Context<Message>>,
config: Arc<Config>,
state: Arc<Mutex<State>>,
mut headers_subscription: Box<dyn Subscription<Message>>,
mut fees_subscription: Box<dyn Subscription<Message>>,
) -> Result<()> {
let publish_topic =
config.get_string("publish-topic").unwrap_or(DEFAULT_PUBLISH_TOPIC.to_string());
info!("Publishing on '{publish_topic}'");

// Create state
// TODO! Handling rollbacks with StateHistory
let state = Arc::new(Mutex::new(State::new()));

loop {
// Read both topics in parallel
let headers_message_f = headers_subscription.read();
Expand Down Expand Up @@ -118,16 +122,53 @@ impl EpochActivityCounter {
.unwrap_or(DEFAULT_SUBSCRIBE_FEES_TOPIC.to_string());
info!("Creating subscriber for fees on '{subscribe_fees_topic}'");

let store_history =
config.get_bool(DEFAULT_STORE_HISTORY.0).unwrap_or(DEFAULT_STORE_HISTORY.1);

// REST handler topics
let handle_current_topic = config
.get_string(DEFAULT_HANDLE_CURRENT_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_CURRENT_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_current_topic);

let handle_historical_topic = config
.get_string(DEFAULT_HANDLE_HISTORICAL_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_HISTORICAL_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_historical_topic);

// Subscribe
let headers_subscription = context.subscribe(&subscribe_headers_topic).await?;
let fees_subscription = context.subscribe(&subscribe_fees_topic).await?;

// Create state
// TODO! Handling rollbacks with StateHistory
let state = Arc::new(Mutex::new(State::new(store_history)));

handle_rest(context.clone(), &handle_current_topic, {
let state = state.clone();
move || {
let state = state.clone();
async move { handle_epoch(state).await }
}
});

handle_rest_with_parameter(context.clone(), &handle_historical_topic, {
let state = state.clone();
move |param| handle_historical_epoch(state.clone(), param[0].to_string())
});

// Start run task
let run_context = context.clone();
context.run(async move {
Self::run(run_context, config, headers_subscription, fees_subscription)
.await
.unwrap_or_else(|e| error!("Failed: {e}"));
Self::run(
run_context,
config,
state,
headers_subscription,
fees_subscription,
)
.await
.unwrap_or_else(|e| error!("Failed: {e}"));
});

Ok(())
Expand Down
105 changes: 105 additions & 0 deletions modules/epoch_activity_counter/src/rest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::state::State;
use acropolis_common::messages::RESTResponse;
use anyhow::Result;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Serialize)]
pub struct EpochActivityRest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this could be the more natural thing to store (without the Rest suffix), and derive the EpochActivityMessage from that? Would save (duplicated) conversions below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was dumb - VRF keys are encoded strings in the REST version. Please ignore!

pub epoch: u64,
pub total_blocks: usize,
pub total_fees: u64,
pub vrf_vkey_hashes: Vec<VRFKeyCount>,
}
#[derive(Serialize)]
pub struct VRFKeyCount {
pub vrf_key_hash: String,
pub block_count: usize,
}

/// Handles /epoch
pub async fn handle_epoch(state: Arc<Mutex<State>>) -> Result<RESTResponse> {
let locked = state.lock().await;
let epoch_data = locked.get_current_epoch();

let response = EpochActivityRest {
epoch: epoch_data.epoch,
total_blocks: epoch_data.total_blocks,
total_fees: epoch_data.total_fees,
vrf_vkey_hashes: epoch_data
.vrf_vkey_hashes
.iter()
.map(|(key, count)| VRFKeyCount {
vrf_key_hash: hex::encode(key),
block_count: *count,
})
.collect(),
};

let json = match serde_json::to_string(&response) {
Ok(j) => j,
Err(e) => {
return Ok(RESTResponse::with_text(
500,
&format!("Internal server error while retrieving current epoch: {e}"),
));
}
};
Ok(RESTResponse::with_json(200, &json))
}

/// Handles /epochs/{epoch}
pub async fn handle_historical_epoch(
state: Arc<Mutex<State>>,
epoch: String,
) -> Result<RESTResponse> {
let parsed_epoch = match epoch.parse::<u64>() {
Ok(v) => v,
Err(_) => {
return Ok(RESTResponse::with_text(
400,
&format!("Invalid epoch number: {epoch}"),
))
}
};

let locked = state.lock().await;
match locked.get_historical_epoch(parsed_epoch) {
Err(_) => Ok(RESTResponse::with_text(
501,
"Historical epoch storage not enabled",
)),
Ok(Some(epoch_data)) => {
let response = EpochActivityRest {
epoch: epoch_data.epoch,
total_blocks: epoch_data.total_blocks,
total_fees: epoch_data.total_fees,
vrf_vkey_hashes: epoch_data
.vrf_vkey_hashes
.iter()
.map(|(key, count)| VRFKeyCount {
vrf_key_hash: hex::encode(key),
block_count: *count,
})
.collect(),
};
let json = match serde_json::to_string(&response) {
Ok(j) => j,
Err(e) => {
return Ok(RESTResponse::with_text(
500,
&format!(
"Internal server error while retrieving epoch {parsed_epoch}: {e}"
),
));
}
};
Ok(RESTResponse::with_json(200, &json))
}
Ok(None) => Ok(RESTResponse::with_text(
404,
&format!("Epoch {parsed_epoch} not found"),
)),
}
}
Loading