#!/usr/bin/env python3 """ Clenow Weekly Momentum Rotation — refinement artifacts (R2, R5, R6, R8, R10). Reads the run outputs (trades + equity curve per universe) and produces the standard refinement outputs used by the Edge Lab page: R2 — one annotated sample chart per universe (price + SMA100 + regression slope sign, entry/exit markers colored by exit reason). R5 — trade blotter (recent 25 with running portfolio equity); full set is the existing {tag}_trades.csv. R6 — standard metrics by lookback (trailing 12 / 24 / 36 months + full): win rate, avg win / avg loss in R, expectancy R, Sharpe, Calmar. 1R = the 3 x ATR(14) risk box on the entry day (the Edge Lab Stage-2 universal risk unit). R8 — sizing earns its own out-of-sample pivot. Clenow is a single daily equity stream, so the Bandy safe-f is the leverage multiplier on the daily returns (mc_daily). Fit it on the pre-pivot window, freeze it, apply it forward; report sizing-in-sample vs sizing-out-of-sample. R10 — exposure, holding period, exposure-adjusted return per window. Outputs land in ../data and ../charts. A JSON summary is printed for the page. Reproducibility: point SHARED_DIR / SRC_DIR at the run location via env vars (the data loader and SBF dataset are internal and not redistributed). """ import json import os import sys from datetime import date from pathlib import Path import numpy as np import polars as pl import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import FuncFormatter HERE = Path(__file__).resolve().parent LAB = HERE.parent # edge-lab/Clenow DATA = LAB / "data" CHARTS = LAB / "charts" DATA.mkdir(exist_ok=True) CHARTS.mkdir(exist_ok=True) # Data loader + run outputs are internal to the research platform and not # redistributed; point SHARED_DIR / SRC_DIR at the run location via env vars. SHARED_DIR = Path(os.environ.get("SHARED_DIR", str(HERE.parent.parent.parent / "shared"))) SRC_DIR = Path(os.environ.get("SRC_DIR", str(HERE.parent / "output"))) sys.path.insert(0, str(SHARED_DIR)) from quote_loader import load_eod # noqa: E402 from edgerisknorm.monkey import bandy_safe_f # noqa: E402 OOS_PIVOT = date(2021, 1, 1) # R7 ruleset/universe disclosure pivot ATR_N = 14 RISK_MULT = 3.0 # 1R = 3 x ATR(14) TRADING_DAYS = 252.0 NDX = "#2563eb" SPX = "#d97706" GREY = "#6b7280" RED = "#dc2626" GREEN = "#059669" AMBER = "#d97706" plt.rcParams.update({ "figure.dpi": 130, "font.size": 11, "axes.grid": True, "grid.color": "#e5e7eb", "grid.linewidth": 0.8, "axes.spines.top": False, "axes.spines.right": False, "axes.edgecolor": "#9ca3af", "font.family": "DejaVu Sans", }) UNIVERSES = { "QQQ_Nasdaq-100": ("Nasdaq-100", NDX, "AVGO"), "SPY_SandP_500": ("S&P 500", SPX, "NVDA"), } EXIT_COLORS = { "rank_drop": GREY, "below_sma100": RED, "negative_slope": AMBER, "gap": "#7c3aed", "eod_close": "#0891b2", "no_data": "#9ca3af", } # ── helpers ───────────────────────────────────────────────────────── def _adj(df: pl.DataFrame): """Split/dividend-adjusted OHLC arrays (ratio reconstruction from adj_close).""" df = df.sort("date") close = df["close"].to_numpy().astype(float) high = df["high"].to_numpy().astype(float) low = df["low"].to_numpy().astype(float) dates = [d.date() if hasattr(d, "date") else d for d in df["date"].to_list()] if "adj_close" in df.columns: a = df["adj_close"].to_numpy().astype(float) f = np.where((close > 0) & np.isfinite(a), a / close, 1.0) close, high, low = close * f, high * f, low * f return dates, high, low, close def _atr14(df: pl.DataFrame) -> tuple[list, np.ndarray, np.ndarray]: dates, high, low, close = _adj(df) tr = np.full(len(close), np.nan) tr[0] = high[0] - low[0] for i in range(1, len(close)): tr[i] = max(high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1])) atr = np.full(len(close), np.nan) if len(tr) >= ATR_N: atr[ATR_N - 1] = np.mean(tr[:ATR_N]) for i in range(ATR_N, len(close)): atr[i] = (atr[i - 1] * (ATR_N - 1) + tr[i]) / ATR_N return dates, atr, close def attach_R(trades: pl.DataFrame) -> pl.DataFrame: """Attach per-trade R, with 1R = 3 x ATR(14) on the entry day.""" syms = trades["symbol"].unique().to_list() atr_at = {} for s in syms: df = load_eod(s) if df is None or len(df) < ATR_N + 2: continue dates, atr, close = _atr14(df) idx = {d: i for i, d in enumerate(dates)} atr_at[s] = (idx, atr) rs = [] for row in trades.iter_rows(named=True): s = row["symbol"]; ed = row["entry_date"] ed = ed.date() if hasattr(ed, "date") else ed r = None if s in atr_at: idx, atr = atr_at[s] j = idx.get(ed) if j is not None and not np.isnan(atr[j]) and row["entry_price"] > 0: risk_frac = RISK_MULT * atr[j] / row["entry_price"] if risk_frac > 0: r = row["pnl_pct"] / risk_frac rs.append(r) return trades.with_columns(pl.Series("R", rs)) def sharpe(daily_ret: np.ndarray) -> float: daily_ret = daily_ret[np.isfinite(daily_ret)] if len(daily_ret) < 2 or daily_ret.std() == 0: return float("nan") return float(daily_ret.mean() / daily_ret.std() * np.sqrt(TRADING_DAYS)) def cagr_maxdd(eq: np.ndarray, n_days: int) -> tuple[float, float]: if len(eq) < 2 or eq[0] <= 0: return float("nan"), float("nan") yrs = n_days / TRADING_DAYS cagr = (eq[-1] / eq[0]) ** (1 / yrs) - 1 if yrs > 0 else float("nan") peak = np.maximum.accumulate(eq) dd = float(np.min((eq - peak) / peak)) return cagr, dd # ── R6: metrics by lookback ───────────────────────────────────────── def lookback_metrics(trades: pl.DataFrame, eq: pl.DataFrame) -> pl.DataFrame: eq = eq.sort("date") edates = [d.date() if hasattr(d, "date") else d for d in eq["date"].to_list()] equity = eq["equity"].to_numpy() last = edates[-1] windows = [("12m", 365), ("24m", 730), ("36m", 1095), ("full", None)] rows = [] for label, days in windows: if days is None: cut = edates[0] else: cut = date(last.year, last.month, last.day) - __import__("datetime").timedelta(days=days) # equity slice mask = [d >= cut for d in edates] eseg = equity[np.array(mask)] n_days = int(np.sum(mask)) dret = np.diff(eseg) / eseg[:-1] if len(eseg) > 1 else np.array([]) sh = sharpe(dret) cg, dd = cagr_maxdd(eseg, n_days) calmar = cg / abs(dd) if dd and not np.isnan(dd) and dd != 0 else float("nan") # trade slice (by exit_date) tt = trades.filter(pl.col("exit_date") >= cut) n = tt.height wr = float((tt["pnl_pct"] > 0).mean()) if n else float("nan") rr = np.clip(tt["R"].drop_nulls().to_numpy(), -10.0, 10.0) # winsorize ±10R win_r = float(rr[rr > 0].mean()) if (rr > 0).any() else float("nan") loss_r = float(rr[rr <= 0].mean()) if (rr <= 0).any() else float("nan") exp_r = float(rr.mean()) if len(rr) else float("nan") rows.append(dict(window=label, n_trades=n, win_rate=round(wr, 4), avg_win_R=round(win_r, 3), avg_loss_R=round(loss_r, 3), expectancy_R=round(exp_r, 3), sharpe=round(sh, 3), calmar=round(calmar, 3))) return pl.DataFrame(rows) # ── R8: sizing OOS pivot (portfolio-aware Bandy safe-f) ───────────── # Clenow is a top-20 concurrent book held for weeks, so the canonical fit is # the portfolio-aware Bandy safe-f on the trade stream (per-slot = safe_f / 20), # not an i.i.d. daily-stream leverage. We fit the fraction on the pre-pivot # trades, then re-fit on the post-pivot trades, and compare: a fraction that # is at least as large OOS as IS means the in-sample sizing stays within the # drawdown target on unseen data. def _bandy(r: np.ndarray, years: float) -> dict: r = r[np.isfinite(r)] n = len(r) if n < 30: return dict(n=n, safe_f=float("nan"), per_slot=float("nan"), car25=float("nan"), maxdd_p5=float("nan")) tpy = max(1.0, n / years) b = bandy_safe_f(r, n_trials=2000, trades_per_trial=500, trades_per_year=tpy, max_positions=20, dd_constraint=-0.20, dd_pctile=5, seed=42) return dict(n=n, safe_f=round(float(b.safe_f), 3), per_slot=round(float(b.safe_f) / 20, 4), car25=round(float(b.car25), 4), maxdd_p5=round(float(b.max_dd_at_constraint_pctile), 4)) def sizing_oos(trades: pl.DataFrame) -> dict: t_is = trades.filter(pl.col("exit_date") < OOS_PIVOT) t_oos = trades.filter(pl.col("exit_date") >= OOS_PIVOT) yrs_is = max(1.0, (OOS_PIVOT - date(2005, 1, 1)).days / 365.25) yrs_oos = max(1.0, (date(2026, 5, 15) - OOS_PIVOT).days / 365.25) return dict( sizing_IS=_bandy(t_is["pnl_pct"].to_numpy().astype(float), yrs_is), sizing_OOS=_bandy(t_oos["pnl_pct"].to_numpy().astype(float), yrs_oos), ) # ── R10: exposure / holding / exposure-adjusted return ────────────── def exposure_block(trades: pl.DataFrame, eq: pl.DataFrame) -> dict: eq = eq.sort("date") edates = [d.date() if hasattr(d, "date") else d for d in eq["date"].to_list()] equity = eq["equity"].to_numpy() positions = eq["positions"].to_numpy() # trading-day holding period via the equity date index didx = {d: i for i, d in enumerate(edates)} def hold_td(row): a = row["entry_date"]; b = row["exit_date"] a = a.date() if hasattr(a, "date") else a b = b.date() if hasattr(b, "date") else b ia, ib = didx.get(a), didx.get(b) return (ib - ia) if (ia is not None and ib is not None) else None holds = [h for h in (hold_td(r) for r in trades.iter_rows(named=True)) if h is not None] holds = np.array(holds) def win(lo): mask = np.array([d >= lo for d in edates]) eseg = equity[mask]; pseg = positions[mask] in_mkt_days = int(np.sum(pseg > 0)) n_days = int(np.sum(mask)) exposure = in_mkt_days / n_days if n_days else float("nan") growth = eseg[-1] / eseg[0] if len(eseg) > 1 and eseg[0] > 0 else float("nan") in_mkt_yrs = in_mkt_days / TRADING_DAYS exp_adj = growth ** (1 / in_mkt_yrs) - 1 if in_mkt_yrs > 0 else float("nan") cg, _ = cagr_maxdd(eseg, n_days) return dict(exposure=round(exposure, 4), cagr=round(cg, 4), exp_adj_return=round(exp_adj, 4)) return dict( full=win(edates[0]), oos=win(OOS_PIVOT), avg_hold_td=round(float(holds.mean()), 1) if len(holds) else None, median_hold_td=int(np.median(holds)) if len(holds) else None, ) # ── R5: blotter (recent 25 with running equity) ───────────────────── def blotter(trades: pl.DataFrame, eq: pl.DataFrame, tag: str): eq = eq.sort("date") edates = [d.date() if hasattr(d, "date") else d for d in eq["date"].to_list()] equity = eq["equity"].to_numpy() eq_at = {} for d, e in zip(edates, equity): eq_at[d] = e recent = trades.sort("exit_date").tail(25) out = [] for r in recent.iter_rows(named=True): xd = r["exit_date"]; xd = xd.date() if hasattr(xd, "date") else xd ed = r["entry_date"]; ed = ed.date() if hasattr(ed, "date") else ed out.append(dict( symbol=r["symbol"].split("-")[0], entry_date=str(ed), exit_date=str(xd), hold_td=None, entry_price=round(r["entry_price"], 2), exit_price=round(r["exit_price"], 2), profit_pct=round(r["pnl_pct"] * 100, 1), profit_usd=round(r["pnl"], 0), equity_at_exit=round(eq_at.get(xd, float("nan")), 0), exit_reason=r["exit_reason"])) bl = pl.DataFrame(out) bl.write_csv(DATA / f"{tag}_blotter_recent.csv") return out # ── R2: annotated sample chart ────────────────────────────────────── def sample_chart(tag: str, sym: str, color: str, label: str): df = load_eod(sym) if df is None: print(f" R2: no data for {sym}"); return None dates, _high, _low, close = _adj(df) # SMA100 sma = np.full(len(close), np.nan) cs = np.cumsum(close) sma[99:] = (cs[99:] - np.concatenate([[0], cs[:-100]])) / 100 didx = {d: i for i, d in enumerate(dates)} trades = pl.read_parquet(SRC_DIR / tag / "trades.parquet") tt = trades.filter(pl.col("symbol") == sym).sort("entry_date") # choose a ~3.5yr window centered on the densest cluster of trades eds = [r["entry_date"] for r in tt.iter_rows(named=True)] eds = [d.date() if hasattr(d, "date") else d for d in eds] if not eds: print(f" R2: no trades for {sym}"); return None center = eds[len(eds) // 2] import datetime as _dt lo = center - _dt.timedelta(days=720) hi = center + _dt.timedelta(days=720) wmask = np.array([(d >= lo and d <= hi) for d in dates]) wd = [d for d, m in zip(dates, wmask) if m] wc = close[wmask]; wsma = sma[wmask] fig, ax = plt.subplots(figsize=(10, 4.6)) ax.plot(wd, wc, color=color, lw=1.4, label=f"{sym} close") ax.plot(wd, wsma, color=GREY, lw=1.1, ls="--", label="100-day SMA") seen = set() for r in tt.iter_rows(named=True): ed = r["entry_date"]; xd = r["exit_date"] ed = ed.date() if hasattr(ed, "date") else ed xd = xd.date() if hasattr(xd, "date") else xd if not (lo <= ed <= hi): continue ci = didx.get(ed); cx = didx.get(xd) if ci is None: continue ax.scatter([ed], [close[ci]], marker="^", s=80, color=GREEN, zorder=5, edgecolor="white", linewidth=0.6, label="entry" if "entry" not in seen else None) seen.add("entry") if cx is not None and lo <= xd <= hi: col = EXIT_COLORS.get(r["exit_reason"], GREY) lab = f"exit: {r['exit_reason']}" ax.scatter([xd], [close[cx]], marker="v", s=70, color=col, zorder=5, edgecolor="white", linewidth=0.6, label=lab if lab not in seen else None) seen.add(lab) ax.plot([ed, xd], [close[ci], close[cx]], color=col, lw=0.8, alpha=0.5, zorder=4) ax.set_title(f"{sym} ({label}) — momentum rotation entries and exits", fontsize=12) ax.yaxis.set_major_formatter(FuncFormatter(lambda x, _: f"${x:,.0f}")) ax.legend(loc="upper left", fontsize=8, framealpha=0.9, ncol=2) fig.tight_layout() out = CHARTS / f"sample_{sym}.png" fig.savefig(out, bbox_inches="tight") plt.close(fig) print(f" R2: wrote {out}") return f"sample_{sym}.png" # ── main ──────────────────────────────────────────────────────────── def main(): summary = {} for tag, (label, color, sym) in UNIVERSES.items(): trades = pl.read_parquet(SRC_DIR / tag / "trades.parquet") eq = pl.read_parquet(SRC_DIR / tag / "equity_curve.parquet") trades = attach_R(trades) lb = lookback_metrics(trades, eq) lb.write_csv(DATA / f"{tag}_lookback_metrics.csv") sz = sizing_oos(trades) (DATA / f"{tag}_sizing_oos.json").write_text(json.dumps(sz, indent=2)) ex = exposure_block(trades, eq) bl = blotter(trades, eq, tag) chart = sample_chart(tag, sym, color, label) summary[tag] = dict(label=label, lookback=lb.to_dicts(), sizing=sz, exposure=ex, blotter_n=len(bl), sample_chart=chart, sample_sym=sym) print(f"\n### {label} ({tag}) ###") print(lb) print("R8 sizing:", json.dumps(sz, indent=0)) print("R10 exposure:", json.dumps(ex)) (DATA / "_refinements_summary.json").write_text(json.dumps(summary, indent=2)) print("\n=== JSON SUMMARY (for the page) ===") print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()