diff --git a/README.md b/README.md index 37e9cd9..5cddd4f 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,64 @@ PORT=8080 MONGODB_URI=mongodb://localhost:27017 go run . The server listens on `:8080` by default and exposes routes under `/v1`. +## Memagent WAL Ingest (Multi-network) + +This backend can receive compressed WAL frames from the standalone `memagent` shipper. + +- Endpoint: `POST /v1/ingest/:network/:node/wal-frames` +- Headers: + - `Authorization: Bearer ` + - `Content-Type: application/vnd.cometbft.wal-frames+gzip` + - `X-WAL-Manifest`: JSON array of frame metadata with fields `{file, frame, off, len, recs, first_ts, last_ts, crc32}` +- Body: the concatenation of gzip members described in the manifest order. +- Storage: frames are appended to `uploads///.gz` with a sidecar `*.ack.json` tracking `next_frame`. + +Environment variables (ingest) +- `INGEST_AUTH_MAP`: Optional per-network key mapping, e.g. `"netA=abc123,netB=def456"`. +- `MEMAGENT_AUTH_KEY` or `INGEST_AUTH_KEY`: Global fallback shared key if a network is not in `INGEST_AUTH_MAP`. +- `INGEST_RATE_PER_MIN`: Per-network/node rate limit (default `3600`). +- `INGEST_BURST`: Burst capacity (default `200`). + +Behavior +- Duplicate frames (frame < next_frame) are skipped and counted. +- Out-of-order frames (frame > next_frame) are rejected (400). +- Batches are streamed from the request body to disk (no full buffering). + +### Prometheus Metrics + +Endpoint: `GET /metrics` + +Exported metrics: +- `ingest_requests_total{network,node,status}` +- `ingest_frames_total{network,node}` +- `ingest_bytes_total{network,node}` +- `ingest_dedupe_skipped_total{network,node}` +- `ingest_out_of_order_total{network,node}` +- `ingest_write_errors_total{network,node}` +- `ingest_duration_seconds{network,node}` (histogram) + +Sample Prometheus scrape config +```yaml +scrape_configs: + - job_name: cometbft-analyzer-backend + metrics_path: /metrics + static_configs: + - targets: ["backend.example.org:8080"] + labels: + env: prod +``` + +### Memagent example + +```bash +./memagent \ + --wal-dir /var/log/cometbft/wal \ + --remote-url http://backend.example.org:8080/v1/ingest///wal-frames \ + --auth-key abc123 \ + --send-interval 5s --hard-interval 10s \ + --iface eth0 --iface-speed 1000 +``` + ## Configuration - `MONGODB_URI`: MongoDB connection string (default: `mongodb://localhost:27017`). diff --git a/go.mod b/go.mod index 1af53a0..4e8229b 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,15 @@ require ( github.com/gin-gonic/gin v1.10.1 github.com/go-playground/validator/v10 v10.20.0 github.com/joho/godotenv v1.5.1 + github.com/prometheus/client_golang v1.23.2 go.mongodb.org/mongo-driver v1.17.4 ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect @@ -22,26 +25,32 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.7.1 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.23.0 // indirect - golang.org/x/text v0.17.0 // indirect - google.golang.org/protobuf v1.36.4 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0ef51cf..3185f8d 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,18 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bft-labs/cometbft-analyzer-types v0.1.0 h1:RWmqFzjCGC5Rd0J9cIOpwY/Vn1agM/BKcOhUXOlLjTU= github.com/bft-labs/cometbft-analyzer-types v0.1.0/go.mod h1:SSQwUNRl9XINCyw2ePORRoCdAkbHM3qn1YXehUXUjag= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -29,19 +34,25 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -53,10 +64,22 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -67,8 +90,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= @@ -84,23 +108,27 @@ github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfS github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -108,24 +136,25 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= -google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handlers/ingest_handler.go b/handlers/ingest_handler.go new file mode 100644 index 0000000..d1a3e7c --- /dev/null +++ b/handlers/ingest_handler.go @@ -0,0 +1,270 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/bft-labs/cometbft-analyzer-backend/metrics" + mw "github.com/bft-labs/cometbft-analyzer-backend/middleware" + "github.com/gin-gonic/gin" +) + +// frameMeta mirrors the memagent FrameMeta schema. +type frameMeta struct { + File string `json:"file"` + Frame uint64 `json:"frame"` + Off uint64 `json:"off"` + Len uint64 `json:"len"` + Recs uint32 `json:"recs"` + FirstTS int64 `json:"first_ts"` + LastTS int64 `json:"last_ts"` + CRC32 uint32 `json:"crc32"` +} + +// IngestWalFramesHandler receives concatenated gzip frames from memagent. +// Route suggestion: POST /v1/ingest/:network/:node/wal-frames +// Headers: +// +// Authorization: Bearer +// Content-Type: application/vnd.cometbft.wal-frames+gzip +// X-WAL-Manifest: JSON array of frameMeta describing each member in payload order +// +// Behavior: +// - Stores frames under uploads///.gz, appending in order +// - Uses sidecar .ack.json to track next expected frame for dedupe/ordering +func IngestWalFramesHandler(uploadRoot string) gin.HandlerFunc { + // Lazy-init per-network auth map and rate limiter from env + var ( + authOnce bool + authMap map[string]string + limiterOnce bool + limiter *mw.RateLimiter + ) + + return func(c *gin.Context) { + start := time.Now() + // Init auth map once + if !authOnce { + authMap = parseAuthMap(os.Getenv("INGEST_AUTH_MAP")) + authOnce = true + } + + network := c.Param("network") + node := c.Param("node") + if network == "" || node == "" { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "bad_request").Inc() + c.JSON(http.StatusBadRequest, gin.H{"error": "missing network or node"}) + return + } + + // Authorization check (per-network preferred, else global shared key) + want := authMap[network] + if want == "" { + want = os.Getenv("MEMAGENT_AUTH_KEY") + if want == "" { + want = os.Getenv("INGEST_AUTH_KEY") + } + } + if want != "" { + ah := c.GetHeader("Authorization") + if !strings.HasPrefix(ah, "Bearer ") || strings.TrimSpace(strings.TrimPrefix(ah, "Bearer ")) != want { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "unauthorized").Inc() + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + } + + // Per-network rate limiting + if !limiterOnce { + rate := getenvInt("INGEST_RATE_PER_MIN", 3600) + burst := getenvInt("INGEST_BURST", 200) + limiter = mw.NewRateLimiter(rate, burst) + limiterOnce = true + } + if limiter != nil { + key := network + ":" + node + if !limiter.Allow(key) { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "rate_limited").Inc() + c.JSON(http.StatusTooManyRequests, gin.H{"error": "rate limit exceeded"}) + return + } + } + + ct := c.GetHeader("Content-Type") + if !strings.HasPrefix(ct, "application/vnd.cometbft.wal-frames+gzip") { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "unsupported_media_type").Inc() + c.JSON(http.StatusUnsupportedMediaType, gin.H{"error": "invalid content type"}) + return + } + + manHdr := c.GetHeader("X-WAL-Manifest") + if manHdr == "" { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "bad_request").Inc() + c.JSON(http.StatusBadRequest, gin.H{"error": "missing X-WAL-Manifest"}) + return + } + var manifest []frameMeta + if err := json.Unmarshal([]byte(manHdr), &manifest); err != nil || len(manifest) == 0 { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "bad_request").Inc() + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid X-WAL-Manifest"}) + return + } + + // Streaming write: do not hold entire body; iterate by manifest parts + // We still want bytes metric; wrap reader to count bytes + countingR := &countReader{R: c.Request.Body} + + // Prepare upload directory + baseDir := filepath.Join(uploadRoot, sanitize(network), sanitize(node)) + if err := os.MkdirAll(baseDir, 0o755); err != nil { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "server_error").Inc() + c.JSON(http.StatusInternalServerError, gin.H{"error": "mkdir failed"}) + return + } + + // Walk manifest and split payload accordingly + progressed := 0 + + for idx, fm := range manifest { + if fm.Len == 0 { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "bad_request").Inc() + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("manifest[%d] len=0", idx)}) + return + } + lr := &io.LimitedReader{R: countingR, N: int64(fm.Len)} + + // Resolve target file path(s) + gzPath := filepath.Join(baseDir, filepath.Base(fm.File)) + ackPath := gzPath + ".ack.json" + + // Load next expected frame + next := uint64(1) + if b, err := os.ReadFile(ackPath); err == nil { + var m map[string]string + if json.Unmarshal(b, &m) == nil { + if v, ok := m["next_frame"]; ok { + if n, err := strconv.ParseUint(v, 10, 64); err == nil && n > 0 { + next = n + } + } + } + } + if fm.Frame < next { + // duplicate, skip but consume bytes from the stream + metrics.IngestDedupeSkippedTotal.WithLabelValues(network, node).Inc() + if _, err := io.Copy(io.Discard, lr); err != nil { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "server_error").Inc() + c.JSON(http.StatusInternalServerError, gin.H{"error": "stream discard failed"}) + return + } + continue + } + if fm.Frame > next { + metrics.IngestOutOfOrderTotal.WithLabelValues(network, node).Inc() + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("out-of-order frame: got=%d want=%d file=%s", fm.Frame, next, fm.File)}) + return + } + // Append chunk to target gz + f, err := os.OpenFile(gzPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + metrics.IngestWriteErrorsTotal.WithLabelValues(network, node).Inc() + metrics.IngestRequestsTotal.WithLabelValues(network, node, "server_error").Inc() + c.JSON(http.StatusInternalServerError, gin.H{"error": "open target failed"}) + return + } + if _, err := io.Copy(f, lr); err != nil { + _ = f.Close() + metrics.IngestWriteErrorsTotal.WithLabelValues(network, node).Inc() + metrics.IngestRequestsTotal.WithLabelValues(network, node, "server_error").Inc() + c.JSON(http.StatusInternalServerError, gin.H{"error": "write failed"}) + return + } + _ = f.Close() + + // Update ack + nb, _ := json.Marshal(map[string]string{"next_frame": strconv.FormatUint(next+1, 10)}) + tmp := ackPath + ".tmp" + if err := os.WriteFile(tmp, nb, 0o644); err == nil { + _ = os.Rename(tmp, ackPath) + } + progressed++ + } + + // finalize bytes metric + metrics.IngestBytesTotal.WithLabelValues(network, node).Add(float64(countingR.N)) + + if progressed == 0 { + metrics.IngestRequestsTotal.WithLabelValues(network, node, "ok").Inc() + metrics.IngestDurationSeconds.WithLabelValues(network, node).Observe(time.Since(start).Seconds()) + c.JSON(http.StatusOK, gin.H{"status": "ok", "note": "no new frames"}) + return + } + metrics.IngestFramesTotal.WithLabelValues(network, node).Add(float64(progressed)) + metrics.IngestRequestsTotal.WithLabelValues(network, node, "ok").Inc() + metrics.IngestDurationSeconds.WithLabelValues(network, node).Observe(time.Since(start).Seconds()) + log.Printf("ingest ok network=%s node=%s frames=%d bytes=%d", network, node, progressed, countingR.N) + c.JSON(http.StatusOK, gin.H{"status": "ok", "frames": progressed}) + } +} + +func sanitize(s string) string { + // very simple sanitization to avoid path traversal + s = strings.TrimSpace(s) + s = strings.ReplaceAll(s, "..", "_") + s = strings.ReplaceAll(s, "/", "_") + s = strings.ReplaceAll(s, "\\", "_") + return s +} + +// parseAuthMap parses env like: "netA=keyA,netB=keyB" +func parseAuthMap(s string) map[string]string { + res := map[string]string{} + parts := strings.Split(s, ",") + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + kv := strings.SplitN(p, "=", 2) + if len(kv) != 2 { + continue + } + net := strings.TrimSpace(kv[0]) + key := strings.TrimSpace(kv[1]) + if net != "" && key != "" { + res[net] = key + } + } + return res +} + +func getenvInt(key string, def int) int { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + return def + } + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + return def +} + +// countReader counts bytes read from an underlying reader +type countReader struct { + R io.Reader + N int64 +} + +func (cr *countReader) Read(p []byte) (int, error) { + n, err := cr.R.Read(p) + cr.N += int64(n) + return n, err +} diff --git a/main.go b/main.go index 51724a8..1a27bc2 100644 --- a/main.go +++ b/main.go @@ -6,9 +6,11 @@ import ( "github.com/bft-labs/cometbft-analyzer-backend/db" "github.com/bft-labs/cometbft-analyzer-backend/handlers" + prommetrics "github.com/bft-labs/cometbft-analyzer-backend/metrics" "github.com/bft-labs/cometbft-analyzer-backend/middleware" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/prometheus/client_golang/prometheus/promhttp" ) func main() { @@ -82,6 +84,13 @@ func main() { v1.GET("/simulations/:id/metrics/network/latency/overview", handlers.GetSimulationNetworkLatencyOverviewHandler(client, simulationsColl)) } + // WAL ingestion endpoint (memagent) + v1.POST("/ingest/:network/:node/wal-frames", handlers.IngestWalFramesHandler("uploads")) + + // Prometheus metrics + _ = prommetrics.IngestBytesTotal // touch to ensure package init + router.GET("/metrics", gin.WrapH(promhttp.Handler())) + port := os.Getenv("PORT") if port == "" { port = "8080" diff --git a/metrics/prometheus.go b/metrics/prometheus.go new file mode 100644 index 0000000..e93c682 --- /dev/null +++ b/metrics/prometheus.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + IngestRequestsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_requests_total", + Help: "Total number of memagent ingest requests", + }, + []string{"network", "node", "status"}, + ) + + IngestFramesTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_frames_total", + Help: "Total number of frames ingested", + }, + []string{"network", "node"}, + ) + + IngestBytesTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_bytes_total", + Help: "Total number of compressed bytes received", + }, + []string{"network", "node"}, + ) + + IngestDedupeSkippedTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_dedupe_skipped_total", + Help: "Total frames skipped due to dedupe (already acknowledged)", + }, + []string{"network", "node"}, + ) + + IngestOutOfOrderTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_out_of_order_total", + Help: "Total out-of-order frames rejected", + }, + []string{"network", "node"}, + ) + + IngestWriteErrorsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ingest_write_errors_total", + Help: "Total write errors during ingest", + }, + []string{"network", "node"}, + ) + + IngestDurationSeconds = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "ingest_duration_seconds", + Help: "Duration of ingest requests", + Buckets: prometheus.DefBuckets, + }, + []string{"network", "node"}, + ) +) diff --git a/middleware/validation.go b/middleware/validation.go index 9c9659e..a4181ff 100644 --- a/middleware/validation.go +++ b/middleware/validation.go @@ -14,9 +14,12 @@ func RequestValidationMiddleware() gin.HandlerFunc { method := c.Request.Method if method == "POST" || method == "PUT" || method == "PATCH" { contentType := c.GetHeader("Content-Type") + path := c.Request.URL.Path // Allow multipart/form-data for file uploads - if !strings.Contains(contentType, "application/json") && + if strings.HasPrefix(path, "/v1/ingest/") || strings.HasPrefix(contentType, "application/vnd.cometbft.wal-frames") { + // allow vendor media type for WAL frames ingest + } else if !strings.Contains(contentType, "application/json") && !strings.Contains(contentType, "multipart/form-data") { c.JSON(http.StatusUnsupportedMediaType, gin.H{ "error": "Content-Type must be application/json or multipart/form-data",