diff --git a/.gitignore b/.gitignore index 9ceeec4..dc7e373 100644 --- a/.gitignore +++ b/.gitignore @@ -52,7 +52,9 @@ Thumbs.db # Project specific *.log +*.csv .cache/ +logs/ # Development/Testing files (not for commit) .dev/ @@ -65,4 +67,4 @@ Thumbs.db .swarm/ memory/ .mcp.json -claude-flow \ No newline at end of file +claude-flowlogs/ diff --git a/dr_manhattan/base/exchange_client.py b/dr_manhattan/base/exchange_client.py index 22a3178..f4ddd34 100644 --- a/dr_manhattan/base/exchange_client.py +++ b/dr_manhattan/base/exchange_client.py @@ -940,6 +940,9 @@ def calculate_delta(positions: Dict[str, float]) -> DeltaInfo: """ Calculate delta (position imbalance) from positions. + For binary markets: delta = first_outcome - second_outcome (signed). + Positive delta means long first outcome, negative means long second outcome. + Args: positions: Dict mapping outcome name to position size @@ -957,10 +960,15 @@ def calculate_delta(positions: Dict[str, float]) -> DeltaInfo: position_values = list(positions.values()) max_pos = max(position_values) min_pos = min(position_values) - delta = max_pos - min_pos + + # Signed delta: first outcome - second outcome + if len(position_values) >= 2: + delta = position_values[0] - position_values[1] + else: + delta = position_values[0] max_outcome = None - if delta > 0: + if delta != 0: max_outcome = max(positions, key=positions.get) return DeltaInfo( diff --git a/dr_manhattan/base/strategy.py b/dr_manhattan/base/strategy.py index e50591b..bd878cd 100644 --- a/dr_manhattan/base/strategy.py +++ b/dr_manhattan/base/strategy.py @@ -57,6 +57,8 @@ def __init__( max_delta: float = 20.0, check_interval: float = 5.0, track_fills: bool = True, + enable_csv_logging: bool = False, + log_dir: str = "logs", ): """ Initialize strategy. @@ -69,6 +71,8 @@ def __init__( max_delta: Maximum position imbalance before reducing exposure check_interval: Seconds between strategy ticks track_fills: Enable order fill tracking + enable_csv_logging: Enable CSV logging of strategy execution + log_dir: Directory to store CSV log files """ self.exchange = exchange self.client = ExchangeClient(exchange, track_fills=track_fills) @@ -77,6 +81,8 @@ def __init__( self.order_size = order_size self.max_delta = max_delta self.check_interval = check_interval + self.enable_csv_logging = enable_csv_logging + self.log_dir = log_dir # Market data (populated by setup()) self.market: Optional[Market] = None @@ -85,6 +91,7 @@ def __init__( # Runtime state self.is_running = False + self.csv_logger = None # Cached state (updated each tick) self._positions: Dict[str, float] = {} @@ -130,6 +137,19 @@ def setup(self) -> bool: # Load initial positions self._positions = self.client.fetch_positions_dict_for_market(self.market) + # Initialize CSV logger if enabled + if self.enable_csv_logging: + from ..utils.csv_logger import CSVLogger + + strategy_name = self.__class__.__name__ + self.csv_logger = CSVLogger( + strategy_name=strategy_name, + market_id=self.market_id, + outcomes=outcomes, + log_dir=self.log_dir, + ) + logger.info(f"CSV logging enabled: {self.csv_logger.get_filepath()}") + self._log_trader_profile() self._log_market_info() return True @@ -650,6 +670,17 @@ def run(self, duration_minutes: Optional[int] = None): break self.on_tick() + + # Log to CSV if enabled + if self.csv_logger: + self.refresh_state() + self.csv_logger.log_snapshot( + nav=self._nav, + positions=self._positions, + orders=self._open_orders, + delta=self.delta, + ) + time.sleep(self.check_interval) except KeyboardInterrupt: diff --git a/dr_manhattan/utils/csv_logger.py b/dr_manhattan/utils/csv_logger.py new file mode 100644 index 0000000..a225d79 --- /dev/null +++ b/dr_manhattan/utils/csv_logger.py @@ -0,0 +1,120 @@ +"""CSV logger for strategy execution tracking.""" + +import csv +from datetime import datetime +from pathlib import Path +from typing import Dict, List + +from ..models.nav import NAV +from ..models.order import Order + + +class CSVLogger: + """ + Logs strategy execution data to CSV file. + + Records NAV, positions, delta, and orders at each tick. + File format: logs/{strategy_name}_{market_id}_{timestamp}.csv + """ + + def __init__( + self, + strategy_name: str, + market_id: str, + outcomes: List[str], + log_dir: str = "logs", + ): + """ + Initialize CSV logger. + + Args: + strategy_name: Name of the strategy + market_id: Market ID being traded + outcomes: List of outcome names + log_dir: Directory to store log files + """ + self.strategy_name = strategy_name + self.market_id = market_id + self.outcomes = outcomes + self.log_dir = Path(log_dir) + + # Create logs directory if it doesn't exist + self.log_dir.mkdir(parents=True, exist_ok=True) + + # Generate filename with timestamp + start_time = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{strategy_name}_{market_id[:8]}_{start_time}.csv" + self.filepath = self.log_dir / filename + + # Initialize CSV file with headers + self._write_header() + + def _write_header(self): + """Write CSV header row.""" + headers = [ + "timestamp", + "nav", + "cash", + "positions_value", + "delta", + "num_open_orders", + ] + + # Add dynamic outcome columns + for outcome in self.outcomes: + # Sanitize outcome name for column header + safe_name = outcome.replace(" ", "_").replace(",", "").lower()[:20] + headers.append(f"{safe_name}_qty") + headers.append(f"{safe_name}_value") + + with open(self.filepath, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + def log_snapshot( + self, + nav: NAV, + positions: Dict[str, float], + orders: List[Order], + delta: float, + ): + """ + Log current state snapshot. + + Args: + nav: NAV object with breakdown + positions: Dict mapping outcome to position size + orders: List of open orders + delta: Current delta value + """ + row = [ + datetime.now().isoformat(), + round(nav.nav, 2), + round(nav.cash, 2), + round(nav.positions_value, 2), + round(delta, 2), + len(orders), + ] + + # Add position data for each outcome + for outcome in self.outcomes: + position_size = positions.get(outcome, 0.0) + + # Find position value from NAV breakdown + position_value = 0.0 + for pos_breakdown in nav.positions: + if pos_breakdown.outcome == outcome: + position_value = pos_breakdown.value + break + + row.append(round(position_size, 2)) + row.append(round(position_value, 2)) + + # Write row to CSV + with open(self.filepath, "a", newline="") as f: + writer = csv.writer(f) + writer.writerow(row) + + def get_filepath(self) -> str: + """Get the full path to the CSV file.""" + return str(self.filepath.absolute()) diff --git a/dr_manhattan/web/__init__.py b/dr_manhattan/web/__init__.py new file mode 100644 index 0000000..fe5aea3 --- /dev/null +++ b/dr_manhattan/web/__init__.py @@ -0,0 +1 @@ +"""Web dashboard for strategy monitoring.""" diff --git a/dr_manhattan/web/api.py b/dr_manhattan/web/api.py new file mode 100644 index 0000000..f5ac527 --- /dev/null +++ b/dr_manhattan/web/api.py @@ -0,0 +1,162 @@ +"""FastAPI server for strategy dashboard.""" + +import csv +import re +from datetime import datetime +from pathlib import Path +from typing import Dict, List + +from fastapi import FastAPI, HTTPException +from fastapi.responses import FileResponse + +app = FastAPI(title="Dr. Manhattan Strategy Dashboard") + +LOGS_DIR = Path("logs") + + +@app.get("/") +async def root(): + """Serve the dashboard HTML.""" + dashboard_path = Path(__file__).parent / "dashboard.html" + if not dashboard_path.exists(): + raise HTTPException(status_code=404, detail="Dashboard not found") + return FileResponse(dashboard_path) + + +@app.get("/api/strategies") +async def list_strategies() -> List[Dict[str, str]]: + """ + List all available strategy log files. + + Returns: + List of dicts with id, name, market_id, and start_time + """ + if not LOGS_DIR.exists(): + return [] + + strategies = [] + for csv_file in LOGS_DIR.glob("*.csv"): + # Parse filename: {strategy_name}_{market_id}_{timestamp}.csv + parts = csv_file.stem.split("_") + if len(parts) >= 3: + strategy_name = parts[0] + market_id = parts[1] + timestamp_str = "_".join(parts[2:]) + + # Parse timestamp + try: + start_time = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") + start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + start_time_str = timestamp_str + + strategies.append( + { + "id": csv_file.stem, + "name": strategy_name, + "market_id": market_id, + "start_time": start_time_str, + "filename": csv_file.name, + } + ) + + # Sort by start_time descending (newest first) + strategies.sort(key=lambda x: x["start_time"], reverse=True) + return strategies + + +def _validate_strategy_id(strategy_id: str) -> bool: + """Validate strategy_id to prevent path traversal attacks.""" + return bool(re.match(r"^[a-zA-Z0-9_-]+$", strategy_id)) + + +@app.get("/api/strategy/{strategy_id}/data") +async def get_strategy_data(strategy_id: str) -> Dict: + """ + Get time series data for a strategy. + + Args: + strategy_id: Strategy ID (filename without .csv) + + Returns: + Dict with timestamps and data arrays + """ + if not _validate_strategy_id(strategy_id): + raise HTTPException(status_code=400, detail="Invalid strategy ID") + + csv_file = LOGS_DIR / f"{strategy_id}.csv" + if not csv_file.exists(): + raise HTTPException(status_code=404, detail="Strategy not found") + + # Read CSV data + timestamps = [] + nav_data = [] + cash_data = [] + positions_value_data = [] + delta_data = [] + num_orders_data = [] + positions = {} # outcome -> [values] + + with open(csv_file, "r") as f: + reader = csv.DictReader(f) + headers = reader.fieldnames + + # Find outcome columns + outcome_columns = [col for col in headers if col.endswith("_qty")] + + for row in reader: + timestamps.append(row["timestamp"]) + nav_data.append(float(row["nav"])) + cash_data.append(float(row["cash"])) + positions_value_data.append(float(row["positions_value"])) + delta_data.append(float(row["delta"])) + num_orders_data.append(int(row["num_open_orders"])) + + # Parse position data + for col in outcome_columns: + outcome_name = col.replace("_qty", "") + if outcome_name not in positions: + positions[outcome_name] = [] + positions[outcome_name].append(float(row[col])) + + # Calculate statistics + initial_nav = nav_data[0] if nav_data else 0 + current_nav = nav_data[-1] if nav_data else 0 + pnl = current_nav - initial_nav + pnl_pct = (pnl / initial_nav * 100) if initial_nav > 0 else 0 + max_nav = max(nav_data) if nav_data else 0 + min_nav = min(nav_data) if nav_data else 0 + avg_delta = sum(delta_data) / len(delta_data) if delta_data else 0 + + return { + "timestamps": timestamps, + "nav": nav_data, + "cash": cash_data, + "positions_value": positions_value_data, + "delta": delta_data, + "num_orders": num_orders_data, + "positions": positions, + "stats": { + "initial_nav": round(initial_nav, 2), + "current_nav": round(current_nav, 2), + "pnl": round(pnl, 2), + "pnl_pct": round(pnl_pct, 2), + "max_nav": round(max_nav, 2), + "min_nav": round(min_nav, 2), + "avg_delta": round(avg_delta, 2), + "total_ticks": len(timestamps), + }, + } + + +def main(): + """Run the dashboard server.""" + import uvicorn + + print("Starting Dr. Manhattan Strategy Dashboard...") + print("Open http://localhost:8000 in your browser") + uvicorn.run(app, host="127.0.0.1", port=8000) + + +if __name__ == "__main__": + main() diff --git a/dr_manhattan/web/dashboard.html b/dr_manhattan/web/dashboard.html new file mode 100644 index 0000000..73a4ff0 --- /dev/null +++ b/dr_manhattan/web/dashboard.html @@ -0,0 +1,304 @@ + + +
+ + +Real-time monitoring of strategy execution
+Initial NAV
+ +Current NAV
+ +PnL
+-
+PnL %
+-
+Max NAV
+ +Min NAV
+ +Avg Delta
+-
+Loading data...
+Error loading data. Please try again.
+