diff --git a/Cargo.lock b/Cargo.lock index ef41d001..5bd0fd57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -488,6 +488,10 @@ version = "0.0.0" dependencies = [ "anyhow", "async-trait", + "aws-config", + "aws-sdk-scheduler", + "aws-sdk-sqs", + "aws-smithy-http-client", "axum", "axum-extra", "axum-test", @@ -878,6 +882,43 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 1.4.0", + "time", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" version = "1.15.1" @@ -900,6 +941,307 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-runtime" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "bytes-utils", + "fastrand 2.3.0", + "http 1.4.0", + "http-body 1.0.1", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid 1.19.0", +] + +[[package]] +name = "aws-sdk-scheduler" +version = "1.97.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2520efef3114265e4753230775f6b6759df03fb900bc1278d884e530c726c8d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sqs" +version = "1.97.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d497fb382b906b04167e62848ad00e56c5ad523791a461d5a5611c186531d81" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.101.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand 2.3.0", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac 0.12.1", + "http 0.2.12", + "http 1.4.0", + "percent-encoding", + "sha2 0.10.9", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.63.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1ab2dc1c2c3749ead27180d333c42f11be8b0e934058fb4b2258ee8dbe5231" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a2f165a7feee6f263028b899d0a181987f4fa7179a6411a32a439fba7c5f769" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.12", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.8.1", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.35", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.62.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06c2315d173edbf1920da8ba3a7189695827002e4c0fc961973ab1c54abca9c" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a56d79744fb3edb5d722ef79d86081e121d3b9422cb209eb03aea6aa4f21ebd" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.4.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version 0.4.1", + "tracing", +] + [[package]] name = "axum" version = "0.8.7" @@ -1034,6 +1376,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.8.0" @@ -1209,13 +1561,23 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytesize" version = "2.3.1" @@ -1611,7 +1973,7 @@ dependencies = [ "rand 0.9.2", "refinery", "reqwest", - "rustls", + "rustls 0.23.35", "rustls-pemfile", "rustls-platform-verifier", "serde", @@ -2711,6 +3073,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" @@ -2720,10 +3097,11 @@ dependencies = [ "http 1.4.0", "hyper 1.8.1", "hyper-util", - "rustls", + "rustls 0.23.35", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots", ] @@ -3061,9 +3439,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.15" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "jni" @@ -3797,6 +4175,12 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -4234,7 +4618,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.35", "socket2 0.6.1", "thiserror 2.0.17", "tokio", @@ -4254,7 +4638,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash", - "rustls", + "rustls 0.23.35", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -4507,6 +4891,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" + [[package]] name = "regex-syntax" version = "0.8.8" @@ -4531,7 +4921,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.8.1", - "hyper-rustls", + "hyper-rustls 0.27.7", "hyper-tls 0.6.0", "hyper-util", "js-sys", @@ -4542,7 +4932,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.35", "rustls-pki-types", "serde", "serde_json", @@ -4550,7 +4940,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower", "tower-http", @@ -4785,6 +5175,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.35" @@ -4796,7 +5198,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.8", "subtle", "zeroize", ] @@ -4843,10 +5245,10 @@ dependencies = [ "jni", "log", "once_cell", - "rustls", + "rustls 0.23.35", "rustls-native-certs", "rustls-platform-verifier-android", - "rustls-webpki", + "rustls-webpki 0.103.8", "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", @@ -4859,6 +5261,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -4891,9 +5303,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.20" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" [[package]] name = "same-file" @@ -4990,6 +5402,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sdd" version = "3.0.10" @@ -5843,9 +6265,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.48.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -5913,20 +6335,30 @@ checksum = "27d684bad428a0f2481f42241f821db42c54e2dc81d8c00db8536c506b0a0144" dependencies = [ "const-oid", "ring", - "rustls", + "rustls 0.23.35", "tokio", "tokio-postgres", - "tokio-rustls", + "tokio-rustls 0.26.4", "x509-cert", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.35", "tokio", ] @@ -5956,9 +6388,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", @@ -6136,9 +6568,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -6159,9 +6591,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -6448,6 +6880,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "wait-timeout" version = "0.2.1" @@ -7094,6 +7532,12 @@ dependencies = [ "time", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yansi" version = "1.0.1" diff --git a/Dockerfile b/Dockerfile index 020fcca9..22dd936e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -102,8 +102,8 @@ COPY Cargo.toml Cargo.lock ./ COPY crates/ ./crates/ COPY .cargo/ ./.cargo/ -# Build the application in release mode -RUN cargo build --release --locked --bin api +# Build the application binaries in release mode +RUN cargo build --release --locked --bin api --bin task_worker # Stage 3: Runtime stage @@ -147,8 +147,9 @@ RUN useradd -m -u 1000 app \ # Create app directory WORKDIR /app -# Copy the built binary +# Copy the built binaries COPY --from=backend-builder --chmod=0775 /app/target/release/api /app/api +COPY --from=backend-builder --chmod=0775 /app/target/release/task_worker /app/task_worker # Copy the migration SQL files RUN mkdir -p /app/crates/database/src/migrations/sql diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index fa147333..a6bcf053 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -56,6 +56,10 @@ base64 = "0.22.1" opentelemetry = { version = "0.31", features = ["metrics"] } opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics"] } opentelemetry-otlp = { version = "0.31", features = ["metrics", "grpc-tonic"] } +aws-config = { version = "1.8.15", default-features = false, features = ["rt-tokio", "rustls"] } +aws-sdk-scheduler = "1.97.0" +aws-sdk-sqs = "1.97.0" +aws-smithy-http-client = "1.1.4" [dev-dependencies] serde_json = "1.0" diff --git a/crates/api/src/bin/task_worker.rs b/crates/api/src/bin/task_worker.rs new file mode 100644 index 00000000..bc612465 --- /dev/null +++ b/crates/api/src/bin/task_worker.rs @@ -0,0 +1,233 @@ +use anyhow::{anyhow, Context}; +use async_trait::async_trait; +use aws_smithy_http_client::{ + tls::{self, rustls_provider::CryptoMode}, + Builder as AwsHttpClientBuilder, +}; +use chrono::{Duration, Utc}; +use services::jobs::{CleanupCanceledInstancesTaskPayload, NoopTaskPayload, TaskExecutor}; +use services::{agent::ports::AgentService, UserId}; +use std::sync::Arc; + +struct DefaultTaskExecutor { + db_pool: database::DbPool, + agent_service: Arc, +} + +#[async_trait] +impl TaskExecutor for DefaultTaskExecutor { + async fn execute_noop(&self, _payload: &NoopTaskPayload) -> anyhow::Result<()> { + tracing::info!("noop task received"); + Ok(()) + } + + async fn execute_cleanup_canceled_instances( + &self, + payload: &CleanupCanceledInstancesTaskPayload, + ) -> anyhow::Result<()> { + if payload.grace_days < 0 { + return Err(anyhow!("grace_days must be >= 0")); + } + + let cutoff = Utc::now() - Duration::days(payload.grace_days); + let mut offset: i64 = 0; + let batch_size: i64 = 200; + let mut total_users = 0usize; + let mut total_instances = 0usize; + let mut failed_instances = 0usize; + + loop { + let client = self + .db_pool + .get() + .await + .context("failed to get DB client")?; + let rows = client + .query( + "SELECT DISTINCT s.user_id + FROM subscriptions s + WHERE s.status = 'canceled' + AND s.current_period_end <= $1 + AND NOT EXISTS ( + SELECT 1 + FROM subscriptions active_sub + WHERE active_sub.user_id = s.user_id + AND active_sub.status IN ('active', 'trialing') + ) + ORDER BY s.user_id + LIMIT $2 OFFSET $3", + &[&cutoff, &batch_size, &offset], + ) + .await + .context("failed to query canceled users for cleanup")?; + + if rows.is_empty() { + break; + } + + for row in &rows { + let user_id: UserId = row.get("user_id"); + total_users += 1; + + let (instances, _) = match self.agent_service.list_instances(user_id, 1000, 0).await + { + Ok(result) => result, + Err(err) => { + tracing::error!( + "cleanup task: failed to list instances user_id={} err={}", + user_id, + err + ); + continue; + } + }; + + let mut cleanup_targets = instances + .into_iter() + .filter(|instance| instance.status != "deleted") + .collect::>(); + total_instances += cleanup_targets.len(); + + if payload.dry_run { + for instance in &cleanup_targets { + tracing::info!( + "cleanup task dry-run: would delete instance_id={} user_id={} status={}", + instance.id, + user_id, + instance.status + ); + } + continue; + } + + for instance in cleanup_targets.drain(..) { + if let Err(err) = self.agent_service.delete_instance(instance.id).await { + failed_instances += 1; + tracing::error!( + "cleanup task: delete failed instance_id={} user_id={} status={} err={}", + instance.id, + user_id, + instance.status, + err + ); + } else { + tracing::info!( + "cleanup task: deleted instance_id={} user_id={} previous_status={}", + instance.id, + user_id, + instance.status + ); + } + } + } + + offset += rows.len() as i64; + if rows.len() < batch_size as usize { + break; + } + } + + tracing::info!( + "cleanup task finished grace_days={} dry_run={} users_scanned={} instances_targeted={} delete_failures={}", + payload.grace_days, + payload.dry_run, + total_users, + total_instances, + failed_instances + ); + + if failed_instances > 0 { + return Err(anyhow!( + "cleanup completed with {} failed instance deletions", + failed_instances + )); + } + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + if let Err(e) = dotenvy::dotenv() { + eprintln!("Warning: Could not load .env file: {e}"); + eprintln!("Continuing with environment variables..."); + } + + let config = config::Config::from_env(); + let tasks = config.tasks.clone(); + + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + if !tasks.enabled { + return Err(anyhow!( + "task worker is disabled: set TASKS_ENABLED=true to run" + )); + } + + let region = tasks + .aws_region + .clone() + .ok_or_else(|| anyhow!("TASKS_AWS_REGION or AWS_REGION is required"))?; + + let queue_url = tasks + .sqs_queue_url + .clone() + .ok_or_else(|| anyhow!("TASKS_SQS_QUEUE_URL is required"))?; + + let db = database::Database::from_config(&config.database) + .await + .context("failed to connect database for task worker")?; + + let system_configs_service = Arc::new( + services::system_configs::service::SystemConfigsServiceImpl::new( + db.system_configs_repository() + as Arc, + ), + ); + + let agent_service = Arc::new(services::agent::AgentServiceImpl::new( + db.agent_repository() as Arc, + config.agent.managers.clone(), + config.agent.nearai_api_url.clone(), + system_configs_service as Arc, + config.agent.channel_relay_url.clone(), + config.agent.non_tee_agent_url_pattern.clone(), + )); + + let http_client = AwsHttpClientBuilder::new() + .tls_provider(tls::Provider::Rustls(CryptoMode::AwsLc)) + .build_https(); + + let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .http_client(http_client) + .region(aws_sdk_sqs::config::Region::new(region)) + .load() + .await; + + let sqs_client = aws_sdk_sqs::Client::new(&aws_config); + let executor = Arc::new(DefaultTaskExecutor { + db_pool: db.pool().clone(), + agent_service, + }); + + let worker = api::tasks::AwsSqsTaskWorker::new( + sqs_client, + queue_url, + tasks.worker_max_concurrency, + tasks.worker_wait_seconds, + tasks.worker_visibility_timeout, + tasks.worker_max_messages, + executor, + ); + + worker + .run_forever() + .await + .context("task worker loop exited unexpectedly") +} diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index ba818880..ad07cbf0 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -9,6 +9,7 @@ pub mod openapi; pub mod routes; pub mod state; pub mod static_files; +pub mod tasks; pub mod usage_parsing; pub mod validation; pub mod web_search_pricing; diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 8e95a60b..a265bd92 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -41,6 +41,16 @@ async fn main() -> anyhow::Result<()> { // Initialize tracing based on configuration init_tracing(&config.logging); + if config.tasks.enabled { + if config.tasks.is_scheduler_configured() { + if let Err(err) = api::tasks::ensure_daily_cleanup_task(&config.tasks).await { + tracing::warn!("failed to ensure daily cleanup schedule: {}", err); + } + } else { + tracing::info!("tasks scheduler not configured; skipping daily cleanup schedule setup"); + } + } + tracing::info!("Starting API server..."); tracing::info!( diff --git a/crates/api/src/tasks/mod.rs b/crates/api/src/tasks/mod.rs new file mode 100644 index 00000000..c234594c --- /dev/null +++ b/crates/api/src/tasks/mod.rs @@ -0,0 +1,553 @@ +use anyhow::{anyhow, Context}; +use async_trait::async_trait; +use aws_sdk_scheduler::{ + error::SdkError, + operation::create_schedule::CreateScheduleError, + types::{ActionAfterCompletion, FlexibleTimeWindow, FlexibleTimeWindowMode, Target}, +}; +use services::jobs::{ + daily_cleanup_canceled_instances_request, dispatch_task, ScheduleSpec, ScheduledTaskRequest, + TaskExecutor, TaskId, TaskMessage, TaskScheduler, +}; +use std::sync::Arc; +use tokio::{ + sync::Semaphore, + time::{sleep, Duration}, +}; + +const SQS_RECEIVE_RETRY_DELAY: Duration = Duration::from_secs(5); + +struct ScheduleRequestParts { + target: Target, + expression: String, + flex_window: FlexibleTimeWindow, + delete_after_completion: bool, +} + +enum CreateScheduleOutcome { + Created, + Conflict(Box), +} + +fn is_conflict_error(err: &SdkError) -> bool { + matches!( + err, + SdkError::ServiceError(service_error) + if matches!(service_error.err(), CreateScheduleError::ConflictException(_)) + ) +} + +#[derive(Clone)] +pub struct AwsTaskScheduler { + client: aws_sdk_scheduler::Client, + scheduler_group: String, + queue_arn: String, + scheduler_role_arn: String, +} + +impl AwsTaskScheduler { + pub fn new( + client: aws_sdk_scheduler::Client, + scheduler_group: String, + queue_arn: String, + scheduler_role_arn: String, + ) -> Self { + Self { + client, + scheduler_group, + queue_arn, + scheduler_role_arn, + } + } + + fn build_target(&self, request: &ScheduledTaskRequest) -> anyhow::Result { + let input = serde_json::to_string(&request.to_message()) + .context("failed to serialize scheduled task message")?; + + Target::builder() + .arn(&self.queue_arn) + .role_arn(&self.scheduler_role_arn) + .input(input) + .build() + .map_err(|e| anyhow!("failed to build scheduler target: {}", e)) + } + + fn prepare_schedule_request_parts( + &self, + request: &ScheduledTaskRequest, + ) -> anyhow::Result { + let target = self.build_target(request)?; + let expression = request.schedule.to_aws_expression(); + let flex_window = FlexibleTimeWindow::builder() + .mode(FlexibleTimeWindowMode::Off) + .build() + .map_err(|e| anyhow!("failed to build flexible time window: {}", e))?; + + Ok(ScheduleRequestParts { + target, + expression, + flex_window, + delete_after_completion: matches!(request.schedule, ScheduleSpec::At(_)), + }) + } + + async fn try_create_schedule( + &self, + request: &ScheduledTaskRequest, + ) -> anyhow::Result { + let parts = self.prepare_schedule_request_parts(request)?; + + let mut create = self + .client + .create_schedule() + .name(request.task_id.as_str()) + .group_name(&self.scheduler_group) + .schedule_expression(&parts.expression) + .flexible_time_window(parts.flex_window.clone()) + .target(parts.target.clone()); + + if parts.delete_after_completion { + create = create.action_after_completion(ActionAfterCompletion::Delete); + } + + match create.send().await { + Ok(_) => Ok(CreateScheduleOutcome::Created), + Err(err) if is_conflict_error(&err) => { + Ok(CreateScheduleOutcome::Conflict(Box::new(parts))) + } + Err(err) => Err(anyhow!( + "failed to create schedule for task_id={}: {}", + request.task_id, + err + )), + } + } + + pub async fn create_task_if_absent(&self, request: ScheduledTaskRequest) -> anyhow::Result<()> { + request.validate()?; + + match self.try_create_schedule(&request).await? { + CreateScheduleOutcome::Created => { + tracing::info!( + "created schedule task_id={} group={}", + request.task_id, + self.scheduler_group + ); + Ok(()) + } + CreateScheduleOutcome::Conflict(_) => { + tracing::info!( + "schedule already exists, skipping create task_id={} group={}", + request.task_id, + self.scheduler_group + ); + Ok(()) + } + } + } +} + +#[async_trait] +impl TaskScheduler for AwsTaskScheduler { + async fn upsert_task(&self, request: ScheduledTaskRequest) -> anyhow::Result<()> { + request.validate()?; + + match self.try_create_schedule(&request).await? { + CreateScheduleOutcome::Created => Ok(()), + CreateScheduleOutcome::Conflict(parts) => { + let mut update = self + .client + .update_schedule() + .name(request.task_id.as_str()) + .group_name(&self.scheduler_group) + .schedule_expression(&parts.expression) + .flexible_time_window(parts.flex_window) + .target(parts.target); + + if parts.delete_after_completion { + update = update.action_after_completion(ActionAfterCompletion::Delete); + } + + update.send().await.map_err(|e| { + anyhow!( + "failed to update existing schedule for task_id={}: {:?}", + request.task_id, + e + ) + })?; + Ok(()) + } + } + } + + async fn delete_task(&self, task_id: &TaskId) -> anyhow::Result<()> { + self.client + .delete_schedule() + .name(task_id.as_str()) + .group_name(&self.scheduler_group) + .send() + .await + .map_err(|e| anyhow!("failed to delete schedule task_id={}: {}", task_id, e))?; + Ok(()) + } +} + +pub struct AwsSqsTaskWorker { + client: aws_sdk_sqs::Client, + queue_url: String, + max_concurrency: usize, + wait_seconds: i32, + visibility_timeout: i32, + max_messages: i32, + executor: Arc, +} + +impl AwsSqsTaskWorker { + pub fn new( + client: aws_sdk_sqs::Client, + queue_url: String, + max_concurrency: usize, + wait_seconds: i32, + visibility_timeout: i32, + max_messages: i32, + executor: Arc, + ) -> Self { + Self { + client, + queue_url, + max_concurrency, + wait_seconds, + visibility_timeout, + max_messages, + executor, + } + } + + pub async fn run_forever(&self) -> anyhow::Result<()> { + let semaphore = Arc::new(Semaphore::new(self.max_concurrency.max(1))); + + loop { + let response = match self + .client + .receive_message() + .queue_url(&self.queue_url) + .max_number_of_messages(self.max_messages.clamp(1, 10)) + .wait_time_seconds(self.wait_seconds.clamp(1, 20)) + .visibility_timeout(self.visibility_timeout.max(1)) + .send() + .await + { + Ok(response) => response, + Err(err) => { + tracing::warn!( + "failed to receive SQS messages; retrying in {}s: {}", + SQS_RECEIVE_RETRY_DELAY.as_secs(), + err + ); + sleep(SQS_RECEIVE_RETRY_DELAY).await; + continue; + } + }; + + for message in response.messages() { + let permit = semaphore.clone().acquire_owned().await?; + let client = self.client.clone(); + let queue_url = self.queue_url.clone(); + let executor = self.executor.clone(); + let message = message.clone(); + + tokio::spawn(async move { + let _permit = permit; + if let Err(err) = process_message(client, queue_url, executor, message).await { + tracing::error!("task processing failed: {}", err); + } + }); + } + } + } +} + +async fn delete_message( + client: &aws_sdk_sqs::Client, + queue_url: &str, + receipt_handle: &str, +) -> anyhow::Result<()> { + client + .delete_message() + .queue_url(queue_url) + .receipt_handle(receipt_handle) + .send() + .await + .context("failed to delete SQS message")?; + + Ok(()) +} + +enum ParsedSqsMessage { + Task(TaskMessage), + Drop { reason: String }, +} + +fn parse_sqs_message(message: &aws_sdk_sqs::types::Message) -> ParsedSqsMessage { + let body = match message.body() { + Some(body) => body, + None => { + return ParsedSqsMessage::Drop { + reason: "SQS message missing body".to_string(), + }; + } + }; + + match parse_task_message(body) { + Ok(task_message) => ParsedSqsMessage::Task(task_message), + Err(err) => ParsedSqsMessage::Drop { + reason: err.to_string(), + }, + } +} + +async fn process_message( + client: aws_sdk_sqs::Client, + queue_url: String, + executor: Arc, + message: aws_sdk_sqs::types::Message, +) -> anyhow::Result<()> { + let receipt_handle = message.receipt_handle().map(str::to_owned); + + let task_message = match parse_sqs_message(&message) { + ParsedSqsMessage::Task(task_message) => task_message, + ParsedSqsMessage::Drop { reason } => { + tracing::warn!("dropping invalid SQS task message: {}", reason); + if let Some(receipt_handle) = receipt_handle.as_deref() { + delete_message(&client, &queue_url, receipt_handle).await?; + } + return Ok(()); + } + }; + + dispatch_task(executor.as_ref(), &task_message.payload) + .await + .context("task handler execution failed")?; + + if let Some(receipt_handle) = receipt_handle.as_deref() { + delete_message(&client, &queue_url, receipt_handle).await?; + } + + Ok(()) +} + +fn parse_task_message(body: &str) -> anyhow::Result { + serde_json::from_str(body).context("failed to parse task message") +} + +pub async fn ensure_daily_cleanup_task(task_config: &config::TaskConfig) -> anyhow::Result<()> { + if !task_config.enabled { + return Ok(()); + } + + if !task_config.is_scheduler_configured() { + return Err(anyhow!( + "tasks scheduler is not configured: TASKS_SQS_QUEUE_ARN and TASKS_SCHEDULER_ROLE_ARN are required" + )); + } + + let region = task_config + .aws_region + .clone() + .ok_or_else(|| anyhow!("TASKS_AWS_REGION or AWS_REGION is required for scheduler"))?; + let queue_arn = task_config + .sqs_queue_arn + .clone() + .ok_or_else(|| anyhow!("TASKS_SQS_QUEUE_ARN is required"))?; + let scheduler_role_arn = task_config + .scheduler_role_arn + .clone() + .ok_or_else(|| anyhow!("TASKS_SCHEDULER_ROLE_ARN is required"))?; + + let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_scheduler::config::Region::new(region)) + .load() + .await; + + let scheduler = AwsTaskScheduler::new( + aws_sdk_scheduler::Client::new(&aws_config), + task_config.scheduler_group.clone(), + queue_arn, + scheduler_role_arn, + ); + + scheduler + .upsert_task(daily_cleanup_canceled_instances_request( + task_config.cleanup_canceled_instances_daily_cron.clone(), + task_config.cleanup_canceled_instances_grace_days, + )?) + .await +} + +#[cfg(test)] +mod tests { + use super::*; + use services::jobs::{ + CleanupCanceledInstancesTaskPayload, NoopTaskPayload, TaskId, TaskPayload, + }; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct TestExecutor { + noop_calls: AtomicUsize, + cleanup_calls: AtomicUsize, + fail_noop: bool, + } + + #[async_trait] + impl TaskExecutor for TestExecutor { + async fn execute_noop(&self, _: &NoopTaskPayload) -> anyhow::Result<()> { + self.noop_calls.fetch_add(1, Ordering::SeqCst); + if self.fail_noop { + anyhow::bail!("noop failed"); + } + Ok(()) + } + + async fn execute_cleanup_canceled_instances( + &self, + _: &CleanupCanceledInstancesTaskPayload, + ) -> anyhow::Result<()> { + self.cleanup_calls.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + #[test] + fn test_parse_task_message_success() { + let body = r#"{ + "task_id":"daily_cleanup", + "payload":{"task_type":"noop","payload":{"note":"ok"}} + }"#; + let message = parse_task_message(body).unwrap(); + assert_eq!(message.task_id.as_str(), "daily_cleanup"); + match message.payload { + TaskPayload::Noop(payload) => assert_eq!(payload.note.as_deref(), Some("ok")), + TaskPayload::CleanupCanceledInstances(_) => panic!("unexpected payload variant"), + } + } + + #[test] + fn test_parse_task_message_invalid_json() { + let body = r#"{"task_id":"bad","payload":{"task_type":"unknown"}}"#; + assert!(parse_task_message(body).is_err()); + } + + #[test] + fn test_parse_sqs_message_drops_missing_body() { + let message = aws_sdk_sqs::types::Message::builder() + .receipt_handle("receipt") + .build(); + + match parse_sqs_message(&message) { + ParsedSqsMessage::Drop { reason } => { + assert!(reason.contains("missing body")); + } + ParsedSqsMessage::Task(_) => panic!("expected drop for missing body"), + } + } + + #[test] + fn test_parse_sqs_message_drops_invalid_json() { + let message = aws_sdk_sqs::types::Message::builder() + .body(r#"{"task_id":"bad","payload":{"task_type":"unknown"}}"#) + .receipt_handle("receipt") + .build(); + + match parse_sqs_message(&message) { + ParsedSqsMessage::Drop { reason } => { + assert!(reason.contains("failed to parse task message")); + } + ParsedSqsMessage::Task(_) => panic!("expected drop for invalid JSON"), + } + } + + #[test] + fn test_parse_sqs_message_parses_valid_message() { + let message = aws_sdk_sqs::types::Message::builder() + .body( + r#"{ + "task_id":"daily_cleanup", + "payload":{"task_type":"noop","payload":{"note":"ok"}} + }"#, + ) + .receipt_handle("receipt") + .build(); + + match parse_sqs_message(&message) { + ParsedSqsMessage::Task(task_message) => { + assert_eq!(task_message.task_id.as_str(), "daily_cleanup"); + } + ParsedSqsMessage::Drop { .. } => panic!("expected parsed task message"), + } + } + + #[tokio::test] + async fn test_dispatch_task_error_bubbles_up() { + let executor = TestExecutor { + noop_calls: AtomicUsize::new(0), + cleanup_calls: AtomicUsize::new(0), + fail_noop: true, + }; + let payload = TaskPayload::Noop(NoopTaskPayload { note: None }); + let result = dispatch_task(&executor, &payload).await; + assert!(result.is_err()); + assert_eq!(executor.noop_calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_dispatch_cleanup_hits_expected_handler() { + let executor = TestExecutor { + noop_calls: AtomicUsize::new(0), + cleanup_calls: AtomicUsize::new(0), + fail_noop: false, + }; + let payload = TaskPayload::CleanupCanceledInstances(CleanupCanceledInstancesTaskPayload { + grace_days: 15, + dry_run: true, + }); + dispatch_task(&executor, &payload).await.unwrap(); + assert_eq!(executor.noop_calls.load(Ordering::SeqCst), 0); + assert_eq!(executor.cleanup_calls.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_task_message_roundtrip_json() { + let msg = TaskMessage { + task_id: TaskId::new("roundtrip_1".to_string()).unwrap(), + payload: TaskPayload::Noop(NoopTaskPayload { + note: Some("hello".to_string()), + }), + }; + + let json = serde_json::to_string(&msg).unwrap(); + let parsed: TaskMessage = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.task_id.as_str(), "roundtrip_1"); + } + + #[tokio::test] + async fn test_ensure_daily_cleanup_task_noop_when_disabled() { + let cfg = config::TaskConfig { + enabled: false, + ..Default::default() + }; + assert!(ensure_daily_cleanup_task(&cfg).await.is_ok()); + } + + #[tokio::test] + async fn test_ensure_daily_cleanup_task_requires_scheduler_config() { + let cfg = config::TaskConfig { + enabled: true, + aws_region: Some("us-east-1".to_string()), + ..Default::default() + }; + let err = ensure_daily_cleanup_task(&cfg).await.unwrap_err(); + assert!(err + .to_string() + .contains("TASKS_SQS_QUEUE_ARN and TASKS_SCHEDULER_ROLE_ARN are required")); + } +} diff --git a/crates/api/tests/completions_tests.rs b/crates/api/tests/completions_tests.rs index ec050093..0abe5b33 100644 --- a/crates/api/tests/completions_tests.rs +++ b/crates/api/tests/completions_tests.rs @@ -4,6 +4,7 @@ use api::routes::api::{ AUTO_ROUTE_MAX_TOKENS, AUTO_ROUTE_MODEL, AUTO_ROUTE_TEMPERATURE, AUTO_ROUTE_TOP_P, USER_BANNED_ERROR_MESSAGE, }; +use axum_test::TestServer; use chrono::Duration; use common::{ clear_subscription_plans, create_test_server, create_test_server_and_db, @@ -26,6 +27,39 @@ async fn create_rate_limited_test_server() -> axum_test::TestServer { .await } +async fn wait_for_near_balance_ban_on_chat_completions( + server: &TestServer, + token: &str, + request_body: serde_json::Value, +) -> axum_test::TestResponse { + let mut last_response = None; + + for _ in 0..10 { + sleep(std::time::Duration::from_millis(300)).await; + + let response = server + .post("/v1/chat/completions") + .add_header( + http::HeaderName::from_static("authorization"), + http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), + ) + .json(&request_body) + .await; + + if response.status_code() == 403 { + return response; + } + + last_response = Some(response); + } + + let response = last_response.expect("should have attempted at least one follow-up request"); + panic!( + "expected NEAR balance ban to return 403 for chat completions, got {} after waiting for async check", + response.status_code() + ); +} + /// Test rate limiting for /v1/chat/completions endpoint #[tokio::test] async fn test_chat_completions_rate_limit_first_request_succeeds() { @@ -445,21 +479,15 @@ async fn test_chat_completions_near_balance_blocks_poor_account() { "First request from poor NEAR account should not be synchronously blocked" ); - // Wait long enough to avoid being affected by per-user rate limit (1 req/sec) - sleep(std::time::Duration::from_millis(1100)).await; - - // Second call should be blocked by blacklist (user ban), after async NEAR check has run - let second_response = server - .post("/v1/chat/completions") - .add_header( - http::HeaderName::from_static("authorization"), - http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), - ) - .json(&json!({ + let second_response = wait_for_near_balance_ban_on_chat_completions( + &server, + token, + json!({ "model": "gpt-4o", "messages": [{"role": "user", "content": "Hello again"}] - })) - .await; + }), + ) + .await; assert_eq!( second_response.status_code(), diff --git a/crates/api/tests/near_balance_tests.rs b/crates/api/tests/near_balance_tests.rs index 39635427..91ca4110 100644 --- a/crates/api/tests/near_balance_tests.rs +++ b/crates/api/tests/near_balance_tests.rs @@ -1,6 +1,7 @@ mod common; use api::routes::api::USER_BANNED_ERROR_MESSAGE; +use axum_test::TestServer; use common::{ cleanup_user_subscriptions, create_test_server_and_db, insert_test_subscription, insert_test_subscription_with_price_id, mock_login, set_subscription_plans, @@ -20,6 +21,39 @@ async fn clear_near_balance_bans(db: &database::Database) { .ok(); } +async fn wait_for_near_balance_ban( + server: &TestServer, + token: &str, + request_body: serde_json::Value, +) -> axum_test::TestResponse { + let mut last_response = None; + + for _ in 0..10 { + sleep(std::time::Duration::from_millis(300)).await; + + let response = server + .post("/v1/responses") + .add_header( + http::HeaderName::from_static("authorization"), + http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), + ) + .json(&request_body) + .await; + + if response.status_code() == 403 { + return response; + } + + last_response = Some(response); + } + + let response = last_response.expect("should have attempted at least one follow-up request"); + panic!( + "expected NEAR balance ban to return 403, got {} after waiting for async check", + response.status_code() + ); +} + /// When user has no NEAR-linked account, NEAR balance check should be skipped /// and /v1/responses should not return 403 due to balance. #[tokio::test] @@ -148,20 +182,14 @@ async fn test_near_balance_blocks_poor_account() { "First request from poor NEAR account should not be synchronously blocked" ); - // Wait long enough to avoid being affected by per-user rate limit (1 req/sec) - sleep(std::time::Duration::from_millis(1100)).await; - - // Second call should be blocked by blacklist (user ban), after async NEAR check has run - let second_response = server - .post("/v1/responses") - .add_header( - http::HeaderName::from_static("authorization"), - http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), - ) - .json(&json!({ + let second_response = wait_for_near_balance_ban( + &server, + token, + json!({ "input": "Hello again" - })) - .await; + }), + ) + .await; assert_eq!( second_response.status_code(), @@ -297,24 +325,28 @@ async fn test_near_balance_check_applied_for_free_plan_subscription() { "First request from free plan + poor NEAR should not be synchronously blocked" ); - sleep(std::time::Duration::from_millis(1100)).await; - - let second_response = server - .post("/v1/responses") - .add_header( - http::HeaderName::from_static("authorization"), - http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), - ) - .json(&json!({ + let second_response = wait_for_near_balance_ban( + &server, + token, + json!({ "input": "Hello again" - })) - .await; + }), + ) + .await; assert_eq!( second_response.status_code(), 403, "Free plan user with poor NEAR account should be blocked after async balance check" ); + + let body: serde_json::Value = second_response.json(); + let error = body.get("error").and_then(|v| v.as_str()); + assert_eq!( + error, + Some(USER_BANNED_ERROR_MESSAGE), + "Ban error message should indicate a temporary ban without exposing NEAR balance details" + ); } /// Unknown price_id (not in config) is not treated as paid; NEAR balance check applies. @@ -376,22 +408,26 @@ async fn test_near_balance_check_applied_for_unknown_price_id() { "First request from unknown price_id + poor NEAR should not be synchronously blocked" ); - sleep(std::time::Duration::from_millis(1100)).await; - - let second_response = server - .post("/v1/responses") - .add_header( - http::HeaderName::from_static("authorization"), - http::HeaderValue::from_str(&format!("Bearer {}", token)).unwrap(), - ) - .json(&json!({ + let second_response = wait_for_near_balance_ban( + &server, + token, + json!({ "input": "Hello again" - })) - .await; + }), + ) + .await; assert_eq!( second_response.status_code(), 403, "User with unknown price_id and poor NEAR account should be blocked by NEAR balance check" ); + + let body: serde_json::Value = second_response.json(); + let error = body.get("error").and_then(|v| v.as_str()); + assert_eq!( + error, + Some(USER_BANNED_ERROR_MESSAGE), + "Ban error message should indicate a temporary ban without exposing NEAR balance details" + ); } diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index b16a18ea..108ddb9d 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -481,6 +481,80 @@ impl Default for InfrastructureConfig { } } +pub const TASKS_CLEANUP_CANCELED_INSTANCES_DAILY_CRON_DEFAULT: &str = "cron(0 0 * * ? *)"; +pub const TASKS_CLEANUP_CANCELED_INSTANCES_GRACE_DAYS_DEFAULT: i64 = 15; + +#[derive(Debug, Clone, Deserialize)] +pub struct TaskConfig { + pub enabled: bool, + pub aws_region: Option, + pub sqs_queue_url: Option, + pub sqs_queue_arn: Option, + pub scheduler_role_arn: Option, + pub scheduler_group: String, + pub cleanup_canceled_instances_daily_cron: String, + pub cleanup_canceled_instances_grace_days: i64, + pub worker_wait_seconds: i32, + pub worker_visibility_timeout: i32, + pub worker_max_messages: i32, + pub worker_max_concurrency: usize, +} + +impl Default for TaskConfig { + fn default() -> Self { + Self { + enabled: std::env::var("TASKS_ENABLED") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(false), + aws_region: std::env::var("TASKS_AWS_REGION") + .ok() + .or_else(|| std::env::var("AWS_REGION").ok()), + sqs_queue_url: std::env::var("TASKS_SQS_QUEUE_URL").ok(), + sqs_queue_arn: std::env::var("TASKS_SQS_QUEUE_ARN").ok(), + scheduler_role_arn: std::env::var("TASKS_SCHEDULER_ROLE_ARN").ok(), + scheduler_group: std::env::var("TASKS_SCHEDULER_GROUP") + .unwrap_or_else(|_| "chat-api".to_string()), + cleanup_canceled_instances_daily_cron: std::env::var( + "TASKS_CLEANUP_CANCELED_INSTANCES_DAILY_CRON", + ) + .unwrap_or_else(|_| TASKS_CLEANUP_CANCELED_INSTANCES_DAILY_CRON_DEFAULT.to_string()), + cleanup_canceled_instances_grace_days: std::env::var( + "TASKS_CLEANUP_CANCELED_INSTANCES_GRACE_DAYS", + ) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(TASKS_CLEANUP_CANCELED_INSTANCES_GRACE_DAYS_DEFAULT), + worker_wait_seconds: std::env::var("TASKS_WORKER_WAIT_SECONDS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(20), + worker_visibility_timeout: std::env::var("TASKS_WORKER_VISIBILITY_TIMEOUT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(60), + worker_max_messages: std::env::var("TASKS_WORKER_MAX_MESSAGES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10), + worker_max_concurrency: std::env::var("TASKS_WORKER_MAX_CONCURRENCY") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10), + } + } +} + +impl TaskConfig { + pub fn is_scheduler_configured(&self) -> bool { + self.sqs_queue_arn.is_some() && self.scheduler_role_arn.is_some() + } + + pub fn is_worker_configured(&self) -> bool { + self.sqs_queue_url.is_some() + } +} + #[derive(Debug, Clone, serde::Deserialize)] pub struct LoggingConfig { /// Global log level for the application. @@ -537,6 +611,7 @@ pub struct Config { pub agent: AgentConfig, /// Infrastructure mode (TEE vs non-TEE) pub infrastructure: InfrastructureConfig, + pub tasks: TaskConfig, } impl Config { @@ -555,6 +630,7 @@ impl Config { logging: LoggingConfig::default(), agent: AgentConfig::default(), infrastructure: InfrastructureConfig::default(), + tasks: TaskConfig::default(), } } } @@ -774,6 +850,33 @@ mod tests { assert!(!config.non_tee_infra, "Default should be TEE mode (false)"); } + #[test] + #[serial] + fn test_task_config_defaults() { + std::env::remove_var("TASKS_ENABLED"); + std::env::remove_var("TASKS_AWS_REGION"); + std::env::remove_var("AWS_REGION"); + std::env::remove_var("TASKS_SQS_QUEUE_URL"); + std::env::remove_var("TASKS_SQS_QUEUE_ARN"); + std::env::remove_var("TASKS_SCHEDULER_ROLE_ARN"); + std::env::remove_var("TASKS_SCHEDULER_GROUP"); + std::env::remove_var("TASKS_WORKER_WAIT_SECONDS"); + std::env::remove_var("TASKS_WORKER_VISIBILITY_TIMEOUT"); + std::env::remove_var("TASKS_WORKER_MAX_MESSAGES"); + std::env::remove_var("TASKS_WORKER_MAX_CONCURRENCY"); + + let cfg = TaskConfig::default(); + assert!(!cfg.enabled); + assert!(cfg.aws_region.is_none()); + assert_eq!(cfg.scheduler_group, "chat-api"); + assert_eq!(cfg.worker_wait_seconds, 20); + assert_eq!(cfg.worker_visibility_timeout, 60); + assert_eq!(cfg.worker_max_messages, 10); + assert_eq!(cfg.worker_max_concurrency, 10); + assert!(!cfg.is_scheduler_configured()); + assert!(!cfg.is_worker_configured()); + } + #[test] #[serial] fn test_infrastructure_config_non_tee_enabled() { @@ -992,4 +1095,45 @@ mod tests { "Manager with is_non_tee=true should be non-TEE even with generic URL" ); } + + #[test] + #[serial] + fn test_task_config_reads_env_and_region_fallback() { + std::env::set_var("TASKS_ENABLED", "true"); + std::env::set_var("AWS_REGION", "us-west-2"); + std::env::set_var("TASKS_SQS_QUEUE_URL", "https://example.com/queue"); + std::env::set_var("TASKS_SQS_QUEUE_ARN", "arn:aws:sqs:us-west-2:123:queue"); + std::env::set_var( + "TASKS_SCHEDULER_ROLE_ARN", + "arn:aws:iam::123:role/scheduler", + ); + std::env::set_var("TASKS_SCHEDULER_GROUP", "group-a"); + std::env::set_var("TASKS_WORKER_WAIT_SECONDS", "12"); + std::env::set_var("TASKS_WORKER_VISIBILITY_TIMEOUT", "90"); + std::env::set_var("TASKS_WORKER_MAX_MESSAGES", "7"); + std::env::set_var("TASKS_WORKER_MAX_CONCURRENCY", "22"); + + let cfg = TaskConfig::default(); + assert!(cfg.enabled); + assert_eq!(cfg.aws_region.as_deref(), Some("us-west-2")); + assert_eq!(cfg.scheduler_group, "group-a"); + assert_eq!(cfg.worker_wait_seconds, 12); + assert_eq!(cfg.worker_visibility_timeout, 90); + assert_eq!(cfg.worker_max_messages, 7); + assert_eq!(cfg.worker_max_concurrency, 22); + assert!(cfg.is_scheduler_configured()); + assert!(cfg.is_worker_configured()); + + std::env::remove_var("TASKS_ENABLED"); + std::env::remove_var("TASKS_AWS_REGION"); + std::env::remove_var("AWS_REGION"); + std::env::remove_var("TASKS_SQS_QUEUE_URL"); + std::env::remove_var("TASKS_SQS_QUEUE_ARN"); + std::env::remove_var("TASKS_SCHEDULER_ROLE_ARN"); + std::env::remove_var("TASKS_SCHEDULER_GROUP"); + std::env::remove_var("TASKS_WORKER_WAIT_SECONDS"); + std::env::remove_var("TASKS_WORKER_VISIBILITY_TIMEOUT"); + std::env::remove_var("TASKS_WORKER_MAX_MESSAGES"); + std::env::remove_var("TASKS_WORKER_MAX_CONCURRENCY"); + } } diff --git a/crates/services/src/jobs/mod.rs b/crates/services/src/jobs/mod.rs new file mode 100644 index 00000000..1315e49a --- /dev/null +++ b/crates/services/src/jobs/mod.rs @@ -0,0 +1,326 @@ +use anyhow::anyhow; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{de::Error as _, Deserialize, Deserializer, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(transparent)] +pub struct TaskId(String); + +impl TaskId { + pub fn new(value: String) -> anyhow::Result { + let is_valid = !value.is_empty() + && value.len() <= 64 + && value + .chars() + .all(|c| c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '.')); + + if !is_valid { + return Err(anyhow!( + "invalid task id: expected 1-64 chars [A-Za-z0-9._-]" + )); + } + + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl<'de> Deserialize<'de> for TaskId { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = String::deserialize(deserializer)?; + TaskId::new(value).map_err(D::Error::custom) + } +} + +impl std::fmt::Display for TaskId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ScheduleSpec { + At(DateTime), + Cron(String), +} + +impl ScheduleSpec { + pub fn validate(&self) -> anyhow::Result<()> { + match self { + Self::At(_) => Ok(()), + Self::Cron(expr) => { + let trimmed = expr.trim(); + if trimmed.starts_with("cron(") && trimmed.ends_with(')') && trimmed.len() > 6 { + Ok(()) + } else { + Err(anyhow!( + "invalid cron expression: expected format cron(...)" + )) + } + } + } + } + + pub fn to_aws_expression(&self) -> String { + match self { + Self::At(at) => format!("at({})", at.format("%Y-%m-%dT%H:%M:%S")), + Self::Cron(expr) => expr.clone(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NoopTaskPayload { + pub note: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CleanupCanceledInstancesTaskPayload { + pub grace_days: i64, + pub dry_run: bool, +} + +pub const CLEANUP_CANCELED_INSTANCES_DAILY_TASK_ID: &str = "cleanup.canceled-instances.daily"; +pub const CLEANUP_CANCELED_INSTANCES_DAILY_CRON_UTC: &str = "cron(0 0 * * ? *)"; +pub const CLEANUP_CANCELED_INSTANCES_DEFAULT_GRACE_DAYS: i64 = 15; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "task_type", content = "payload", rename_all = "snake_case")] +pub enum TaskPayload { + Noop(NoopTaskPayload), + CleanupCanceledInstances(CleanupCanceledInstancesTaskPayload), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskMessage { + pub task_id: TaskId, + pub payload: TaskPayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledTaskRequest { + pub task_id: TaskId, + pub schedule: ScheduleSpec, + pub payload: TaskPayload, +} + +impl ScheduledTaskRequest { + pub fn validate(&self) -> anyhow::Result<()> { + self.schedule.validate() + } + + pub fn to_message(&self) -> TaskMessage { + TaskMessage { + task_id: self.task_id.clone(), + payload: self.payload.clone(), + } + } +} + +pub fn daily_cleanup_canceled_instances_request( + cron: String, + grace_days: i64, +) -> anyhow::Result { + Ok(ScheduledTaskRequest { + task_id: TaskId::new(CLEANUP_CANCELED_INSTANCES_DAILY_TASK_ID.to_string())?, + schedule: ScheduleSpec::Cron(cron), + payload: TaskPayload::CleanupCanceledInstances(CleanupCanceledInstancesTaskPayload { + grace_days, + dry_run: false, + }), + }) +} + +#[async_trait] +pub trait TaskScheduler: Send + Sync { + async fn upsert_task(&self, request: ScheduledTaskRequest) -> anyhow::Result<()>; + async fn delete_task(&self, task_id: &TaskId) -> anyhow::Result<()>; +} + +#[async_trait] +pub trait TaskExecutor: Send + Sync { + async fn execute_noop(&self, payload: &NoopTaskPayload) -> anyhow::Result<()>; + + async fn execute_cleanup_canceled_instances( + &self, + payload: &CleanupCanceledInstancesTaskPayload, + ) -> anyhow::Result<()>; +} + +pub async fn dispatch_task(executor: &E, payload: &TaskPayload) -> anyhow::Result<()> +where + E: TaskExecutor + ?Sized, +{ + match payload { + TaskPayload::Noop(payload) => executor.execute_noop(payload).await, + TaskPayload::CleanupCanceledInstances(payload) => { + executor.execute_cleanup_canceled_instances(payload).await + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[test] + fn test_task_id_validation() { + let valid = TaskId::new("cleanup.canceled_15d-1".to_string()).unwrap(); + assert_eq!(valid.as_str(), "cleanup.canceled_15d-1"); + + assert!(TaskId::new("".to_string()).is_err()); + assert!(TaskId::new("with space".to_string()).is_err()); + assert!(TaskId::new("slash/not-allowed".to_string()).is_err()); + assert!(TaskId::new("a".repeat(65)).is_err()); + } + + #[test] + fn test_task_id_deserialize_validates_input() { + let valid: TaskId = serde_json::from_str(r#""cleanup.canceled_15d-1""#).unwrap(); + assert_eq!(valid.as_str(), "cleanup.canceled_15d-1"); + + let err = serde_json::from_str::(r#""with space""#).unwrap_err(); + assert!(err.to_string().contains("invalid task id")); + } + + #[test] + fn test_schedule_to_aws_expression() { + let at = chrono::DateTime::parse_from_rfc3339("2026-03-18T10:00:00Z") + .unwrap() + .with_timezone(&Utc); + assert_eq!( + ScheduleSpec::At(at).to_aws_expression(), + "at(2026-03-18T10:00:00)" + ); + assert_eq!( + ScheduleSpec::Cron("cron(0 8 * * ? *)".to_string()).to_aws_expression(), + "cron(0 8 * * ? *)" + ); + } + + #[test] + fn test_schedule_validate_cron() { + assert!(ScheduleSpec::Cron("cron(0 8 * * ? *)".to_string()) + .validate() + .is_ok()); + assert!(ScheduleSpec::Cron("0 8 * * *".to_string()) + .validate() + .is_err()); + assert!(ScheduleSpec::Cron("cron()".to_string()).validate().is_err()); + } + + #[test] + fn test_scheduled_task_request_validate_propagates_schedule_error() { + let req = ScheduledTaskRequest { + task_id: TaskId::new("test_task".to_string()).unwrap(), + schedule: ScheduleSpec::Cron("invalid".to_string()), + payload: TaskPayload::Noop(NoopTaskPayload { note: None }), + }; + assert!(req.validate().is_err()); + } + + #[test] + fn test_scheduled_task_request_to_message_copies_fields() { + let req = ScheduledTaskRequest { + task_id: TaskId::new("test_task".to_string()).unwrap(), + schedule: ScheduleSpec::Cron("cron(0 0 * * ? *)".to_string()), + payload: TaskPayload::Noop(NoopTaskPayload { + note: Some("hello".to_string()), + }), + }; + + let msg = req.to_message(); + assert_eq!(msg.task_id.as_str(), "test_task"); + match msg.payload { + TaskPayload::Noop(payload) => assert_eq!(payload.note.as_deref(), Some("hello")), + TaskPayload::CleanupCanceledInstances(_) => panic!("unexpected payload variant"), + } + } + + #[test] + fn test_daily_cleanup_request_shape() { + let req = daily_cleanup_canceled_instances_request( + CLEANUP_CANCELED_INSTANCES_DAILY_CRON_UTC.to_string(), + CLEANUP_CANCELED_INSTANCES_DEFAULT_GRACE_DAYS, + ) + .unwrap(); + assert_eq!( + req.task_id.as_str(), + CLEANUP_CANCELED_INSTANCES_DAILY_TASK_ID + ); + assert_eq!( + req.schedule, + ScheduleSpec::Cron(CLEANUP_CANCELED_INSTANCES_DAILY_CRON_UTC.to_string()) + ); + match req.payload { + TaskPayload::CleanupCanceledInstances(payload) => { + assert_eq!( + payload.grace_days, + CLEANUP_CANCELED_INSTANCES_DEFAULT_GRACE_DAYS + ); + assert!(!payload.dry_run); + } + TaskPayload::Noop(_) => panic!("unexpected payload variant"), + } + } + + struct TestExecutor { + noop_calls: AtomicUsize, + cleanup_calls: AtomicUsize, + } + + #[async_trait] + impl TaskExecutor for TestExecutor { + async fn execute_noop(&self, _: &NoopTaskPayload) -> anyhow::Result<()> { + self.noop_calls.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + async fn execute_cleanup_canceled_instances( + &self, + _: &CleanupCanceledInstancesTaskPayload, + ) -> anyhow::Result<()> { + self.cleanup_calls.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + #[tokio::test] + async fn test_dispatch_task_by_enum_variant() { + let exec = TestExecutor { + noop_calls: AtomicUsize::new(0), + cleanup_calls: AtomicUsize::new(0), + }; + + dispatch_task( + &exec, + &TaskPayload::Noop(NoopTaskPayload { + note: Some("test".to_string()), + }), + ) + .await + .unwrap(); + + dispatch_task( + &exec, + &TaskPayload::CleanupCanceledInstances(CleanupCanceledInstancesTaskPayload { + grace_days: 15, + dry_run: true, + }), + ) + .await + .unwrap(); + + assert_eq!(exec.noop_calls.load(Ordering::SeqCst), 1); + assert_eq!(exec.cleanup_calls.load(Ordering::SeqCst), 1); + } +} diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 4c16917f..fe15f24e 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -5,6 +5,7 @@ pub mod bi_metrics; pub mod consts; pub mod conversation; pub mod file; +pub mod jobs; pub mod metrics; pub mod model; pub mod response; diff --git a/env.example b/env.example index b26fc8af..916ec1c9 100644 --- a/env.example +++ b/env.example @@ -90,3 +90,15 @@ POSTHOG_HOST=https://us.i.posthog.com # Billing STRIPE_SECRET_KEY= STRIPE_WEBHOOK_SECRET= + +# Tasks / Scheduler / Worker +TASKS_ENABLED=false +# TASKS_AWS_REGION=us-east-1 +# TASKS_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/chat-api-tasks +# TASKS_SQS_QUEUE_ARN=arn:aws:sqs:us-east-1:123456789012:chat-api-tasks +# TASKS_SCHEDULER_ROLE_ARN=arn:aws:iam::123456789012:role/chat-api-scheduler-role +TASKS_SCHEDULER_GROUP=chat-api +TASKS_WORKER_WAIT_SECONDS=20 +TASKS_WORKER_VISIBILITY_TIMEOUT=60 +TASKS_WORKER_MAX_MESSAGES=10 +TASKS_WORKER_MAX_CONCURRENCY=10 diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 004c423e..9256c27a 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.90.0" +channel = "1.91.1" components = ["cargo", "rustfmt", "clippy"]