diff --git a/.env.example b/.env.example index 2de88d0..1decff2 100644 --- a/.env.example +++ b/.env.example @@ -57,6 +57,14 @@ KURU_USE_ACCESS_LIST=true # KURU_RECONCILIATION_INTERVAL=3.0 # KURU_RECONCILIATION_THRESHOLD=5.0 +# ======================================== +# INFLUXDB METRICS (optional — omit to disable) +# ======================================== + +# INFLUX_URL=http://localhost:8181 +# INFLUX_TOKEN=your_token_here +# INFLUX_DATABASE=mm_bot + # ======================================== # OPERATIONAL STRATEGY SETTINGS # ======================================== diff --git a/CLAUDE.md b/CLAUDE.md index 49fd96c..a768487 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -52,33 +52,58 @@ Key tracking dicts on `Bot` (all set/cleared together in callbacks): **Invariant:** A cloid in `active_cloids` always has a matching entry in both `active_orders` and `order_sizes`. These three are always set and cleared together. -### PropMaintain logic (`_generate_orders_with_prop_maintain`) +### Pluggable quoter system -For each quoter level, each side independently checks whether the existing order's edge is above the cancel threshold (`baseline_edge × (1 - PROP_MAINTAIN)`). The check chain: +The quoter layer is split into: -1. Order found in REST API result → use API price for edge check -2. Order in `preregistered_orders` → just sent, awaiting confirmation → hold -3. Order in `active_orders` → confirmed via callback but REST API hasn't indexed it yet → use callback price for full edge check (logs `[callback]`) -4. Order in `active_cloids` but not `active_orders` → unknown state → hold +- `mm_bot/quoter/base.py` — `BaseQuoter` ABC. Implement `decide(ctx: QuoterContext) -> QuoterDecision`. +- `mm_bot/quoter/context.py` — `QuoterContext` (frozen snapshot of market state) and `QuoterDecision` (cancels + new orders). +- `mm_bot/quoter/skew_quoter.py` — `SkewQuoter`, the built-in strategy (position-skew + PropMaintain). +- `mm_bot/quoter/registry.py` — `register_quoter(name, cls)` / `get_quoter_class(name)`. +- `mm_bot/quoter/quoter.py` — backward-compat shim (`SkewQuoter as Quoter`). -**Coupling:** when one side of a quoter is replaced, the other is force-replaced too (lines ~1090–1100 in `bot.py`). This keeps both sides priced off the same reference. +The bot creates quoters via `_initialize_quoters()` using the registry. For each iteration it: +1. Calls `_resolve_existing_orders(quoter, on_chain_by_cloid)` to build `ExistingOrder` objects from tracking dicts +2. Constructs a `QuoterContext` snapshot +3. Calls `quoter.decide(ctx)` to get cancels + new orders +4. Processes the `QuoterDecision` (discard cancelled cloids from `active_cloids`, batch into `place_orders()`) + +This replaces the old `_generate_orders_with_prop_maintain` method. All per-quoter strategy logic now lives in the quoter's `decide()` method. + +### Order generation flow (`_generate_orders`) + +For each quoter, `_resolve_existing_orders` resolves the existing bid/ask from tracking dicts using this priority: + +1. Found in `on_chain_by_cloid` (REST API) → source `"on_chain"`, price from API +2. Found in `preregistered_orders` → source `"preregistered"`, price `None` +3. Found in `active_orders` → source `"callback"`, price from callback +4. Found in `active_cloids` only → source `"unknown"`, price `None` + +The `SkewQuoter.decide()` check chain: +1. source `"preregistered"` → hold (awaiting confirmation) +2. source `"unknown"` → hold +3. source `"on_chain"` or `"callback"` → edge check against cancel threshold; keep or cancel +4. Coupling: if one side replaced, force-replace the other (uses same reference price/skew) ### Cloid format ``` -{side}-{baseline_edge_bps}-{timestamp_ms} +{side}-{quoter_id}-{timestamp_ms} # e.g. bid-1.0-1771500973306, ask-15.0-1771500975944 ``` -Quoter-to-order matching uses cloid prefix (`bid-{bps}-` / `ask-{bps}-`). Do not change this format without updating the matching logic. +For `SkewQuoter`, `quoter_id = str(Decimal(str(baseline_edge_bps)))`, preserving the original format. +For custom quoters, `quoter_id` is set in `BaseQuoter.__init__` and must be unique and stable across restarts. + +Quoter-to-order matching uses `cloid_prefix_bid` / `cloid_prefix_ask` properties on `BaseQuoter`. Do not change the format without updating matching logic. ### Quoter skew formula -`Quoter.get_bid_ask_edges()` adjusts edges based on `position / max_position` (capped ±1): +`SkewQuoter._get_skewed_edges()` adjusts edges based on `position / max_position` (capped ±1): - Long position → widen bids (slow to buy more), tighten asks (eager to sell) - Short position → tighten bids (eager to buy), widen asks (slow to sell more) -Skew magnitude is controlled by `PROP_SKEW_ENTRY` and `PROP_SKEW_EXIT`. +Skew magnitude is controlled by `prop_skew_entry` and `prop_skew_exit`. ### Shutdown diff --git a/README.md b/README.md index 2ee8b40..301edcd 100644 --- a/README.md +++ b/README.md @@ -1,101 +1,246 @@ # Kuru Market Making Bot -A market-making bot for Kuru DEX using the refactored `kuru-sdk-py` client. +An async market-making bot for [Kuru DEX](https://kuru.io) on Monad. Maintains bid/ask quotes using a **PropMaintain** strategy — only cancels and replaces orders whose edge has drifted below a threshold, minimizing gas costs. -## What Changed +## Quick Start -This repo now tracks SDK v0.1.9+ behavior: +```bash +git clone https://github.com/kuru-labs/mm-example.git +cd mm-example +python3 -m venv venv && source venv/bin/activate +pip install -r requirements.txt +``` -- Decimal-native order/position math in bot state and fill handling -- Full `ConfigManager.load_all_configs(...)` bootstrap path -- Typed SDK error handling (`Kuru*Error`) for retries and recovery -- Cancel-all flow aligned with SDK semantics (no tx-hash return assumptions) +```bash +cp .env.example .env +cp bot_config.example.toml bot_config.toml +``` -## Architecture +Set your credentials in `.env`: +``` +PRIVATE_KEY=your_private_key_without_0x +``` -Core modules: +Set your market and oracle in `bot_config.toml`. Available market symbols and their `market_address` values are listed in [kuru-exchange-server/config/markets.toml](https://github.com/Kuru-Labs/kuru-exchange-server/blob/master/config/markets.toml): +```toml +market_address = "0x..." # orderbookAddress from markets.toml +oracle_source = "kuru" +kuru_symbol = "mon_ausd" # symbol from markets.toml +``` -- `mm_bot/main.py`: process startup, logging, signal handling -- `mm_bot/config/config.py`: loads operational config (`bot_config.toml`) and SDK config bundle -- `mm_bot/bot/bot.py`: quoting loop, order lifecycle callbacks, cancellation/reconciliation, typed recovery -- `mm_bot/quoter/quoter.py`: skewed bid/ask generation -- `mm_bot/position/position_tracker.py`: Decimal-native position persistence (`tracking/position_state.json`) -- `mm_bot/pricing/oracle.py`: oracle sources (`kuru` websocket or `coinbase` REST) -- `mm_bot/pnl/tracker.py`: Decimal-native PnL display +**Deposit tokens into your margin account** before running. The bot places orders from your Kuru margin balance — a zero balance means no orders will go through: +```bash +# Check your current margin balance +python deposit.py --token 0x... --check -## Install +# Deposit tokens (use the token address for your market's base or quote token) +python deposit.py --token 0x... --amount 100 +``` +Then start the bot: ```bash -git clone https://github.com/kuru-labs/mm-example.git -cd mm-example -python3 -m venv venv -source venv/bin/activate -pip install -r requirements.txt +python -m mm_bot.main ``` +Logs appear under `tracking/`. + +--- + ## Configuration -The bot uses: +The bot uses two config files: -1. `bot_config.toml` for strategy/operational settings -2. `.env` for secrets and SDK runtime settings +- **`bot_config.toml`** — strategy and operational settings (hot-reloadable) +- **`.env`** — secrets and SDK runtime settings -### Required `.env` +### Required `.env` keys -- `PRIVATE_KEY` -- `MARKET_ADDRESS` (required unless `strategy.market_address` is set in `bot_config.toml`) +| Key | Notes | +|-----|-------| +| `PRIVATE_KEY` | No `0x` prefix | +| `MARKET_ADDRESS` | Can also be set in `bot_config.toml` as `strategy.market_address` | ### Common SDK `.env` settings -- `RPC_URL` (default `https://rpc.monad.xyz`) -- `RPC_WS_URL` (default `wss://rpc.monad.xyz`) -- `KURU_WS_URL` (default `wss://ws.kuru.io/`) -- `KURU_API_URL` (default `https://api.kuru.io/`) -- `KURU_RPC_LOGS_SUBSCRIPTION` (default `monadLogs`) -- `KURU_GAS_BUFFER_MULTIPLIER` (default from SDK) -- `KURU_USE_ACCESS_LIST` (`true`/`false`) -- `KURU_POST_ONLY` (`true`/`false`) -- `KURU_RPC_WS_MAX_RECONNECT_ATTEMPTS` -- `KURU_RPC_WS_RECONNECT_DELAY` -- `KURU_RPC_WS_MAX_RECONNECT_DELAY` -- `KURU_RECONCILIATION_INTERVAL` -- `KURU_RECONCILIATION_THRESHOLD` +| Key | Default | +|-----|---------| +| `RPC_URL` | `https://rpc.monad.xyz` | +| `RPC_WS_URL` | `wss://rpc.monad.xyz` | +| `KURU_WS_URL` | `wss://ws.kuru.io/` | +| `KURU_API_URL` | `https://api.kuru.io/` | +| `KURU_RPC_LOGS_SUBSCRIPTION` | `monadLogs` | +| `KURU_GAS_BUFFER_MULTIPLIER` | SDK default | +| `KURU_USE_ACCESS_LIST` | `true`/`false` | +| `KURU_POST_ONLY` | `true`/`false` | + +### Oracle: Kuru exchange server + +When `oracle_source = "kuru"`, the bot connects to `wss://exchange.kuru.io` and subscribes to live orderbook updates for the given symbol, computing a mid-price from the best bid/ask. + +```toml +oracle_source = "kuru" +kuru_symbol = "mon_ausd" + +# Which Monad block state to read prices from (default: "committed") +# proposed → freshest prices, can revert on reorg +# voted → validators voted +# finalized → finalized by validators +# committed → highest finality, slightly lagging +kuru_depth_state = "proposed" +``` + +Available market symbols are defined in the Kuru exchange server config: +[kuru-exchange-server/config/markets.toml](https://github.com/Kuru-Labs/kuru-exchange-server/blob/master/config/markets.toml) -See `.env.example` and `bot_config.example.toml`. +--- -## Run +### Key `bot_config.toml` parameters + +Hot-reloadable (no restart needed): + +| Parameter | Description | +|-----------|-------------| +| `prop_maintain` | Cancel threshold — `0.2` means cancel if edge drops below 80% of target | +| `reconcile_interval` | Seconds between position reconciliation (`0` = disabled) | + +Full reinit on change (brief trading pause): + +| Parameter | Description | +|-----------|-------------| +| `quoters_bps` | Spread levels in bps — `[1, 10, 15]` creates 3 bid/ask pairs | +| `quoter_type` | Quoter strategy — built-in: `"skew"` | +| `quantity` | Order size per level per side (base token units) | +| `max_position` | Inventory cap — bot skews quotes to stay within | +| `prop_skew_entry` | How aggressively to slow down position accumulation | +| `prop_skew_exit` | How aggressively to accelerate position unwind | + +See `bot_config.example.toml` for a fully annotated reference, and **[TUNING.md](TUNING.md)** for a detailed guide on how to tune each parameter. + +--- + +## Architecture -```bash -./run.sh +``` +mm_bot/ +├── main.py # Entry point, logging, signal handling +├── bot/bot.py # Quoting loop, order lifecycle, callbacks +├── quoter/ # Pluggable quoter system (see below) +├── config/ +│ ├── config.py # BotConfig dataclass, TOML + .env loading +│ └── config_watcher.py # Hot-reload (watches bot_config.toml every 5s) +├── position/ # Position tracking, persistence +├── pricing/oracle.py # Oracle sources (Kuru exchange server WS or Coinbase REST) +└── pnl/tracker.py # PnL display ``` -or: +The bot tracks all order state from **WebSocket callbacks**, not the REST API. The REST API lags ~2 seconds behind on-chain events and is only used for startup cleanup, orphan detection, and shutdown. + +### Quoter system + +Each quoter manages one bid/ask pair at one spread level. On every iteration the bot: + +1. Resolves existing order state from its tracking dicts into a frozen `QuoterContext` snapshot +2. Calls `quoter.decide(ctx)` — the quoter returns cancels + new orders +3. Batches everything into a single `place_orders()` transaction + +| Module | Role | +|--------|------| +| `quoter/base.py` | `BaseQuoter` ABC — implement `decide(ctx) -> QuoterDecision` | +| `quoter/context.py` | `QuoterContext` (frozen snapshot) and `QuoterDecision` | +| `quoter/skew_quoter.py` | Built-in `SkewQuoter` — position skew + PropMaintain cancel logic | +| `quoter/registry.py` | `register_quoter()` / `get_quoter_class()` | + +### Writing a custom quoter + +```python +from decimal import Decimal +from mm_bot.quoter.base import BaseQuoter +from mm_bot.quoter.context import QuoterContext, QuoterDecision +from mm_bot.quoter.registry import register_quoter +from mm_bot.kuru_imports import Order, OrderType, OrderSide + +class MyQuoter(BaseQuoter): + def __init__(self, edge_bps: float, quantity: Decimal): + super().__init__(quoter_id=f"my-{edge_bps}", quantity=quantity) + self.edge = Decimal(str(edge_bps)) + + def decide(self, ctx: QuoterContext) -> QuoterDecision: + cancels = [] + if ctx.existing_bid and ctx.existing_bid.source not in ("preregistered", "unknown"): + cancels.append(ctx.existing_bid.cloid) + if ctx.existing_ask and ctx.existing_ask.source not in ("preregistered", "unknown"): + cancels.append(ctx.existing_ask.cloid) + + new_orders = [] + if not ctx.stop_bids: + new_orders.append(Order( + cloid=self.make_cloid("bid"), order_type=OrderType.LIMIT, + side=OrderSide.BUY, size=self.quantity, post_only=False, + price=self.price_from_edge(self.edge, OrderSide.BUY, ctx.reference_price), + )) + if not ctx.stop_asks: + new_orders.append(Order( + cloid=self.make_cloid("ask"), order_type=OrderType.LIMIT, + side=OrderSide.SELL, size=self.quantity, post_only=False, + price=self.price_from_edge(self.edge, OrderSide.SELL, ctx.reference_price), + )) + return QuoterDecision(cancels=cancels, new_orders=new_orders) + + @classmethod + def from_config(cls, config_section: dict) -> "MyQuoter": + return cls( + edge_bps=float(config_section["baseline_edge_bps"]), + quantity=Decimal(str(config_section["quantity"])), + ) + +register_quoter("my_quoter", MyQuoter) +``` -```bash -source venv/bin/activate -PYTHONPATH=. python3 mm_bot/main.py +Then in `bot_config.toml`: +```toml +[[strategy.quoters]] +type = "my_quoter" +baseline_edge_bps = 10.0 +quantity = 500 ``` -## Runtime Notes +`QuoterContext` fields: + +| Field | Type | Description | +|-------|------|-------------| +| `reference_price` | `Decimal` | Current fair price from oracle | +| `current_position` | `Decimal` | Net position (positive = long) | +| `max_position` | `Decimal` | Position limit from config | +| `existing_bid` | `ExistingOrder?` | Bot's current bid for this quoter | +| `existing_ask` | `ExistingOrder?` | Bot's current ask for this quoter | +| `stop_bids` | `bool` | `True` if position ≥ max_position | +| `stop_asks` | `bool` | `True` if position ≤ -max_position | +| `prop_maintain` | `float` | Cancel threshold factor from config | +| `price_precision` | `Decimal` | Market price precision | + +`ExistingOrder.source`: `"on_chain"` · `"callback"` · `"preregistered"` · `"unknown"` -- Position state is persisted as Decimal-safe values in `tracking/position_state.json`. -- Terminal order states now include `ORDER_TIMEOUT` and `ORDER_FAILED` handling. -- SDK typed errors are classified for retry vs skip behavior: - - execution errors: `KuruInsufficientFundsError`, `KuruContractError`, `KuruOrderError` - - connectivity errors: `KuruConnectionError`, `KuruWebSocketError`, `KuruTimeoutError` - - API/auth errors: `KuruAuthorizationError` +--- ## Troubleshooting -- Orders not placing: - - Check margin balances and wallet gas balance - - Confirm market address and token decimals from on-chain market config -- Frequent retries: - - Check RPC/WebSocket health - - Tune `KURU_RPC_WS_*` and `KURU_RECONCILIATION_*` -- No reference price: - - Verify selected oracle source in `bot_config.toml` - - Check connectivity to Kuru WS or Coinbase API +**Orders not placing** +- Check margin balances and wallet gas balance +- Confirm market address and token decimals from on-chain market config + +**Frequent retries / connectivity issues** +- Check RPC/WebSocket health +- Tune `KURU_RPC_WS_*` reconnect settings in `.env` + +**No reference price** +- Verify `oracle_source` in `bot_config.toml` +- Check connectivity to Kuru WS or Coinbase API + +**Position drift after restart** +- Position is persisted in `tracking/position_state.json` +- Use `override_start_position` in `bot_config.toml` to force a specific starting value + +--- ## License diff --git a/TUNING.md b/TUNING.md new file mode 100644 index 0000000..7918ef5 --- /dev/null +++ b/TUNING.md @@ -0,0 +1,190 @@ +# Parameter Tuning Guide + +This guide explains what each strategy parameter does and how to tune it. All parameters live in `bot_config.toml` under `[strategy]`. + +--- + +## Baseline edge: `quoters_bps` + +```toml +quoters_bps = [7, 15] +``` + +Each value is the **baseline edge in basis points** for one quoter — the target distance from the oracle mid-price at which that quoter places its bid and ask (before any position skew is applied). `7` = 0.07% each side. + +Each entry in the list creates one independent bid/ask pair. You can run multiple quoters at different edge levels simultaneously: + +```toml +quoters_bps = [5, 20] +``` + +The inner quoter (5 bps) captures most of the flow. The outer quoter (20 bps) earns more edge per fill and only triggers on larger price moves. + +**Starting point:** baseline edge should at least cover on-chain fee costs (~2–5 bps) plus your target profit margin. On a liquid market, 5–15 bps is common. On a thinner market, 20–50 bps may be needed to avoid adverse selection. + +--- + +## Order size: `quantity` and `quantity_bps_per_level` + +```toml +quantity = 500 +``` + +Fixed order size per side, per quoter level, in base token units. + +Alternatively, size as a fraction of `max_position`: +```toml +quantity_bps_per_level = 500 # 5% of max_position per level +``` + +This keeps order size proportional to your inventory limit — useful when tuning `max_position` frequently. + +**Rule of thumb:** size each level so that a full fill on one side doesn't push you uncomfortably close to `max_position`. With two levels and `max_position = 10000`, sizing each at 500–1000 leaves room for multiple fills before skew kicks in hard. + +--- + +## Inventory cap: `max_position` + +```toml +max_position = 10000 +``` + +Maximum net position (in base token units) the bot will hold. When position reaches this limit: +- `stop_bids = True` — bids are cancelled and not replaced +- `stop_asks = True` (on the short side) — asks are cancelled + +This is a hard cap. The skew parameters (below) are the soft mechanism that slows accumulation before the cap is hit. + +--- + +## Inventory skew: `prop_skew_entry` and `prop_skew_exit` + +These control how aggressively the bot adjusts its quoted edge as inventory builds up. + +```toml +prop_skew_entry = 0.1 # slow to build more inventory +prop_skew_exit = 0.5 # eager to unwind +``` + +The skew formula scales edge up (wider, less competitive) or down (tighter, more competitive) based on `position / max_position`: + +- **Long position →** bid edge widens (slow to buy more), ask edge tightens (eager to sell) +- **Short position →** ask edge widens (slow to sell more), bid edge tightens (eager to buy) + +`prop_skew_entry` controls the widening on the side that would increase inventory. +`prop_skew_exit` controls the tightening on the side that would reduce inventory. + +At `position = max_position` the multipliers are at full effect: +- `prop_skew_entry = 0.5` → entry edge is `baseline * 1.5` (50% wider) +- `prop_skew_exit = 0.5` → exit edge is `baseline * 0.5` (50% tighter) + +**Aggressive unwind example:** +```toml +prop_skew_entry = 0.2 +prop_skew_exit = 0.8 +``` +Barely slows down accumulation but aggressively undercuts to unwind. Use this on assets you don't want to hold overnight. + +**Passive / symmetric example:** +```toml +prop_skew_entry = 0.5 +prop_skew_exit = 0.5 +``` +Symmetric adjustment — equally reluctant to accumulate as to unwind. + +--- + +## PropMaintain cancel threshold: `prop_maintain` + +```toml +prop_maintain = 0.2 +``` + +Controls when an existing order is cancelled and replaced. An order is kept if its current edge is at least `(1 - prop_maintain)` of the target edge. + +`prop_maintain = 0.2` → cancel if edge has drifted below 80% of target. +`prop_maintain = 0.05` → very patient — only cancel if edge has drifted below 95% of target. +`prop_maintain = 0.5` → aggressive — cancel and replace if edge drops below 50% of target. + +**Lower values** = fewer cancels/replacements = less gas, but quotes may sit stale longer. +**Higher values** = quotes track the oracle more tightly = more gas. + +On a fast-moving market, use a higher value (0.3–0.5). On a slow market, 0.05–0.1 is fine. + +--- + +## Oracle depth state: `kuru_depth_state` + +```toml +kuru_depth_state = "proposed" +``` + +Which Monad block state to read orderbook prices from. The four states reflect Monad's block lifecycle: + +| State | Latency | Finality | Use when | +|-------|---------|----------|----------| +| `proposed` | Lowest | Can reorg | Fast-moving markets, tight spreads | +| `voted` | Low | Unlikely to reorg | Good balance | +| `finalized` | Medium | Very safe | Conservative | +| `committed` | Highest | Guaranteed | Maximum safety | + +For most MM strategies, `proposed` or `voted` gives the freshest reference price and is fine — reorgs on Monad are rare. Use `committed` if you're seeing inconsistent fills that suggest stale pricing. + +--- + +## Reconciliation interval: `reconcile_interval` + +```toml +reconcile_interval = 10 +``` + +Seconds between position reconciliation checks (REST API call to detect orphaned orders). Set to `0` to disable. + +**Lower values** = more frequent orphan detection, more API calls. +Recommended: `10`–`30` seconds. No need to go below 5. + +--- + +## Multi-level vs per-quoter config + +**Flat config** (all levels share the same skew params): +```toml +quoters_bps = [5, 15, 30] +quantity = 500 +prop_skew_entry = 0.3 +prop_skew_exit = 0.6 +``` + +**Per-quoter config** (different params per level — more control): +```toml +[[strategy.quoters]] +type = "skew" +baseline_edge_bps = 5 +quantity = 200 +prop_skew_entry = 0.1 +prop_skew_exit = 0.8 + +[[strategy.quoters]] +type = "skew" +baseline_edge_bps = 25 +quantity = 1000 +prop_skew_entry = 0.5 +prop_skew_exit = 0.3 +``` + +The inner level is small and quick to unwind. The outer level is large and patient — it only fills on big moves, so inventory risk is lower. + +--- + +## Hot-reload vs restart + +| Parameter | Reload behaviour | +|-----------|-----------------| +| `prop_maintain` | Hot — applies immediately | +| `reconcile_interval` | Hot — applies immediately | +| `prop_skew_entry` / `prop_skew_exit` | Reinit — brief trading pause | +| `quoters_bps` / `quoters` | Reinit — brief trading pause | +| `quantity` / `quantity_bps_per_level` | Reinit — brief trading pause | +| `max_position` | Reinit — brief trading pause | +| `oracle_source` / `kuru_symbol` / `kuru_depth_state` | Restart required | +| `market_address` | Restart required | diff --git a/bot_config.example.toml b/bot_config.example.toml index 2a54b98..3d51396 100644 --- a/bot_config.example.toml +++ b/bot_config.example.toml @@ -1,69 +1,47 @@ -# Operational Configuration for MM Bot -# Hot-reload enabled: changes detected every 5 seconds +# MM Bot — example config. Copy to bot_config.toml and edit. +# For parameter explanations and tuning advice see TUNING.md. [strategy] -# --- HOT-RELOADABLE (changes apply immediately) --- -# Cancel threshold factor (0.2 = keep orders with edge >= 80% of target) -# Range: 0.0 to 1.0 -# Higher values = more aggressive replacement, lower values = more patient -prop_maintain = 0.2 - -# Seconds between position reconciliation (0 = disabled) -# Reconciliation validates placed orders via REST API -# Range: >= 0.0 -reconcile_interval = 300 - -# --- FULL REINIT REQUIRED (brief trading pause) --- - -# Maximum position size (base token units) -# Bot will skew quotes to avoid exceeding this limit -# Range: > 0.0 -max_position = 10000 - -# Skew multiplier when entering position (acquiring inventory) -# Range: >= 0.0 -# Higher values = more aggressive skewing when taking on position -prop_skew_entry = 0.5 - -# Skew multiplier when exiting position (reducing inventory) -# Range: >= 0.0 -# Higher values = more aggressive skewing when reducing position -prop_skew_exit = 0.5 - -# Base order quantity (base token units) -# Used when quantity_bps_per_level is null -# Range: > 0.0 -quantity = 400 - -# Optional: quantity as bps of max_position (overrides quantity if set) -# Example: 100 = 1% of max_position per level -# Comment out or omit to use fixed quantity instead -# quantity_bps_per_level = 2000 - -# Quoter levels in basis points from mid -# Each value creates a bid/ask pair at that spread -# Example: [1, 10, 15] creates quotes at 1bps, 10bps, and 15bps from mid -# Range: non-empty list of positive numbers -quoters_bps = [1, 10, 15] - -# --- RESTART REQUIRED (cannot hot-reload) --- - -# Market address (Kuru DEX market contract) -# Comment out or omit to use MARKET_ADDRESS from .env -# Changing requires restart to apply -market_address = "0x065c9d28e428a0db40191a54d33d5b7c71a9c394" - -# Price oracle source: "kuru" (orderbook mid) or "coinbase" (Coinbase API) -# Changing requires connection restart -oracle_source = "coinbase" - -# Coinbase symbol (required when oracle_source = "coinbase") -# Example: "MON-USD", "BTC-USD" -# Changing requires connection restart -coinbase_symbol = "MON-USD" - -# Manual position override (only used at initial startup) -# Comment out or omit to start from saved position state -# Changing requires restart to apply -# override_start_position = 1000.0 +prop_maintain = 0.2 # cancel threshold (see TUNING.md) +reconcile_interval = 30 # seconds between orphan checks + +max_position = 10000 # max inventory in base token units +prop_skew_entry = 0.5 # skew aggressiveness when building position +prop_skew_exit = 0.5 # skew aggressiveness when unwinding position +quantity = 400 # order size per side per level (base token units) + +quoter_type = "skew" +quoters_bps = [5, 15] # baseline edge levels in bps — one bid/ask pair each + +# Market address — find yours at: +# https://github.com/Kuru-Labs/kuru-exchange-server/blob/master/config/markets.toml +market_address = "0x..." + +# Oracle: "kuru" (recommended) or "coinbase" +oracle_source = "kuru" +kuru_symbol = "mon_ausd" # symbol from markets.toml above +kuru_depth_state = "proposed" # proposed | voted | finalized | committed (see TUNING.md) + +# oracle_source = "coinbase" +# coinbase_symbol = "MON-USD" + +# override_start_position = 0.0 # force a starting position on restart + + +# --- Per-quoter config (optional, overrides quoters_bps above) --- +# Use [[strategy.quoters]] for independent params per level or mixed quoter types. +# +# [[strategy.quoters]] +# type = "skew" +# baseline_edge_bps = 5.0 +# quantity = 500 +# prop_skew_entry = 0.3 +# prop_skew_exit = 0.7 +# +# [[strategy.quoters]] +# type = "skew" +# baseline_edge_bps = 20.0 +# quantity = 1000 +# prop_skew_entry = 0.5 +# prop_skew_exit = 0.3 diff --git a/mm_bot/bot/bot.py b/mm_bot/bot/bot.py index ef815e1..d5dae5c 100644 --- a/mm_bot/bot/bot.py +++ b/mm_bot/bot/bot.py @@ -23,9 +23,12 @@ KuruOrderError, KuruTimeoutError, ) -from mm_bot.config.config import BotConfig +from mm_bot.config.config import BotConfig, load_influx_config from mm_bot.config.config_watcher import ConfigWatcher -from mm_bot.quoter.quoter import Quoter +from mm_bot.monitoring.influx import InfluxWriter, _extract_quoter_id +from mm_bot.quoter.base import BaseQuoter +from mm_bot.quoter.context import ExistingOrder, QuoterContext +from mm_bot.quoter.registry import get_quoter_class from mm_bot.position.position_tracker import PositionTracker from mm_bot.pricing.oracle import OracleService, KuruPriceSource, CoinbasePriceSource from mm_bot.pnl.tracker import PnlTracker @@ -111,6 +114,17 @@ def __init__(self, sdk_configs: dict, bot_config: BotConfig): # Validation counter for periodic API checks self._validation_counter: int = 0 + # InfluxDB metrics writer (None until start()) + self.influx: Optional[InfluxWriter] = None + + # TPS tracking + self._tx_count: int = 0 + self._last_state_write_time: float = time.monotonic() + + # Cumulative edge PnL (quote units captured from spread) + self._cumulative_edge_pnl: float = 0.0 + self._cumulative_edge_pnl_by_quoter: Dict[str, float] = {} + # Initialize components (position tracker will be initialized in start()) self.position_tracker: Optional[PositionTracker] = None @@ -119,21 +133,19 @@ def __init__(self, sdk_configs: dict, bot_config: BotConfig): self.oracle_source = bot_config.oracle_source # "kuru" or "coinbase" # Set up the configured price source + self.kuru_price_source = None if self.oracle_source == "kuru": - # Kuru WebSocket price source (will be started in start() method) - self.kuru_price_source = KuruPriceSource() + self.kuru_price_source = KuruPriceSource(depth_state=bot_config.kuru_depth_state) self.oracle_service.add_price_source("kuru", self.kuru_price_source) else: - # Coinbase API price source (default) self.coinbase_price_source = CoinbasePriceSource(symbol=self.bot_config.coinbase_symbol) self.oracle_service.add_price_source("coinbase", self.coinbase_price_source) - self.kuru_price_source = None # Not used # PnL tracker (will be initialized after position tracker in start()) self.pnl_tracker: Optional[PnlTracker] = None # Quoters will be created after position tracker is initialized - self.quoters: List[Quoter] = [] + self.quoters: List[BaseQuoter] = [] # Config watcher for hot-reload self.config_watcher: Optional[ConfigWatcher] = None @@ -260,8 +272,20 @@ async def order_callback(self, order: Order) -> None: f"(ID: {order.kuru_order_id}, size: {float(order_size):.8f})" ) + if self.influx: + side_str = "buy" if order.side == OrderSide.BUY else "sell" + self.influx.write_order( + side=side_str, + price=float(order_price), + size=float(order_size), + quoter_id=_extract_quoter_id(order.cloid), + event="placed", + ) + # Track cancellations elif order.status == OrderStatus.ORDER_CANCELLED: + # Capture price/size before cleanup clears active_orders + cancelled_order = self.active_orders.get(order.cloid) self._cleanup_order_tracking( order.cloid, order.kuru_order_id, @@ -270,6 +294,16 @@ async def order_callback(self, order: Order) -> None: self._debug_log(f"[INVENTORY] Removed cancelled order: {order.cloid}") logger.debug(f"✗ Order {order.cloid} cancelled") + if self.influx and cancelled_order: + side_str = "buy" if cancelled_order.side == OrderSide.BUY else "sell" + self.influx.write_order( + side=side_str, + price=float(cancelled_order.price), + size=float(cancelled_order.size), + quoter_id=_extract_quoter_id(order.cloid), + event="cancelled", + ) + # Track fills and update position elif order.status == OrderStatus.ORDER_FULLY_FILLED: self._debug_log(f"[ORDER] FULLY_FILLED event - cloid: {order.cloid}") @@ -330,6 +364,37 @@ async def order_callback(self, order: Order) -> None: f"Price: {order.price}" ) + if self.influx and previous_size is not None and order.side is not None: + oracle_price_raw = self.oracle_service.get_price( + self.market_config.market_address, self.oracle_source + ) + if oracle_price_raw is not None: + oracle_f = float(oracle_price_raw) + fill_f = float(order_price) + size_f = float(filled_size) + is_buy = order.side == OrderSide.BUY + realized_edge_bps = ( + (oracle_f - fill_f) / oracle_f * 10000 if is_buy + else (fill_f - oracle_f) / oracle_f * 10000 + ) + edge_pnl = size_f * (oracle_f - fill_f if is_buy else fill_f - oracle_f) + self._cumulative_edge_pnl += edge_pnl + qid = _extract_quoter_id(order.cloid) + self._cumulative_edge_pnl_by_quoter[qid] = ( + self._cumulative_edge_pnl_by_quoter.get(qid, 0.0) + edge_pnl + ) + self.influx.write_fill( + side="buy" if is_buy else "sell", + price=fill_f, + oracle_price=oracle_f, + realized_edge_bps=realized_edge_bps, + edge_pnl=edge_pnl, + filled_size=size_f, + remaining_size=0.0, + fill_type="full", + quoter_id=_extract_quoter_id(order.cloid), + ) + elif order.status == OrderStatus.ORDER_PARTIALLY_FILLED: self._debug_log(f"[ORDER] PARTIALLY_FILLED event - cloid: {order.cloid}") self._debug_log(f"[ORDER] side: {order.side.value if order.side else 'None'}") @@ -389,6 +454,37 @@ async def order_callback(self, order: Order) -> None: # Keep in active_cloids since it's still on the book logger.info(f"⚡ Order {order.cloid} partially filled") + + if self.influx and previous_size is not None and order.side is not None: + oracle_price_raw = self.oracle_service.get_price( + self.market_config.market_address, self.oracle_source + ) + if oracle_price_raw is not None: + oracle_f = float(oracle_price_raw) + fill_f = float(order_price) + size_f = float(filled_size) + is_buy = order.side == OrderSide.BUY + realized_edge_bps = ( + (oracle_f - fill_f) / oracle_f * 10000 if is_buy + else (fill_f - oracle_f) / oracle_f * 10000 + ) + edge_pnl = size_f * (oracle_f - fill_f if is_buy else fill_f - oracle_f) + self._cumulative_edge_pnl += edge_pnl + qid = _extract_quoter_id(order.cloid) + self._cumulative_edge_pnl_by_quoter[qid] = ( + self._cumulative_edge_pnl_by_quoter.get(qid, 0.0) + edge_pnl + ) + self.influx.write_fill( + side="buy" if is_buy else "sell", + price=fill_f, + oracle_price=oracle_f, + realized_edge_bps=realized_edge_bps, + edge_pnl=edge_pnl, + filled_size=size_f, + remaining_size=float(order_size), + fill_type="partial", + quoter_id=_extract_quoter_id(order.cloid), + ) elif order.status == OrderStatus.ORDER_TIMEOUT: self._cleanup_order_tracking(order.cloid, order.kuru_order_id) logger.warning( @@ -423,9 +519,9 @@ async def start(self) -> None: # Start price feed based on configured oracle if self.oracle_source == "kuru": logger.info("Connecting to Kuru orderbook WebSocket...") - self.kuru_price_source.start(self.market_config.market_address) + self.kuru_price_source.start(self.bot_config.kuru_symbol) else: - logger.info(f"Using Coinbase API as oracle (symbol: MON-USD)") + logger.info(f"Using Coinbase API as oracle (symbol: {self.bot_config.coinbase_symbol})") # ONE-TIME cleanup: Cancel any leftover orders from previous runs await self._cancel_all_existing_orders() @@ -444,6 +540,26 @@ async def start(self) -> None: # Create quoters with calculated quantity self._initialize_quoters() + # Start InfluxDB metrics writer (no-op if INFLUX_URL not set) + influx_cfg = load_influx_config() + if influx_cfg: + self.influx = InfluxWriter( + url=influx_cfg["url"], + token=influx_cfg["token"], + database=influx_cfg["database"], + market=self.market_config.market_address, + oracle_source=self.oracle_source, + ) + await self.influx.start() + self._cumulative_edge_pnl = await self.influx.query_last_cumulative_edge_pnl( + self.market_config.market_address + ) + self._cumulative_edge_pnl_by_quoter = await self.influx.query_last_cumulative_edge_pnl_by_quoter( + self.market_config.market_address + ) + else: + logger.info("InfluxDB metrics disabled (INFLUX_URL not set)") + # Start config watcher (if bot_config.toml exists) config_path = Path("bot_config.toml") if config_path.exists(): @@ -517,36 +633,54 @@ async def _initialize_position_tracker(self) -> None: def _initialize_quoters(self) -> None: """ - Initialize quoters with calculated quantity based on config. - Uses quantity_bps_per_level if set, otherwise uses fixed quantity. + Initialize quoters from config. + + Supports two config formats: + 1. Flat: quoters_bps list + shared params (backward-compatible) + 2. Per-quoter: [[strategy.quoters]] with type + per-quoter params (mixed types) """ - for baseline_edge_bps in self.bot_config.quoters_bps: - # Calculate quantity - if self.bot_config.quantity_bps_per_level is not None: - # Use BPS of max position - quantity = (self.bot_config.max_position * self.bot_config.quantity_bps_per_level) / 10000 - logger.info( - f"Quoter {baseline_edge_bps}bps: Using quantity_bps={self.bot_config.quantity_bps_per_level} " - f"→ quantity={quantity:.2f}" - ) - else: - # Use fixed quantity - quantity = self.bot_config.quantity - logger.info(f"Quoter {baseline_edge_bps}bps: Using fixed quantity={quantity}") - - quoter = Quoter( - oracle_service=self.oracle_service, - position_tracker=self.position_tracker, - source_name="kuru", - market_id=self.market_config.market_address, - baseline_edge_bps=baseline_edge_bps, - max_position=self.bot_config.max_position, - prop_skew_entry=self.bot_config.prop_skew_entry, - prop_skew_exit=self.bot_config.prop_skew_exit, - quantity=quantity, - market_config=self.market_config - ) - self.quoters.append(quoter) + # Ensure registry is populated (importing __init__ triggers registration) + import mm_bot.quoter # noqa: F401 + + if self.bot_config.quoters_config: + # Per-quoter config: [[strategy.quoters]] + for i, q_conf in enumerate(self.bot_config.quoters_config): + quoter_type = q_conf["type"] + quoter_cls = get_quoter_class(quoter_type) + + # Allow quoter-level quantity override, fall back to strategy-level + if "quantity" not in q_conf and self.bot_config.quantity: + q_conf = {**q_conf, "quantity": self.bot_config.quantity} + + quoter = quoter_cls.from_config(q_conf) + self.quoters.append(quoter) + logger.info(f"Quoter [{i}] type={quoter_type} id={quoter.quoter_id} quantity={float(quoter.quantity)}") + else: + # Flat config: quoters_bps + shared skew params (backward-compatible) + quoter_type = self.bot_config.quoter_type + quoter_cls = get_quoter_class(quoter_type) + + for baseline_edge_bps in self.bot_config.quoters_bps: + # Calculate quantity + if self.bot_config.quantity_bps_per_level is not None: + quantity = (self.bot_config.max_position * self.bot_config.quantity_bps_per_level) / 10000 + logger.info( + f"Quoter {baseline_edge_bps}bps: Using quantity_bps={self.bot_config.quantity_bps_per_level} " + f"→ quantity={quantity:.2f}" + ) + else: + quantity = self.bot_config.quantity + logger.info(f"Quoter {baseline_edge_bps}bps: Using fixed quantity={quantity}") + + # Build config section for from_config + q_conf = { + "baseline_edge_bps": baseline_edge_bps, + "quantity": quantity, + "prop_skew_entry": self.bot_config.prop_skew_entry, + "prop_skew_exit": self.bot_config.prop_skew_exit, + } + quoter = quoter_cls.from_config(q_conf) + self.quoters.append(quoter) logger.success(f"✓ Initialized {len(self.quoters)} quoters") @@ -731,6 +865,19 @@ async def _reconcile_position(self, block_number: Optional[int] = None) -> None: # Periodic API validation (every 10th reconciliation) await self._validate_against_api() + # Write reconcile metrics to InfluxDB + if self.influx: + self.influx.write_reconcile( + tracked_position=float(tracked_position), + drift=float(drift), + free_base=float(free_base), + locked_base=float(locked_base), + free_quote=float(free_quote), + locked_quote=float(locked_quote), + num_active_orders=len(self.active_orders), + block_number=int(block_number), + ) + # Show clean summary with drift delta drift_status = f"Δ{float(drift_delta):+.2f}" if previous_drift is not None else "baseline" logger.info( @@ -1014,12 +1161,12 @@ async def run_main_loop(self) -> None: # Get current on-chain active orders on_chain_orders = self.client.user.get_active_orders() - # Use PropMaintain logic to generate orders (only cancels orders below cancel threshold) - all_orders, num_cancels, num_new_orders = self._generate_orders_with_prop_maintain( + # Delegate to quoters to decide what to cancel and place + all_orders, num_cancels, num_new_orders = self._generate_orders( reference_price, on_chain_orders ) - logger.info(f"PropMaintain: {num_cancels} cancels, {num_new_orders} new orders") + logger.info(f"Quoters: {num_cancels} cancels, {num_new_orders} new orders") # Validate balance before placing orders if all_orders: @@ -1054,6 +1201,7 @@ async def run_main_loop(self) -> None: price_rounding="default", ) logger.info(f"Transaction hash: {txhash}") + self._tx_count += 1 except ( KuruInsufficientFundsError, KuruContractError, @@ -1074,7 +1222,55 @@ async def run_main_loop(self) -> None: # Print PnL self.pnl_tracker.print_pnl() else: - logger.debug("No order updates needed (PropMaintain kept existing orders)") + logger.debug("No order updates needed (quoters kept existing orders)") + + # Write iteration metrics to InfluxDB + if self.influx: + current_position = self.position_tracker.get_current_position() + stop_bids = current_position > self.bot_config.max_position + stop_asks = current_position < -self.bot_config.max_position + + # Compute TPS + now = time.monotonic() + elapsed = now - self._last_state_write_time + tps = self._tx_count / elapsed if elapsed > 0 else 0.0 + self._tx_count = 0 + self._last_state_write_time = now + + # Build order price snapshot + order_prices: dict = {} + bids = sorted( + [o for o in self.active_orders.values() if o.side == OrderSide.BUY], + key=lambda o: o.price, reverse=True, + ) + asks = sorted( + [o for o in self.active_orders.values() if o.side == OrderSide.SELL], + key=lambda o: o.price, + ) + for i, o in enumerate(bids): + order_prices[f"bid_{i}"] = float(o.price) + for i, o in enumerate(asks): + order_prices[f"ask_{i}"] = float(o.price) + best_bid = float(bids[0].price) if bids else None + best_ask = float(asks[0].price) if asks else None + + pnl_val = self.pnl_tracker.get_pnl() + self.influx.write_state( + reference_price=float(reference_price), + position=float(current_position), + pnl=float(pnl_val) if pnl_val is not None else None, + num_active_orders=len(self.active_cloids), + num_cancels=num_cancels, + num_new_orders=num_new_orders, + stop_bids=stop_bids, + stop_asks=stop_asks, + tps=tps, + best_bid=best_bid, + best_ask=best_ask, + order_prices=order_prices, + cumulative_edge_pnl=self._cumulative_edge_pnl, + cumulative_edge_pnl_by_quoter=self._cumulative_edge_pnl_by_quoter, + ) # Sleep 1 second try: @@ -1099,13 +1295,78 @@ async def run_main_loop(self) -> None: logger.error(f"Error in main loop: {e}") await asyncio.sleep(1.0) - def _generate_orders_with_prop_maintain( + def _resolve_order_price( + self, + cloid: str, + on_chain_by_cloid: dict, + ) -> tuple[Optional[Decimal], str]: + """ + Resolve the price and source of an existing order from tracking dicts. + + Priority: on_chain > callback > preregistered > unknown + + Returns: + (price_or_none, source_string) + """ + if cloid in on_chain_by_cloid: + order = on_chain_by_cloid[cloid] + price = _to_decimal(order.get('price', 0)) / Decimal(self.market_config.price_precision) + return price, "on_chain" + elif cloid in self.preregistered_orders: + return None, "preregistered" + elif cloid in self.active_orders: + return self.active_orders[cloid].price, "callback" + else: + return None, "unknown" + + def _resolve_existing_orders( + self, + quoter: BaseQuoter, + on_chain_by_cloid: dict, + ) -> tuple[Optional[ExistingOrder], Optional[ExistingOrder]]: + """ + Find and resolve this quoter's existing bid and ask orders + from the bot's tracking dicts. + + Returns: + (existing_bid, existing_ask) — either may be None if no order found + """ + existing_bid = None + existing_ask = None + + # Primary: search active_cloids (callback-driven, no RPC lag) + for cloid in list(self.active_cloids): + if cloid.startswith(quoter.cloid_prefix_bid): + price, source = self._resolve_order_price(cloid, on_chain_by_cloid) + existing_bid = ExistingOrder(cloid=cloid, side=OrderSide.BUY, price=price, source=source) + elif cloid.startswith(quoter.cloid_prefix_ask): + price, source = self._resolve_order_price(cloid, on_chain_by_cloid) + existing_ask = ExistingOrder(cloid=cloid, side=OrderSide.SELL, price=price, source=source) + + # Fallback: order just sent but ORDER_PLACED callback not yet received + if existing_bid is None: + for cloid in self.preregistered_orders: + if cloid.startswith(quoter.cloid_prefix_bid): + existing_bid = ExistingOrder(cloid=cloid, side=OrderSide.BUY, price=None, source="preregistered") + break + if existing_ask is None: + for cloid in self.preregistered_orders: + if cloid.startswith(quoter.cloid_prefix_ask): + existing_ask = ExistingOrder(cloid=cloid, side=OrderSide.SELL, price=None, source="preregistered") + break + + return existing_bid, existing_ask + + def _generate_orders( self, reference_price: Decimal, on_chain_orders: list, ) -> tuple[list[Order], int, int]: """ - Generate orders using PropMaintain logic: only cancel orders below cancel threshold. + Generate orders by delegating to each quoter's decide() method. + + The bot resolves order state from its tracking dicts into a QuoterContext, + then lets each quoter decide what to cancel and place. Args: reference_price: Current fair/reference price @@ -1130,241 +1391,43 @@ def _generate_orders_with_prop_maintain( # Build map of on-chain orders by cloid (for our orders only) on_chain_by_cloid = {} - on_chain_order_ids = set() - for order in on_chain_orders: order_id = int(order.get('orderid', 0)) if order_id in self.order_id_to_cloid: cloid = self.order_id_to_cloid[order_id] on_chain_by_cloid[cloid] = order - on_chain_order_ids.add(order_id) - # For each quoter, check if existing orders meet cancel threshold for quoter in self.quoters: - # Get cancel thresholds (edges below which we cancel existing orders) - bid_cancel_edge, ask_cancel_edge = quoter.get_cancel_edges(self.bot_config.prop_maintain) - - # Find existing orders for this quoter (by baseline_edge_bps in cloid) - quoter_prefix_bid = f"bid-{quoter.baseline_edge_bps}-" - quoter_prefix_ask = f"ask-{quoter.baseline_edge_bps}-" - - existing_bid_cloid = None - existing_ask_cloid = None - need_bid = True - need_ask = True - - # Primary: search active_cloids (callback-driven, no RPC lag). - # This is the reliable source - it's updated immediately when callbacks fire, - # unlike on_chain_by_cloid which requires RPC to reflect the mined block. - for cloid in list(self.active_cloids): - if cloid.startswith(quoter_prefix_bid): - existing_bid_cloid = cloid - elif cloid.startswith(quoter_prefix_ask): - existing_ask_cloid = cloid - - # Fallback: order just sent but ORDER_PLACED callback not yet received - if existing_bid_cloid is None: - for cloid in self.preregistered_orders: - if cloid.startswith(quoter_prefix_bid): - existing_bid_cloid = cloid - break - if existing_ask_cloid is None: - for cloid in self.preregistered_orders: - if cloid.startswith(quoter_prefix_ask): - existing_ask_cloid = cloid - break - - # Check bid: calculate edge and compare to cancel threshold - if existing_bid_cloid and existing_bid_cloid in on_chain_by_cloid: - order = on_chain_by_cloid[existing_bid_cloid] - order_price = _to_decimal(order.get('price', 0)) / Decimal(self.market_config.price_precision) - order_edge = quoter.calculate_order_edge(order_price, OrderSide.BUY, reference_price) - - # Cancel bid if position limit exceeded - if stop_bids: - all_orders.append(Order(cloid=existing_bid_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_bid_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling bid @ {float(order_price):.6f} " - f"(position limit exceeded)" - ) - elif order_edge >= bid_cancel_edge: - # Edge is good, keep the order - need_bid = False - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Keeping bid @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} >= cancel_threshold={float(bid_cancel_edge):.1f})" - ) - else: - # Edge too low, cancel it - all_orders.append(Order(cloid=existing_bid_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - # Proactively remove from active_cloids so the next iteration doesn't - # find this stale cloid and mistakenly think the slot is still filled. - self.active_cloids.discard(existing_bid_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling bid @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} < cancel_threshold={float(bid_cancel_edge):.1f})" - ) - elif existing_bid_cloid and existing_bid_cloid in self.preregistered_orders: - # Order just sent, awaiting on-chain confirmation - don't place another - need_bid = False - logger.debug(f"Quoter {quoter.baseline_edge_bps}bps: Bid pending confirmation, holding") - elif existing_bid_cloid and existing_bid_cloid in self.active_orders: - # Confirmed on-chain (ORDER_PLACED received) but REST API hasn't indexed it yet. - # Use callback-tracked price for full edge evaluation — eliminates the blind window. - order_price = self.active_orders[existing_bid_cloid].price - order_edge = quoter.calculate_order_edge(order_price, OrderSide.BUY, reference_price) - - # Cancel bid if position limit exceeded - if stop_bids: - all_orders.append(Order(cloid=existing_bid_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_bid_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling bid @ {float(order_price):.6f} " - f"(position limit exceeded) [callback]" - ) - elif order_edge >= bid_cancel_edge: - need_bid = False - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Keeping bid @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} >= cancel_threshold={float(bid_cancel_edge):.1f}) [callback]" - ) - else: - all_orders.append(Order(cloid=existing_bid_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_bid_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling bid @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} < cancel_threshold={float(bid_cancel_edge):.1f}) [callback]" - ) - elif existing_bid_cloid: - # In active_cloids but not in active_orders — unexpected state, hold. - need_bid = False - logger.debug(f"Quoter {quoter.baseline_edge_bps}bps: Bid in unknown state, holding") - - # Check ask: calculate edge and compare to cancel threshold - if existing_ask_cloid and existing_ask_cloid in on_chain_by_cloid: - order = on_chain_by_cloid[existing_ask_cloid] - order_price = _to_decimal(order.get('price', 0)) / Decimal(self.market_config.price_precision) - order_edge = quoter.calculate_order_edge(order_price, OrderSide.SELL, reference_price) - - # Cancel ask if position limit exceeded - if stop_asks: - all_orders.append(Order(cloid=existing_ask_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_ask_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling ask @ {float(order_price):.6f} " - f"(position limit exceeded)" - ) - elif order_edge >= ask_cancel_edge: - # Edge is good, keep the order - need_ask = False - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Keeping ask @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} >= cancel_threshold={float(ask_cancel_edge):.1f})" - ) - else: - # Edge too low, cancel it - all_orders.append(Order(cloid=existing_ask_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - # Proactively remove from active_cloids (same reason as bid above) - self.active_cloids.discard(existing_ask_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling ask @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} < cancel_threshold={float(ask_cancel_edge):.1f})" - ) - elif existing_ask_cloid and existing_ask_cloid in self.preregistered_orders: - # Order just sent, awaiting on-chain confirmation - don't place another - need_ask = False - logger.debug(f"Quoter {quoter.baseline_edge_bps}bps: Ask pending confirmation, holding") - elif existing_ask_cloid and existing_ask_cloid in self.active_orders: - # Confirmed on-chain (ORDER_PLACED received) but REST API hasn't indexed it yet. - # Use callback-tracked price for full edge evaluation — eliminates the blind window. - order_price = self.active_orders[existing_ask_cloid].price - order_edge = quoter.calculate_order_edge(order_price, OrderSide.SELL, reference_price) - - # Cancel ask if position limit exceeded - if stop_asks: - all_orders.append(Order(cloid=existing_ask_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_ask_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling ask @ {float(order_price):.6f} " - f"(position limit exceeded) [callback]" - ) - elif order_edge >= ask_cancel_edge: - need_ask = False - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Keeping ask @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} >= cancel_threshold={float(ask_cancel_edge):.1f}) [callback]" - ) - else: - all_orders.append(Order(cloid=existing_ask_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_ask_cloid) - logger.debug( - f"Quoter {float(quoter.baseline_edge_bps):.2f}bps: " - f"Cancelling ask @ {float(order_price):.6f} " - f"(edge={float(order_edge):.1f} < cancel_threshold={float(ask_cancel_edge):.1f}) [callback]" - ) - elif existing_ask_cloid: - # In active_cloids but not in active_orders — unexpected state, hold. - need_ask = False - logger.debug(f"Quoter {quoter.baseline_edge_bps}bps: Ask in unknown state, holding") - - # Coupling block: if one side needs replacement, force-replace the other too. - # This ensures both sides are re-priced with the current prop_of_max after a fill. - if need_bid and not need_ask: - # Bid is being replaced. Check if ask is still preregistered (mid-block race). - if existing_ask_cloid and existing_ask_cloid in self.preregistered_orders: - logger.debug(f"Coupling: ask {existing_ask_cloid} still preregistered, skipping quoter this iteration") - continue - need_ask = True - if existing_ask_cloid: - all_orders.append(Order(cloid=existing_ask_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_ask_cloid) - logger.debug(f"Coupling: cancelling ask {existing_ask_cloid} because bid was replaced") - elif need_ask and not need_bid: - # Ask is being replaced. Check if bid is still preregistered (mid-block race). - if existing_bid_cloid and existing_bid_cloid in self.preregistered_orders: - logger.debug(f"Coupling: bid {existing_bid_cloid} still preregistered, skipping quoter this iteration") - continue - need_bid = True - if existing_bid_cloid: - all_orders.append(Order(cloid=existing_bid_cloid, order_type=OrderType.CANCEL)) - total_cancels += 1 - self.active_cloids.discard(existing_bid_cloid) - logger.debug(f"Coupling: cancelling bid {existing_bid_cloid} because ask was replaced") - - # Generate new orders for sides that need updating - # Apply position limits: stop quoting bids if position > max, stop asks if position < -max - final_need_bid = need_bid and not stop_bids - final_need_ask = need_ask and not stop_asks - - new_quoter_orders = quoter.generate_orders(reference_price, need_bid=final_need_bid, need_ask=final_need_ask) - if new_quoter_orders: - all_orders.extend(new_quoter_orders) - total_new_orders += len(new_quoter_orders) - logger.debug( - f"Quoter {quoter.baseline_edge_bps}bps: Generating {len(new_quoter_orders)} new orders " - f"(bid={'yes' if final_need_bid else 'no'}, ask={'yes' if final_need_ask else 'no'})" - ) + # Resolve existing orders for this quoter from tracking dicts + existing_bid, existing_ask = self._resolve_existing_orders(quoter, on_chain_by_cloid) + + # Build context snapshot for the quoter + ctx = QuoterContext( + reference_price=reference_price, + current_position=current_position, + max_position=_to_decimal(self.bot_config.max_position), + existing_bid=existing_bid, + existing_ask=existing_ask, + stop_bids=stop_bids, + stop_asks=stop_asks, + prop_maintain=self.bot_config.prop_maintain, + price_precision=Decimal(self.market_config.price_precision), + ) + + # Let the quoter decide what to cancel and place + decision = quoter.decide(ctx) + + # Process cancels + for cloid in decision.cancels: + all_orders.append(Order(cloid=cloid, order_type=OrderType.CANCEL)) + # Proactively remove from active_cloids so the next iteration doesn't + # find this stale cloid and mistakenly think the slot is still filled. + self.active_cloids.discard(cloid) + total_cancels += 1 + + # Collect new orders + all_orders.extend(decision.new_orders) + total_new_orders += len(decision.new_orders) return all_orders, total_cancels, total_new_orders @@ -1574,7 +1637,9 @@ def _on_config_reload(self, new_config: BotConfig): new_config.prop_skew_exit != self.bot_config.prop_skew_exit or new_config.quantity != self.bot_config.quantity or new_config.quantity_bps_per_level != self.bot_config.quantity_bps_per_level or - new_config.quoters_bps != self.bot_config.quoters_bps + new_config.quoters_bps != self.bot_config.quoters_bps or + new_config.quoter_type != self.bot_config.quoter_type or + new_config.quoters_config != self.bot_config.quoters_config ) restart_required = ( @@ -1685,6 +1750,10 @@ async def stop(self) -> None: """ logger.info("\n🛑 Stopping bot...") + # Flush and stop InfluxDB writer before anything else + if self.influx: + await self.influx.stop() + # Cancel all active orders with exponential backoff await self._cancel_all_orders_with_retry() diff --git a/mm_bot/config/config.py b/mm_bot/config/config.py index 5959178..28a52d4 100644 --- a/mm_bot/config/config.py +++ b/mm_bot/config/config.py @@ -27,9 +27,28 @@ class BotConfig: quantity_bps_per_level: Optional[float] = None # If set, overrides quantity override_start_position: Optional[float] = None # Manual position override reconcile_interval: float = 300 # Seconds between reconciliation (0=disabled) - oracle_source: str = "coinbase" # "kuru" for Kuru orderbook mid, "coinbase" for Coinbase API + oracle_source: str = "coinbase" # "kuru" | "coinbase" coinbase_symbol: Optional[str] = None # Required when oracle_source="coinbase" + kuru_symbol: Optional[str] = None # Required when oracle_source="kuru" (e.g. "mon_ausd") + kuru_depth_state: str = "committed" # "proposed"|"voted"|"finalized"|"committed" market_address: Optional[str] = None # Market address (restart required to change) + quoter_type: str = "skew" # Default quoter type for flat config (used with quoters_bps) + quoters_config: Optional[List[dict]] = None # Per-quoter config for mixed types ([[strategy.quoters]]) + + +def load_influx_config() -> Optional[dict]: + """ + Load InfluxDB connection config from environment variables. + Returns None if INFLUX_URL is not set (metrics disabled). + """ + url = os.getenv("INFLUX_URL") + if not url: + return None + return { + "url": url, + "token": os.getenv("INFLUX_TOKEN", ""), + "database": os.getenv("INFLUX_DATABASE", "mm_bot"), + } def load_secrets_from_env(market_address: Optional[str] = None) -> SDKConfigs: @@ -77,22 +96,41 @@ def load_operational_config(toml_path: Path) -> BotConfig: strategy = config_dict["strategy"] + # Check if using per-quoter config ([[strategy.quoters]]) or flat config + has_quoters_config = "quoters" in strategy and isinstance(strategy["quoters"], list) + # Validate required parameters - required_params = [ - "prop_maintain", - "reconcile_interval", - "max_position", - "prop_skew_entry", - "prop_skew_exit", - "quantity", - "quoters_bps", - "oracle_source", - ] + # When using [[strategy.quoters]], quoters_bps and per-quoter params in [strategy] are optional + if has_quoters_config: + required_params = [ + "prop_maintain", + "reconcile_interval", + "oracle_source", + ] + else: + required_params = [ + "prop_maintain", + "reconcile_interval", + "max_position", + "prop_skew_entry", + "prop_skew_exit", + "quantity", + "quoters_bps", + "oracle_source", + ] for param in required_params: if param not in strategy: raise ValueError(f"Missing required parameter: {param}") + # Validate per-quoter configs + if has_quoters_config: + for i, q in enumerate(strategy["quoters"]): + if "type" not in q: + raise ValueError(f"quoters[{i}] missing required field 'type'") + if "quantity" not in q and "quantity" not in strategy: + raise ValueError(f"quoters[{i}] missing 'quantity' (not in quoter config or [strategy])") + # Validate oracle_source and coinbase_symbol if strategy["oracle_source"] not in ["kuru", "coinbase"]: raise ValueError(f"Invalid oracle_source: {strategy['oracle_source']} (must be 'kuru' or 'coinbase')") @@ -101,28 +139,40 @@ def load_operational_config(toml_path: Path) -> BotConfig: if "coinbase_symbol" not in strategy or not strategy["coinbase_symbol"]: raise ValueError("coinbase_symbol required when oracle_source='coinbase'") + if strategy["oracle_source"] == "kuru": + if "kuru_symbol" not in strategy or not strategy["kuru_symbol"]: + raise ValueError("kuru_symbol required when oracle_source='kuru' (e.g. 'mon_ausd')") + depth_state = strategy.get("kuru_depth_state", "committed") + valid_states = ("proposed", "voted", "finalized", "committed") + if depth_state not in valid_states: + raise ValueError(f"kuru_depth_state must be one of {valid_states}, got '{depth_state}'") + # Build BotConfig return BotConfig( prop_maintain=float(strategy["prop_maintain"]), reconcile_interval=float(strategy["reconcile_interval"]), - max_position=float(strategy["max_position"]), - prop_skew_entry=float(strategy["prop_skew_entry"]), - prop_skew_exit=float(strategy["prop_skew_exit"]), - quantity=float(strategy["quantity"]), + max_position=float(strategy.get("max_position", 0)), + prop_skew_entry=float(strategy.get("prop_skew_entry", 0.5)), + prop_skew_exit=float(strategy.get("prop_skew_exit", 0.5)), + quantity=float(strategy.get("quantity", 0)), quantity_bps_per_level=( float(strategy["quantity_bps_per_level"]) if strategy.get("quantity_bps_per_level") is not None else None ), - quoters_bps=[float(x) for x in strategy["quoters_bps"]], + quoters_bps=[float(x) for x in strategy["quoters_bps"]] if "quoters_bps" in strategy else [], oracle_source=strategy["oracle_source"], coinbase_symbol=strategy.get("coinbase_symbol"), + kuru_symbol=strategy.get("kuru_symbol"), + kuru_depth_state=strategy.get("kuru_depth_state", "committed"), market_address=strategy.get("market_address"), override_start_position=( float(strategy["override_start_position"]) if strategy.get("override_start_position") is not None else None ), + quoter_type=strategy.get("quoter_type", "skew"), + quoters_config=strategy.get("quoters") if has_quoters_config else None, ) diff --git a/mm_bot/config/config_watcher.py b/mm_bot/config/config_watcher.py index 569f889..90cf1d9 100644 --- a/mm_bot/config/config_watcher.py +++ b/mm_bot/config/config_watcher.py @@ -164,6 +164,7 @@ def _load_and_validate(self) -> Optional[BotConfig]: return None strategy = config_dict["strategy"] + has_quoters_config = "quoters" in strategy and isinstance(strategy["quoters"], list) # Validate all parameters errors = [] @@ -181,33 +182,45 @@ def _load_and_validate(self) -> Optional[BotConfig]: f"Invalid reconcile_interval: {strategy['reconcile_interval']} (must be >= 0.0)" ) - # Reinit-required params - if "max_position" not in strategy: - errors.append("Missing max_position") - elif not validate_max_position(strategy["max_position"]): - errors.append(f"Invalid max_position: {strategy['max_position']} (must be > 0.0)") - - if "prop_skew_entry" not in strategy: - errors.append("Missing prop_skew_entry") - elif not validate_prop_skew(strategy["prop_skew_entry"]): - errors.append(f"Invalid prop_skew_entry: {strategy['prop_skew_entry']} (must be >= 0.0)") - - if "prop_skew_exit" not in strategy: - errors.append("Missing prop_skew_exit") - elif not validate_prop_skew(strategy["prop_skew_exit"]): - errors.append(f"Invalid prop_skew_exit: {strategy['prop_skew_exit']} (must be >= 0.0)") - - if "quantity" not in strategy: - errors.append("Missing quantity") - elif not validate_quantity(strategy["quantity"]): - errors.append(f"Invalid quantity: {strategy['quantity']} (must be > 0.0)") - - if "quoters_bps" not in strategy: - errors.append("Missing quoters_bps") - elif not validate_quoters_bps(strategy["quoters_bps"]): - errors.append( - f"Invalid quoters_bps: {strategy['quoters_bps']} (must be non-empty list of positive numbers)" - ) + # Reinit-required params (only required for flat config, not per-quoter config) + if not has_quoters_config: + if "max_position" not in strategy: + errors.append("Missing max_position") + elif not validate_max_position(strategy["max_position"]): + errors.append(f"Invalid max_position: {strategy['max_position']} (must be > 0.0)") + + if "prop_skew_entry" not in strategy: + errors.append("Missing prop_skew_entry") + elif not validate_prop_skew(strategy["prop_skew_entry"]): + errors.append(f"Invalid prop_skew_entry: {strategy['prop_skew_entry']} (must be >= 0.0)") + + if "prop_skew_exit" not in strategy: + errors.append("Missing prop_skew_exit") + elif not validate_prop_skew(strategy["prop_skew_exit"]): + errors.append(f"Invalid prop_skew_exit: {strategy['prop_skew_exit']} (must be >= 0.0)") + + if "quantity" not in strategy: + errors.append("Missing quantity") + elif not validate_quantity(strategy["quantity"]): + errors.append(f"Invalid quantity: {strategy['quantity']} (must be > 0.0)") + + if "quoters_bps" not in strategy: + errors.append("Missing quoters_bps") + elif not validate_quoters_bps(strategy["quoters_bps"]): + errors.append( + f"Invalid quoters_bps: {strategy['quoters_bps']} (must be non-empty list of positive numbers)" + ) + else: + # Validate per-quoter configs + if len(strategy["quoters"]) == 0: + errors.append("[[strategy.quoters]] must have at least one entry") + for i, q in enumerate(strategy["quoters"]): + if "type" not in q: + errors.append(f"quoters[{i}] missing required field 'type'") + + # Validate optional top-level params if present + if "max_position" in strategy and not validate_max_position(strategy["max_position"]): + errors.append(f"Invalid max_position: {strategy['max_position']} (must be > 0.0)") # Optional params if "oracle_source" not in strategy: @@ -222,6 +235,11 @@ def _load_and_validate(self) -> Optional[BotConfig]: if "coinbase_symbol" not in strategy or not strategy["coinbase_symbol"]: errors.append("coinbase_symbol required when oracle_source='coinbase'") + # Kuru symbol required when using kuru oracle + if strategy.get("oracle_source") == "kuru": + if "kuru_symbol" not in strategy or not strategy["kuru_symbol"]: + errors.append("kuru_symbol required when oracle_source='kuru' (e.g. 'mon_ausd')") + # Report all validation errors if errors: for error in errors: @@ -232,16 +250,16 @@ def _load_and_validate(self) -> Optional[BotConfig]: return BotConfig( prop_maintain=float(strategy["prop_maintain"]), reconcile_interval=float(strategy["reconcile_interval"]), - max_position=float(strategy["max_position"]), - prop_skew_entry=float(strategy["prop_skew_entry"]), - prop_skew_exit=float(strategy["prop_skew_exit"]), - quantity=float(strategy["quantity"]), + max_position=float(strategy.get("max_position", 0)), + prop_skew_entry=float(strategy.get("prop_skew_entry", 0.5)), + prop_skew_exit=float(strategy.get("prop_skew_exit", 0.5)), + quantity=float(strategy.get("quantity", 0)), quantity_bps_per_level=( float(strategy["quantity_bps_per_level"]) if strategy.get("quantity_bps_per_level") is not None else None ), - quoters_bps=[float(x) for x in strategy["quoters_bps"]], + quoters_bps=[float(x) for x in strategy["quoters_bps"]] if "quoters_bps" in strategy else [], oracle_source=strategy["oracle_source"], coinbase_symbol=strategy.get("coinbase_symbol"), market_address=strategy.get("market_address"), @@ -250,6 +268,8 @@ def _load_and_validate(self) -> Optional[BotConfig]: if strategy.get("override_start_position") is not None else None ), + quoter_type=strategy.get("quoter_type", "skew"), + quoters_config=strategy.get("quoters") if has_quoters_config else None, ) except Exception as e: diff --git a/mm_bot/monitoring/__init__.py b/mm_bot/monitoring/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mm_bot/monitoring/influx.py b/mm_bot/monitoring/influx.py new file mode 100644 index 0000000..e1d3e70 --- /dev/null +++ b/mm_bot/monitoring/influx.py @@ -0,0 +1,312 @@ +""" +InfluxDB 3 metrics writer for the market making bot. + +All write methods are no-ops when INFLUX_URL is not configured. +Points are buffered in-memory and flushed every 250 ms (or eagerly at 100 points) +by a background asyncio task. The InfluxDB3 client write call is synchronous and +is offloaded to a thread executor to avoid blocking the event loop. +""" +import asyncio +import time +from typing import Optional + +from loguru import logger + +try: + from influxdb_client_3 import InfluxDBClient3, Point + _INFLUX_AVAILABLE = True +except ImportError: + _INFLUX_AVAILABLE = False + + +def _extract_quoter_id(cloid: str) -> str: + """Parse quoter_id from cloid format '{side}-{quoter_id}-{timestamp_ms}'.""" + parts = cloid.split("-", 2) + return parts[1] if len(parts) >= 2 else "unknown" + + +class InfluxWriter: + """ + Buffers InfluxDB data points and flushes them asynchronously. + + Usage: + writer = InfluxWriter(url, token, database, market, oracle_source) + await writer.start() + writer.write_state(...) # non-blocking, queued + await writer.stop() # flushes remaining points + """ + + def __init__( + self, + url: str, + token: str, + database: str, + market: str, + oracle_source: str, + ): + self._url = url + self._token = token + self._database = database + self._market = market + self._oracle_source = oracle_source + self._enabled = _INFLUX_AVAILABLE + self._client: Optional[object] = None + self._buffer: list = [] + self._flush_task: Optional[asyncio.Task] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + if not _INFLUX_AVAILABLE: + logger.warning("influxdb3-python not installed — metrics disabled") + + async def start(self) -> None: + if not self._enabled: + return + self._loop = asyncio.get_running_loop() + self._client = InfluxDBClient3( + host=self._url, + token=self._token, + database=self._database, + ) + self._flush_task = asyncio.create_task(self._flush_loop()) + logger.info(f"InfluxDB metrics enabled → {self._url} db={self._database}") + + async def query_last_cumulative_edge_pnl(self, market: str) -> float: + """ + Query InfluxDB for the last written cumulative_edge_pnl for this market. + Used on startup to resume from where the previous run left off. + Returns 0.0 if no data found or InfluxDB is unavailable. + """ + if not self._enabled: + return 0.0 + sql = ( + f"SELECT cumulative_edge_pnl FROM mm_state " + f"WHERE market = '{market}' " + f"ORDER BY time DESC LIMIT 1" + ) + try: + loop = self._loop or asyncio.get_running_loop() + result = await loop.run_in_executor( + None, + lambda: self._client.query(sql, language="sql"), + ) + # result is a pyarrow Table + if result is not None and result.num_rows > 0: + value = result.column("cumulative_edge_pnl")[0].as_py() + if value is not None: + logger.info(f"Resumed cumulative_edge_pnl from InfluxDB: {value:.6f}") + return float(value) + except Exception as e: + logger.warning(f"Could not restore cumulative_edge_pnl from InfluxDB: {e}") + return 0.0 + + async def stop(self) -> None: + if not self._enabled: + return + if self._flush_task: + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + await self._flush_now() + + # ------------------------------------------------------------------ + # Internal flush machinery + # ------------------------------------------------------------------ + + async def query_last_cumulative_edge_pnl_by_quoter(self, market: str) -> dict: + """ + Query InfluxDB for the last written per-quoter cumulative edge PnL fields. + Returns a dict of {quoter_id: float}, e.g. {"1.0": 0.42, "10.0": 1.83}. + Returns {} if no data found or InfluxDB is unavailable. + """ + if not self._enabled: + return {} + sql = ( + f"SELECT * FROM mm_state " + f"WHERE market = '{market}' " + f"ORDER BY time DESC LIMIT 1" + ) + try: + loop = self._loop or asyncio.get_running_loop() + result = await loop.run_in_executor( + None, + lambda: self._client.query(sql, language="sql"), + ) + if result is None or result.num_rows == 0: + return {} + by_quoter: dict = {} + for col in result.schema.names: + if col.startswith("edge_pnl_q"): + raw_id = col[len("edge_pnl_q"):].replace("_", ".", 1) + val = result.column(col)[0].as_py() + if val is not None: + by_quoter[raw_id] = float(val) + if by_quoter: + logger.info(f"Resumed per-quoter edge PnL from InfluxDB: {by_quoter}") + return by_quoter + except Exception as e: + logger.warning(f"Could not restore per-quoter edge PnL from InfluxDB: {e}") + return {} + + async def _flush_loop(self) -> None: + while True: + await asyncio.sleep(0.25) + await self._flush_now() + + async def _flush_now(self) -> None: + if not self._buffer: + return + points, self._buffer = self._buffer, [] + try: + loop = self._loop or asyncio.get_running_loop() + await loop.run_in_executor(None, self._write_sync, points) + except Exception as e: + logger.warning(f"InfluxDB write error: {e}") + + def _write_sync(self, points: list) -> None: + self._client.write(record=points) + + def _enqueue(self, point: "Point") -> None: + self._buffer.append(point) + if len(self._buffer) >= 100: + asyncio.ensure_future(self._flush_now()) + + @staticmethod + def _ts() -> int: + return int(time.time() * 1_000_000_000) + + # ------------------------------------------------------------------ + # Public write methods + # ------------------------------------------------------------------ + + def write_state( + self, + reference_price: float, + position: float, + pnl: Optional[float], + num_active_orders: int, + num_cancels: int, + num_new_orders: int, + stop_bids: bool, + stop_asks: bool, + tps: float, + best_bid: Optional[float], + best_ask: Optional[float], + order_prices: dict, + cumulative_edge_pnl: float, + cumulative_edge_pnl_by_quoter: dict, + ) -> None: + """Write mm_state measurement (one per main loop iteration).""" + if not self._enabled: + return + p = ( + Point("mm_state") + .tag("market", self._market) + .tag("oracle_source", self._oracle_source) + .field("reference_price", float(reference_price)) + .field("position", float(position)) + .field("num_active_orders", int(num_active_orders)) + .field("num_cancels", int(num_cancels)) + .field("num_new_orders", int(num_new_orders)) + .field("stop_bids", 1 if stop_bids else 0) + .field("stop_asks", 1 if stop_asks else 0) + .field("tps", float(tps)) + .field("cumulative_edge_pnl", float(cumulative_edge_pnl)) + .time(self._ts()) + ) + for qid, val in cumulative_edge_pnl_by_quoter.items(): + field_name = f"edge_pnl_q{qid.replace('.', '_')}" + p = p.field(field_name, float(val)) + if pnl is not None: + p = p.field("pnl", float(pnl)) + if best_bid is not None: + p = p.field("best_bid", float(best_bid)) + if best_ask is not None: + p = p.field("best_ask", float(best_ask)) + for name, price in order_prices.items(): + p = p.field(name, float(price)) + self._enqueue(p) + + def write_fill( + self, + side: str, + price: float, + oracle_price: float, + realized_edge_bps: float, + edge_pnl: float, + filled_size: float, + remaining_size: float, + fill_type: str, + quoter_id: str, + ) -> None: + """Write mm_fill measurement (one per fill callback).""" + if not self._enabled: + return + p = ( + Point("mm_fill") + .tag("market", self._market) + .tag("side", side) + .tag("quoter_id", quoter_id) + .tag("fill_type", fill_type) + .field("price", float(price)) + .field("oracle_price", float(oracle_price)) + .field("realized_edge_bps", float(realized_edge_bps)) + .field("edge_pnl", float(edge_pnl)) + .field("filled_size", float(filled_size)) + .field("remaining_size", float(remaining_size)) + .time(self._ts()) + ) + self._enqueue(p) + + def write_order( + self, + side: str, + price: float, + size: float, + quoter_id: str, + event: str, + ) -> None: + """Write mm_order measurement (placed or cancelled).""" + if not self._enabled: + return + p = ( + Point("mm_order") + .tag("market", self._market) + .tag("side", side) + .tag("quoter_id", quoter_id) + .tag("event", event) + .field("price", float(price)) + .field("size", float(size)) + .time(self._ts()) + ) + self._enqueue(p) + + def write_reconcile( + self, + tracked_position: float, + drift: float, + free_base: float, + locked_base: float, + free_quote: float, + locked_quote: float, + num_active_orders: int, + block_number: int, + ) -> None: + """Write mm_reconcile measurement (one per reconcile cycle).""" + if not self._enabled: + return + p = ( + Point("mm_reconcile") + .tag("market", self._market) + .field("tracked_position", float(tracked_position)) + .field("drift", float(drift)) + .field("free_base", float(free_base)) + .field("locked_base", float(locked_base)) + .field("free_quote", float(free_quote)) + .field("locked_quote", float(locked_quote)) + .field("num_active_orders", int(num_active_orders)) + .field("block_number", int(block_number)) + .time(self._ts()) + ) + self._enqueue(p) diff --git a/mm_bot/pricing/oracle.py b/mm_bot/pricing/oracle.py index 28cc40a..d498f75 100644 --- a/mm_bot/pricing/oracle.py +++ b/mm_bot/pricing/oracle.py @@ -56,31 +56,47 @@ def get_price(self, market_id: str) -> Optional[float]: return None +# Monad block lifecycle states, from freshest to most final. +# "proposed" — newest prices, can revert on reorg +# "voted" — validators voted +# "finalized" — finalized by validators +# "committed" — committed to chain, highest finality (slightly lagging) +KURU_DEPTH_STATES = ("proposed", "voted", "finalized", "committed") + + class KuruPriceSource(PriceSource): """ Fetch real-time price from Kuru WebSocket orderbook. - Maintains a WebSocket connection to wss://ws.kuru.io and subscribes to - the frontendOrderbook channel. Calculates mid-price from best bid/ask. + Maintains a WebSocket connection to wss://exchange.kuru.io and subscribes to + the @monadDepth channel. Calculates mid-price from best bid/ask. + + Args: + depth_state: Which Monad block state to read prices from. + One of "proposed", "voted", "finalized", "committed". + Defaults to "committed" (safest). Use "proposed" for freshest prices. """ - def __init__(self): + def __init__(self, depth_state: str = "committed"): + if depth_state not in KURU_DEPTH_STATES: + raise ValueError(f"depth_state must be one of {KURU_DEPTH_STATES}, got '{depth_state}'") + self._depth_state = depth_state self._best_bid: Optional[float] = None self._best_ask: Optional[float] = None - self._market_id: Optional[str] = None + self._symbol: Optional[str] = None self._ws_task: Optional[asyncio.Task] = None self._stop_event = asyncio.Event() self._loop: Optional[asyncio.AbstractEventLoop] = None self._ready_event = threading.Event() - def start(self, market_id: str) -> None: + def start(self, symbol: str) -> None: """ Start WebSocket connection in background. Args: - market_id: Market address to subscribe to + symbol: Market symbol to subscribe to (e.g. "mon_ausd") """ - self._market_id = market_id + self._symbol = symbol # Start WebSocket in background thread with its own event loop def run_ws(): @@ -99,19 +115,18 @@ def run_ws(): async def _run_websocket(self) -> None: """Run WebSocket connection (internal)""" - uri = "wss://ws.kuru.io" + uri = "wss://exchange.kuru.io" while not self._stop_event.is_set(): try: async with websockets.connect(uri) as websocket: - # Subscribe to orderbook subscribe_msg = { - "type": "subscribe", - "channel": "frontendOrderbook", - "market": self._market_id + "method": "SUBSCRIBE", + "params": [f"{self._symbol}@monadDepth"], + "id": 1 } await websocket.send(json.dumps(subscribe_msg)) - logger.debug(f"Subscribed to Kuru orderbook for {self._market_id}") + logger.debug(f"Subscribed to Kuru orderbook for {self._symbol}@monadDepth") # Process messages while not self._stop_event.is_set(): @@ -130,29 +145,41 @@ async def _run_websocket(self) -> None: await asyncio.sleep(5) # Retry after 5s def _process_message(self, data: dict) -> None: - """Process WebSocket message and update prices""" + """Process WebSocket message and update prices. + + Message format from exchange.kuru.io @monadDepth: + {"e": "monadDepthUpdate", "s": "
", "states": {"committed": {"b": [["price_wei", "size"], ...], "a": [...]}, "proposed": {...}}} + Prices are 10^18-scaled integer strings. + """ try: - # Check for orderbook data - if "b" in data and "a" in data: - bids = data["b"] - asks = data["a"] + if data.get("e") != "monadDepthUpdate": + return + + states = data.get("states", {}) + state = states.get(self._depth_state) + if not state: + return + + bids = state.get("b") + asks = state.get("a") + if not bids or not asks: + return + + best_bid = int(bids[0][0]) / (10 ** 18) + best_ask = int(asks[0][0]) / (10 ** 18) - if bids and asks: - # Best bid/ask are first entries - best_bid_raw = bids[0][0] # [price, size] - best_ask_raw = asks[0][0] + if best_bid <= 0 or best_ask <= 0: + return - # Convert from 10^18 to actual price - self._best_bid = best_bid_raw / (10 ** 18) - self._best_ask = best_ask_raw / (10 ** 18) + self._best_bid = best_bid + self._best_ask = best_ask - # Signal ready on first valid data - if not self._ready_event.is_set(): - self._ready_event.set() + if not self._ready_event.is_set(): + self._ready_event.set() - logger.debug(f"Kuru orderbook updated: bid={self._best_bid:.6f}, ask={self._best_ask:.6f}") + logger.debug(f"Kuru orderbook updated: bid={self._best_bid:.6f}, ask={self._best_ask:.6f}") - except (KeyError, IndexError, TypeError) as e: + except (KeyError, IndexError, TypeError, ValueError) as e: logger.warning(f"Failed to parse Kuru orderbook: {e}") def get_price(self, market_id: str) -> Optional[float]: diff --git a/mm_bot/quoter/__init__.py b/mm_bot/quoter/__init__.py index e69de29..90fe72e 100644 --- a/mm_bot/quoter/__init__.py +++ b/mm_bot/quoter/__init__.py @@ -0,0 +1,7 @@ +from mm_bot.quoter.base import BaseQuoter +from mm_bot.quoter.context import ExistingOrder, QuoterContext, QuoterDecision +from mm_bot.quoter.registry import register_quoter, get_quoter_class +from mm_bot.quoter.skew_quoter import SkewQuoter + +# Register built-in quoter types +register_quoter("skew", SkewQuoter) diff --git a/mm_bot/quoter/base.py b/mm_bot/quoter/base.py new file mode 100644 index 0000000..813cea2 --- /dev/null +++ b/mm_bot/quoter/base.py @@ -0,0 +1,106 @@ +import time +from abc import ABC, abstractmethod +from decimal import Decimal + +from mm_bot.kuru_imports import OrderSide +from mm_bot.quoter.context import QuoterContext, QuoterDecision + + +class BaseQuoter(ABC): + """ + Abstract base class for all quoter implementations. + + A quoter is responsible for one "level" of quoting -- typically + one bid/ask pair at a particular spread from mid. + + Implementers must define a single method: decide(ctx) -> QuoterDecision. + The Bot handles all infrastructure (order tracking, pre-registration, + callback handling, balance filtering). The quoter handles strategy + (pricing, cancel/maintain decisions, order construction). + """ + + def __init__(self, quoter_id: str, quantity: Decimal): + """ + Args: + quoter_id: Unique identifier for this quoter instance. + Used in cloid prefixes for order matching. + Must be stable across restarts for the same config. + Examples: "10.0", "always-25.0", "vwap-1" + quantity: Order size for this quoter (base token units). + """ + self.quoter_id = quoter_id + self.quantity = quantity + + @property + def cloid_prefix_bid(self) -> str: + """Prefix for matching bid cloids back to this quoter.""" + return f"bid-{self.quoter_id}-" + + @property + def cloid_prefix_ask(self) -> str: + """Prefix for matching ask cloids back to this quoter.""" + return f"ask-{self.quoter_id}-" + + def owns_cloid(self, cloid: str) -> bool: + """Check if a cloid belongs to this quoter.""" + return cloid.startswith(self.cloid_prefix_bid) or cloid.startswith(self.cloid_prefix_ask) + + def make_cloid(self, side: str) -> str: + """Generate a new cloid for this quoter. side is 'bid' or 'ask'.""" + timestamp = int(time.time() * 1000) + return f"{side}-{self.quoter_id}-{timestamp}" + + @abstractmethod + def decide(self, ctx: QuoterContext) -> QuoterDecision: + """ + Given the current market context, decide what to do. + + This is the single method that quoter implementers must define. + It receives full context and returns cancel/place instructions. + + The Bot handles: + - Resolving existing orders from its tracking state + - Executing the cancels and placements + - Pre-registration and callback tracking + - Balance filtering + + The quoter handles: + - Pricing logic (edges, skew, signals, etc.) + - Cancel/maintain decisions + - Order construction + """ + ... + + @classmethod + def from_config(cls, config_section: dict) -> "BaseQuoter": + """ + Construct a quoter from a config dict (e.g., a [[strategy.quoters]] section). + Override this in subclasses to parse type-specific config fields. + """ + raise NotImplementedError(f"{cls.__name__} must implement from_config()") + + # --- Static helpers (quoters can use or ignore) --- + + @staticmethod + def calculate_order_edge( + order_price: Decimal, + order_side: OrderSide, + reference_price: Decimal, + ) -> Decimal: + """Calculate edge in bps of an existing order relative to reference price.""" + if order_side == OrderSide.BUY: + return (reference_price - order_price) / reference_price * Decimal("10000") + else: + return (order_price - reference_price) / reference_price * Decimal("10000") + + @staticmethod + def price_from_edge( + edge_bps: Decimal, + side: OrderSide, + reference_price: Decimal, + ) -> Decimal: + """Convert an edge in bps to a price.""" + if side == OrderSide.BUY: + return reference_price * (Decimal("1") - edge_bps / Decimal("10000")) + else: + return reference_price * (Decimal("1") + edge_bps / Decimal("10000")) diff --git a/mm_bot/quoter/context.py b/mm_bot/quoter/context.py new file mode 100644 index 0000000..82f5721 --- /dev/null +++ b/mm_bot/quoter/context.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass, field +from decimal import Decimal +from typing import Optional + +from mm_bot.kuru_imports import Order, OrderSide + + +@dataclass(frozen=True) +class ExistingOrder: + """An order the bot knows about, resolved from multiple tracking sources.""" + cloid: str + side: OrderSide + price: Optional[Decimal] # None if price unknown (preregistered / unknown state) + source: str # "on_chain" | "callback" | "preregistered" | "unknown" + + +@dataclass(frozen=True) +class QuoterContext: + """ + Snapshot of market state passed to a quoter each iteration. + Assembled by the Bot -- quoters never touch Bot internals. + + Frozen (immutable) to prevent quoters from mutating shared state. + New Optional fields with defaults can be added without breaking existing quoters. + """ + # Market state + reference_price: Decimal + + # Position state + current_position: Decimal + max_position: Decimal + + # This quoter's existing orders (resolved by Bot from its tracking dicts) + existing_bid: Optional[ExistingOrder] = None + existing_ask: Optional[ExistingOrder] = None + + # Position limit flags (computed by Bot) + stop_bids: bool = False # Position > max_position + stop_asks: bool = False # Position < -max_position + + # Strategy params from config (quoters may or may not use these) + prop_maintain: float = 0.2 + + # Market config (for price precision, etc.) + price_precision: Optional[Decimal] = None + + +@dataclass +class QuoterDecision: + """ + A quoter's output: which orders to cancel and which to place. + """ + cancels: list[str] = field(default_factory=list) # cloids to cancel + new_orders: list[Order] = field(default_factory=list) # orders to place diff --git a/mm_bot/quoter/quoter.py b/mm_bot/quoter/quoter.py index d459c08..45442aa 100644 --- a/mm_bot/quoter/quoter.py +++ b/mm_bot/quoter/quoter.py @@ -1,192 +1,13 @@ -from typing import Optional -from decimal import Decimal -import time -from loguru import logger +""" +Backward-compatibility shim. -from mm_bot.pricing.oracle import OracleService -from mm_bot.position.position_tracker import PositionTracker -from mm_bot.kuru_imports import Order, OrderType, OrderSide +The Quoter class has been refactored into the pluggable quoter system: +- BaseQuoter (mm_bot.quoter.base) — abstract base class +- SkewQuoter (mm_bot.quoter.skew_quoter) — the original strategy +This file preserves the old import path: + from mm_bot.quoter.quoter import Quoter +""" +from mm_bot.quoter.skew_quoter import SkewQuoter as Quoter -class Quoter: - def __init__( - self, - oracle_service: OracleService, - position_tracker: PositionTracker, - source_name: str, - market_id: str, - baseline_edge_bps: float, - max_position: float, - prop_skew_entry: float, - prop_skew_exit: float, - quantity: float, - market_config=None, - ): - self.oracle_service = oracle_service - self.position_tracker = position_tracker - self.source_name = source_name - self.market_id = market_id - self.market_config = market_config - self.baseline_edge_bps = Decimal(str(baseline_edge_bps)) - self.max_position = Decimal(str(max_position)) - self.prop_skew_entry = Decimal(str(prop_skew_entry)) - self.prop_skew_exit = Decimal(str(prop_skew_exit)) - self.quantity = Decimal(str(quantity)) - - def _to_decimal(self, value) -> Decimal: - if isinstance(value, Decimal): - return value - return Decimal(str(value)) - - def _cap_value(self, value: Decimal, min_val: Decimal, max_val: Decimal) -> Decimal: - """Cap a value between min_val and max_val""" - return max(min_val, min(max_val, value)) - - def _calculate_prop_of_max_position(self) -> Decimal: - """Calculate the proportional position relative to max position, capped at [-1, 1]""" - current_position = self.position_tracker.get_current_position() - prop_of_max = current_position / self.max_position if self.max_position != 0 else Decimal("0") - return self._cap_value(prop_of_max, Decimal("-1"), Decimal("1")) - - def get_bid_ask_edges(self) -> tuple[Optional[Decimal], Optional[Decimal]]: - """ - Calculate bid and ask edges in bps based on the strategy type and position - - Returns: - tuple[Optional[float], Optional[float]]: (bid_edge_bps, ask_edge_bps) - """ - prop_of_max = self._calculate_prop_of_max_position() - - if prop_of_max > 0: - # Currently long: widen asks, tighten bids to mean-revert - bid_edge_bps = self.baseline_edge_bps * (Decimal("1") + prop_of_max * self.prop_skew_entry) - ask_edge_bps = self.baseline_edge_bps * (Decimal("1") - prop_of_max * self.prop_skew_exit) - else: - # Currently short: widen bids, tighten asks to mean-revert - bid_edge_bps = self.baseline_edge_bps * (Decimal("1") - prop_of_max * self.prop_skew_exit) - ask_edge_bps = self.baseline_edge_bps * (Decimal("1") + prop_of_max * self.prop_skew_entry) - - return bid_edge_bps, ask_edge_bps - - def get_cancel_edges(self, prop_maintain: float) -> tuple[Decimal, Decimal]: - """ - Get the cancel edge thresholds (edges below which orders should be cancelled). - - Args: - prop_maintain: Proportion to maintain (e.g. 0.2 means cancel if edge < 80% of target) - - Returns: - tuple[float, float]: (bid_cancel_edge_bps, ask_cancel_edge_bps) - """ - bid_edge_bps, ask_edge_bps = self.get_bid_ask_edges() - - # Cancel edges are reduced by prop_maintain factor - maintain = self._to_decimal(prop_maintain) - bid_cancel_edge_bps = bid_edge_bps * (Decimal("1") - maintain) - ask_cancel_edge_bps = ask_edge_bps * (Decimal("1") - maintain) - - return bid_cancel_edge_bps, ask_cancel_edge_bps - - def calculate_order_edge(self, order_price: Decimal, order_side: OrderSide, reference_price: Decimal) -> Decimal: - """ - Calculate the edge (in bps) of an existing order. - - Args: - order_price: Price of the existing order - order_side: Side of the order (BUY or SELL) - reference_price: Current reference/fair price - - Returns: - float: Edge in basis points - """ - if order_side == OrderSide.BUY: - # Bid edge: how much below fair value - edge = (reference_price - order_price) / reference_price * Decimal("10000") - else: # SELL - # Ask edge: how much above fair value - edge = (order_price - reference_price) / reference_price * Decimal("10000") - - return edge - - def get_reference_price(self) -> Optional[Decimal]: - """Get the reference price from the oracle service""" - reference_price = self.oracle_service.get_price(self.market_id, self.source_name) - if reference_price is None: - return None - return self._to_decimal(reference_price) - - def generate_orders(self, reference_price: Decimal, need_bid: bool = True, need_ask: bool = True) -> list[Order]: - """ - Generate orders for specified sides based on current market conditions. - - Args: - reference_price: Current reference/fair price - need_bid: Whether to generate a bid order - need_ask: Whether to generate an ask order - - Returns: - list[Order]: List containing requested orders - """ - if reference_price is None: - return [] - - bid_edge_bps, ask_edge_bps = self.get_bid_ask_edges() - - orders = [] - timestamp = int(time.time() * 1000) - - if need_bid: - # Calculate bid price - bid_multiplier = Decimal("1") - (bid_edge_bps / Decimal("10000")) - # Tick rounding is delegated to SDK place_orders(price_rounding="default") - bid_price = reference_price * bid_multiplier - bid_cloid = f"bid-{self.baseline_edge_bps}-{timestamp}" - - orders.append(Order( - cloid=bid_cloid, - order_type=OrderType.LIMIT, - side=OrderSide.BUY, - price=bid_price, - size=self.quantity, - post_only=False - )) - logger.debug( - f"New bid: cloid={bid_cloid} price={float(bid_price):.6f} " - f"size={float(self.quantity)} edge={float(bid_edge_bps):.2f}bps" - ) - - if need_ask: - # Calculate ask price - ask_multiplier = Decimal("1") + (ask_edge_bps / Decimal("10000")) - # Tick rounding is delegated to SDK place_orders(price_rounding="default") - ask_price = reference_price * ask_multiplier - ask_cloid = f"ask-{self.baseline_edge_bps}-{timestamp}" - - orders.append(Order( - cloid=ask_cloid, - order_type=OrderType.LIMIT, - side=OrderSide.SELL, - price=ask_price, - size=self.quantity, - post_only=False - )) - logger.debug( - f"New ask: cloid={ask_cloid} price={float(ask_price):.6f} " - f"size={float(self.quantity)} edge={float(ask_edge_bps):.2f}bps" - ) - - return orders - - def get_orders(self) -> list[Order]: - """ - Generate bid and ask orders based on the current market conditions and strategy. - Legacy method - generates both sides always. - - Returns: - list[Order]: List containing bid and ask orders - """ - reference_price = self.get_reference_price() - if reference_price is None: - return [] - - return self.generate_orders(reference_price, need_bid=True, need_ask=True) +__all__ = ["Quoter"] diff --git a/mm_bot/quoter/registry.py b/mm_bot/quoter/registry.py new file mode 100644 index 0000000..dd5f0f7 --- /dev/null +++ b/mm_bot/quoter/registry.py @@ -0,0 +1,20 @@ +from typing import Dict, Type + +from mm_bot.quoter.base import BaseQuoter + +QUOTER_REGISTRY: Dict[str, Type[BaseQuoter]] = {} + + +def register_quoter(name: str, cls: Type[BaseQuoter]) -> None: + """Register a quoter class under a name (e.g., 'skew', 'always_replace').""" + QUOTER_REGISTRY[name] = cls + + +def get_quoter_class(name: str) -> Type[BaseQuoter]: + """Look up a registered quoter class by name. Raises ValueError if not found.""" + if name not in QUOTER_REGISTRY: + available = list(QUOTER_REGISTRY.keys()) + raise ValueError( + f"Unknown quoter type '{name}'. Available: {available}" + ) + return QUOTER_REGISTRY[name] diff --git a/mm_bot/quoter/skew_quoter.py b/mm_bot/quoter/skew_quoter.py new file mode 100644 index 0000000..c7370ec --- /dev/null +++ b/mm_bot/quoter/skew_quoter.py @@ -0,0 +1,205 @@ +from decimal import Decimal +from typing import Optional + +from loguru import logger + +from mm_bot.kuru_imports import Order, OrderType, OrderSide +from mm_bot.quoter.base import BaseQuoter +from mm_bot.quoter.context import ExistingOrder, QuoterContext, QuoterDecision + + +class SkewQuoter(BaseQuoter): + """ + Position-skew quoter with PropMaintain cancel logic. + + This is the original quoter strategy. It: + - Widens/tightens edges based on position as a proportion of max_position + - Only cancels orders whose edge has drifted below a threshold (PropMaintain) + - Couples bid/ask: if one side is replaced, the other is force-replaced too + """ + + def __init__( + self, + baseline_edge_bps: float, + quantity: Decimal, + prop_skew_entry: float = 0.5, + prop_skew_exit: float = 0.5, + ): + # quoter_id embeds baseline_edge_bps for backward-compatible cloid format + # e.g. "10.0" produces cloids like "bid-10.0-1771500973306" + quoter_id = str(Decimal(str(baseline_edge_bps))) + super().__init__(quoter_id=quoter_id, quantity=quantity) + self.baseline_edge_bps = Decimal(str(baseline_edge_bps)) + self.prop_skew_entry = Decimal(str(prop_skew_entry)) + self.prop_skew_exit = Decimal(str(prop_skew_exit)) + + def _get_skewed_edges(self, ctx: QuoterContext) -> tuple[Decimal, Decimal]: + """Calculate bid/ask edges with position skew applied.""" + if ctx.max_position != 0: + prop_of_max = ctx.current_position / ctx.max_position + else: + prop_of_max = Decimal("0") + prop_of_max = max(Decimal("-1"), min(Decimal("1"), prop_of_max)) + + if prop_of_max > 0: + # Currently long: widen bids (slow to buy more), tighten asks (eager to sell) + bid_edge = self.baseline_edge_bps * (Decimal("1") + prop_of_max * self.prop_skew_entry) + ask_edge = self.baseline_edge_bps * (Decimal("1") - prop_of_max * self.prop_skew_exit) + else: + # Currently short: tighten bids (eager to buy), widen asks (slow to sell more) + bid_edge = self.baseline_edge_bps * (Decimal("1") - prop_of_max * self.prop_skew_exit) + ask_edge = self.baseline_edge_bps * (Decimal("1") + prop_of_max * self.prop_skew_entry) + + return bid_edge, ask_edge + + def _evaluate_existing_order( + self, + existing: Optional[ExistingOrder], + side: OrderSide, + cancel_threshold: Decimal, + reference_price: Decimal, + stop_side: bool, + ) -> tuple[bool, Optional[str]]: + """ + Evaluate a single existing order: should it be kept, cancelled, or is it missing? + + Returns: + (need_new_order, cancel_cloid_or_none) + """ + if existing is None: + return True, None # No order exists, need a new one + + side_label = "bid" if side == OrderSide.BUY else "ask" + + if existing.source == "preregistered": + logger.debug(f"Quoter {self.baseline_edge_bps}bps: {side_label.capitalize()} pending confirmation, holding") + return False, None + + if existing.source == "unknown": + logger.debug(f"Quoter {self.baseline_edge_bps}bps: {side_label.capitalize()} in unknown state, holding") + return False, None + + # We have a price -- do the edge check + if existing.price is None: + return False, None # Safety: shouldn't happen for on_chain/callback but be safe + + order_edge = self.calculate_order_edge(existing.price, side, reference_price) + source_tag = f" [{existing.source}]" if existing.source == "callback" else "" + + if stop_side: + logger.debug( + f"Quoter {float(self.baseline_edge_bps):.2f}bps: " + f"Cancelling {side_label} @ {float(existing.price):.6f} " + f"(position limit exceeded){source_tag}" + ) + return True, existing.cloid + + if order_edge >= cancel_threshold: + logger.debug( + f"Quoter {float(self.baseline_edge_bps):.2f}bps: " + f"Keeping {side_label} @ {float(existing.price):.6f} " + f"(edge={float(order_edge):.1f} >= cancel_threshold={float(cancel_threshold):.1f}){source_tag}" + ) + return False, None + + logger.debug( + f"Quoter {float(self.baseline_edge_bps):.2f}bps: " + f"Cancelling {side_label} @ {float(existing.price):.6f} " + f"(edge={float(order_edge):.1f} < cancel_threshold={float(cancel_threshold):.1f}){source_tag}" + ) + return True, existing.cloid + + def decide(self, ctx: QuoterContext) -> QuoterDecision: + cancels = [] + + bid_edge, ask_edge = self._get_skewed_edges(ctx) + maintain = Decimal(str(ctx.prop_maintain)) + bid_cancel_threshold = bid_edge * (Decimal("1") - maintain) + ask_cancel_threshold = ask_edge * (Decimal("1") - maintain) + + # --- Evaluate existing bid --- + need_bid, bid_cancel = self._evaluate_existing_order( + ctx.existing_bid, OrderSide.BUY, bid_cancel_threshold, + ctx.reference_price, ctx.stop_bids, + ) + if bid_cancel: + cancels.append(bid_cancel) + + # --- Evaluate existing ask --- + need_ask, ask_cancel = self._evaluate_existing_order( + ctx.existing_ask, OrderSide.SELL, ask_cancel_threshold, + ctx.reference_price, ctx.stop_asks, + ) + if ask_cancel: + cancels.append(ask_cancel) + + # --- Coupling: if one side replaced, force-replace the other --- + if need_bid and not need_ask: + if ctx.existing_ask and ctx.existing_ask.source == "preregistered": + logger.debug(f"Coupling: ask {ctx.existing_ask.cloid} still preregistered, skipping quoter this iteration") + return QuoterDecision() + need_ask = True + if ctx.existing_ask and ctx.existing_ask.cloid not in cancels: + cancels.append(ctx.existing_ask.cloid) + logger.debug(f"Coupling: cancelling ask {ctx.existing_ask.cloid} because bid was replaced") + elif need_ask and not need_bid: + if ctx.existing_bid and ctx.existing_bid.source == "preregistered": + logger.debug(f"Coupling: bid {ctx.existing_bid.cloid} still preregistered, skipping quoter this iteration") + return QuoterDecision() + need_bid = True + if ctx.existing_bid and ctx.existing_bid.cloid not in cancels: + cancels.append(ctx.existing_bid.cloid) + logger.debug(f"Coupling: cancelling bid {ctx.existing_bid.cloid} because ask was replaced") + + # --- Generate new orders --- + new_orders = [] + final_need_bid = need_bid and not ctx.stop_bids + final_need_ask = need_ask and not ctx.stop_asks + + if final_need_bid: + bid_price = self.price_from_edge(bid_edge, OrderSide.BUY, ctx.reference_price) + new_orders.append(Order( + cloid=self.make_cloid("bid"), + order_type=OrderType.LIMIT, + side=OrderSide.BUY, + price=bid_price, + size=self.quantity, + post_only=False, + )) + logger.debug( + f"New bid: cloid={new_orders[-1].cloid} price={float(bid_price):.6f} " + f"size={float(self.quantity)} edge={float(bid_edge):.2f}bps" + ) + + if final_need_ask: + ask_price = self.price_from_edge(ask_edge, OrderSide.SELL, ctx.reference_price) + new_orders.append(Order( + cloid=self.make_cloid("ask"), + order_type=OrderType.LIMIT, + side=OrderSide.SELL, + price=ask_price, + size=self.quantity, + post_only=False, + )) + logger.debug( + f"New ask: cloid={new_orders[-1].cloid} price={float(ask_price):.6f} " + f"size={float(self.quantity)} edge={float(ask_edge):.2f}bps" + ) + + if new_orders: + logger.debug( + f"Quoter {self.baseline_edge_bps}bps: Generating {len(new_orders)} new orders " + f"(bid={'yes' if final_need_bid else 'no'}, ask={'yes' if final_need_ask else 'no'})" + ) + + return QuoterDecision(cancels=cancels, new_orders=new_orders) + + @classmethod + def from_config(cls, config_section: dict) -> "SkewQuoter": + """Construct from a [[strategy.quoters]] config dict.""" + return cls( + baseline_edge_bps=float(config_section["baseline_edge_bps"]), + quantity=Decimal(str(config_section["quantity"])), + prop_skew_entry=float(config_section.get("prop_skew_entry", 0.5)), + prop_skew_exit=float(config_section.get("prop_skew_exit", 0.5)), + ) diff --git a/requirements.txt b/requirements.txt index f71caf3..6350cee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,6 @@ kuru-sdk-py>=0.1.10 # Configuration hot-reload tomli>=2.0.0; python_version < '3.11' + +# InfluxDB metrics (optional) +influxdb3-python diff --git a/skills/kuru-mm-bot/SKILL.md b/skills/kuru-mm-bot/SKILL.md index 44daab7..37f10ef 100644 --- a/skills/kuru-mm-bot/SKILL.md +++ b/skills/kuru-mm-bot/SKILL.md @@ -1,6 +1,6 @@ --- name: kuru-mm-bot -description: Specialized knowledge for coding on the Kuru market making bot (mm-example repository). Use when working on any task involving order tracking, PropMaintain strategy, position management, quoter logic, bot configuration, or the order lifecycle. Provides critical architectural context that prevents common mistakes — load this before making any changes to bot.py, quoter.py, or config.py. +description: Specialized knowledge for coding on the Kuru market making bot (mm-example repository). Use when working on any task involving order tracking, PropMaintain strategy, position management, quoter logic, bot configuration, or the order lifecycle. Provides critical architectural context that prevents common mistakes — load this before making any changes to bot.py, quoter files, or config.py. --- # Kuru Market Making Bot @@ -22,9 +22,13 @@ Logs → `tracking/bot_run_YYYYMMDD_HHMMSS.log`. Position persists across restar | File | Role | |------|------| | `mm_bot/main.py` | Entry point, logging setup, signal handlers | -| `mm_bot/bot/bot.py` | All strategy logic (~1270 lines) | -| `mm_bot/quoter/quoter.py` | Bid/ask price + cancel threshold calculation | -| `mm_bot/config/config.py` | `BotConfig` dataclass, loads `.env` | +| `mm_bot/bot/bot.py` | All strategy orchestration, order lifecycle | +| `mm_bot/quoter/base.py` | `BaseQuoter` ABC — implement `decide(ctx) -> QuoterDecision` | +| `mm_bot/quoter/context.py` | `QuoterContext` (frozen snapshot) and `QuoterDecision` | +| `mm_bot/quoter/skew_quoter.py` | Built-in `SkewQuoter` — position skew + PropMaintain | +| `mm_bot/quoter/registry.py` | `register_quoter()` / `get_quoter_class()` | +| `mm_bot/quoter/quoter.py` | Backward-compat shim (`SkewQuoter as Quoter`) | +| `mm_bot/config/config.py` | `BotConfig` dataclass, TOML + `.env` loading | | `mm_bot/kuru_imports.py` | **Single shim for all SDK imports** — always import from here | The root-level `kuru_imports.py` is unused dead code. Do not import from it. @@ -42,10 +46,10 @@ Do not bypass bundle-based initialization with partial ad-hoc config wiring. Kee **3. Cloid format is load-bearing:** ``` -{side}-{baseline_edge_bps}-{timestamp_ms} +{side}-{quoter_id}-{timestamp_ms} # e.g. bid-1.0-1771500973306 ask-15.0-1771500975944 ``` -Quoter-to-order matching uses prefix `bid-{bps}-` / `ask-{bps}-`. Changing the format requires updating the matching logic in `_generate_orders_with_prop_maintain()`. +For `SkewQuoter`, `quoter_id = str(Decimal(str(baseline_edge_bps)))`. Matching uses `quoter.cloid_prefix_bid` / `quoter.cloid_prefix_ask`. Changing format requires updating matching logic in `_resolve_existing_orders()`. ## Order Tracking: Callbacks, Not REST API @@ -53,16 +57,26 @@ The REST API lags ~2 seconds behind on-chain events. The bot tracks order state REST API is only used for: startup cleanup, orphan detection validation, and shutdown. -## PropMaintain Check Chain +## Order Generation Flow -For each quoter + side, `_generate_orders_with_prop_maintain()` checks the existing order in priority order: +Each iteration, `_generate_orders()` in `bot.py`: -1. Found in REST API result → edge check using API price -2. In `preregistered_orders` → just sent, awaiting confirmation → hold -3. In `active_orders` → confirmed via callback, REST API not yet indexed → full edge check using callback price (logs `[callback]`) -4. In `active_cloids` but not `active_orders` → unknown state → hold +1. Calls `_resolve_existing_orders(quoter, on_chain_by_cloid)` — scans tracking dicts, returns `ExistingOrder` objects with `source` set to `"on_chain"`, `"callback"`, `"preregistered"`, or `"unknown"` +2. Builds a frozen `QuoterContext` snapshot (price, position, existing orders, stop flags) +3. Calls `quoter.decide(ctx)` — the quoter returns `QuoterDecision(cancels, new_orders)` +4. Discards cancelled cloids from `active_cloids`, batches all orders into `place_orders()` -If one side of a quoter is replaced, the **coupling block** force-replaces the other side too (~line 1090 in `bot.py`). This ensures both bid and ask always share the same reference price. +The old `_generate_orders_with_prop_maintain` method has been replaced by this flow. All per-quoter cancel/maintain logic now lives in the quoter's `decide()` method. + +## PropMaintain Check Chain (SkewQuoter) + +`SkewQuoter.decide()` evaluates each side's existing order by `source`: + +1. `"preregistered"` → just sent, awaiting confirmation → hold +2. `"unknown"` → in `active_cloids` but not `active_orders` → hold +3. `"on_chain"` or `"callback"` → compute edge; keep if `edge >= baseline × (1 - prop_maintain)`, else cancel + +If one side of a quoter is replaced, the **coupling block** inside `decide()` force-replaces the other side too. This ensures both bid and ask always share the same reference price. ## Shutdown diff --git a/skills/kuru-mm-bot/references/architecture.md b/skills/kuru-mm-bot/references/architecture.md index 5268047..e271834 100644 --- a/skills/kuru-mm-bot/references/architecture.md +++ b/skills/kuru-mm-bot/references/architecture.md @@ -3,10 +3,11 @@ ## Table of Contents 1. [Bot Tracking Dicts](#bot-tracking-dicts) 2. [Order Lifecycle](#order-lifecycle) -3. [Quoter Skew Formula](#quoter-skew-formula) -4. [Environment Variables](#environment-variables) -5. [Orphan Detection](#orphan-detection) -6. [Common Pitfalls](#common-pitfalls) +3. [Pluggable Quoter System](#pluggable-quoter-system) +4. [SkewQuoter Formula](#skewquoter-formula) +5. [Environment Variables](#environment-variables) +6. [Orphan Detection](#orphan-detection) +7. [Common Pitfalls](#common-pitfalls) --- @@ -64,13 +65,78 @@ ORDER_CANCELLED callback **Immediate fill handling:** an order can be fully filled before ORDER_PLACED fires. `preregistered_orders` is populated before `place_orders()` and cleaned in the FULLY_FILLED handler to cover this path. -**Position total** = `start_position + current_position`. Both `Quoter` and reconciliation use the total, not just `current_position`. +**Position total** = `start_position + current_position`. Both quoters and reconciliation use the total, not just `current_position`. --- -## Quoter Skew Formula +## Pluggable Quoter System -`Quoter.get_bid_ask_edges()` adjusts edges based on `prop = position / max_position` (capped ±1): +### Overview + +Each quoter manages one bid/ask pair at one spread level. The bot resolves order state into a `QuoterContext` snapshot and passes it to each quoter's `decide()` method. The quoter returns a `QuoterDecision` (cloids to cancel + orders to place) without touching any bot internals. + +### QuoterContext fields + +| Field | Type | Description | +|-------|------|-------------| +| `reference_price` | `Decimal` | Current fair price from oracle | +| `current_position` | `Decimal` | Net position (positive = long) | +| `max_position` | `Decimal` | Position limit from config | +| `existing_bid` | `ExistingOrder?` | Bot's current bid for this quoter | +| `existing_ask` | `ExistingOrder?` | Bot's current ask for this quoter | +| `stop_bids` | `bool` | `True` if position ≥ max_position | +| `stop_asks` | `bool` | `True` if position ≤ -max_position | +| `prop_maintain` | `float` | Cancel threshold factor | +| `price_precision` | `Decimal` | Market price precision | + +`QuoterContext` is frozen — quoters cannot mutate it. + +### ExistingOrder.source values + +| Source | Meaning | +|--------|---------| +| `"on_chain"` | Confirmed in REST API result; price is reliable | +| `"callback"` | Confirmed via ORDER_PLACED callback; REST API hasn't indexed yet | +| `"preregistered"` | Just sent via `place_orders()`; awaiting confirmation | +| `"unknown"` | In `active_cloids` but not in `active_orders`; unexpected state | + +### Order resolution priority in `_resolve_existing_orders()` + +1. `on_chain_by_cloid` (REST API) → source `"on_chain"` +2. `preregistered_orders` (fallback if not in active_cloids) → source `"preregistered"` +3. `active_orders` (callback-confirmed, REST API lag) → source `"callback"` +4. `active_cloids` only → source `"unknown"` + +### Implementing a custom quoter + +```python +from mm_bot.quoter.base import BaseQuoter +from mm_bot.quoter.context import QuoterContext, QuoterDecision +from mm_bot.quoter.registry import register_quoter + +class MyQuoter(BaseQuoter): + def decide(self, ctx: QuoterContext) -> QuoterDecision: + # ... your logic ... + return QuoterDecision(cancels=[...], new_orders=[...]) + + @classmethod + def from_config(cls, config_section: dict) -> "MyQuoter": + return cls(...) + +register_quoter("my_quoter", MyQuoter) +``` + +Available helper methods on `BaseQuoter`: +- `make_cloid(side)` → `"bid-{quoter_id}-{timestamp_ms}"` +- `price_from_edge(edge_bps, side, ref_price)` → price +- `calculate_order_edge(order_price, side, ref_price)` → edge in bps +- `cloid_prefix_bid` / `cloid_prefix_ask` properties + +--- + +## SkewQuoter Formula + +`SkewQuoter._get_skewed_edges()` adjusts edges based on `prop = position / max_position` (capped ±1): ```python if prop > 0: # long → eager to sell, reluctant to buy more @@ -80,11 +146,10 @@ else: # short → eager to buy, reluctant to sell more bid_edge = baseline × (1 - prop × prop_skew_exit) # tighter bid ask_edge = baseline × (1 + prop × prop_skew_entry) # wider ask -cancel_threshold = edge × (1 - PROP_MAINTAIN) +cancel_threshold = edge × (1 - prop_maintain) ``` -`get_cancel_edges(prop_maintain)` returns `(bid_cancel_edge, ask_cancel_edge)`. -`calculate_order_edge(price, side, ref_price)` returns the current edge in bps of an existing order. +Keep if `order_edge >= cancel_threshold`, cancel otherwise. Coupling: if one side is cancelled+replaced, the other is force-replaced to stay in sync. --- @@ -111,7 +176,7 @@ cancel_threshold = edge × (1 - PROP_MAINTAIN) | `POSITION_UPDATE_THRESHOLD_BPS` | `500` | Drift alert threshold | | `KURU_GAS_BUFFER_MULTIPLIER` | SDK default (1.1×) | Read by `ConfigManager.load_transaction_config()` | -To add a new parameter: add to `BotConfig` → read in `load_config_from_env()` → pass to `Quoter.__init__()` → use in `get_bid_ask_edges()` or `get_cancel_edges()`. +To add a new parameter: add to `BotConfig` → read in `load_operational_config()` → include in `QuoterContext` if needed by quoters. --- @@ -133,8 +198,8 @@ To add a new parameter: add to `BotConfig` → read in `load_config_from_env()` **Immediate fills:** An order can be filled before ORDER_PLACED fires. Always populate `preregistered_orders` before `place_orders()`, and always clean it up in FULLY_FILLED. -**Coupling:** If you add logic that replaces one side of a quoter, check whether it should also trigger the coupling block (~line 1090 in `bot.py`) to replace the other side. +**Coupling in custom quoters:** If your quoter replaces one side but holds the other, consider whether they should be re-priced together. `SkewQuoter` force-replaces both sides on any replacement to keep them priced off the same reference. **SDK config wiring:** Always initialize client from full SDK bundle (`KuruClient.create(**sdk_configs)`), where `sdk_configs` comes from `ConfigManager.load_all_configs(...)`. -**Cloid prefix matching:** The quoter-to-order mapping relies on `{side}-{bps}-` prefixes. Don't add extra fields to the cloid format without updating matching logic. +**Cloid prefix matching:** Quoter-to-order mapping relies on `cloid_prefix_bid` / `cloid_prefix_ask` on `BaseQuoter`. `quoter_id` must be unique across all quoters in a bot instance. Don't use the same `quoter_id` for two different quoters.