diff --git a/Cargo.lock b/Cargo.lock index 8ffea85..c03ef27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,17 +11,11 @@ dependencies = [ "gimli", ] -[[package]] -name = "adler2" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" - [[package]] name = "aho-corasick" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" dependencies = [ "memchr", ] @@ -143,28 +137,13 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "backon" -version = "1.5.2" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" dependencies = [ "fastrand", ] -[[package]] -name = "backtrace" -version = "0.3.76" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-link", -] - [[package]] name = "base64" version = "0.22.1" @@ -194,9 +173,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.4" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "block-buffer" @@ -216,15 +195,6 @@ dependencies = [ "cfg_aliases", ] -[[package]] -name = "bstr" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" -dependencies = [ - "memchr", -] - [[package]] name = "bumpalo" version = "3.19.0" @@ -268,9 +238,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "bytesize" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5c434ae3cf0089ca203e9019ebe529c47ff45cefe8af7c85ecb734ef541822f" +checksum = "c99fa31e08a43eaa5913ef68d7e01c37a2bdce6ed648168239ad33b7d30a9cd8" [[package]] name = "candle-core" @@ -330,9 +300,9 @@ dependencies = [ [[package]] name = "cap-fs-ext" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e41cc18551193fe8fa6f15c1e3c799bc5ec9e2cfbfaa8ed46f37013e3e6c173c" +checksum = "d5528f85b1e134ae811704e41ef80930f56e795923f866813255bc342cc20654" dependencies = [ "cap-primitives", "cap-std", @@ -342,9 +312,9 @@ dependencies = [ [[package]] name = "cap-net-ext" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f83833816c66c986e913b22ac887cec216ea09301802054316fc5301809702c" +checksum = "20a158160765c6a7d0d8c072a53d772e4cb243f38b04bfcf6b4939cfbe7482e7" dependencies = [ "cap-primitives", "cap-std", @@ -354,9 +324,9 @@ dependencies = [ [[package]] name = "cap-primitives" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a1e394ed14f39f8bc26f59d4c0c010dbe7f0a1b9bafff451b1f98b67c8af62a" +checksum = "b6cf3aea8a5081171859ef57bc1606b1df6999df4f1110f8eef68b30098d1d3a" dependencies = [ "ambient-authority", "fs-set-times", @@ -372,9 +342,9 @@ dependencies = [ [[package]] name = "cap-rand" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0acb89ccf798a28683f00089d0630dfaceec087234eae0d308c05ddeaa941b40" +checksum = "d8144c22e24bbcf26ade86cb6501a0916c46b7e4787abdb0045a467eb1645a1d" dependencies = [ "ambient-authority", "rand 0.8.5", @@ -382,9 +352,9 @@ dependencies = [ [[package]] name = "cap-std" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07c0355ca583dd58f176c3c12489d684163861ede3c9efa6fd8bba314c984189" +checksum = "b6dc3090992a735d23219de5c204927163d922f42f575a0189b005c62d37549a" dependencies = [ "cap-primitives", "io-extras", @@ -394,9 +364,9 @@ dependencies = [ [[package]] name = "cap-time-ext" -version = "3.4.4" +version = "3.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "491af520b8770085daa0466978c75db90368c71896523f2464214e38359b1a5b" +checksum = "def102506ce40c11710a9b16e614af0cde8e76ae51b1f48c04b8d79f4b671a80" dependencies = [ "ambient-authority", "cap-primitives", @@ -408,9 +378,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.40" +version = "1.2.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d05d92f4b1fd76aad469d46cdd858ca761576082cd37df81416691e50199fb" +checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" dependencies = [ "find-msvc-tools", "jobserver", @@ -420,9 +390,9 @@ dependencies = [ [[package]] name = "cfg-if" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "cfg_aliases" @@ -444,12 +414,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "cityhash-rs" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" - [[package]] name = "claims" version = "0.8.0" @@ -458,9 +422,9 @@ checksum = "bba18ee93d577a8428902687bcc2b6b45a56b1981a1f6d779731c86cc4c5db18" [[package]] name = "clap" -version = "4.5.48" +version = "4.5.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" +checksum = "4c26d721170e0295f191a69bd9a1f93efcdb0aff38684b61ab5750468972e5f5" dependencies = [ "clap_builder", "clap_derive", @@ -468,9 +432,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.48" +version = "4.5.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" +checksum = "75835f0c7bf681bfd05abe44e965760fea999a5286c6eb2d59883634fd02011a" dependencies = [ "anstream", "anstyle", @@ -480,9 +444,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.47" +version = "4.5.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" +checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" dependencies = [ "heck", "proc-macro2", @@ -492,46 +456,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" - -[[package]] -name = "clickhouse" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9a81a1dffadd762ee662635ce409232258ce9beebd7cc0fa227df0b5e7efc0" -dependencies = [ - "bstr", - "bytes", - "cityhash-rs", - "clickhouse-derive", - "futures", - "futures-channel", - "http-body-util", - "hyper", - "hyper-util", - "lz4_flex", - "replace_with", - "sealed", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "url", -] - -[[package]] -name = "clickhouse-derive" -version = "0.2.0" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals", - "syn", -] +checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cobs" @@ -816,14 +743,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dictionary" -version = "0.13.2" -dependencies = [ - "async-trait", - "reactor", -] - [[package]] name = "digest" version = "0.10.7" @@ -888,9 +807,9 @@ dependencies = [ [[package]] name = "dyn-stack-macros" -version = "0.1.0" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05dbec7076f432bb132db738df90d87a4f5789e99f59e7b1219a6b8ef61eaa68" +checksum = "e1d926b4d407d372f141f93bb444696142c29d32962ccbd3531117cf3aa0bfa9" [[package]] name = "either" @@ -985,7 +904,6 @@ dependencies = [ "async-trait", "bytesize", "clap", - "dictionary", "http", "http-backend", "http-body-util", @@ -1001,6 +919,7 @@ dependencies = [ "smol_str", "tempfile", "tokio", + "utils", "wasmtime", ] @@ -1023,9 +942,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0399f9d26e5191ce32c498bebd31e7a3ceabc2745f0ac54af3f335126c3f24b3" +checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" [[package]] name = "fnv" @@ -1178,7 +1097,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "debugid", "fxhash", "serde", @@ -1424,9 +1343,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.7" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" dependencies = [ "typenum", "version_check", @@ -1440,19 +1359,19 @@ checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", ] [[package]] name = "getrandom" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", "r-efi", - "wasi 0.14.7+wasi-0.2.4", + "wasip2", ] [[package]] @@ -1487,9 +1406,9 @@ dependencies = [ [[package]] name = "half" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e54c115d4f30f52c67202f079c5f9d8b49db4691f460fdb0b4c2e838261b2ba5" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ "bytemuck", "cfg-if", @@ -1594,9 +1513,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", - "bytesize", "claims", - "dictionary", "http", "http-backend", "http-body-util", @@ -1613,6 +1530,7 @@ dependencies = [ "tokio", "tracing", "tracing-test", + "utils", "wasi-common", "wasmtime", "wasmtime-wasi", @@ -1724,22 +1642,22 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" dependencies = [ "displaydoc", "potential_utf", - "yoke 0.8.0", + "yoke 0.8.1", "zerofrom", "zerovec", ] [[package]] name = "icu_locale_core" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" dependencies = [ "displaydoc", "litemap", @@ -1750,11 +1668,10 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" dependencies = [ - "displaydoc", "icu_collections", "icu_normalizer_data", "icu_properties", @@ -1765,44 +1682,40 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" +checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" dependencies = [ - "displaydoc", "icu_collections", "icu_locale_core", "icu_properties_data", "icu_provider", - "potential_utf", "zerotrie", "zerovec", ] [[package]] name = "icu_properties_data" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" +checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" [[package]] name = "icu_provider" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" dependencies = [ "displaydoc", "icu_locale_core", - "stable_deref_trait", - "tinystr", "writeable", - "yoke 0.8.0", + "yoke 0.8.1", "zerofrom", "zerotrie", "zerovec", @@ -1837,9 +1750,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.11.4" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" +checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", "hashbrown 0.16.0", @@ -1863,17 +1776,6 @@ version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06432fb54d3be7964ecd3649233cddf80db2832f47fec34c01f65b3d9d774983" -[[package]] -name = "io-uring" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" -dependencies = [ - "bitflags 2.9.4", - "cfg-if", - "libc", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -1882,20 +1784,20 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "is-terminal" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] name = "is_terminal_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itertools" @@ -1947,15 +1849,15 @@ version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" dependencies = [ - "getrandom 0.3.3", + "getrandom 0.3.4", "libc", ] [[package]] name = "js-sys" -version = "0.3.81" +version = "0.3.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305" +checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65" dependencies = [ "once_cell", "wasm-bindgen", @@ -1970,6 +1872,7 @@ dependencies = [ "redis", "slab", "smol_str", + "tokio", "tracing", "wasmtime", ] @@ -1994,9 +1897,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.176" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "libloading" @@ -2020,7 +1923,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "libc", ] @@ -2038,9 +1941,9 @@ checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" [[package]] name = "lock_api" @@ -2057,12 +1960,6 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" -[[package]] -name = "lz4_flex" -version = "0.11.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" - [[package]] name = "mach2" version = "0.4.3" @@ -2104,9 +2001,9 @@ dependencies = [ [[package]] name = "memmap2" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490" dependencies = [ "libc", "stable_deref_trait", @@ -2121,24 +2018,15 @@ dependencies = [ "autocfg", ] -[[package]] -name = "miniz_oxide" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" -dependencies = [ - "adler2", -] - [[package]] name = "mio" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", - "wasi 0.11.1+wasi-snapshot-preview1", - "windows-sys 0.59.0", + "wasi", + "windows-sys 0.61.2", ] [[package]] @@ -2218,11 +2106,11 @@ dependencies = [ [[package]] name = "nu-ansi-term" -version = "0.50.1" +version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2312,9 +2200,9 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a973b4e44ce6cad84ce69d797acf9a044532e4184c4f267913d1b546a0727b7a" +checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c" dependencies = [ "num_enum_derive", "rustversion", @@ -2322,9 +2210,9 @@ dependencies = [ [[package]] name = "num_enum_derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" +checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2352,17 +2240,17 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "once_cell_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.73" +version = "0.10.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -2390,9 +2278,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.109" +version = "0.9.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" dependencies = [ "cc", "libc", @@ -2526,9 +2414,9 @@ dependencies = [ [[package]] name = "potential_utf" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" dependencies = [ "zerovec", ] @@ -2558,14 +2446,14 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" dependencies = [ - "toml_edit 0.23.6", + "toml_edit 0.23.7", ] [[package]] name = "proc-macro2" -version = "1.0.101" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] @@ -2576,7 +2464,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "hex", "procfs-core", "rustix 0.38.44", @@ -2588,7 +2476,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "hex", ] @@ -2676,9 +2564,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -2745,7 +2633,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom 0.3.3", + "getrandom 0.3.4", ] [[package]] @@ -2773,7 +2661,7 @@ version = "11.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", ] [[package]] @@ -2842,7 +2730,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", ] [[package]] @@ -2872,9 +2760,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.3" +version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c" +checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" dependencies = [ "aho-corasick", "memchr", @@ -2884,9 +2772,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.11" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" dependencies = [ "aho-corasick", "memchr", @@ -2895,15 +2783,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" - -[[package]] -name = "replace_with" -version = "0.1.8" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" +checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "ring" @@ -2929,8 +2811,6 @@ dependencies = [ "bytesize", "chrono", "claims", - "clickhouse", - "dictionary", "http", "http-backend", "key-value-store", @@ -2944,6 +2824,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "utils", "wasi-common", "wasmtime", "wasmtime-environ 36.0.2 (git+https://github.com/G-Core/wasmtime.git?branch=release-36.0.0)", @@ -2981,7 +2862,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -2994,7 +2875,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.11.0", @@ -3027,9 +2908,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" dependencies = [ "zeroize", ] @@ -3097,17 +2978,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cd08a21f852bd2fe42e3b2a6c76a0db6a95a5b5bd29c0521dd0b30fa1712ec8" -[[package]] -name = "sealed" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "secret" version = "0.13.2" @@ -3123,7 +2993,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "core-foundation", "core-foundation-sys", "libc", @@ -3186,17 +3056,6 @@ dependencies = [ "syn", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "serde_json" version = "1.0.145" @@ -3307,22 +3166,22 @@ dependencies = [ [[package]] name = "smol_str" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9676b89cd56310a87b93dec47b11af744f34d5fc9f367b829474eec0a891350d" +checksum = "3498b0a27f93ef1402f20eefacfaa1691272ac4eca1cdc8c596cb0a245d6cbf5" dependencies = [ "borsh", - "serde", + "serde_core", ] [[package]] name = "socket2" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -3331,12 +3190,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.11.1" @@ -3351,9 +3204,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.106" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -3377,7 +3230,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec7dddc5f0fee506baf8b9fdb989e242f17e4b11c61dfbb0635b705217199eea" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "byteorder", "enum-as-inner", "libc", @@ -3391,7 +3244,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01198a2debb237c62b6826ec7081082d951f46dbb64b0e8c7649a452230d1dfc" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "byteorder", "enum-as-inner", "libc", @@ -3405,7 +3258,7 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc4592f674ce18521c2a81483873a49596655b179f71c5e05d10c1fe66c78745" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "cap-fs-ext", "cap-std", "fd-lock", @@ -3434,7 +3287,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" dependencies = [ "fastrand", - "getrandom 0.3.3", + "getrandom 0.3.4", "once_cell", "rustix 1.1.2", "windows-sys 0.61.2", @@ -3533,9 +3386,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" dependencies = [ "displaydoc", "zerovec", @@ -3543,29 +3396,26 @@ dependencies = [ [[package]] name = "tokio" -version = "1.47.1" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ - "backtrace", "bytes", - "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "slab", "socket2", "tokio-macros", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] name = "tokio-macros" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -3606,9 +3456,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.16" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", @@ -3642,9 +3492,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f1085dec27c2b6632b04c80b3bb1b4300d6495d1e129693bdda7d91e72eec1" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" dependencies = [ "serde_core", ] @@ -3665,21 +3515,21 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.6" +version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3effe7c0e86fdff4f69cdd2ccc1b96f933e24811c5441d44904e8683e27184b" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ "indexmap", - "toml_datetime 0.7.2", + "toml_datetime 0.7.3", "toml_parser", "winnow", ] [[package]] name = "toml_parser" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cf893c33be71572e0e9aa6dd15e6677937abd686b066eac3f8cd3531688a627" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" dependencies = [ "winnow", ] @@ -3830,9 +3680,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.19" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] name = "unicode-width" @@ -3876,13 +3726,20 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utils" +version = "0.13.2" +dependencies = [ + "reactor", +] + [[package]] name = "uuid" version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ - "getrandom 0.3.3", + "getrandom 0.3.4", "js-sys", "wasm-bindgen", ] @@ -3930,22 +3787,13 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" -[[package]] -name = "wasi" -version = "0.14.7+wasi-0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" -dependencies = [ - "wasip2", -] - [[package]] name = "wasi-common" version = "36.0.2" source = "git+https://github.com/G-Core/wasmtime.git?branch=release-36.0.0#649569feed89cb1f1d5bf4ce7bbbad09b6c0d3c0" dependencies = [ "anyhow", - "bitflags 2.9.4", + "bitflags 2.10.0", "cap-fs-ext", "cap-rand", "cap-std", @@ -3974,9 +3822,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.104" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d" +checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60" dependencies = [ "cfg-if", "once_cell", @@ -3985,25 +3833,11 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.104" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19" -dependencies = [ - "bumpalo", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - [[package]] name = "wasm-bindgen-macro" -version = "0.2.104" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119" +checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4011,22 +3845,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.104" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" +checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc" dependencies = [ + "bumpalo", "proc-macro2", "quote", "syn", - "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.104" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1" +checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76" dependencies = [ "unicode-ident", ] @@ -4079,7 +3913,7 @@ version = "0.228.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4abf1132c1fdf747d56bbc1bb52152400c70f336870f968b85e89ea422198ae3" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "hashbrown 0.15.5", "indexmap", "semver", @@ -4091,7 +3925,7 @@ version = "0.236.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9b1e81f3eb254cf7404a82cee6926a4a3ccc5aad80cc3d43608a070c67aa1d7" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "hashbrown 0.15.5", "indexmap", "semver", @@ -4104,7 +3938,7 @@ version = "0.240.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b722dcf61e0ea47440b53ff83ccb5df8efec57a69d150e4f24882e4eba7e24a4" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "indexmap", "semver", ] @@ -4127,7 +3961,7 @@ dependencies = [ "addr2line", "anyhow", "async-trait", - "bitflags 2.9.4", + "bitflags 2.10.0", "bumpalo", "cc", "cfg-if", @@ -4370,7 +4204,7 @@ name = "wasmtime-internal-wit-bindgen" version = "36.0.2" dependencies = [ "anyhow", - "bitflags 2.9.4", + "bitflags 2.10.0", "heck", "indexmap", "wit-parser 0.236.1", @@ -4382,7 +4216,7 @@ version = "36.0.2" dependencies = [ "anyhow", "async-trait", - "bitflags 2.9.4", + "bitflags 2.10.0", "bytes", "cap-fs-ext", "cap-net-ext", @@ -4505,14 +4339,14 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" dependencies = [ - "webpki-roots 1.0.3", + "webpki-roots 1.0.4", ] [[package]] name = "webpki-roots" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" dependencies = [ "rustls-pki-types", ] @@ -4523,7 +4357,7 @@ version = "36.0.2" dependencies = [ "anyhow", "async-trait", - "bitflags 2.9.4", + "bitflags 2.10.0", "thiserror 2.0.17", "tracing", "wasmtime", @@ -4537,7 +4371,7 @@ source = "git+https://github.com/G-Core/wasmtime.git?branch=release-36.0.0#64956 dependencies = [ "anyhow", "async-trait", - "bitflags 2.9.4", + "bitflags 2.10.0", "thiserror 2.0.17", "tracing", "wasmtime", @@ -4921,7 +4755,7 @@ version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "windows-sys 0.59.0", ] @@ -4938,7 +4772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfb53295365b9500e17bc41c40229337183244f0d6185a5b028c587837c3370f" dependencies = [ "anyhow", - "bitflags 2.9.4", + "bitflags 2.10.0", "indexmap", "log", "serde", @@ -5000,9 +4834,9 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" [[package]] name = "yoke" @@ -5018,13 +4852,12 @@ dependencies = [ [[package]] name = "yoke" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" dependencies = [ - "serde", "stable_deref_trait", - "yoke-derive 0.8.0", + "yoke-derive 0.8.1", "zerofrom", ] @@ -5042,9 +4875,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", @@ -5101,31 +4934,31 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zerotrie" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" dependencies = [ "displaydoc", - "yoke 0.8.0", + "yoke 0.8.1", "zerofrom", ] [[package]] name = "zerovec" -version = "0.11.4" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" dependencies = [ - "yoke 0.8.0", + "yoke 0.8.1", "zerofrom", "zerovec-derive", ] [[package]] name = "zerovec-derive" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 3f18856..615a25f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ pretty_env_logger = "0.5" runtime = { path = "crates/runtime", default-features = false } http-service = { path = "crates/http-service" } http-backend = { path = "crates/http-backend" } -dictionary = { path = "crates/dictionary" } +utils = { path = "crates/utils" } secret = { path = "crates/secret" } key-value-store = { path = "crates/key-value-store" } hyper-tls = "0.6" diff --git a/crates/http-backend/src/lib.rs b/crates/http-backend/src/lib.rs index bfb3618..e187749 100644 --- a/crates/http-backend/src/lib.rs +++ b/crates/http-backend/src/lib.rs @@ -1,6 +1,9 @@ +pub mod stats; + use std::fmt::Debug; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -17,6 +20,7 @@ use tokio::net::TcpStream; use tower_service::Service; use tracing::{debug, trace, warn}; +use crate::stats::{ExtRequestStats, ExtStatsTimer}; use reactor::gcore::fastedge::http::Headers; use reactor::gcore::fastedge::{ http::{Error as HttpError, Method, Request, Response}, @@ -40,13 +44,13 @@ pub struct Connection { /// A custom Hyper client connector, which is needed to override Hyper's default behavior of /// connecting to host specified by the request's URI; we instead want to connect to the host /// specified by our backend configuration, regardless of what the URI says. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct FastEdgeConnector { inner: HttpConnector, backend: Uri, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Backend { client: Client>, uri: Uri, @@ -54,6 +58,7 @@ pub struct Backend { propagate_header_names: HeaderNameList, max_sub_requests: usize, pub strategy: BackendStrategy, + ext_http_stats: Option>, } pub struct Builder { @@ -81,7 +86,7 @@ impl Builder { where C: Connect + Clone, { - let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) + let client = Client::builder(TokioExecutor::new()) .set_host(false) .pool_idle_timeout(Duration::from_secs(30)) .build(connector); @@ -90,9 +95,10 @@ impl Builder { client, uri: self.uri.to_owned(), propagate_headers: HeaderMap::new(), - propagate_header_names: self.propagate_header_names.to_owned(), + propagate_header_names: self.propagate_header_names.clone(), max_sub_requests: self.max_sub_requests, strategy: self.strategy, + ext_http_stats: None, } } } @@ -111,6 +117,11 @@ impl Backend { self.uri.to_owned() } + /// Set external request stats + pub fn set_ext_http_stats(&mut self, stats: Arc) { + self.ext_http_stats.replace(stats); + } + pub fn propagate_header_names(&self) -> HeaderNameList { self.propagate_header_names.clone() } @@ -150,6 +161,7 @@ impl Backend { fn make_request(&self, req: Request) -> Result>> { trace!("strategy: {:?}", self.strategy); + let builder = match self.strategy { BackendStrategy::Direct => { let mut headers = req.headers.into_iter().collect::>(); @@ -282,6 +294,13 @@ where warn!(cause=?error, "making request to backend"); HttpError::RequestError })?; + + // start external request stats timer + let _stats_timer = self + .ext_http_stats + .as_ref() + .map(|s| ExtStatsTimer::new(s.clone())); + let res = self.client.request(request).await.map_err(|error| { warn!(cause=?error, "sending request to backend"); HttpError::RequestError diff --git a/crates/http-backend/src/stats.rs b/crates/http-backend/src/stats.rs new file mode 100644 index 0000000..87f29eb --- /dev/null +++ b/crates/http-backend/src/stats.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +pub trait ExtRequestStats: Sync + Send { + /// Observe elapsed time + fn observe_ext(&self, elapsed: Duration); +} + +pub struct ExtStatsTimer { + /// A stats ref for automatic recording of observations. + stats: Arc, + /// Whether the timer has already been observed once. + observed: bool, + /// Starting instant for the timer. + start: Instant, +} + +impl ExtStatsTimer { + pub fn new(stats: Arc) -> Self { + Self { + stats, + observed: false, + start: Instant::now(), + } + } + + /// Discard timer without recording duration + pub fn discard(mut self) { + self.observed = true; + } + + /// Observe and record timer duration + pub fn observe_duration(mut self) { + self.observe(); + } + + fn observe(&mut self) { + let v = self.start.elapsed(); + self.observed = true; + self.stats.observe_ext(v); + } +} + +impl Drop for ExtStatsTimer { + fn drop(&mut self) { + if !self.observed { + self.observe() + } + } +} diff --git a/crates/http-service/Cargo.toml b/crates/http-service/Cargo.toml index 11d00e6..b5051c7 100644 --- a/crates/http-service/Cargo.toml +++ b/crates/http-service/Cargo.toml @@ -8,7 +8,6 @@ authors.workspace = true [features] default = [] metrics = ["runtime/metrics"] -stats = ["runtime/stats"] [dependencies] anyhow = { workspace = true } @@ -25,10 +24,9 @@ smol_str = { workspace = true } reactor = { path = "../reactor" } runtime = { path = "../runtime" } http-backend = { path = "../http-backend" } -dictionary = { path = "../dictionary" } +utils = { path = "../utils" } secret = { path = "../secret" } nanoid = "0.4" -bytesize = { workspace = true } async-trait = "0.1" hyper-util = { version = "0.1", features = ["server", "server-graceful"] } http-body-util = "0.1" diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index 4907a8b..16e4f36 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -1,15 +1,17 @@ +use crate::executor; use crate::executor::HttpExecutor; use crate::state::HttpState; use anyhow::{anyhow, bail, Context}; use async_trait::async_trait; -use bytesize::ByteSize; use http::{Method, Request, Response, StatusCode}; use http_backend::Backend; use http_body_util::{BodyExt, Full}; use hyper::body::Body; use reactor::gcore::fastedge; +use runtime::util::stats::{StatsTimer, StatsVisitor}; use runtime::{store::StoreBuilder, InstancePre}; -use std::time::{Duration, Instant}; +use std::sync::Arc; +use std::time::Duration; use wasmtime_wasi_http::body::HyperOutgoingBody; /// Execute context used by ['HttpService'] @@ -25,18 +27,18 @@ impl HttpExecutor for HttpExecutorImpl where C: Clone + Send + Sync + 'static, { - async fn execute( - &self, + async fn execute( + self, req: Request, - on_response: R, + stats: Arc, ) -> anyhow::Result> where - R: FnOnce(StatusCode, ByteSize, Duration) + Send + 'static, B: BodyExt + Send, ::Data: Send, { tracing::trace!("start execute"); - let start_ = Instant::now(); + // Start timing for stats + let stats_timer = StatsTimer::new(stats.clone()); let (parts, body) = req.into_parts(); let method = to_fastedge_http_method(&parts.method)?; @@ -70,15 +72,17 @@ where body, }; - let properties = crate::executor::get_properties(&parts.headers); + let properties = executor::get_properties(&parts.headers); - let store_builder = self.store_builder.clone().with_properties(properties); - let mut http_backend = self.backend.clone(); + let store_builder = self.store_builder.with_properties(properties); + let mut http_backend = self.backend; http_backend .propagate_headers(parts.headers.clone()) .context("propagate headers")?; + http_backend.set_ext_http_stats(stats.clone()); + let propagate_header_names = http_backend.propagate_header_names(); let backend_uri = http_backend.uri(); let state = HttpState { @@ -86,6 +90,7 @@ where uri: backend_uri, propagate_headers: parts.headers, propagate_header_names, + stats: stats.clone(), }; let mut store = store_builder.build(state)?; @@ -115,8 +120,8 @@ where return Err(error); } }; - let status_code = ::http::StatusCode::try_from(resp.status)?; - let builder = ::http::Response::builder().status(status_code); + let status_code = StatusCode::try_from(resp.status)?; + let builder = Response::builder().status(status_code); let builder = if let Some(headers) = resp.headers { headers .iter() @@ -125,12 +130,9 @@ where builder }; - let elapsed = Instant::now().duration_since(start_); - on_response( - status_code, - ByteSize::b(store.memory_used() as u64), - elapsed, - ); + drop(stats_timer); // Stop timing for stats + stats.status_code(status_code.as_u16()); + stats.memory_used(store.memory_used() as u64); let body = resp .body @@ -180,14 +182,14 @@ mod tests { }; use bytes::Bytes; use claims::*; - use dictionary::Dictionary; + use http_backend::stats::ExtRequestStats; use http_backend::{Backend, BackendStrategy, FastEdgeConnector}; use http_body_util::Empty; - use key_value_store::KeyValueStore; + use key_value_store::ReadStats; use runtime::app::{KvStoreOption, SecretOption, Status}; use runtime::logger::{Logger, NullAppender}; use runtime::service::ServiceBuilder; - use runtime::util::stats::{StatRow, StatsWriter}; + use runtime::util::stats::CdnPhase; use runtime::{ componentize_if_necessary, App, ContextT, PreCompiledLoader, Router, WasiVersion, WasmConfig, WasmEngine, @@ -195,9 +197,47 @@ mod tests { use secret::SecretStore; use smol_str::{SmolStr, ToSmolStr}; use std::collections::HashMap; + use utils::{Dictionary, UserDiagStats}; use wasmtime::component::Component; use wasmtime::{Engine, Module}; + #[derive(Clone)] + struct TestStats; + + impl ReadStats for TestStats { + fn count_kv_read(&self, _value: i32) {} + + fn count_kv_byod_read(&self, _value: i32) {} + } + + impl UserDiagStats for TestStats { + fn set_user_diag(&self, _diag: &str) {} + } + + impl ExtRequestStats for TestStats { + fn observe_ext(&self, _elapsed: Duration) {} + } + + impl StatsVisitor for TestStats { + fn status_code(&self, _status_code: u16) {} + + fn memory_used(&self, _memory_used: u64) {} + + fn fail_reason(&self, _fail_reason: u32) {} + + fn observe(&self, _elapsed: Duration) {} + + fn get_time_elapsed(&self) -> u64 { + 0 + } + + fn get_memory_used(&self) -> u64 { + 0 + } + + fn cdn_phase(&self, _phase: CdnPhase) {} + } + #[derive(Clone)] struct TestContext { geo: HashMap, @@ -231,9 +271,18 @@ mod tests { todo!() } - fn make_key_value_store(&self, _stores: &Vec) -> KeyValueStore { + fn make_key_value_store(&self, _stores: &Vec) -> key_value_store::Builder { todo!() } + + fn new_stats_row( + &self, + _request_id: &SmolStr, + _app: &SmolStr, + _cfg: &App, + ) -> Arc { + Arc::new(TestStats) + } } static DUMMY_SAMPLE: &[u8] = include_bytes!("../fixtures/dummy.wasm"); @@ -255,10 +304,6 @@ mod tests { } } - impl StatsWriter for TestContext { - fn write_stats(&self, _stat: StatRow) {} - } - impl Router for TestContext { async fn lookup_by_name(&self, _name: &str) -> Option { self.app.clone() @@ -330,6 +375,7 @@ mod tests { debug_until: None, secrets: vec![], kv_stores: vec![], + plan_id: 0, }) } @@ -354,7 +400,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn test_success() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "success.test.com") @@ -370,10 +416,10 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("1", req).await); + let res = assert_ok!(http_service.handle_request("1".to_smolstr(), req).await); assert_eq!(StatusCode::OK, res.status()); let headers = res.headers(); assert_eq!(4, headers.len()); @@ -389,7 +435,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn test_timeout() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "timeout.test.com") @@ -413,6 +459,7 @@ mod tests { debug_until: None, secrets: vec![], kv_stores: vec![], + plan_id: 0, }); let context = TestContext { @@ -421,10 +468,10 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("2", req).await); + let res = assert_ok!(http_service.handle_request("2".to_smolstr(), req).await); assert_eq!(FASTEDGE_EXECUTION_TIMEOUT, res.status()); let headers = res.headers(); assert_eq!(3, headers.len()); @@ -439,7 +486,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn test_insufficient_memory() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/?size=200000") .header("server_name", "insufficient_memory.test.com") @@ -463,6 +510,7 @@ mod tests { debug_until: None, secrets: vec![], kv_stores: vec![], + plan_id: 0, }); let context = TestContext { @@ -471,10 +519,10 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("3", req).await); + let res = assert_ok!(http_service.handle_request("3".to_smolstr(), req).await); assert_eq!(FASTEDGE_OUT_OF_MEMORY, res.status()); let headers = res.headers(); assert_eq!(3, headers.len()); @@ -489,7 +537,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn draft_app() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "draft.test.com") @@ -505,9 +553,9 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("4", req).await); + let res = assert_ok!(http_service.handle_request("4".to_smolstr(), req).await); assert_eq!(StatusCode::NOT_FOUND, res.status()); assert_eq!(0, res.headers().len()); } @@ -515,7 +563,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn disabled_app() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "draft.test.com") @@ -531,9 +579,9 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("5", req).await); + let res = assert_ok!(http_service.handle_request("5".to_smolstr(), req).await); assert_eq!(StatusCode::NOT_FOUND, res.status()); assert_eq!(0, res.headers().len()); } @@ -541,7 +589,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn rate_limit_app() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "draft.test.com") @@ -557,9 +605,9 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("6", req).await); + let res = assert_ok!(http_service.handle_request("6".to_smolstr(), req).await); assert_eq!(StatusCode::TOO_MANY_REQUESTS, res.status()); assert_eq!(0, res.headers().len()); } @@ -567,7 +615,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn suspended_app() { - let req = assert_ok!(http::Request::builder() + let req = assert_ok!(Request::builder() .method("GET") .uri("http://www.rust-lang.org/") .header("server_name", "draft.test.com") @@ -583,9 +631,9 @@ mod tests { engine: make_engine(), }; - let http_service: HttpService = + let http_service: HttpService = assert_ok!(ServiceBuilder::new(context).build()); - let res = assert_ok!(http_service.handle_request("7", req).await); + let res = assert_ok!(http_service.handle_request("7".to_smolstr(), req).await); assert_eq!(StatusCode::NOT_ACCEPTABLE, res.status()); assert_eq!(0, res.headers().len()); } diff --git a/crates/http-service/src/executor/mod.rs b/crates/http-service/src/executor/mod.rs index ed46490..7160bae 100644 --- a/crates/http-service/src/executor/mod.rs +++ b/crates/http-service/src/executor/mod.rs @@ -1,13 +1,13 @@ mod http; mod wasi_http; +use runtime::util::stats::StatsVisitor; use std::collections::HashMap; -use std::time::Duration; +use std::sync::Arc; -use ::http::{HeaderMap, HeaderValue, StatusCode}; +use ::http::{HeaderMap, HeaderValue}; use anyhow::Result; use async_trait::async_trait; -use bytesize::ByteSize; use http_body_util::BodyExt; use hyper::body::Body; use runtime::{App, WasmEngine}; @@ -23,13 +23,12 @@ pub(crate) static X_CDN_REQUESTOR: &str = "x-cdn-requestor"; #[async_trait] pub trait HttpExecutor { - async fn execute( - &self, + async fn execute( + self, req: hyper::Request, - on_response: R, + stats: Arc, ) -> Result> where - R: FnOnce(StatusCode, ByteSize, Duration) + Send + 'static, B: BodyExt + Send, ::Data: Send; } diff --git a/crates/http-service/src/executor/wasi_http.rs b/crates/http-service/src/executor/wasi_http.rs index 4c687a3..75f22c9 100644 --- a/crates/http-service/src/executor/wasi_http.rs +++ b/crates/http-service/src/executor/wasi_http.rs @@ -1,14 +1,16 @@ -use std::time::{Duration, Instant}; +use std::sync::Arc; +use std::time::Duration; +use crate::executor; use crate::executor::HttpExecutor; use crate::state::HttpState; -use ::http::{header, HeaderMap, Request, Response, StatusCode, Uri}; +use ::http::{header, HeaderMap, Request, Response, Uri}; use anyhow::{anyhow, bail, Context}; use async_trait::async_trait; -use bytesize::ByteSize; use http_backend::Backend; use http_body_util::{BodyExt, Full}; use hyper::body::Body; +use runtime::util::stats::{StatsTimer, StatsVisitor}; use runtime::{store::StoreBuilder, InstancePre}; use wasmtime_wasi_http::bindings::http::types::Scheme; use wasmtime_wasi_http::bindings::ProxyPre; @@ -27,18 +29,18 @@ impl HttpExecutor for WasiHttpExecutorImpl where C: Clone + Send + Sync + 'static, { - async fn execute( - &self, + async fn execute( + self, req: Request, - on_response: R, + stats: Arc, ) -> anyhow::Result> where - R: FnOnce(StatusCode, ByteSize, Duration) + Send + 'static, B: BodyExt + Send, ::Data: Send, { tracing::trace!("start execute"); - let start_ = Instant::now(); + // Start timing for stats + let stats_timer = StatsTimer::new(stats.clone()); let (sender, receiver) = tokio::sync::oneshot::channel(); let (mut parts, body) = req.into_parts(); @@ -68,9 +70,9 @@ where let body = Full::new(body).map_err(|never| match never {}); let body = body.boxed(); - let properties = crate::executor::get_properties(&parts.headers); - let store_builder = self.store_builder.to_owned().with_properties(properties); - let mut http_backend = self.backend.to_owned(); + let properties = executor::get_properties(&parts.headers); + let store_builder = self.store_builder.with_properties(properties); + let mut http_backend = self.backend; http_backend .propagate_headers(parts.headers.clone()) @@ -97,6 +99,7 @@ where uri: backend_uri, propagate_headers, propagate_header_names, + stats: stats.clone(), }; let mut store = store_builder.build(state).context("store build")?; @@ -115,14 +118,9 @@ where let proxy = proxy_pre.instantiate_async(&mut store).await?; - let duration = Duration::from_millis(store.data().timeout); - - /* - Channel to receive http status code for asynchronious response processing of WASI-HTTP. - */ - let (status_code_tx, status_code_rx) = tokio::sync::oneshot::channel(); - + let task_stats = stats.clone(); let task = tokio::task::spawn(async move { + let duration = Duration::from_millis(store.data().timeout); if let Err(e) = tokio::time::timeout( duration, proxy @@ -134,39 +132,16 @@ where tracing::warn!(cause=?e, "incoming handler"); return Err(e); }; - let elapsed = Instant::now().duration_since(start_); - /* - Used by WASI-HTTP to send status code prior response processing. - If there is no status code then default value is returned. - For synchronious HTTP processing the status_code parameter is set and no value in the channel. - */ - let status_code = status_code_rx.await.unwrap_or_else(|error| { - tracing::trace!(cause=?error, "unknown status code"); - StatusCode::default() - }); - - on_response( - status_code, - ByteSize::b(store.memory_used() as u64), - elapsed, - ); + + drop(stats_timer); // Stop timing for stats + task_stats.memory_used(store.memory_used() as u64); + Ok(()) }); match receiver.await { Ok(Ok(response)) => { - /* - Status code sender is closed if response handler processing was done. - */ - if !status_code_tx.is_closed() { - if let Err(error) = status_code_tx.send(response.status()) { - tracing::warn!(cause=?error, "sending status code") - } - tracing::debug!("returned status code: '{}'", response.status(),); - } else { - tracing::warn!("status code sender is closed"); - } - + stats.status_code(response.status().as_u16()); Ok(response) } Ok(Err(e)) => Err(e.into()), diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index 59bc852..92c58eb 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -1,3 +1,4 @@ +use std::marker::PhantomData; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -8,7 +9,6 @@ pub use crate::executor::ExecutorFactory; use crate::executor::HttpExecutor; use anyhow::{bail, Error, Result}; use bytes::Bytes; -use bytesize::ByteSize; use http::{ header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL}, HeaderMap, HeaderName, HeaderValue, StatusCode, @@ -18,15 +18,11 @@ use hyper::{body::Body, server::conn::http1, service::service_fn}; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo}; #[cfg(feature = "metrics")] use runtime::util::metrics; -#[cfg(feature = "stats")] -use runtime::util::stats::StatRow; +use runtime::util::stats::StatsVisitor; use runtime::{ - app::Status, service::Service, util::stats::StatsWriter, App, AppResult, ContextT, Router, - WasmEngine, WasmEngineBuilder, + app::Status, service::Service, App, AppResult, ContextT, Router, WasmEngine, WasmEngineBuilder, }; -use smol_str::SmolStr; -#[cfg(feature = "stats")] -use smol_str::ToSmolStr; +use smol_str::{SmolStr, ToSmolStr}; use state::HttpState; use tokio::{net::TcpListener, time::error::Elapsed}; pub use wasmtime_wasi_http::body::HyperOutgoingBody; @@ -59,19 +55,19 @@ pub struct HttpConfig { pub backoff: u64, } -pub struct HttpService { +pub struct HttpService { engine: WasmEngine>, context: T, + _stats: PhantomData, } pub trait ContextHeaders { fn append_headers(&self) -> impl Iterator; } -impl Service for HttpService +impl Service for HttpService where T: ContextT - + StatsWriter + Router + ContextHeaders + ExecutorFactory> @@ -81,13 +77,18 @@ where + 'static, T::BackendConnector: Connect + Clone + Send + Sync + 'static, T::Executor: HttpExecutor + Send + Sync, + S: StatsVisitor + Send + Sync + 'static, { type State = HttpState; type Config = HttpConfig; type Context = T; fn new(engine: WasmEngine, context: Self::Context) -> Result { - Ok(Self { engine, context }) + Ok(Self { + engine, + context, + _stats: Default::default(), + }) } /// Run hyper http service @@ -143,8 +144,8 @@ where let request_id = remote_traceparent(&req); async move { self_ - .handle_request(&request_id, req) - .instrument(tracing::debug_span!("http", request_id)) + .handle_request(request_id.clone(), req) + .instrument(tracing::debug_span!("http", ?request_id)) .await } }); @@ -206,14 +207,17 @@ where &mut data.key_value_store })?; + reactor::gcore::fastedge::utils::add_to_linker::<_, HasSelf<_>>(linker, |data| { + &mut data.utils + })?; + Ok(()) } } -impl HttpService +impl HttpService where T: ContextT - + StatsWriter + Router + ContextHeaders + ExecutorFactory> @@ -223,23 +227,18 @@ where + Clone, T::BackendConnector: Clone + Send + Sync + 'static, T::Executor: HttpExecutor + Send + Sync, + S: StatsVisitor + Send + 'static, { /// handle HTTP request. async fn handle_request( &self, - request_id: &str, + request_id: SmolStr, mut request: hyper::Request, ) -> Result> where B: BodyExt + Send, ::Data: Send, { - #[cfg(feature = "stats")] - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .expect("current time") - .as_secs(); - request .headers_mut() .extend(app_req_headers(self.context.append_headers())); @@ -298,6 +297,7 @@ where Some(cfg) => cfg, }; + // get cached execute context for this application let executor = match self .context @@ -314,92 +314,33 @@ where } }; - let start_ = std::time::Instant::now(); - - let response_handler = { - let app_name = app_name.clone(); - #[cfg(feature = "stats")] - let billing_plan = cfg.plan.clone(); - #[cfg(feature = "stats")] - let request_id = request_id.to_smolstr(); - #[cfg(feature = "stats")] - let context = self.context.clone(); + let stats = self.context.new_stats_row(&request_id, &app_name, &cfg); - move |status_code: StatusCode, mem_used: ByteSize, time_elapsed: Duration| { - tracing::info!( - "'{}' completed with status code: '{}' in {:.0?} using {} of WebAssembly heap", - app_name, - status_code, - time_elapsed, - mem_used - ); - #[cfg(feature = "stats")] - { - let stat_row = StatRow { - app_id: cfg.app_id, - client_id: cfg.client_id, - timestamp: timestamp as u32, - app_name, - status_code: status_code.as_u16() as u32, - fail_reason: 0, // TODO: use AppResult - billing_plan, - time_elapsed: time_elapsed.as_micros() as u64, - memory_used: mem_used.as_u64(), - request_id, - }; - context.write_stats(stat_row); - } + let response = match executor.execute(request, stats.clone()).await { + Ok(mut response) => { #[cfg(feature = "metrics")] metrics::metrics( AppResult::SUCCESS, &["http"], - Some(time_elapsed.as_micros() as u64), - Some(mem_used.as_u64()), + Some(stats.get_time_elapsed()), + Some(stats.get_memory_used()), ); - } - }; - let response = match executor.execute(request, response_handler).await { - Ok(mut response) => { response.headers_mut().extend(app_res_headers(cfg)); response } Err(error) => { tracing::warn!(cause=?error, "execute"); - let time_elapsed = std::time::Instant::now().duration_since(start_); - let (status_code, fail_reason, msg) = map_err(error); - tracing::info!( - "'{}' failed with status code: '{}' in {:.0?}", - app_name, - status_code, - time_elapsed - ); - - #[cfg(feature = "stats")] - { - let stat_row = StatRow { - app_id: cfg.app_id, - client_id: cfg.client_id, - timestamp: timestamp as u32, - app_name: app_name, - status_code: status_code as u32, - fail_reason: fail_reason as u32, - billing_plan: cfg.plan.clone(), - time_elapsed: time_elapsed.as_micros() as u64, - memory_used: 0, - request_id: request_id.to_smolstr(), - }; - self.context.write_stats(stat_row); - } - #[cfg(not(feature = "stats"))] - tracing::debug!(?fail_reason, request_id, "stats"); + stats.status_code(status_code); + stats.fail_reason(fail_reason as u32); + tracing::debug!(?fail_reason, ?request_id, "stats"); #[cfg(feature = "metrics")] metrics::metrics( fail_reason, HTTP_LABEL, - Some(time_elapsed.as_micros() as u64), + Some(stats.get_time_elapsed()), None, ); @@ -487,12 +428,12 @@ fn map_err(error: Error) -> (u16, AppResult, HyperOutgoingBody) { (status_code, fail_reason, msg) } -fn remote_traceparent(req: &hyper::Request) -> String { +fn remote_traceparent(req: &hyper::Request) -> SmolStr { req.headers() .get(TRACEPARENT) .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()) - .unwrap_or(nanoid::nanoid!()) + .map(|s| s.to_smolstr()) + .unwrap_or(nanoid::nanoid!().to_smolstr()) } /// Creates an HTTP 500 response. diff --git a/crates/http-service/src/state.rs b/crates/http-service/src/state.rs index 9941ee0..d5d8940 100644 --- a/crates/http-service/src/state.rs +++ b/crates/http-service/src/state.rs @@ -3,18 +3,20 @@ use http::request::Parts; use http::uri::Scheme; use http::{header, HeaderMap, HeaderName, Uri}; use http_backend::Backend; +use runtime::store::HasStats; +use runtime::util::stats::StatsVisitor; use runtime::BackendRequest; -use tracing::instrument; +use std::sync::Arc; pub struct HttpState { pub(super) http_backend: Backend, pub(super) uri: Uri, pub(super) propagate_headers: HeaderMap, pub(super) propagate_header_names: Vec, + pub(super) stats: Arc, } impl BackendRequest for HttpState { - #[instrument(skip(self), ret, err)] fn backend_request(&mut self, mut head: Parts) -> anyhow::Result { match self.http_backend.strategy { http_backend::BackendStrategy::Direct => { @@ -128,3 +130,9 @@ fn canonical_url( .build() .map_err(Error::msg) } + +impl HasStats for HttpState { + fn get_stats(&self) -> Arc { + self.stats.clone() + } +} diff --git a/crates/key-value-store/Cargo.toml b/crates/key-value-store/Cargo.toml index 76e3d5d..8c89111 100644 --- a/crates/key-value-store/Cargo.toml +++ b/crates/key-value-store/Cargo.toml @@ -19,5 +19,8 @@ smol_str = {workspace = true} tracing = "0.1" redis = { version = "0.32", features = ["aio", "tokio-comp", "connection-manager", "tokio-native-tls-comp", "safe_iterators"], optional = true} +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } + [lints] workspace = true diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 1257bbd..a7a75be 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -4,7 +4,6 @@ mod redis_impl; use reactor::gcore::fastedge::key_value; use slab::Slab; use smol_str::SmolStr; -use std::collections::HashMap; use std::sync::Arc; use tracing::instrument; use wasmtime::component::Resource; @@ -35,19 +34,55 @@ pub trait Store: Sync + Send { #[async_trait::async_trait] pub trait StoreManager: Sync + Send { /// Get a store by db url. - async fn get_store(&self, param: &str) -> Result, Error>; + async fn get_store( + &self, + param: &str, + metric: Arc, + ) -> Result, Error>; +} + +/// Key-Value store read metrics +pub trait ReadStats: Sync + Send { + /// Increment key-value read count and size + fn count_kv_read(&self, value: i32); + /// Increment key-value read count and size for BYOD + fn count_kv_byod_read(&self, value: i32); } #[derive(Clone)] -pub struct KeyValueStore { - allowed_stores: HashMap, +pub struct Builder { + allowed_stores: Vec<(SmolStr, SmolStr)>, + manager: Arc, +} + +pub struct StoreImpl { + allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc, stores: Slab>, + stats: Arc, } -impl key_value::HostStore for KeyValueStore { +impl Builder { + pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { + Self { + allowed_stores, + manager, + } + } + + pub fn build(self, stats: Arc) -> StoreImpl { + StoreImpl { + allowed_stores: self.allowed_stores, + manager: self.manager, + stores: Slab::new(), + stats, + } + } +} + +impl key_value::HostStore for StoreImpl { async fn open(&mut self, name: String) -> Result, Error> { - let store_id = KeyValueStore::open(self, &name).await?; + let store_id = StoreImpl::open(self, &name).await?; Ok(Resource::new_own(store_id)) } @@ -57,7 +92,7 @@ impl key_value::HostStore for KeyValueStore { key: String, ) -> Result>, Error> { let store_id = store.rep(); - KeyValueStore::get(self, store_id, &key).await + StoreImpl::get(self, store_id, &key).await } async fn scan( @@ -66,7 +101,7 @@ impl key_value::HostStore for KeyValueStore { pattern: String, ) -> Result, Error> { let store_id = store.rep(); - KeyValueStore::scan(self, store_id, &pattern).await + StoreImpl::scan(self, store_id, &pattern).await } async fn zrange_by_score( @@ -77,7 +112,7 @@ impl key_value::HostStore for KeyValueStore { max: f64, ) -> Result, Error> { let store_id = store.rep(); - KeyValueStore::zrange_by_score(self, store_id, &key, min, max).await + StoreImpl::zrange_by_score(self, store_id, &key, min, max).await } async fn zscan( @@ -87,7 +122,7 @@ impl key_value::HostStore for KeyValueStore { pattern: String, ) -> Result, Error> { let store_id = store.rep(); - KeyValueStore::zscan(self, store_id, &key, &pattern).await + StoreImpl::zscan(self, store_id, &key, &pattern).await } async fn bf_exists( @@ -97,7 +132,7 @@ impl key_value::HostStore for KeyValueStore { item: String, ) -> Result { let store_id = store.rep(); - KeyValueStore::bf_exists(self, store_id, &key, &item).await + StoreImpl::bf_exists(self, store_id, &key, &item).await } async fn drop(&mut self, store: Resource) -> Result<(), wasmtime::Error> { @@ -106,23 +141,18 @@ impl key_value::HostStore for KeyValueStore { } } -impl key_value::Host for KeyValueStore {} - -impl KeyValueStore { - #[instrument(skip(manager), level = "trace")] - pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { - Self { - allowed_stores: allowed_stores.into_iter().collect(), - manager, - stores: Slab::new(), - } - } +impl key_value::Host for StoreImpl {} +impl StoreImpl { /// Open a store by name. Return the store ID. #[instrument(skip(self), level = "trace", ret, err)] pub async fn open(&mut self, name: &str) -> Result { - if let Some(param) = self.allowed_stores.get(name) { - let store = self.manager.get_store(¶m).await?; + if let Some(param) = self + .allowed_stores + .iter() + .find_map(|s| (s.0 == name).then_some(&s.1)) + { + let store = self.manager.get_store(param, self.stats.clone()).await?; Ok(self.stores.insert(store) as u32) } else { Err(Error::AccessDenied) @@ -184,12 +214,11 @@ impl KeyValueStore { } } -impl Default for KeyValueStore { +impl Default for Builder { fn default() -> Self { Self { allowed_stores: Default::default(), manager: Arc::new(NoSuchStoreManager), - stores: Slab::new(), } } } @@ -198,7 +227,465 @@ pub struct NoSuchStoreManager; #[async_trait::async_trait] impl StoreManager for NoSuchStoreManager { - async fn get_store(&self, _name: &str) -> Result, Error> { + async fn get_store( + &self, + _name: &str, + _metric: Arc, + ) -> Result, Error> { Err(Error::NoSuchStore) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::sync::atomic::AtomicI32; + + // Mock implementation of Store + struct MockStore { + data: HashMap, + zset_data: HashMap>, + bloom_filters: HashMap>, + } + + impl MockStore { + fn new() -> Self { + Self { + data: HashMap::new(), + zset_data: HashMap::new(), + bloom_filters: HashMap::new(), + } + } + + fn with_data(mut self, key: &str, value: Value) -> Self { + self.data.insert(key.to_string(), value); + self + } + + fn with_zset(mut self, key: &str, items: Vec<(Value, f64)>) -> Self { + self.zset_data.insert(key.to_string(), items); + self + } + + fn with_bloom(mut self, key: &str, items: Vec) -> Self { + self.bloom_filters.insert(key.to_string(), items); + self + } + } + + #[async_trait::async_trait] + impl Store for MockStore { + async fn get(&self, key: &str) -> Result, Error> { + Ok(self.data.get(key).cloned()) + } + + async fn zrange_by_score( + &self, + key: &str, + min: f64, + max: f64, + ) -> Result, Error> { + if let Some(items) = self.zset_data.get(key) { + let filtered: Vec<_> = items + .iter() + .filter(|(_, score)| *score >= min && *score <= max) + .cloned() + .collect(); + Ok(filtered) + } else { + Ok(vec![]) + } + } + + async fn scan(&self, pattern: &str) -> Result, Error> { + let keys: Vec = self + .data + .keys() + .filter(|k| k.contains(pattern)) + .cloned() + .collect(); + Ok(keys) + } + + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error> { + if let Some(items) = self.zset_data.get(key) { + let filtered: Vec<_> = items + .iter() + .filter(|(val, _)| { + if let Some(s) = String::from_utf8(val.clone()).ok() { + s.contains(pattern) + } else { + false + } + }) + .cloned() + .collect(); + Ok(filtered) + } else { + Ok(vec![]) + } + } + + async fn bf_exists(&self, key: &str, item: &str) -> Result { + if let Some(items) = self.bloom_filters.get(key) { + Ok(items.iter().any(|i| i == item)) + } else { + Ok(false) + } + } + } + + // Mock implementation of ReadStats + #[derive(Default)] + struct MockReadStats { + kv_reads: AtomicI32, + byod_reads: AtomicI32, + } + + impl MockReadStats { + fn new() -> Self { + Self::default() + } + + fn get_kv_reads(&self) -> i32 { + self.kv_reads.load(std::sync::atomic::Ordering::Relaxed) + } + + fn get_byod_reads(&self) -> i32 { + self.byod_reads.load(std::sync::atomic::Ordering::Relaxed) + } + } + + impl ReadStats for MockReadStats { + fn count_kv_read(&self, value: i32) { + self.kv_reads + .fetch_add(value, std::sync::atomic::Ordering::Relaxed); + } + + fn count_kv_byod_read(&self, value: i32) { + self.byod_reads + .fetch_add(value, std::sync::atomic::Ordering::Relaxed); + } + } + + // Mock implementation of StoreManager + struct MockStoreManager { + stores: HashMap>, + } + + impl MockStoreManager { + fn new() -> Self { + Self { + stores: HashMap::new(), + } + } + + fn with_store(mut self, param: &str, store: Arc) -> Self { + self.stores.insert(param.to_string(), store); + self + } + } + + #[async_trait::async_trait] + impl StoreManager for MockStoreManager { + async fn get_store( + &self, + param: &str, + _metric: Arc, + ) -> Result, Error> { + self.stores.get(param).cloned().ok_or(Error::NoSuchStore) + } + } + + #[tokio::test] + async fn test_builder_default() { + let builder = Builder::default(); + assert_eq!(builder.allowed_stores.len(), 0); + } + + #[tokio::test] + async fn test_builder_new() { + let allowed_stores = vec![( + SmolStr::new("store1"), + SmolStr::new("redis://localhost:6379"), + )]; + let manager = Arc::new(NoSuchStoreManager); + let builder = Builder::new(allowed_stores.clone(), manager); + + assert_eq!(builder.allowed_stores.len(), 1); + assert_eq!(builder.allowed_stores[0].0, "store1"); + } + + #[tokio::test] + async fn test_store_impl_open_allowed_store() { + let mock_store = Arc::new(MockStore::new()); + let manager = Arc::new( + MockStoreManager::new().with_store("redis://localhost:6379", mock_store.clone()), + ); + + let allowed_stores = vec![( + SmolStr::new("mystore"), + SmolStr::new("redis://localhost:6379"), + )]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let result = store_impl.open("mystore").await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 0); + } + + #[tokio::test] + async fn test_store_impl_open_denied_store() { + let manager = Arc::new(NoSuchStoreManager); + let allowed_stores = vec![( + SmolStr::new("allowed"), + SmolStr::new("redis://localhost:6379"), + )]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let result = store_impl.open("notallowed").await; + assert!(matches!(result, Err(Error::AccessDenied))); + } + + #[tokio::test] + async fn test_store_impl_open_multiple_stores() { + let mock_store1 = Arc::new(MockStore::new()); + let mock_store2 = Arc::new(MockStore::new()); + let manager = Arc::new( + MockStoreManager::new() + .with_store("redis://store1", mock_store1.clone()) + .with_store("redis://store2", mock_store2.clone()), + ); + + let allowed_stores = vec![ + (SmolStr::new("store1"), SmolStr::new("redis://store1")), + (SmolStr::new("store2"), SmolStr::new("redis://store2")), + ]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store1_id = store_impl.open("store1").await.unwrap(); + let store2_id = store_impl.open("store2").await.unwrap(); + + assert_eq!(store1_id, 0); + assert_eq!(store2_id, 1); + } + + #[tokio::test] + async fn test_get_value_success() { + let mock_store = Arc::new(MockStore::new().with_data("key1", b"value1".to_vec())); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl.get(store_id, "key1").await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(b"value1".to_vec())); + } + + #[tokio::test] + async fn test_get_value_not_found() { + let mock_store = Arc::new(MockStore::new()); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl.get(store_id, "nonexistent").await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), None); + } + + #[tokio::test] + async fn test_get_invalid_store_id() { + let manager = Arc::new(NoSuchStoreManager); + let stats = Arc::new(MockReadStats::new()); + let store_impl = Builder::new(vec![], manager).build(stats.clone()); + + let result = store_impl.get(999, "key1").await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_zrange_by_score() { + let items = vec![ + (b"item1".to_vec(), 1.0), + (b"item2".to_vec(), 2.5), + (b"item3".to_vec(), 5.0), + (b"item4".to_vec(), 7.5), + ]; + let mock_store = Arc::new(MockStore::new().with_zset("sorted_set", items)); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl + .zrange_by_score(store_id, "sorted_set", 2.0, 6.0) + .await; + + assert!(result.is_ok()); + let items = result.unwrap(); + assert_eq!(items.len(), 2); + assert_eq!(items[0].1, 2.5); + assert_eq!(items[1].1, 5.0); + } + + #[tokio::test] + async fn test_zrange_by_score_invalid_store() { + let manager = Arc::new(NoSuchStoreManager); + let stats = Arc::new(MockReadStats::new()); + let store_impl = Builder::new(vec![], manager).build(stats.clone()); + + let result = store_impl.zrange_by_score(999, "key", 0.0, 10.0).await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_scan_pattern() { + let mock_store = Arc::new( + MockStore::new() + .with_data("user:1", b"alice".to_vec()) + .with_data("user:2", b"bob".to_vec()) + .with_data("post:1", b"hello".to_vec()), + ); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl.scan(store_id, "user:").await; + + assert!(result.is_ok()); + let keys = result.unwrap(); + assert_eq!(keys.len(), 2); + assert!(keys.contains(&"user:1".to_string())); + assert!(keys.contains(&"user:2".to_string())); + } + + #[tokio::test] + async fn test_scan_invalid_store() { + let manager = Arc::new(NoSuchStoreManager); + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(vec![], manager).build(stats.clone()); + + let result = store_impl.scan(999, "pattern").await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_zscan() { + let items = vec![ + (b"apple".to_vec(), 1.0), + (b"apricot".to_vec(), 2.0), + (b"banana".to_vec(), 3.0), + ]; + let mock_store = Arc::new(MockStore::new().with_zset("fruits", items)); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl.zscan(store_id, "fruits", "ap").await; + + assert!(result.is_ok()); + let items = result.unwrap(); + assert_eq!(items.len(), 2); + } + + #[tokio::test] + async fn test_zscan_invalid_store() { + let manager = Arc::new(NoSuchStoreManager); + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(vec![], manager).build(stats.clone()); + + let result = store_impl.zscan(999, "key", "pattern").await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_bf_exists_true() { + let mock_store = Arc::new( + MockStore::new() + .with_bloom("bloom_key", vec!["item1".to_string(), "item2".to_string()]), + ); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl.bf_exists(store_id, "bloom_key", "item1").await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), true); + } + + #[tokio::test] + async fn test_bf_exists_false() { + let mock_store = + Arc::new(MockStore::new().with_bloom("bloom_key", vec!["item1".to_string()])); + let manager = Arc::new(MockStoreManager::new().with_store("redis://localhost", mock_store)); + + let allowed_stores = vec![(SmolStr::new("mystore"), SmolStr::new("redis://localhost"))]; + let stats = Arc::new(MockReadStats::new()); + let mut store_impl = Builder::new(allowed_stores, manager).build(stats.clone()); + + let store_id = store_impl.open("mystore").await.unwrap(); + let result = store_impl + .bf_exists(store_id, "bloom_key", "nonexistent") + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), false); + } + + #[tokio::test] + async fn test_bf_exists_invalid_store() { + let manager = Arc::new(NoSuchStoreManager); + let stats = Arc::new(MockReadStats::new()); + let store_impl = Builder::new(vec![], manager).build(stats.clone()); + + let result = store_impl.bf_exists(999, "key", "item").await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_no_such_store_manager() { + let manager = NoSuchStoreManager; + let stats = Arc::new(MockReadStats::new()); + + let result = manager.get_store("any", stats.clone()).await; + assert!(matches!(result, Err(Error::NoSuchStore))); + } + + #[tokio::test] + async fn test_read_stats() { + let stats = MockReadStats::new(); + + stats.count_kv_read(10); + stats.count_kv_read(5); + assert_eq!(stats.get_kv_reads(), 15); + + stats.count_kv_byod_read(3); + stats.count_kv_byod_read(7); + assert_eq!(stats.get_byod_reads(), 10); + } +} diff --git a/crates/reactor/wit b/crates/reactor/wit index 561aa99..9a18af2 160000 --- a/crates/reactor/wit +++ b/crates/reactor/wit @@ -1 +1 @@ -Subproject commit 561aa99135425fb2a7a01feb989614fcbd083a50 +Subproject commit 9a18af2195424b94f4473d48e0f5e9c000c161e4 diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 2fed407..bbbae57 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -9,7 +9,6 @@ authors.workspace = true default = ["kafka_log"] kafka_log = [] metrics = ["prometheus", "lazy_static"] -stats = ["clickhouse"] [dependencies] anyhow = { workspace = true } @@ -30,7 +29,7 @@ tracing = { workspace = true } bytesize = { workspace = true } http-backend = { path = "../http-backend" } key-value-store = { path = "../key-value-store" , features = ["redis"]} -dictionary = { path = "../dictionary" } +utils = { path = "../utils" } secret = { path = "../secret" } async-trait = "0.1" bytes = "1.10" @@ -38,7 +37,6 @@ serde = "1.0" serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } prometheus = { version = "0.14.0", features = ["process"], optional = true } -clickhouse = { version = "0.13", optional = true } lazy_static = { version = "1.5.0", optional = true } [dev-dependencies] diff --git a/crates/runtime/src/app.rs b/crates/runtime/src/app.rs index 0973d90..04af430 100644 --- a/crates/runtime/src/app.rs +++ b/crates/runtime/src/app.rs @@ -29,6 +29,7 @@ pub struct App { pub secrets: Vec, #[serde(default)] pub kv_stores: Vec, + pub plan_id: u64, } #[derive(Debug, Clone, PartialEq, Deserialize)] @@ -156,6 +157,7 @@ mod tests { "app_id": 12345, "client_id": 23456, "plan": "test_plan", + "plan_id": 0, "status": 1, "debug_until": "2037-01-01T12:00:27.87Z", "secrets":[{"name":"SECRET","secret_values":[{"effective_from":0,"value":"encrypted"}]}] @@ -183,6 +185,7 @@ mod tests { }], }], kv_stores: vec![], + plan_id: 0, }; assert_eq!(expected, assert_ok!(serde_json::from_str(&json))); diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 6a64b2f..31b583e 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,7 +1,9 @@ use crate::app::KvStoreOption; -use dictionary::Dictionary; -use key_value_store::KeyValueStore; +use crate::store::HasStats; +use http_backend::stats::ExtStatsTimer; +use std::sync::Arc; use std::{fmt::Debug, ops::Deref}; +use utils::{Dictionary, Utils}; use wasmtime_wasi::ResourceTable; use wasmtime_wasi::WasiCtxView; use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView}; @@ -28,6 +30,7 @@ pub mod util; use crate::app::SecretOption; use crate::logger::Logger; +use crate::util::stats::StatsVisitor; use anyhow::{anyhow, bail}; pub use app::{App, SecretValue, SecretValues}; use http::request::Parts; @@ -37,10 +40,9 @@ use smol_str::SmolStr; use std::borrow::Cow; use wasmtime_environ::wasmparser::{Encoding, Parser, Payload}; use wasmtime_wasi_http::body::HyperOutgoingBody; -use wasmtime_wasi_http::types::OutgoingRequestConfig; use wasmtime_wasi_http::{ bindings::http::types::ErrorCode, - types::{default_send_request, HostFutureIncomingResponse}, + types::{default_send_request_handler, HostFutureIncomingResponse, OutgoingRequestConfig}, }; use wasmtime_wasi_nn::wit::WasiNnCtx; @@ -91,8 +93,9 @@ pub struct Data { pub logger: Option, http: WasiHttpCtx, pub secret_store: SecretStore, - pub key_value_store: KeyValueStore, + pub key_value_store: key_value_store::StoreImpl, pub dictionary: Dictionary, + pub utils: Utils, } pub trait BackendRequest { @@ -117,7 +120,7 @@ impl IoView for Data { } } -impl WasiHttpView for Data { +impl WasiHttpView for Data { fn ctx(&mut self) -> &mut WasiHttpCtx { &mut self.http } @@ -137,10 +140,17 @@ impl WasiHttpView for Data { })?; let use_tls = matches!(head.uri.scheme_str(), Some("https")); let request = Request::from_parts(head, body); - Ok(default_send_request( - request, - OutgoingRequestConfig { use_tls, ..config }, - )) + // start external request stats timer + let stats = self.inner.get_stats(); + + let handle = wasmtime_wasi::runtime::spawn(async move { + let _stats_timer = ExtStatsTimer::new(stats); // keep timer alive until request is done + Ok( + default_send_request_handler(request, OutgoingRequestConfig { use_tls, ..config }) + .await, + ) + }); + Ok(HostFutureIncomingResponse::pending(handle)) } fn table(&mut self) -> &mut ResourceTable { @@ -162,14 +172,6 @@ impl Data { Wasi::Preview2(ctx) => ctx, } } - - pub fn secret_store_ref(&self) -> &SecretStore { - &self.secret_store - } - - pub fn key_value_store_ref(&self) -> &KeyValueStore { - &self.key_value_store - } } /// Global Engine configuration for `WasmEngineBuilder`. @@ -405,7 +407,14 @@ pub trait ContextT { fn make_secret_store(&self, secrets: &Vec) -> anyhow::Result; - fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore; + fn make_key_value_store(&self, stores: &Vec) -> key_value_store::Builder; + + fn new_stats_row( + &self, + request_id: &SmolStr, + app: &SmolStr, + cfg: &App, + ) -> Arc; } pub trait ExecutorCache { diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index 7b88b11..2782e90 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -1,17 +1,18 @@ use crate::limiter::ProxyLimiter; use crate::logger::Logger; use crate::registry::CachedGraphRegistry; +use crate::util::stats::StatsVisitor; use crate::{Data, Wasi, WasiVersion, DEFAULT_EPOCH_TICK_INTERVAL}; use anyhow::Result; -use dictionary::Dictionary; -use key_value_store::KeyValueStore; use secret::SecretStore; +use std::sync::Arc; use std::{ collections::HashMap, fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, }; use tracing::{debug, instrument, trace}; +use utils::{Dictionary, Utils}; use wasmtime::component::ResourceTable; use wasmtime_wasi::WasiCtxBuilder; use wasmtime_wasi_http::WasiHttpCtx; @@ -70,6 +71,10 @@ impl DerefMut for Store { } } +pub trait HasStats { + fn get_stats(&self) -> Arc; +} + /// A builder interface for configuring a new [`Store`]. /// /// A new [`StoreBuilder`] can be obtained with [`crate::Engine::store_builder`]. @@ -84,7 +89,7 @@ pub struct StoreBuilder { properties: HashMap, registry: CachedGraphRegistry, secret_store: SecretStore, - key_value_store: KeyValueStore, + key_value_store: key_value_store::Builder, dictionary: Dictionary, } @@ -101,7 +106,7 @@ impl StoreBuilder { properties: Default::default(), registry: CachedGraphRegistry::new(), secret_store: Default::default(), - key_value_store: KeyValueStore::default(), + key_value_store: key_value_store::Builder::default(), dictionary: Default::default(), } } @@ -164,7 +169,7 @@ impl StoreBuilder { } /// Set key value store - pub fn key_value_store(self, key_value_store: KeyValueStore) -> Self { + pub fn key_value_store(self, key_value_store: key_value_store::Builder) -> Self { Self { key_value_store, ..self @@ -208,7 +213,10 @@ impl StoreBuilder { } /// Builds a [`Store`] from this builder with `Default` WasiCtxBuilder - pub fn build(self, inner: T) -> Result> { + pub fn build(self, inner: T) -> Result> + where + T: HasStats, + { let wasi_builder = WasiCtxBuilder::new(); self.build_with_wasi(wasi_builder, inner) } @@ -219,7 +227,10 @@ impl StoreBuilder { self, mut wasi_ctx_builder: WasiCtxBuilder, inner: T, - ) -> Result> { + ) -> Result> + where + T: HasStats, + { let table = ResourceTable::new(); wasi_ctx_builder.envs(&self.env); @@ -242,6 +253,9 @@ impl StoreBuilder { WasiVersion::Preview2 => Wasi::Preview2(wasi_ctx_builder.build()), }; + let key_value_store = self.key_value_store.build(inner.get_stats()); + let utils = Utils::new(inner.get_stats()); + let mut inner = wasmtime::Store::new( &self.engine, Data { @@ -254,8 +268,9 @@ impl StoreBuilder { logger, http: WasiHttpCtx::new(), secret_store: self.secret_store, - key_value_store: self.key_value_store, + key_value_store, dictionary: self.dictionary, + utils, }, ); inner.limiter(|state| &mut state.store_limits); diff --git a/crates/runtime/src/util/stats.rs b/crates/runtime/src/util/stats.rs index 6da1c6c..0e3e199 100644 --- a/crates/runtime/src/util/stats.rs +++ b/crates/runtime/src/util/stats.rs @@ -1,28 +1,413 @@ -#[cfg(feature = "stats")] -use clickhouse::Row; -#[cfg(feature = "stats")] use serde::Serialize; -#[cfg(feature = "stats")] -use smol_str::SmolStr; - -#[cfg(feature = "stats")] -#[derive(Row, Debug, Serialize, Default)] -pub struct StatRow { - pub app_id: u64, - pub client_id: u64, - pub timestamp: u32, - pub app_name: SmolStr, - pub status_code: u32, - pub fail_reason: u32, - pub billing_plan: SmolStr, - pub time_elapsed: u64, - pub memory_used: u64, - pub request_id: SmolStr, +use std::sync::Arc; + +use http_backend::stats::ExtRequestStats; +pub use key_value_store::ReadStats; +use std::time::{Duration, Instant}; +use utils::UserDiagStats; + +#[repr(i32)] +pub enum CdnPhase { + Http = 0, + RequestHeaders = 1, + RequestBody = 2, + ResponseHeaders = 3, + ResponseBody = 4, + Log = 5, +} + +pub trait StatsWriter { + fn write_stats(&self, stat: T); +} + +pub struct StatsTimer { + /// A stats ref for automatic recording of observations. + stats: Arc, + /// Whether the timer has already been observed once. + observed: bool, + /// Starting instant for the timer. + start: Instant, +} + +pub trait StatsVisitor: ReadStats + UserDiagStats + ExtRequestStats + Send + Sync { + /// Register http execution status code + fn status_code(&self, status_code: u16); + /// Register memory used by wasm execution + fn memory_used(&self, memory_used: u64); + /// Register failure reason code + fn fail_reason(&self, fail_reason: u32); + /// Observe elapsed time + fn observe(&self, elapsed: Duration); + /// Get elapsed time in microseconds + fn get_time_elapsed(&self) -> u64; + /// Get memory used in bytes + fn get_memory_used(&self) -> u64; + /// Register cdn phase + fn cdn_phase(&self, phase: CdnPhase); +} + +impl StatsTimer { + pub fn new(stats: Arc) -> Self { + Self { + stats, + observed: false, + start: Instant::now(), + } + } + + /// Discard timer without recording duration + pub fn discard(mut self) { + self.observed = true; + } + + /// Observe and record timer duration + pub fn observe_duration(mut self) { + self.observe(); + } + + fn observe(&mut self) { + let v = self.start.elapsed(); + self.observed = true; + self.stats.observe(v); + } +} + +impl Drop for StatsTimer { + fn drop(&mut self) { + if !self.observed { + self.observe() + } + } } -#[cfg(not(feature = "stats"))] -pub struct StatRow; +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU16, AtomicU32, AtomicU64, Ordering}; + use std::sync::Mutex; + use std::thread; + + // Mock implementation of StatsVisitor for testing + #[derive(Default)] + struct MockStatsVisitor { + status_code: AtomicU16, + memory_used: AtomicU64, + fail_reason: AtomicU32, + observed_duration: Mutex>, + time_elapsed: AtomicU64, + cdn_phase: AtomicI32, + user_diag: Mutex, + kv_reads: AtomicI32, + byod_reads: AtomicI32, + observe_called: AtomicBool, + } + + impl MockStatsVisitor { + fn new() -> Self { + Self::default() + } + + fn get_status_code(&self) -> u16 { + self.status_code.load(Ordering::Relaxed) + } + + fn get_memory_used_internal(&self) -> u64 { + self.memory_used.load(Ordering::Relaxed) + } + + fn get_fail_reason(&self) -> u32 { + self.fail_reason.load(Ordering::Relaxed) + } + + fn get_observed_duration(&self) -> Option { + self.observed_duration.lock().unwrap().clone() + } + + fn get_cdn_phase(&self) -> i32 { + self.cdn_phase.load(Ordering::Relaxed) + } + + fn get_user_diag(&self) -> String { + self.user_diag.lock().unwrap().clone() + } + + fn was_observe_called(&self) -> bool { + self.observe_called.load(Ordering::Relaxed) + } + } + + impl ReadStats for MockStatsVisitor { + fn count_kv_read(&self, value: i32) { + self.kv_reads.fetch_add(value, Ordering::Relaxed); + } + + fn count_kv_byod_read(&self, value: i32) { + self.byod_reads.fetch_add(value, Ordering::Relaxed); + } + } + + impl UserDiagStats for MockStatsVisitor { + fn set_user_diag(&self, diag: &str) { + *self.user_diag.lock().unwrap() = diag.to_string(); + } + } + + impl ExtRequestStats for MockStatsVisitor { + fn observe_ext(&self, _: std::time::Duration) {} + } + + impl StatsVisitor for MockStatsVisitor { + fn status_code(&self, status_code: u16) { + self.status_code.store(status_code, Ordering::Relaxed); + } + + fn memory_used(&self, memory_used: u64) { + self.memory_used.store(memory_used, Ordering::Relaxed); + } + + fn fail_reason(&self, fail_reason: u32) { + self.fail_reason.store(fail_reason, Ordering::Relaxed); + } + + fn observe(&self, elapsed: Duration) { + self.observe_called.store(true, Ordering::Relaxed); + *self.observed_duration.lock().unwrap() = Some(elapsed); + } + + fn get_time_elapsed(&self) -> u64 { + self.time_elapsed.load(Ordering::Relaxed) + } + + fn get_memory_used(&self) -> u64 { + self.memory_used.load(Ordering::Relaxed) + } + + fn cdn_phase(&self, phase: CdnPhase) { + self.cdn_phase.store(phase as i32, Ordering::Relaxed); + } + } + + #[test] + fn test_cdn_phase_values() { + assert_eq!(CdnPhase::Http as i32, 0); + assert_eq!(CdnPhase::RequestHeaders as i32, 1); + assert_eq!(CdnPhase::RequestBody as i32, 2); + assert_eq!(CdnPhase::ResponseHeaders as i32, 3); + assert_eq!(CdnPhase::ResponseBody as i32, 4); + assert_eq!(CdnPhase::Log as i32, 5); + } + + #[test] + fn test_stats_visitor_status_code() { + let stats = MockStatsVisitor::new(); + + stats.status_code(200); + assert_eq!(stats.get_status_code(), 200); + + stats.status_code(404); + assert_eq!(stats.get_status_code(), 404); + + stats.status_code(500); + assert_eq!(stats.get_status_code(), 500); + } + + #[test] + fn test_stats_visitor_memory_used() { + let stats = MockStatsVisitor::new(); + + stats.memory_used(1024); + assert_eq!(stats.get_memory_used_internal(), 1024); + + stats.memory_used(2048); + assert_eq!(stats.get_memory_used_internal(), 2048); + } + + #[test] + fn test_stats_visitor_fail_reason() { + let stats = MockStatsVisitor::new(); + + stats.fail_reason(1); + assert_eq!(stats.get_fail_reason(), 1); + + stats.fail_reason(42); + assert_eq!(stats.get_fail_reason(), 42); + } + + #[test] + fn test_stats_visitor_cdn_phase() { + let stats = MockStatsVisitor::new(); + + stats.cdn_phase(CdnPhase::Http); + assert_eq!(stats.get_cdn_phase(), 0); + + stats.cdn_phase(CdnPhase::RequestHeaders); + assert_eq!(stats.get_cdn_phase(), 1); + + stats.cdn_phase(CdnPhase::ResponseBody); + assert_eq!(stats.get_cdn_phase(), 4); + } + + #[test] + fn test_stats_visitor_user_diag() { + let stats = MockStatsVisitor::new(); + + stats.set_user_diag("test diagnostic"); + assert_eq!(stats.get_user_diag(), "test diagnostic"); + + stats.set_user_diag("another message"); + assert_eq!(stats.get_user_diag(), "another message"); + } + + #[test] + fn test_stats_visitor_read_stats() { + let stats = MockStatsVisitor::new(); + + stats.count_kv_read(10); + stats.count_kv_read(5); + assert_eq!(stats.kv_reads.load(Ordering::Relaxed), 15); + + stats.count_kv_byod_read(3); + stats.count_kv_byod_read(7); + assert_eq!(stats.byod_reads.load(Ordering::Relaxed), 10); + } + + #[test] + fn test_stats_timer_new() { + let stats = Arc::new(MockStatsVisitor::new()); + let timer = StatsTimer::new(stats.clone()); + + assert!(!timer.observed); + assert!(!stats.was_observe_called()); + } + + #[test] + fn test_stats_timer_observe_duration() { + let stats = Arc::new(MockStatsVisitor::new()); + let timer = StatsTimer::new(stats.clone()); + + thread::sleep(Duration::from_millis(10)); + timer.observe_duration(); + + assert!(stats.was_observe_called()); + let duration = stats.get_observed_duration(); + assert!(duration.is_some()); + assert!(duration.unwrap() >= Duration::from_millis(10)); + } + + #[test] + fn test_stats_timer_discard() { + let stats = Arc::new(MockStatsVisitor::new()); + let timer = StatsTimer::new(stats.clone()); + + timer.discard(); + + assert!(!stats.was_observe_called()); + assert!(stats.get_observed_duration().is_none()); + } + + #[test] + fn test_stats_timer_drop_auto_observe() { + let stats = Arc::new(MockStatsVisitor::new()); + + { + let _timer = StatsTimer::new(stats.clone()); + thread::sleep(Duration::from_millis(10)); + // Timer goes out of scope here, should auto-observe + } + + assert!(stats.was_observe_called()); + let duration = stats.get_observed_duration(); + assert!(duration.is_some()); + assert!(duration.unwrap() >= Duration::from_millis(10)); + } + + #[test] + fn test_stats_timer_drop_after_observe_no_double_observe() { + let stats = Arc::new(MockStatsVisitor::new()); + let first_duration; + + { + let timer = StatsTimer::new(stats.clone()); + thread::sleep(Duration::from_millis(10)); + timer.observe_duration(); + first_duration = stats.get_observed_duration().unwrap(); + // Even though timer goes out of scope, observe should not be called again + } + + let final_duration = stats.get_observed_duration().unwrap(); + assert_eq!(first_duration, final_duration); + } + + #[test] + fn test_stats_timer_drop_after_discard_no_observe() { + let stats = Arc::new(MockStatsVisitor::new()); + + { + let timer = StatsTimer::new(stats.clone()); + thread::sleep(Duration::from_millis(10)); + timer.discard(); + // Timer goes out of scope, but should not observe since discarded + } + + assert!(!stats.was_observe_called()); + assert!(stats.get_observed_duration().is_none()); + } + + #[test] + fn test_stats_timer_multiple_instances() { + let stats = Arc::new(MockStatsVisitor::new()); + + let timer1 = StatsTimer::new(stats.clone()); + let timer2 = StatsTimer::new(stats.clone()); + + thread::sleep(Duration::from_millis(10)); + timer1.observe_duration(); + + assert!(stats.was_observe_called()); + + thread::sleep(Duration::from_millis(5)); + timer2.observe_duration(); + + // Last observed duration should be from timer2 + let duration = stats.get_observed_duration().unwrap(); + assert!(duration >= Duration::from_millis(5)); + } + + #[test] + fn test_stats_timer_measures_elapsed_time() { + let stats = Arc::new(MockStatsVisitor::new()); + let timer = StatsTimer::new(stats.clone()); + + let sleep_duration = Duration::from_millis(50); + thread::sleep(sleep_duration); + timer.observe_duration(); + + let observed = stats.get_observed_duration().unwrap(); + // Allow some tolerance for timing variations + assert!(observed >= sleep_duration); + assert!(observed < sleep_duration + Duration::from_millis(50)); + } + + #[test] + fn test_mock_stats_visitor_concurrent_access() { + let stats = Arc::new(MockStatsVisitor::new()); + let mut handles = vec![]; + + for i in 0..10 { + let stats_clone = stats.clone(); + let handle = thread::spawn(move || { + stats_clone.status_code(200 + i); + stats_clone.memory_used(1024 * (i as u64 + 1)); + stats_clone.count_kv_read(1); + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } -pub trait StatsWriter { - fn write_stats(&self, stat: StatRow); + // Should have 10 kv reads + assert_eq!(stats.kv_reads.load(Ordering::Relaxed), 10); + } } diff --git a/crates/dictionary/Cargo.toml b/crates/utils/Cargo.toml similarity index 66% rename from crates/dictionary/Cargo.toml rename to crates/utils/Cargo.toml index 94e2cfa..735f1dc 100644 --- a/crates/dictionary/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -1,14 +1,13 @@ [package] -name = "dictionary" +name = "utils" version.workspace = true edition.workspace = true publish.workspace = true authors.workspace = true -description = "dictionary host function" +description = "utils(dictionary) host function" [dependencies] reactor = { path = "../reactor" } -async-trait = {workspace = true} [lints] workspace = true diff --git a/crates/dictionary/src/lib.rs b/crates/utils/src/dictionary.rs similarity index 100% rename from crates/dictionary/src/lib.rs rename to crates/utils/src/dictionary.rs diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs new file mode 100644 index 0000000..955c20b --- /dev/null +++ b/crates/utils/src/lib.rs @@ -0,0 +1,25 @@ +mod dictionary; + +pub use dictionary::Dictionary; +use std::sync::Arc; + +pub trait UserDiagStats: Sync + Send { + /// Set user defined diagnostic information + fn set_user_diag(&self, diag: &str); +} + +pub struct Utils { + stats: Arc, +} + +impl Utils { + pub fn new(stats: Arc) -> Self { + Self { stats } + } +} + +impl reactor::gcore::fastedge::utils::Host for Utils { + async fn set_user_diag(&mut self, value: String) { + self.stats.set_user_diag(value.as_str()); + } +} diff --git a/src/context.rs b/src/context.rs index 59b264b..a5fb5aa 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,17 +1,17 @@ use crate::executor::RunExecutor; use crate::key_value::CliStoreManager; use crate::secret::SecretImpl; -use dictionary::Dictionary; +use http_backend::stats::ExtRequestStats; use http_backend::Backend; use http_service::executor::{HttpExecutorImpl, WasiHttpExecutorImpl}; use http_service::state::HttpState; use http_service::{ContextHeaders, ExecutorFactory}; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use key_value_store::KeyValueStore; +use key_value_store::ReadStats; use runtime::app::{KvStoreOption, SecretOption}; use runtime::logger::{Console, Logger}; -use runtime::util::stats::{StatRow, StatsWriter}; +use runtime::util::stats::{CdnPhase, StatsVisitor}; use runtime::{ componentize_if_necessary, App, ContextT, ExecutorCache, PreCompiledLoader, Router, WasiVersion, WasmEngine, @@ -19,7 +19,10 @@ use runtime::{ use secret::SecretStore; use smol_str::SmolStr; use std::collections::HashMap; +use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::time::Duration; +use utils::{Dictionary, UserDiagStats}; use wasmtime::component::Component; use wasmtime::{Engine, Module}; @@ -73,15 +76,24 @@ impl ContextT for Context { Ok(SecretStore::new(Arc::new(secret_impl))) } - fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { + fn make_key_value_store(&self, stores: &Vec) -> key_value_store::Builder { let allowed_stores = stores .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); let manager = CliStoreManager { - stores: stores.to_owned(), + stores: stores.clone(), }; - KeyValueStore::new(allowed_stores, Arc::new(manager)) + key_value_store::Builder::new(allowed_stores, Arc::new(manager)) + } + + fn new_stats_row( + &self, + _request_id: &SmolStr, + _app: &SmolStr, + _cfg: &App, + ) -> Arc { + Arc::new(StatsStub::default()) } } @@ -160,6 +172,50 @@ impl Router for Context { } } -impl StatsWriter for Context { - fn write_stats(&self, _stat: StatRow) {} +#[derive(Default)] +pub struct StatsStub { + elapsed: AtomicU64, + memory_used: AtomicU64, +} + +impl ReadStats for StatsStub { + fn count_kv_read(&self, _value: i32) {} + + fn count_kv_byod_read(&self, _value: i32) {} +} + +impl UserDiagStats for StatsStub { + fn set_user_diag(&self, _diag: &str) {} +} + +impl ExtRequestStats for StatsStub { + fn observe_ext(&self, _elapsed: Duration) {} +} + +impl StatsVisitor for StatsStub { + fn status_code(&self, _status_code: u16) {} + + fn memory_used(&self, memory_used: u64) { + self.memory_used + .store(memory_used, std::sync::atomic::Ordering::Relaxed); + } + + fn fail_reason(&self, _fail_reason: u32) {} + + fn observe(&self, elapsed: Duration) { + self.elapsed.store( + elapsed.as_micros() as u64, + std::sync::atomic::Ordering::Relaxed, + ); + } + + fn get_time_elapsed(&self) -> u64 { + self.elapsed.load(std::sync::atomic::Ordering::Relaxed) + } + + fn get_memory_used(&self) -> u64 { + self.memory_used.load(std::sync::atomic::Ordering::Relaxed) + } + + fn cdn_phase(&self, _phase: CdnPhase) {} } diff --git a/src/executor.rs b/src/executor.rs index 6a153b4..e5dc907 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,13 +1,13 @@ use async_trait::async_trait; -use bytesize::ByteSize; -use http::{Request, Response, StatusCode}; +use http::{Request, Response}; use http_body_util::BodyExt; use http_service::executor::{HttpExecutor, HttpExecutorImpl, WasiHttpExecutorImpl}; use http_service::HyperOutgoingBody; use hyper::body::Body; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use std::time::Duration; +use runtime::util::stats::StatsVisitor; +use std::sync::Arc; pub enum RunExecutor { Http(HttpExecutorImpl>), @@ -16,19 +16,18 @@ pub enum RunExecutor { #[async_trait] impl HttpExecutor for RunExecutor { - async fn execute( - &self, + async fn execute( + self, req: Request, - on_response: R, + stats: Arc, ) -> anyhow::Result> where - R: FnOnce(StatusCode, ByteSize, Duration) + Send + 'static, B: BodyExt + Send, ::Data: Send, { match self { - RunExecutor::Http(ref executor) => executor.execute(req, on_response).await, - RunExecutor::Wasi(ref executor) => executor.execute(req, on_response).await, + RunExecutor::Http(executor) => executor.execute(req, stats).await, + RunExecutor::Wasi(executor) => executor.execute(req, stats).await, } } } diff --git a/src/key_value.rs b/src/key_value.rs index 88b4a6c..be81bd8 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,5 +1,5 @@ -use key_value_store::RedisStore; use key_value_store::{Error, Store, StoreManager}; +use key_value_store::{ReadStats, RedisStore}; use runtime::app::KvStoreOption; use std::sync::Arc; @@ -9,7 +9,11 @@ pub(crate) struct CliStoreManager { #[async_trait::async_trait] impl StoreManager for CliStoreManager { - async fn get_store(&self, name: &str) -> Result, Error> { + async fn get_store( + &self, + name: &str, + _stats: Arc, + ) -> Result, Error> { let Some(opts) = self.stores.iter().find(|store| store.name == name) else { return Err(Error::NoSuchStore); }; diff --git a/src/main.rs b/src/main.rs index 9e1e736..abb3e17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod executor; mod key_value; mod secret; +use crate::context::StatsStub; use bytesize::MB; use clap::{Args, Parser, Subcommand}; use context::Context; @@ -171,6 +172,7 @@ async fn main() -> anyhow::Result<()> { debug_until: None, secrets, kv_stores, + plan_id: 0, }; let mut headers = dotenv_injector.merge_with_dotenv_variables( @@ -190,7 +192,7 @@ async fn main() -> anyhow::Result<()> { wasi_http: run.wasi_http.unwrap_or_default(), }; - let http: HttpService = ServiceBuilder::new(context).build()?; + let http: HttpService = ServiceBuilder::new(context).build()?; let http = http.run(HttpConfig { all_interfaces: false, port: run.port,