Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ pub async fn get_epoxy_key_debug(
.await?;

let instances = local_value
.value
.map(|_| {
vec![EpoxyKeyInstance {
replica_id,
Expand Down Expand Up @@ -499,10 +498,8 @@ pub async fn get_epoxy_kv_local(
.await?;

Ok(GetEpoxyKvResponse {
exists: output.value.is_some(),
value: output
.value
.map(|v| base64::engine::general_purpose::STANDARD.encode(&v)),
exists: output.is_some(),
value: output.map(|v| base64::engine::general_purpose::STANDARD.encode(&v.value)),
})
}

Expand All @@ -526,6 +523,7 @@ pub async fn get_epoxy_kv_optimistic(
replica_id,
key: key_bytes,
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
save_empty: false,
})
.await?;

Expand Down
57 changes: 29 additions & 28 deletions engine/packages/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
]
.concat();

let runner_configs = if !runner_names.is_empty() {
let (runner_configs, cursor) = if !runner_names.is_empty() {
let runner_configs = ctx
.op(pegboard::ops::runner_config::get::Input {
runners: runner_names
Expand All @@ -42,7 +42,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
(
runner_configs
.into_iter()
.map(|c| (c.name, c.config))
.map(|c| (c.name, c.config, c.protocol_version))
.collect::<Vec<_>>(),
None,
)
Expand Down Expand Up @@ -76,48 +76,49 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List

let cursor = runner_configs
.last()
.map(|(name, config)| format!("{}:{}", runner_config_variant(&config), name));
.map(|c| format!("{}:{}", runner_config_variant(&c.config), c.name));

(runner_configs, cursor)
(
runner_configs
.into_iter()
.map(|c| (c.name, c.config, c.protocol_version))
.collect::<Vec<_>>(),
cursor,
)
};

// Fetch pool errors
let runner_pool_errors: HashMap<String, _> = if !runner_configs.0.is_empty() {
let runner_pool_errors = if !runner_configs.is_empty() {
let runners = runner_configs
.0
.iter()
.map(|(name, _)| (namespace.namespace_id, name.clone()))
.map(|(name, _, _)| (namespace.namespace_id, name.clone()))
.collect();
ctx.op(pegboard::ops::runner_config::get_error::Input { runners })
.await?
.into_iter()
.map(|e| (e.runner_name, e.error))
.collect()
.collect::<HashMap<_, _>>()
} else {
HashMap::new()
};

// Build response with pool errors
let runner_configs_with_errors: HashMap<String, RunnerConfigResponse> = runner_configs
.0
.into_iter()
.map(|(name, config)| {
let runner_pool_error = runner_pool_errors.get(&name).cloned();
(
name,
RunnerConfigResponse {
config,
runner_pool_error,
},
)
})
.collect();

Ok(ListResponse {
runner_configs: runner_configs_with_errors,
pagination: Pagination {
cursor: runner_configs.1,
},
// Build response with pool errors
runner_configs: runner_configs
.into_iter()
.map(|(name, config, protocol_version)| {
let runner_pool_error = runner_pool_errors.get(&name).cloned();
(
name,
RunnerConfigResponse {
config,
runner_pool_error,
protocol_version,
},
)
})
.collect(),
pagination: Pagination { cursor },
})
}

Expand Down
58 changes: 0 additions & 58 deletions engine/packages/api-public/src/runner_configs/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use rivet_api_util::request_remote_datacenter;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use super::utils;
use crate::ctx::ApiCtx;

#[derive(Deserialize, Serialize, ToSchema)]
Expand Down Expand Up @@ -58,34 +57,6 @@ async fn upsert_inner(
) -> Result<UpsertResponse> {
ctx.auth().await?;

tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert");

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Store serverless config before processing (since we'll remove from body.datacenters)
let serverless_config = body
.datacenters
.iter()
.filter_map(|(_dc_name, runner_config)| {
if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
url,
headers,
..
} = &runner_config.kind
{
Some((url.clone(), headers.clone().unwrap_or_default()))
} else {
None
}
})
.next();

let dcs = ctx
.config()
.topology()
Expand Down Expand Up @@ -162,35 +133,6 @@ async fn upsert_inner(
.into_iter()
.any(|endpoint_config_changed| endpoint_config_changed);

// Update runner metadata
//
// This allows us to populate the actor names immediately upon configuring a serverless runner
if let Some((url, metadata_headers)) = serverless_config {
if any_endpoint_config_changed {
tracing::debug!("endpoint config changed, refreshing metadata");
if let Err(err) = utils::refresh_runner_config_metadata(
ctx.clone(),
namespace.namespace_id,
path.runner_name.clone(),
url,
metadata_headers,
)
.await
{
tracing::warn!(?err, runner_name=?path.runner_name, "failed to refresh runner config metadata");
}
} else {
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
}
}

pegboard::utils::purge_runner_config_caches(
ctx.cache(),
namespace.namespace_id,
&path.runner_name,
)
.await?;

Ok(UpsertResponse {
endpoint_config_changed: any_endpoint_config_changed,
})
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/api-types/src/namespaces/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ pub enum RunnerConfigKind {
max_concurrent_actors: Option<u64>,
/// Seconds.
drain_grace_period: Option<u32>,
/// Deprecated.
slots_per_runner: u32,
/// Deprecated.
min_runners: Option<u32>,
/// Deprecated.
max_runners: u32,
/// Deprecated.
runners_margin: Option<u32>,
/// Milliseconds between metadata polling. If not set, uses the global default.
metadata_poll_interval: Option<u64>,
Expand Down
1 change: 1 addition & 0 deletions engine/packages/api-types/src/runner_configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod list;
pub struct RunnerConfigResponse {
#[serde(flatten)]
pub config: rivet_types::runner_configs::RunnerConfig,
pub protocol_version: Option<u16>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<Object>, additional_properties = true)]
pub runner_pool_error: Option<rivet_types::actor::RunnerPoolError>,
Expand Down
13 changes: 13 additions & 0 deletions engine/packages/bootstrap/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ pub async fn run(ctx: &StandaloneCtx) -> Result<()> {
.await?;
}

// Runner config backfill
if !is_complete(
ctx,
pegboard::workflows::runner_pool_backfill::BACKFILL_NAME,
)
.await?
{
ctx.workflow(pegboard::workflows::runner_pool_backfill::Input {})
.unique()
.dispatch()
.await?;
}

Ok(())
}

Expand Down
28 changes: 28 additions & 0 deletions engine/packages/config/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,34 @@ impl<'a> IntoIterator for &'a DatacentersRepr {
}
}

pub enum DatacentersIntoIter {
Map(std::collections::hash_map::IntoValues<String, Datacenter>),
List(std::vec::IntoIter<Datacenter>),
}

impl Iterator for DatacentersIntoIter {
type Item = Datacenter;

fn next(&mut self) -> Option<Self::Item> {
match self {
DatacentersIntoIter::Map(iter) => iter.next(),
DatacentersIntoIter::List(iter) => iter.next(),
}
}
}

impl IntoIterator for DatacentersRepr {
type Item = Datacenter;
type IntoIter = DatacentersIntoIter;

fn into_iter(self) -> Self::IntoIter {
match self {
DatacentersRepr::Map(map) => DatacentersIntoIter::Map(map.into_values()),
DatacentersRepr::List(vec) => DatacentersIntoIter::List(vec.into_iter()),
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Datacenter {
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/epoxy/src/keys/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use universaldb::tuple::Versionstamp;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct CommittedValue {
// NOTE: An empty value may exist for cached entries to denote the value was not found on any datacenter
// and cached as such.
pub value: Vec<u8>,
pub version: u64,
pub mutable: bool,
Expand Down
10 changes: 10 additions & 0 deletions engine/packages/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ pub(crate) async fn read_local_value(
}

if let Some(value) = cache_value {
let cache_value = cache_key.deserialize(&value)?;

// Special case with empty values. These are inserted in kv_get_optimistic with `save_empty`
if cache_value.value.is_empty() {
return Ok(LocalValueRead {
value: None,
cache_value: None,
});
}

return Ok(LocalValueRead {
value: None,
cache_value: Some(cache_key.deserialize(&value)?),
Expand Down
Loading
Loading