#!/usr/bin/env python3 """ Clenow QQQ — Weekly Momentum Rotation System — Portfolio Simulation Hypothesis: Weekly rotation into strongest Nasdaq-100 components, ranked by smoothed momentum (linear regression slope × R²), sized by volatility (ATR14), with market regime filter (QQQ > 200 SMA). See spec.md for full rules. Usage: python prototype.py [--index QQQ_Nasdaq-100] [--start 2005-01-01] [--end 2026-03-16] """ import argparse import sys from datetime import date, timedelta from pathlib import Path import numpy as np import polars as pl sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "shared")) from quote_loader import load_eod, load_indexcon, members_on_date # Deep-history ETF EOD (eod_tr) for the regime gate — QQQ back to 1999, # SPY back to 1993. The platform's active quotes_eod only carries recent # history for some ETFs (dev QQQ starts 2012), which is too short for the # 200-day SMA regime gate over a 2005-start run. _ETF_EOD_DIR = Path(__file__).resolve().parent.parent.parent / "shared" / "etf_eod" def load_regime_etf(symbol: str) -> pl.DataFrame | None: """Load deep-history EOD for the regime-gate ETF, preferring the eod_tr cache; fall back to the shared quote loader.""" path = _ETF_EOD_DIR / f"symbol={symbol}.parquet" if path.exists(): return pl.read_parquet(path).sort("date") return load_eod(symbol) # ── Parameters ────────────────────────────────────────────────────── PARAMS = dict( regression_window=90, sma_stock_filter=100, sma_market_filter=200, gap_threshold=0.15, gap_lookback=100, top_n=20, vol_sizing_factor=0.0015, atr_period=14, resize_threshold_pct=0.20, resize_threshold_port=0.02, resize_frequency=2, # resize every N weeks rebalance_day=1, # 0=Mon, 1=Tue ) INITIAL_CAPITAL = 100_000.0 # ── Indicator computation ─────────────────────────────────────────── def compute_log_regression(log_close: np.ndarray, n: int) -> tuple[np.ndarray, np.ndarray]: """Compute rolling linear regression slope and R² on log-close. Returns (annualized_slope, r_squared) arrays.""" slope = np.full(len(log_close), np.nan) r2 = np.full(len(log_close), np.nan) x = np.arange(n) x_mean = x.mean() ss_x = np.sum((x - x_mean) ** 2) for i in range(n - 1, len(log_close)): y = log_close[i - n + 1:i + 1] if np.any(np.isnan(y)): continue y_mean = y.mean() ss_xy = np.sum((x - x_mean) * (y - y_mean)) ss_y = np.sum((y - y_mean) ** 2) b = ss_xy / ss_x slope[i] = b if ss_y > 0: r2[i] = (ss_xy ** 2) / (ss_x * ss_y) else: r2[i] = 0.0 # Annualize: exp(slope * 252) - 1 annualized = np.where(~np.isnan(slope), np.exp(slope * 252) - 1, np.nan) return annualized, r2 def compute_sma(close: np.ndarray, n: int) -> np.ndarray: """Simple moving average.""" result = np.full(len(close), np.nan) cumsum = np.cumsum(close) result[n - 1:] = (cumsum[n - 1:] - np.concatenate([[0], cumsum[:-n]])) / n return result def compute_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, n: int = 14) -> np.ndarray: """Average True Range.""" tr = np.full(len(close), np.nan) tr[0] = high[0] - low[0] for i in range(1, len(close)): tr[i] = max(high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1])) atr = np.full(len(close), np.nan) if len(tr) >= n: atr[n - 1] = np.mean(tr[:n]) for i in range(n, len(close)): atr[i] = (atr[i - 1] * (n - 1) + tr[i]) / n return atr def compute_overnight_gaps(open_prices: np.ndarray, close: np.ndarray) -> np.ndarray: """Overnight gap as abs(open/prev_close - 1).""" gaps = np.full(len(close), 0.0) for i in range(1, len(close)): if close[i - 1] > 0: gaps[i] = abs(open_prices[i] / close[i - 1] - 1) return gaps def has_large_gap(gaps: np.ndarray, idx: int, lookback: int, threshold: float) -> bool: """Check if any gap in the lookback period exceeds threshold.""" start = max(0, idx - lookback + 1) return bool(np.any(gaps[start:idx + 1] > threshold)) # ── Data preparation ──────────────────────────────────────────────── def prepare_symbol_data(df: pl.DataFrame) -> dict: """Compute all indicators for a single symbol. OHLC are split/dividend-adjusted from adj_close before any indicator, price, or P&L is computed (ratio reconstruction: factor = adj_close/close applied to O/H/L/C). Without this a stock split shows up as a large overnight gap in the raw close, which the gap filter mis-reads as a real move and exits at a fabricated loss. Per the platform Quote Data Manifesto (adj_close only, ratio-based reconstruction).""" df = df.sort("date") close = df["close"].to_numpy().astype(float) high = df["high"].to_numpy().astype(float) low = df["low"].to_numpy().astype(float) opens = df["open"].to_numpy().astype(float) volume = df["volume"].to_numpy() dates = df["date"].to_list() if "adj_close" in df.columns: adj = df["adj_close"].to_numpy().astype(float) factor = np.where((close > 0) & np.isfinite(adj), adj / close, 1.0) close = close * factor high = high * factor low = low * factor opens = opens * factor log_close = np.log(close) ann_slope, r2 = compute_log_regression(log_close, PARAMS["regression_window"]) ranking = ann_slope * r2 # slope × R² sma100 = compute_sma(close, PARAMS["sma_stock_filter"]) atr14 = compute_atr(high, low, close, PARAMS["atr_period"]) gaps = compute_overnight_gaps(opens, close) # Build date->index lookup date_idx = {} for j, d in enumerate(dates): dd = d.date() if hasattr(d, "date") else d if hasattr(dd, "date"): dd = dd.date() date_idx[dd] = j return dict( dates=dates, open=opens, high=high, low=low, close=close, volume=volume, log_close=log_close, ann_slope=ann_slope, r2=r2, ranking=ranking, sma100=sma100, atr14=atr14, gaps=gaps, date_idx=date_idx, ) # ── Simulation ────────────────────────────────────────────────────── class Position: __slots__ = ("symbol", "entry_date", "entry_price", "shares", "cost", "target_shares") def __init__(self, symbol, entry_date, entry_price, shares, cost): self.symbol = symbol self.entry_date = entry_date self.entry_price = entry_price self.shares = shares self.cost = cost self.target_shares = shares def prepare_run( index_name: str, start_date: date, end_date: date, regime_etf: str = "QQQ", verbose: bool = True, ) -> dict: """Load membership, regime ETF, and all member quotes ONCE. Returns a `prep` dict that simulate_portfolio() consumes. Separating load from sim lets the monkey baseline reuse the same loaded data across K seeds (and guarantees it runs the identical sim code path). """ if verbose: print(f"Loading IndexCon membership: {index_name}") membership = load_indexcon(index_name) # Regime-filter ETF (deep history) if verbose: print(f"Loading regime ETF: {regime_etf}") qqq_df = load_regime_etf(regime_etf) if qqq_df is None: raise RuntimeError(f"{regime_etf} quote data required for regime filter") qqq_data = prepare_symbol_data(qqq_df) qqq_sma200 = compute_sma(qqq_data["close"], PARAMS["sma_market_filter"]) # Build trading calendar from regime-ETF dates all_trading_dates = sorted(qqq_data["date_idx"].keys()) sim_dates = [d for d in all_trading_dates if start_date <= d <= end_date] if verbose: print(f" Simulation: {sim_dates[0]} to {sim_dates[-1]} ({len(sim_dates)} trading days)") # Collect universe all_members = set() for d in sim_dates[::20]: all_members.update(members_on_date(membership, d)) if verbose: print(f" Universe: {len(all_members)} unique symbols") # Load quote data if verbose: print("Loading quotes...") quotes = {} loaded = 0 for i, sym in enumerate(sorted(all_members)): if sym == regime_etf: continue # regime ETF, not a tradable member here df = load_eod(sym) if df is not None and len(df) >= max(PARAMS["regression_window"], PARAMS["sma_stock_filter"]) + 10: quotes[sym] = prepare_symbol_data(df) loaded += 1 if verbose and (i + 1) % 50 == 0: print(f" {i+1}/{len(all_members)} checked ({loaded} loaded)") if verbose: print(f" Loaded {loaded} symbols") # Precompute membership per rebalance day once (fixed across seeds) — a # big speedup for the K-seed monkey, identical result vs the live call. rebalance_dates = [d for d in sim_dates if d.weekday() == PARAMS["rebalance_day"]] members_by_date = {d: members_on_date(membership, d) for d in rebalance_dates} return dict( membership=membership, quotes=quotes, qqq_data=qqq_data, qqq_sma200=qqq_sma200, sim_dates=sim_dates, regime_etf=regime_etf, index_name=index_name, members_by_date=members_by_date, ) def simulate_portfolio( prep: dict, rank_mode: str = "clenow", rng=None, gate: bool = True, weight_mode: str = "atr", verbose: bool = True, ) -> dict: """Run the weekly rotation on pre-loaded data. rank_mode — "clenow" sorts eligible candidates by slope×R² (the system); "random" shuffles the SAME eligible candidate set with `rng` and takes the first top_n. Every other rule (regime gate, >SMA100 / slope>0 / gap filters, ATR inverse-dollar sizing, resize cadence, exit logic) is identical — so the only thing that varies is the selection step. This is the in-harness monkey that isolates the ranking. """ membership = prep["membership"] quotes = prep["quotes"] qqq_data = prep["qqq_data"] qqq_sma200 = prep["qqq_sma200"] sim_dates = prep["sim_dates"] regime_etf = prep["regime_etf"] index_name = prep["index_name"] members_by_date = prep["members_by_date"] # Portfolio simulation equity = INITIAL_CAPITAL cash = INITIAL_CAPITAL positions: dict[str, Position] = {} # symbol -> Position trades: list[dict] = [] equity_curve: list[tuple] = [] week_count = 0 if verbose: print("Running simulation...") for day_i, today in enumerate(sim_dates): # ── Is this a rebalance day? (Tuesday) ── is_rebalance = today.weekday() == PARAMS["rebalance_day"] if is_rebalance: week_count += 1 # QQQ regime check qqq_idx = qqq_data["date_idx"].get(today) if qqq_idx is None: continue qqq_close = qqq_data["close"][qqq_idx] qqq_sma = qqq_sma200[qqq_idx] # gate=False → always invested (un-gated baseline) market_ok = (not gate) or (not np.isnan(qqq_sma) and qqq_close > qqq_sma) # Get current members (precomputed per rebalance day) members_today = members_by_date.get(today) or members_on_date(membership, today) # Rank all candidates candidates = [] for sym in members_today: if sym not in quotes: continue data = quotes[sym] idx = data["date_idx"].get(today) if idx is None: continue close_val = data["close"][idx] ranking_val = data["ranking"][idx] slope_val = data["ann_slope"][idx] sma100_val = data["sma100"][idx] atr_val = data["atr14"][idx] if any(np.isnan(v) for v in [ranking_val, slope_val, sma100_val, atr_val]): continue if atr_val <= 0: continue # Filters if close_val <= sma100_val: continue if slope_val <= 0: continue if has_large_gap(data["gaps"], idx, PARAMS["gap_lookback"], PARAMS["gap_threshold"]): continue candidates.append((sym, ranking_val, atr_val, close_val)) # Selection step — the ONLY difference between the system and the # monkey. Clenow: sort eligible candidates by slope×R². Monkey: # shuffle the same eligible set with the seeded rng. if rank_mode == "random": rng.shuffle(candidates) else: candidates.sort(key=lambda x: x[1], reverse=True) top_ranked = candidates[:PARAMS["top_n"]] top_symbols = set(c[0] for c in top_ranked) # ── Exits ── to_exit = [] for sym, pos in positions.items(): exit_reason = None if sym not in quotes: exit_reason = "no_data" else: data = quotes[sym] idx = data["date_idx"].get(today) if idx is None: continue close_val = data["close"][idx] slope_val = data["ann_slope"][idx] sma100_val = data["sma100"][idx] # Exit conditions if has_large_gap(data["gaps"], idx, 5, PARAMS["gap_threshold"]): exit_reason = "gap" elif not np.isnan(sma100_val) and close_val <= sma100_val: exit_reason = "below_sma100" elif not np.isnan(slope_val) and slope_val <= 0: exit_reason = "negative_slope" elif sym not in top_symbols: exit_reason = "rank_drop" if exit_reason: to_exit.append((sym, exit_reason)) for sym, reason in to_exit: pos = positions[sym] data = quotes.get(sym) if data: idx = data["date_idx"].get(today) if idx is not None: exit_price = data["close"][idx] else: exit_price = pos.entry_price else: exit_price = pos.entry_price proceeds = pos.shares * exit_price cash += proceeds pnl = proceeds - pos.cost trades.append(dict( symbol=sym, entry_date=pos.entry_date, entry_price=pos.entry_price, exit_date=today, exit_price=exit_price, shares=pos.shares, pnl=pnl, pnl_pct=pnl / pos.cost if pos.cost > 0 else 0, exit_reason=reason, )) del positions[sym] # ── Entries & Resizing ── if market_ok: is_resize_week = (week_count % PARAMS["resize_frequency"]) == 0 for sym, rank_val, atr_val, close_val in top_ranked: # Target shares. atr: ATR inverse-dollar (the spec's vol # sizing, equity × 0.0015 / ATR14). equal: equal-dollar # 1/top_n of equity per name — the ablation that removes # vol-sizing concentration while holding selection fixed. if weight_mode == "equal": target_shares = int((equity / PARAMS["top_n"]) / close_val) else: target_shares = int(equity * PARAMS["vol_sizing_factor"] / atr_val) if target_shares < 1: continue if sym in positions: # Resize check (only on resize weeks) if is_resize_week: pos = positions[sym] change_pct = abs(target_shares - pos.shares) / pos.shares if pos.shares > 0 else 1 change_port = abs(target_shares - pos.shares) * close_val / equity if change_pct > PARAMS["resize_threshold_pct"] or change_port > PARAMS["resize_threshold_port"]: # Resize diff = target_shares - pos.shares if diff > 0: cost = diff * close_val if cost <= cash: cash -= cost pos.cost += cost pos.shares = target_shares elif diff < 0: proceeds = abs(diff) * close_val cash += proceeds pos.cost -= abs(diff) * pos.entry_price # approximate pos.shares = target_shares else: # New position cost = target_shares * close_val if cost <= cash and len(positions) < PARAMS["top_n"]: cash -= cost positions[sym] = Position(sym, today, close_val, target_shares, cost) # ── Mark-to-market ── position_value = 0.0 for sym, pos in positions.items(): if sym in quotes: idx = quotes[sym]["date_idx"].get(today) if idx is not None: position_value += pos.shares * quotes[sym]["close"][idx] else: position_value += pos.cost else: position_value += pos.cost equity = cash + position_value equity_curve.append((today, equity, len(positions), cash)) if verbose and (day_i + 1) % 500 == 0: print(f" Day {day_i+1}/{len(sim_dates)}: equity=${equity:,.0f}, " f"positions={len(positions)}, trades={len(trades)}") # Close remaining for sym, pos in positions.items(): if sym in quotes: idx = quotes[sym]["date_idx"].get(sim_dates[-1]) if idx is not None: exit_price = quotes[sym]["close"][idx] proceeds = pos.shares * exit_price cash += proceeds pnl = proceeds - pos.cost trades.append(dict( symbol=sym, entry_date=pos.entry_date, entry_price=pos.entry_price, exit_date=sim_dates[-1], exit_price=exit_price, shares=pos.shares, pnl=pnl, pnl_pct=pnl / pos.cost if pos.cost > 0 else 0, exit_reason="eod_close", )) return dict( trades=trades, equity_curve=equity_curve, final_equity=equity, initial_capital=INITIAL_CAPITAL, start_date=sim_dates[0], end_date=sim_dates[-1], index_name=index_name, regime_etf=regime_etf, params=PARAMS.copy(), ) def run_simulation( index_name: str, start_date: date, end_date: date, regime_etf: str = "QQQ", ) -> dict: """Load + run the Clenow weekly rotation (system ranking). Thin wrapper preserving the original entry point.""" prep = prepare_run(index_name, start_date, end_date, regime_etf) return simulate_portfolio(prep, rank_mode="clenow") # ── Reporting ─────────────────────────────────────────────────────── def compute_metrics(result: dict) -> dict: trades = result["trades"] eq = result["equity_curve"] n = len(trades) if n == 0: return {"n_trades": 0} pnls = [t["pnl"] for t in trades] pnl_pcts = [t["pnl_pct"] for t in trades] winners = [p for p in pnls if p > 0] losers = [p for p in pnls if p <= 0] win_rate = len(winners) / n avg_return = np.mean(pnl_pcts) gross_profit = sum(winners) if winners else 0 gross_loss = abs(sum(losers)) if losers else 1 profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf") equities = np.array([e[1] for e in eq]) peak = np.maximum.accumulate(equities) drawdown = (equities - peak) / peak max_dd = float(np.min(drawdown)) years = (result["end_date"] - result["start_date"]).days / 365.25 cagr = (result["final_equity"] / result["initial_capital"]) ** (1 / years) - 1 if years > 0 else 0 exposures = [e[2] for e in eq] avg_positions = np.mean(exposures) hold_days = [] for t in trades: if isinstance(t["entry_date"], date) and isinstance(t["exit_date"], date): hold_days.append((t["exit_date"] - t["entry_date"]).days) exit_reasons = {} for t in trades: r = t["exit_reason"] exit_reasons[r] = exit_reasons.get(r, 0) + 1 return dict( n_trades=n, win_rate=win_rate, avg_return=avg_return, avg_win=np.mean([p for p in pnl_pcts if p > 0]) if winners else 0, avg_loss=np.mean([p for p in pnl_pcts if p <= 0]) if losers else 0, profit_factor=profit_factor, max_drawdown=max_dd, cagr=cagr, return_dd=cagr / abs(max_dd) if max_dd != 0 else float("inf"), total_pnl=sum(pnls), avg_hold_days=np.mean(hold_days) if hold_days else 0, avg_positions=avg_positions, exit_reasons=exit_reasons, final_equity=result["final_equity"], years=years, ) def print_report(result: dict): m = compute_metrics(result) print("\n" + "=" * 60) print(f"Clenow QQQ Results — {result['index_name']}") print(f"Period: {result['start_date']} to {result['end_date']} ({m['years']:.1f} years)") print("=" * 60) if m["n_trades"] == 0: print("No trades generated.") return m print(f" N trades: {m['n_trades']}") print(f" Win rate: {m['win_rate']:.1%}") print(f" Avg return: {m['avg_return']:.2%}") print(f" Avg winner: {m['avg_win']:.2%}") print(f" Avg loser: {m['avg_loss']:.2%}") print(f" Profit factor: {m['profit_factor']:.2f}") print(f" Max drawdown: {m['max_drawdown']:.2%}") print(f" CAGR: {m['cagr']:.2%}") print(f" Return/DD: {m['return_dd']:.2f}") print(f" Total PnL: ${m['total_pnl']:,.0f}") print(f" Final equity: ${m['final_equity']:,.0f}") print(f" Avg hold (days): {m['avg_hold_days']:.0f}") print(f" Avg positions: {m['avg_positions']:.1f}") print(f" Exit reasons: {m['exit_reasons']}") print() return m def save_trades(result: dict, output_dir: Path): output_dir.mkdir(parents=True, exist_ok=True) if result["trades"]: trades_df = pl.DataFrame(result["trades"]) trades_df.write_parquet(output_dir / "trades.parquet") print(f" Saved {len(result['trades'])} trades to {output_dir / 'trades.parquet'}") if result["equity_curve"]: eq_df = pl.DataFrame( result["equity_curve"], schema=["date", "equity", "positions", "cash"], orient="row", ) eq_df.write_parquet(output_dir / "equity_curve.parquet") print(f" Saved equity curve to {output_dir / 'equity_curve.parquet'}") # ── Main ──────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Clenow QQQ momentum rotation POC") parser.add_argument("--index", default="QQQ_Nasdaq-100", help="IndexCon membership file name") parser.add_argument("--regime-etf", default="QQQ", help="ETF whose 200d SMA gates new entries (QQQ for " "Nasdaq-100, SPY for S&P 500)") parser.add_argument("--start", default="2005-01-01", help="Simulation start date") parser.add_argument("--end", default="2026-05-15", help="Simulation end date") args = parser.parse_args() start = date.fromisoformat(args.start) end = date.fromisoformat(args.end) result = run_simulation(args.index, start, end, regime_etf=args.regime_etf) result["regime_etf"] = args.regime_etf metrics = print_report(result) output_dir = Path(__file__).parent / "output" / args.index save_trades(result, output_dir) if __name__ == "__main__": main()