#!/usr/bin/env python3 """ HV-RSI: Short-Term Mean Reversion System — Portfolio Simulation Hypothesis: Stocks making new 10-day lows for 2+ consecutive days, entered via 3% limit order, mean-revert within one week. Universe: S&P 500 historical constituents (IndexCon SPY membership). See spec.md for full rules. Usage: python prototype.py [--index SPY_SandP_500|IWM_Russell_2000] [--start 2005-01-01] [--end 2026-03-16] """ import argparse import sys from datetime import date, timedelta from pathlib import Path import numpy as np import polars as pl sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "shared")) from quote_loader import load_eod, load_indexcon, members_on_date # ── Parameters ────────────────────────────────────────────────────── PARAMS = dict( ndx_lookback=10, ndx_threshold=0, ndx_consecutive=2, limit_discount=0.03, sma_filter=150, max_hold_days=5, max_positions=20, position_pct=0.10, min_price=2.0, min_avg_vol=100_000, ) INITIAL_CAPITAL = 100_000.0 # ── Indicator computation ─────────────────────────────────────────── def compute_ndx10(close: np.ndarray, high: np.ndarray, low: np.ndarray, n: int = 10) -> np.ndarray: """NDX of Close over prior N days: position within the PRIOR N-day high/low range. Uses high/low from the previous N days (not including today). Returns <0 when close is below prior N-day low (new low), >100 when close is above prior N-day high (new high).""" result = np.full(len(close), np.nan) for i in range(n, len(close)): hh = np.max(high[i - n:i]) # prior N days, excluding today ll = np.min(low[i - n:i]) # prior N days, excluding today if hh == ll: result[i] = 50.0 else: result[i] = 100.0 * (close[i] - ll) / (hh - ll) return result def compute_sma(close: np.ndarray, n: int) -> np.ndarray: """Simple moving average.""" result = np.full(len(close), np.nan) cumsum = np.cumsum(close) result[n - 1:] = (cumsum[n - 1:] - np.concatenate([[0], cumsum[:-n]])) / n return result def compute_avg_volume(volume: np.ndarray, n: int = 25) -> np.ndarray: """N-day average volume.""" result = np.full(len(volume), np.nan) vol_f = volume.astype(float) cumsum = np.cumsum(vol_f) result[n - 1:] = (cumsum[n - 1:] - np.concatenate([[0], cumsum[:-n]])) / n return result # ── Data preparation ──────────────────────────────────────────────── def prepare_symbol_data(df: pl.DataFrame) -> dict: """Compute indicators for a single symbol. Returns dict of numpy arrays. OHLC are split/dividend-adjusted from adj_close before any indicator, price, or P&L is computed (ratio reconstruction: factor = adj_close/close applied to O/H/L/C). Without this a split shows up as a large gap in the raw close and corrupts the entry/exit prices for any trade spanning it. Per the platform Quote Data Manifesto (adj_close only, ratio-based).""" df = df.sort("date") close = df["close"].to_numpy().astype(float) high = df["high"].to_numpy().astype(float) low = df["low"].to_numpy().astype(float) volume = df["volume"].to_numpy() dates = df["date"].to_list() opens = df["open"].to_numpy().astype(float) if "adj_close" in df.columns: adj = df["adj_close"].to_numpy().astype(float) factor = np.where((close > 0) & np.isfinite(adj), adj / close, 1.0) close = close * factor high = high * factor low = low * factor opens = opens * factor ndx10 = compute_ndx10(close, high, low, PARAMS["ndx_lookback"]) sma150 = compute_sma(close, PARAMS["sma_filter"]) avg_vol = compute_avg_volume(volume, 25) return dict( dates=dates, open=opens, high=high, low=low, close=close, volume=volume, ndx10=ndx10, sma150=sma150, avg_vol=avg_vol, ) # ── Simulation ────────────────────────────────────────────────────── class Position: __slots__ = ("symbol", "entry_date", "entry_price", "shares", "cost", "hold_days", "prev_high") def __init__(self, symbol, entry_date, entry_price, shares, cost): self.symbol = symbol self.entry_date = entry_date self.entry_price = entry_price self.shares = shares self.cost = cost self.hold_days = 0 self.prev_high = 0.0 # yesterday's high — set on first daily update def run_simulation( index_name: str, start_date: date, end_date: date, dry_run: bool = False, cost_bps: float = 0.0, realistic_fill: bool = False, ) -> dict: """Run the HV-RSI portfolio simulation. cost_bps : per-side transaction cost (commission + spread + impact) charged on the notional of every entry and exit. 0 = the original frictionless run. realistic_fill : when True, a limit order fills only if the bar gaps through the limit at the open (fill at the open) OR the low trades at least QUEUE_BUF below the limit (fill at the limit) — a mere touch at the limit is assumed to miss the queue. When False, the original optimistic rule (fill at limit whenever low <= limit). """ print(f"Loading IndexCon membership: {index_name}") membership = load_indexcon(index_name) all_dates = sorted(membership["date"].to_list()) # Filter to sim range sim_dates = [d.date() if hasattr(d, 'date') else d for d in all_dates] sim_dates = [d for d in sim_dates if start_date <= d <= end_date] print(f" Simulation period: {sim_dates[0]} to {sim_dates[-1]} ({len(sim_dates)} trading days)") # Collect all symbols that were ever members in the sim period all_members = set() membership_dates = membership["date"].to_list() symbol_cols = [c for c in membership.columns if c != "date"] for d in sim_dates[::20]: # sample every 20 days to build universe dt = d if not isinstance(d, date) else d members = members_on_date(membership, dt) all_members.update(members) print(f" Universe: {len(all_members)} unique symbols across sim period") # Load quote data print("Loading quotes...") quotes = {} loaded = 0 failed = 0 for i, sym in enumerate(sorted(all_members)): df = load_eod(sym) if df is not None and len(df) >= PARAMS["sma_filter"] + 10: data = prepare_symbol_data(df) # Build date->index lookup date_idx = {} for j, d in enumerate(data["dates"]): dd = d.date() if hasattr(d, "date") else d if hasattr(dd, "date"): dd = dd.date() date_idx[dd] = j data["date_idx"] = date_idx quotes[sym] = data loaded += 1 else: failed += 1 if (i + 1) % 100 == 0: print(f" {i+1}/{len(all_members)} checked ({loaded} loaded, {failed} skipped)") print(f" Loaded {loaded} symbols, {failed} unavailable/too short") if dry_run: return {"loaded": loaded, "failed": failed} # Portfolio simulation c = cost_bps / 10000.0 # per-side cost fraction QUEUE_BUF = 0.001 if realistic_fill else 0.0 # price must trade 0.1% through the limit to clear the queue equity = INITIAL_CAPITAL cash = INITIAL_CAPITAL positions: list[Position] = [] trades: list[dict] = [] equity_curve: list[tuple] = [] # Pending limit orders: list of (symbol, limit_price, ndx10_score) pending_orders: list[tuple] = [] print("Running simulation...") for day_i, today in enumerate(sim_dates): # ── 1. Check fills on pending limit orders ── filled_today = [] for sym, limit_price, ndx_score in pending_orders: if sym not in quotes: continue data = quotes[sym] idx = data["date_idx"].get(today) if idx is None: continue day_low = data["low"][idx] day_open = data["open"][idx] # ── fill model ── if realistic_fill: if day_open <= limit_price: fill_price = day_open # gapped through -> marketable at the open elif day_low <= limit_price * (1.0 - QUEUE_BUF): fill_price = limit_price # traded far enough through to clear the queue else: fill_price = None # mere touch -> assume the queue misses else: fill_price = limit_price if day_low <= limit_price else None if fill_price is not None and len(positions) < PARAMS["max_positions"]: alloc = equity * PARAMS["position_pct"] if alloc > cash: alloc = cash if alloc < fill_price * 1.1: # need enough for at least ~1 share continue shares = int(alloc / fill_price) if shares < 1: continue outlay = shares * fill_price * (1.0 + c) # cost basis incl. buy-side cost cash -= outlay pos = Position(sym, today, fill_price, shares, outlay) pos.prev_high = data["high"][idx] positions.append(pos) filled_today.append(sym) pending_orders.clear() # ── 2. Update existing positions and check exits ── to_close = [] for pos in positions: if pos.symbol not in quotes: continue data = quotes[pos.symbol] idx = data["date_idx"].get(today) if idx is None: pos.hold_days += 1 continue today_close = data["close"][idx] today_high = data["high"][idx] pos.hold_days += 1 # Exit condition 1: close > yesterday's high exit_target = pos.hold_days > 1 and today_close > pos.prev_high # Exit condition 2: held for max_hold_days exit_time = pos.hold_days >= PARAMS["max_hold_days"] if exit_target or exit_time: proceeds = pos.shares * today_close * (1.0 - c) # net of sell-side cost cash += proceeds pnl = proceeds - pos.cost trades.append(dict( symbol=pos.symbol, entry_date=pos.entry_date, entry_price=pos.entry_price, exit_date=today, exit_price=today_close, shares=pos.shares, pnl=pnl, pnl_pct=pnl / pos.cost, hold_days=pos.hold_days, exit_reason="target" if exit_target else "time", )) to_close.append(pos) pos.prev_high = today_high for pos in to_close: positions.remove(pos) # ── 3. Scan for new signals ── members_today = members_on_date(membership, today) candidates = [] for sym in members_today: if sym not in quotes: continue # Skip if already holding if any(p.symbol == sym for p in positions): continue data = quotes[sym] idx = data["date_idx"].get(today) if idx is None or idx < 1: continue close_val = data["close"][idx] ndx_val = data["ndx10"][idx] ndx_prev = data["ndx10"][idx - 1] if idx >= 1 else np.nan sma_val = data["sma150"][idx] avg_vol_val = data["avg_vol"][idx] if np.isnan(ndx_val) or np.isnan(ndx_prev) or np.isnan(sma_val) or np.isnan(avg_vol_val): continue # Filters if close_val < PARAMS["min_price"]: continue if avg_vol_val < PARAMS["min_avg_vol"]: continue if close_val <= sma_val: continue # NDX10 signal: below threshold for 2 consecutive days if ndx_val < PARAMS["ndx_threshold"] and ndx_prev < PARAMS["ndx_threshold"]: limit_price = close_val * (1 - PARAMS["limit_discount"]) candidates.append((sym, limit_price, ndx_val)) # Rank by NDX10 score (lower = higher priority), limit to available slots candidates.sort(key=lambda x: x[2]) slots_available = PARAMS["max_positions"] - len(positions) pending_orders = candidates[:slots_available] # ── 4. Mark-to-market equity ── position_value = 0.0 for pos in positions: if pos.symbol in quotes: data = quotes[pos.symbol] idx = data["date_idx"].get(today) if idx is not None: position_value += pos.shares * data["close"][idx] else: position_value += pos.cost # no quote, use cost basis equity = cash + position_value equity_curve.append((today, equity, len(positions), cash)) if (day_i + 1) % 500 == 0: print(f" Day {day_i+1}/{len(sim_dates)}: equity=${equity:,.0f}, " f"positions={len(positions)}, trades={len(trades)}") # Close any remaining positions at last available price for pos in positions: if pos.symbol in quotes: data = quotes[pos.symbol] idx = data["date_idx"].get(sim_dates[-1]) if idx is not None: today_close = data["close"][idx] proceeds = pos.shares * today_close * (1.0 - c) # net of sell-side cost cash += proceeds pnl = proceeds - pos.cost trades.append(dict( symbol=pos.symbol, entry_date=pos.entry_date, entry_price=pos.entry_price, exit_date=sim_dates[-1], exit_price=today_close, shares=pos.shares, pnl=pnl, pnl_pct=pnl / pos.cost, hold_days=pos.hold_days, exit_reason="eod_close", )) return dict( trades=trades, equity_curve=equity_curve, final_equity=equity, initial_capital=INITIAL_CAPITAL, start_date=sim_dates[0], end_date=sim_dates[-1], index_name=index_name, params=PARAMS.copy(), ) # ── Reporting ─────────────────────────────────────────────────────── def compute_metrics(result: dict) -> dict: """Compute SP-002 standard metrics from simulation results.""" trades = result["trades"] eq = result["equity_curve"] n = len(trades) if n == 0: return {"n_trades": 0} pnls = [t["pnl"] for t in trades] pnl_pcts = [t["pnl_pct"] for t in trades] winners = [p for p in pnls if p > 0] losers = [p for p in pnls if p <= 0] win_rate = len(winners) / n avg_return = np.mean(pnl_pcts) gross_profit = sum(winners) if winners else 0 gross_loss = abs(sum(losers)) if losers else 1 profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf") # Equity curve analysis equities = np.array([e[1] for e in eq]) peak = np.maximum.accumulate(equities) drawdown = (equities - peak) / peak max_dd = float(np.min(drawdown)) # CAGR years = (result["end_date"] - result["start_date"]).days / 365.25 cagr = (result["final_equity"] / result["initial_capital"]) ** (1 / years) - 1 if years > 0 else 0 # Exposure exposures = [e[2] for e in eq] # position count per day avg_exposure = np.mean(exposures) / PARAMS["max_positions"] # Hold days hold_days = [t["hold_days"] for t in trades] exit_reasons = {} for t in trades: r = t["exit_reason"] exit_reasons[r] = exit_reasons.get(r, 0) + 1 return dict( n_trades=n, win_rate=win_rate, avg_return=avg_return, avg_win=np.mean([p for p in pnl_pcts if p > 0]) if winners else 0, avg_loss=np.mean([p for p in pnl_pcts if p <= 0]) if losers else 0, profit_factor=profit_factor, max_drawdown=max_dd, cagr=cagr, return_dd=cagr / abs(max_dd) if max_dd != 0 else float("inf"), total_pnl=sum(pnls), avg_hold_days=np.mean(hold_days), avg_exposure_pct=avg_exposure, exit_reasons=exit_reasons, final_equity=result["final_equity"], years=years, ) def print_report(result: dict): """Print SP-002 formatted results.""" m = compute_metrics(result) print("\n" + "=" * 60) print(f"HV-RSI Results — {result['index_name']}") print(f"Period: {result['start_date']} to {result['end_date']} ({m['years']:.1f} years)") print("=" * 60) if m["n_trades"] == 0: print("No trades generated.") return m print(f" N trades: {m['n_trades']}") print(f" Win rate: {m['win_rate']:.1%}") print(f" Avg return: {m['avg_return']:.2%}") print(f" Avg winner: {m['avg_win']:.2%}") print(f" Avg loser: {m['avg_loss']:.2%}") print(f" Profit factor: {m['profit_factor']:.2f}") print(f" Max drawdown: {m['max_drawdown']:.2%}") print(f" CAGR: {m['cagr']:.2%}") print(f" Return/DD: {m['return_dd']:.2f}") print(f" Total PnL: ${m['total_pnl']:,.0f}") print(f" Final equity: ${m['final_equity']:,.0f}") print(f" Avg hold days: {m['avg_hold_days']:.1f}") print(f" Avg exposure: {m['avg_exposure_pct']:.1%}") print(f" Exit reasons: {m['exit_reasons']}") print() return m def save_trades(result: dict, output_dir: Path): """Save trade list and equity curve to parquet.""" output_dir.mkdir(parents=True, exist_ok=True) if result["trades"]: trades_df = pl.DataFrame(result["trades"]) trades_df.write_parquet(output_dir / "trades.parquet") print(f" Saved {len(result['trades'])} trades to {output_dir / 'trades.parquet'}") if result["equity_curve"]: eq_df = pl.DataFrame( result["equity_curve"], schema=["date", "equity", "positions", "cash"], orient="row", ) eq_df.write_parquet(output_dir / "equity_curve.parquet") print(f" Saved equity curve to {output_dir / 'equity_curve.parquet'}") # ── Main ──────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="HV-RSI system POC") parser.add_argument("--index", default="SPY_SandP_500", help="IndexCon membership file name") parser.add_argument("--start", default="2005-01-01", help="Simulation start date (YYYY-MM-DD)") parser.add_argument("--end", default="2026-03-16", help="Simulation end date (YYYY-MM-DD)") parser.add_argument("--dry-run", action="store_true", help="Just load data, don't simulate") args = parser.parse_args() start = date.fromisoformat(args.start) end = date.fromisoformat(args.end) result = run_simulation(args.index, start, end, dry_run=args.dry_run) if args.dry_run: print(f"Dry run complete: {result}") return metrics = print_report(result) output_dir = Path(__file__).parent / "output" / args.index save_trades(result, output_dir) if __name__ == "__main__": main()