diff --git a/Cargo.lock b/Cargo.lock index 29e970c310..3000c5b0f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,6 +745,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "upload-get", "url", "uuid", ] diff --git a/packages/edge/api/intercom/Cargo.toml b/packages/edge/api/intercom/Cargo.toml index f3965ecfb3..f4e2b5d3c5 100644 --- a/packages/edge/api/intercom/Cargo.toml +++ b/packages/edge/api/intercom/Cargo.toml @@ -26,6 +26,7 @@ rivet-env.workspace = true rivet-health-checks.workspace = true rivet-operation.workspace = true rivet-pools.workspace = true +upload-get.workspace = true s3-util.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/packages/edge/api/intercom/src/route/pegboard.rs b/packages/edge/api/intercom/src/route/pegboard.rs index f6dc8fd9aa..1ee77b81f5 100644 --- a/packages/edge/api/intercom/src/route/pegboard.rs +++ b/packages/edge/api/intercom/src/route/pegboard.rs @@ -71,8 +71,19 @@ pub async fn prewarm_image( let dc = unwrap!(dc_res.datacenters.first()); let build = unwrap!(builds_res.builds.first()); - let fallback_artifact_url = - resolve_image_fallback_artifact_url(&ctx, dc.build_delivery_method, &build).await?; + // Only prewarm if using ATS + let BuildDeliveryMethod::TrafficServer = dc.build_delivery_method else { + tracing::debug!("skipping prewarm since we're not using ats build delivery method"); + return Ok(json!({})); + }; + + // Get the artifact size + let uploads_res = op!([ctx] upload_get { + upload_ids: vec![build.upload_id.into()], + }) + .await?; + let upload = unwrap!(uploads_res.uploads.first()); + let artifact_size_bytes = upload.content_length; let res = ctx .signal(pegboard::workflows::client::PrewarmImage2 { @@ -83,7 +94,10 @@ pub async fn prewarm_image( build.upload_id, &build::utils::file_name(build.kind, build.compression), )?, - fallback_artifact_url, + // We will never need to fall back to fetching directly from S3. This short + // circuits earlier in the fn. + fallback_artifact_url: None, + artifact_size_bytes, kind: build.kind.into(), compression: build.compression.into(), }, diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 6deed48840..63ba9174ae 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -99,28 +99,28 @@ impl ImageDownloadHandler { let mut conn = ctx.sql().await?; let mut tx = conn.begin().await?; - let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!( - async { - // Get total size of images directory. Note that it doesn't matter if this doesn't - // match the actual fs size because it should either be exactly at or below actual fs - // size. Also calculating fs size manually is expensive. - sqlx::query_as::<_, (i64, i64)>(indoc!( - " - SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache - ", - )) - .fetch_one(&mut *tx) - .await - .map_err(Into::::into) - }, - // NOTE: The image size here is somewhat misleading because its only the size of the - // downloaded archive and not the total disk usage after it is unpacked. However, this is - // good enough - self.fetch_image_download_size(ctx, image_config), - )?; + // Get total size of images directory. Note that it doesn't matter if this doesn't + // match the actual fs size because it should either be exactly at or below actual fs + // size. Also calculating fs size manually is expensive. + let (cache_count, images_dir_size) = sqlx::query_as::<_, (i64, i64)>(indoc!( + " + SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache + ", + )) + .fetch_one(&mut *tx) + .await + .map_err(Into::::into)?; // Prune images - let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size + // + // HACK: The artifact_size_bytes here is somewhat misleading because its only the size of the + // downloaded archive and not the total disk usage after it is unpacked. However, this is size + // is recalculated later once decompressed, so this will only ever exceed the cache + // size limit in edge cases by `actual size - compressed size`. In this situation, + // that extra difference is already reserved on the file system by the actor + // itself. + let (removed_count, removed_bytes) = if images_dir_size as u64 + + image_config.artifact_size_bytes > ctx.config().images.max_cache_size() { // Fetch as many images as it takes to clear up enough space for this new image. @@ -157,7 +157,7 @@ impl ImageDownloadHandler { .bind(image_config.id) .bind( (images_dir_size as u64) - .saturating_add(image_download_size) + .saturating_add(image_config.artifact_size_bytes) .saturating_sub(ctx.config().images.max_cache_size()) as i64, ) .fetch_all(&mut *tx) @@ -202,7 +202,7 @@ impl ImageDownloadHandler { metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count); metrics::IMAGE_CACHE_SIZE - .set(images_dir_size + image_download_size as i64 - removed_bytes); + .set(images_dir_size + image_config.artifact_size_bytes as i64 - removed_bytes); sqlx::query(indoc!( " @@ -487,51 +487,4 @@ impl ImageDownloadHandler { Ok(addresses) } - - /// Attempts to fetch HEAD for the image download url and determine the image's download size. - async fn fetch_image_download_size( - &self, - ctx: &Ctx, - image_config: &protocol::Image, - ) -> Result { - let addresses = self.get_image_addresses(ctx, image_config).await?; - - let mut iter = addresses.into_iter(); - while let Some(artifact_url) = iter.next() { - // Log the full URL we're attempting to download from - tracing::info!(image_id=?image_config.id, %artifact_url, "attempting to download image"); - - match reqwest::Client::new() - .head(&artifact_url) - .send() - .await - .and_then(|res| res.error_for_status()) - { - Ok(res) => { - tracing::info!(image_id=?image_config.id, %artifact_url, "successfully fetched image HEAD"); - - // Read Content-Length header from response - let image_size = res - .headers() - .get(reqwest::header::CONTENT_LENGTH) - .context("no Content-Length header")? - .to_str()? - .parse::() - .context("invalid Content-Length header")?; - - return Ok(image_size); - } - Err(err) => { - tracing::warn!( - image_id=?image_config.id, - %artifact_url, - %err, - "failed to fetch image HEAD" - ); - } - } - } - - bail!("artifact url could not be resolved"); - } } diff --git a/packages/edge/services/pegboard/src/protocol.rs b/packages/edge/services/pegboard/src/protocol.rs index d1b1b609e4..b6f9dcf1c2 100644 --- a/packages/edge/services/pegboard/src/protocol.rs +++ b/packages/edge/services/pegboard/src/protocol.rs @@ -117,6 +117,8 @@ pub struct Image { pub artifact_url_stub: String, /// Direct S3 url to download the image from without ATS. pub fallback_artifact_url: Option, + /// Size in bytes of the artfiact. + pub artifact_size_bytes: u64, pub kind: ImageKind, pub compression: ImageCompression, } diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index dd4c6594c1..7018e90673 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -8,6 +8,7 @@ use foundationdb::{ options::{ConflictRangeType, StreamingMode}, }; use futures_util::{FutureExt, TryStreamExt}; +use rivet_api::models::actors_endpoint_type; use sqlx::Acquire; use super::{ @@ -685,6 +686,7 @@ pub async fn spawn_actor( id: actor_setup.image_id, artifact_url_stub: actor_setup.artifact_url_stub.clone(), fallback_artifact_url: actor_setup.fallback_artifact_url.clone(), + artifact_size_bytes: actor_setup.artifact_size_bytes, kind: actor_setup.meta.build_kind.into(), compression: actor_setup.meta.build_compression.into(), }, diff --git a/packages/edge/services/pegboard/src/workflows/actor/setup.rs b/packages/edge/services/pegboard/src/workflows/actor/setup.rs index 6e4cf28e52..d37580ed84 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/setup.rs @@ -559,6 +559,7 @@ pub struct ActorSetupCtx { pub resources: protocol::Resources, pub artifact_url_stub: String, pub fallback_artifact_url: Option, + pub artifact_size_bytes: u64, } pub async fn setup( @@ -630,6 +631,7 @@ pub async fn setup( resources, artifact_url_stub: artifacts_res.artifact_url_stub, fallback_artifact_url: artifacts_res.fallback_artifact_url, + artifact_size_bytes: artifacts_res.artifact_size_bytes, }) } @@ -707,6 +709,7 @@ struct ResolveArtifactsInput { struct ResolveArtifactsOutput { artifact_url_stub: String, fallback_artifact_url: Option, + artifact_size_bytes: u64, } #[activity(ResolveArtifacts)] @@ -714,40 +717,46 @@ async fn resolve_artifacts( ctx: &ActivityCtx, input: &ResolveArtifactsInput, ) -> GlobalResult { - let fallback_artifact_url = - if let BuildDeliveryMethod::S3Direct = input.dc_build_delivery_method { - tracing::debug!("using s3 direct delivery"); - - // Build client - let s3_client = s3_util::Client::with_bucket_and_endpoint( - ctx.config(), - "bucket-build", - s3_util::EndpointKind::EdgeInternal, + // Get the fallback URL + let fallback_artifact_url = { + tracing::debug!("using s3 direct delivery"); + + // Build client + let s3_client = s3_util::Client::with_bucket_and_endpoint( + ctx.config(), + "bucket-build", + s3_util::EndpointKind::EdgeInternal, + ) + .await?; + + let presigned_req = s3_client + .get_object() + .bucket(s3_client.bucket()) + .key(format!( + "{upload_id}/{file_name}", + upload_id = input.build_upload_id, + file_name = input.build_file_name, + )) + .presigned( + s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() + .expires_in(std::time::Duration::from_secs(15 * 60)) + .build()?, ) .await?; - let presigned_req = s3_client - .get_object() - .bucket(s3_client.bucket()) - .key(format!( - "{upload_id}/{file_name}", - upload_id = input.build_upload_id, - file_name = input.build_file_name, - )) - .presigned( - s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() - .expires_in(std::time::Duration::from_secs(15 * 60)) - .build()?, - ) - .await?; + let addr_str = presigned_req.uri().to_string(); + tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); - let addr_str = presigned_req.uri().to_string(); - tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); + Some(addr_str) + }; - Some(addr_str) - } else { - None - }; + // Get the artifact size + let uploads_res = op!([ctx] upload_get { + upload_ids: vec![input.build_upload_id.into()], + }) + .await?; + let upload = unwrap!(uploads_res.uploads.first()); + let artifact_size_bytes = upload.content_length; Ok(ResolveArtifactsOutput { artifact_url_stub: crate::util::image_artifact_url_stub( @@ -756,5 +765,6 @@ async fn resolve_artifacts( &input.build_file_name, )?, fallback_artifact_url, + artifact_size_bytes, }) }