Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/edge/api/intercom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rivet-env.workspace = true
rivet-health-checks.workspace = true
rivet-operation.workspace = true
rivet-pools.workspace = true
upload-get.workspace = true
s3-util.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
20 changes: 17 additions & 3 deletions packages/edge/api/intercom/src/route/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,19 @@ pub async fn prewarm_image(
let dc = unwrap!(dc_res.datacenters.first());
let build = unwrap!(builds_res.builds.first());

let fallback_artifact_url =
resolve_image_fallback_artifact_url(&ctx, dc.build_delivery_method, &build).await?;
// Only prewarm if using ATS
let BuildDeliveryMethod::TrafficServer = dc.build_delivery_method else {
tracing::debug!("skipping prewarm since we're not using ats build delivery method");
return Ok(json!({}));
};

// Get the artifact size
let uploads_res = op!([ctx] upload_get {
upload_ids: vec![build.upload_id.into()],
})
.await?;
let upload = unwrap!(uploads_res.uploads.first());
let artifact_size_bytes = upload.content_length;

let res = ctx
.signal(pegboard::workflows::client::PrewarmImage2 {
Expand All @@ -83,7 +94,10 @@ pub async fn prewarm_image(
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
fallback_artifact_url,
// We will never need to fall back to fetching directly from S3. This short
// circuits earlier in the fn.
fallback_artifact_url: None,
artifact_size_bytes,
kind: build.kind.into(),
compression: build.compression.into(),
},
Expand Down
91 changes: 22 additions & 69 deletions packages/edge/infra/client/manager/src/image_download_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,28 @@ impl ImageDownloadHandler {
let mut conn = ctx.sql().await?;
let mut tx = conn.begin().await?;

let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!(
async {
// Get total size of images directory. Note that it doesn't matter if this doesn't
// match the actual fs size because it should either be exactly at or below actual fs
// size. Also calculating fs size manually is expensive.
sqlx::query_as::<_, (i64, i64)>(indoc!(
"
SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache
",
))
.fetch_one(&mut *tx)
.await
.map_err(Into::<anyhow::Error>::into)
},
// NOTE: The image size here is somewhat misleading because its only the size of the
// downloaded archive and not the total disk usage after it is unpacked. However, this is
// good enough
self.fetch_image_download_size(ctx, image_config),
)?;
// Get total size of images directory. Note that it doesn't matter if this doesn't
// match the actual fs size because it should either be exactly at or below actual fs
// size. Also calculating fs size manually is expensive.
let (cache_count, images_dir_size) = sqlx::query_as::<_, (i64, i64)>(indoc!(
"
SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache
",
))
.fetch_one(&mut *tx)
.await
.map_err(Into::<anyhow::Error>::into)?;

// Prune images
let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size
//
// HACK: The artifact_size_bytes here is somewhat misleading because its only the size of the
// downloaded archive and not the total disk usage after it is unpacked. However, this is size
// is recalculated later once decompressed, so this will only ever exceed the cache
// size limit in edge cases by `actual size - compressed size`. In this situation,
// that extra difference is already reserved on the file system by the actor
// itself.
let (removed_count, removed_bytes) = if images_dir_size as u64
+ image_config.artifact_size_bytes
> ctx.config().images.max_cache_size()
{
// Fetch as many images as it takes to clear up enough space for this new image.
Expand Down Expand Up @@ -157,7 +157,7 @@ impl ImageDownloadHandler {
.bind(image_config.id)
.bind(
(images_dir_size as u64)
.saturating_add(image_download_size)
.saturating_add(image_config.artifact_size_bytes)
.saturating_sub(ctx.config().images.max_cache_size()) as i64,
)
.fetch_all(&mut *tx)
Expand Down Expand Up @@ -202,7 +202,7 @@ impl ImageDownloadHandler {

metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count);
metrics::IMAGE_CACHE_SIZE
.set(images_dir_size + image_download_size as i64 - removed_bytes);
.set(images_dir_size + image_config.artifact_size_bytes as i64 - removed_bytes);

sqlx::query(indoc!(
"
Expand Down Expand Up @@ -487,51 +487,4 @@ impl ImageDownloadHandler {

Ok(addresses)
}

/// Attempts to fetch HEAD for the image download url and determine the image's download size.
async fn fetch_image_download_size(
&self,
ctx: &Ctx,
image_config: &protocol::Image,
) -> Result<u64> {
let addresses = self.get_image_addresses(ctx, image_config).await?;

let mut iter = addresses.into_iter();
while let Some(artifact_url) = iter.next() {
// Log the full URL we're attempting to download from
tracing::info!(image_id=?image_config.id, %artifact_url, "attempting to download image");

match reqwest::Client::new()
.head(&artifact_url)
.send()
.await
.and_then(|res| res.error_for_status())
{
Ok(res) => {
tracing::info!(image_id=?image_config.id, %artifact_url, "successfully fetched image HEAD");

// Read Content-Length header from response
let image_size = res
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.context("no Content-Length header")?
.to_str()?
.parse::<u64>()
.context("invalid Content-Length header")?;

return Ok(image_size);
}
Err(err) => {
tracing::warn!(
image_id=?image_config.id,
%artifact_url,
%err,
"failed to fetch image HEAD"
);
}
}
}

bail!("artifact url could not be resolved");
}
}
2 changes: 2 additions & 0 deletions packages/edge/services/pegboard/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct Image {
pub artifact_url_stub: String,
/// Direct S3 url to download the image from without ATS.
pub fallback_artifact_url: Option<String>,
/// Size in bytes of the artfiact.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo in the documentation comment: artfiact should be spelled artifact.

Suggested change
/// Size in bytes of the artfiact.
/// Size in bytes of the artifact.

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

pub artifact_size_bytes: u64,
pub kind: ImageKind,
pub compression: ImageCompression,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use foundationdb::{
options::{ConflictRangeType, StreamingMode},
};
use futures_util::{FutureExt, TryStreamExt};
use rivet_api::models::actors_endpoint_type;
use sqlx::Acquire;

use super::{
Expand Down Expand Up @@ -685,6 +686,7 @@ pub async fn spawn_actor(
id: actor_setup.image_id,
artifact_url_stub: actor_setup.artifact_url_stub.clone(),
fallback_artifact_url: actor_setup.fallback_artifact_url.clone(),
artifact_size_bytes: actor_setup.artifact_size_bytes,
kind: actor_setup.meta.build_kind.into(),
compression: actor_setup.meta.build_compression.into(),
},
Expand Down
68 changes: 39 additions & 29 deletions packages/edge/services/pegboard/src/workflows/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ pub struct ActorSetupCtx {
pub resources: protocol::Resources,
pub artifact_url_stub: String,
pub fallback_artifact_url: Option<String>,
pub artifact_size_bytes: u64,
}

pub async fn setup(
Expand Down Expand Up @@ -630,6 +631,7 @@ pub async fn setup(
resources,
artifact_url_stub: artifacts_res.artifact_url_stub,
fallback_artifact_url: artifacts_res.fallback_artifact_url,
artifact_size_bytes: artifacts_res.artifact_size_bytes,
})
}

Expand Down Expand Up @@ -707,47 +709,54 @@ struct ResolveArtifactsInput {
struct ResolveArtifactsOutput {
artifact_url_stub: String,
fallback_artifact_url: Option<String>,
artifact_size_bytes: u64,
}

#[activity(ResolveArtifacts)]
async fn resolve_artifacts(
ctx: &ActivityCtx,
input: &ResolveArtifactsInput,
) -> GlobalResult<ResolveArtifactsOutput> {
let fallback_artifact_url =
if let BuildDeliveryMethod::S3Direct = input.dc_build_delivery_method {
tracing::debug!("using s3 direct delivery");

// Build client
let s3_client = s3_util::Client::with_bucket_and_endpoint(
ctx.config(),
"bucket-build",
s3_util::EndpointKind::EdgeInternal,
// Get the fallback URL
let fallback_artifact_url = {
tracing::debug!("using s3 direct delivery");

// Build client
let s3_client = s3_util::Client::with_bucket_and_endpoint(
ctx.config(),
"bucket-build",
s3_util::EndpointKind::EdgeInternal,
)
.await?;

let presigned_req = s3_client
.get_object()
.bucket(s3_client.bucket())
.key(format!(
"{upload_id}/{file_name}",
upload_id = input.build_upload_id,
file_name = input.build_file_name,
))
.presigned(
s3_util::aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(15 * 60))
.build()?,
)
.await?;

let presigned_req = s3_client
.get_object()
.bucket(s3_client.bucket())
.key(format!(
"{upload_id}/{file_name}",
upload_id = input.build_upload_id,
file_name = input.build_file_name,
))
.presigned(
s3_util::aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(15 * 60))
.build()?,
)
.await?;
let addr_str = presigned_req.uri().to_string();
tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request");

let addr_str = presigned_req.uri().to_string();
tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request");
Some(addr_str)
};

Some(addr_str)
} else {
None
};
// Get the artifact size
let uploads_res = op!([ctx] upload_get {
upload_ids: vec![input.build_upload_id.into()],
})
.await?;
let upload = unwrap!(uploads_res.uploads.first());
let artifact_size_bytes = upload.content_length;

Ok(ResolveArtifactsOutput {
artifact_url_stub: crate::util::image_artifact_url_stub(
Expand All @@ -756,5 +765,6 @@ async fn resolve_artifacts(
&input.build_file_name,
)?,
fallback_artifact_url,
artifact_size_bytes,
})
}
Loading