#!/usr/bin/env python3 """ HV-RSI Short-Term Mean Reversion — refinement artifacts (R2, R5, R6, R8, R10). Reads the run outputs (trades + equity curve) and produces the standard Edge Lab refinement outputs: R2 — annotated sample chart (price + 150-day SMA, entry/exit markers colored by exit reason) for one recognizable symbol. R5 — trade blotter (recent 25 with running portfolio equity); the full set is the trades artifact. R6 — standard metrics by lookback (trailing 12 / 24 / 36 months + full): win rate, avg win / avg loss in R, expectancy R, Sharpe, Calmar. 1R = the 3 x ATR(14) risk box on the entry day (Stage-2 universal unit). R8 — sizing earns its own out-of-sample pivot. HV-RSI is a concurrent 20-slot book, so the canonical fit is the portfolio-aware Bandy safe-f on the trade stream (per-slot = safe_f / 20). Fit it on the pre-pivot trades, re-fit on the post-pivot trades, compare. R10 — exposure, holding period, exposure-adjusted return per window. Outputs land in ../data and ../charts; a JSON summary is printed. Reproducibility: point SHARED_DIR / SRC_DIR at the run location via env vars (the data loader and SBF dataset are internal and not redistributed). """ import datetime as _dt import json import os import sys from datetime import date from pathlib import Path import numpy as np import polars as pl import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import FuncFormatter HERE = Path(__file__).resolve().parent LAB = HERE.parent DATA = LAB / "data"; CHARTS = LAB / "charts" DATA.mkdir(exist_ok=True); CHARTS.mkdir(exist_ok=True) # Data loader + run outputs are internal to the research platform and not # redistributed; point SHARED_DIR / SRC_DIR at the run location via env vars. SHARED_DIR = Path(os.environ.get("SHARED_DIR", str(HERE.parent.parent.parent / "shared"))) SRC_DIR = Path(os.environ.get("SRC_DIR", str(HERE.parent / "output"))) sys.path.insert(0, str(SHARED_DIR)) from quote_loader import load_eod # noqa: E402 from edgerisknorm.monkey import bandy_safe_f # noqa: E402 TAG = "SPY_SandP_500" SAMPLE_SYM = "KLAC" SIZE_PIVOT = date(2018, 1, 1) # sizing IS/OOS split (matches the robustness windows) ATR_N = 14 RISK_MULT = 3.0 TRADING_DAYS = 252.0 MAX_POS = 20 VIOLET = "#7c3aed" GREY = "#6b7280" RED = "#dc2626" GREEN = "#059669" plt.rcParams.update({ "figure.dpi": 130, "font.size": 11, "axes.grid": True, "grid.color": "#e5e7eb", "grid.linewidth": 0.8, "axes.spines.top": False, "axes.spines.right": False, "axes.edgecolor": "#9ca3af", "font.family": "DejaVu Sans", }) EXIT_COLORS = {"target": GREEN, "time": "#d97706", "eod_close": "#0891b2"} def _adj_arrays(df: pl.DataFrame): df = df.sort("date") close = df["close"].to_numpy().astype(float) high = df["high"].to_numpy().astype(float) low = df["low"].to_numpy().astype(float) dates = [d.date() if hasattr(d, "date") else d for d in df["date"].to_list()] if "adj_close" in df.columns: adj = df["adj_close"].to_numpy().astype(float) f = np.where((close > 0) & np.isfinite(adj), adj / close, 1.0) close, high, low = close * f, high * f, low * f return dates, high, low, close def _atr14(df: pl.DataFrame): dates, high, low, close = _adj_arrays(df) tr = np.full(len(close), np.nan) tr[0] = high[0] - low[0] for i in range(1, len(close)): tr[i] = max(high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1])) atr = np.full(len(close), np.nan) if len(tr) >= ATR_N: atr[ATR_N - 1] = np.mean(tr[:ATR_N]) for i in range(ATR_N, len(close)): atr[i] = (atr[i - 1] * (ATR_N - 1) + tr[i]) / ATR_N return dates, atr def attach_R(trades: pl.DataFrame) -> pl.DataFrame: atr_at = {} for s in trades["symbol"].unique().to_list(): df = load_eod(s) if df is None or len(df) < ATR_N + 2: continue dates, atr = _atr14(df) atr_at[s] = ({d: i for i, d in enumerate(dates)}, atr) rs = [] for row in trades.iter_rows(named=True): s = row["symbol"]; ed = row["entry_date"] ed = ed.date() if hasattr(ed, "date") else ed r = None if s in atr_at: idx, atr = atr_at[s] j = idx.get(ed) if j is not None and not np.isnan(atr[j]) and row["entry_price"] > 0: rf = RISK_MULT * atr[j] / row["entry_price"] if rf > 0: r = row["pnl_pct"] / rf rs.append(r) return trades.with_columns(pl.Series("R", rs)) def sharpe(d): d = d[np.isfinite(d)] return float(d.mean() / d.std() * np.sqrt(TRADING_DAYS)) if len(d) > 1 and d.std() else float("nan") def cagr_maxdd(eq, n_days): if len(eq) < 2 or eq[0] <= 0: return float("nan"), float("nan") yrs = n_days / TRADING_DAYS cg = (eq[-1] / eq[0]) ** (1 / yrs) - 1 if yrs > 0 else float("nan") peak = np.maximum.accumulate(eq) return cg, float(np.min((eq - peak) / peak)) def lookback_metrics(trades, eq): eq = eq.sort("date") edates = [d.date() if hasattr(d, "date") else d for d in eq["date"].to_list()] equity = eq["equity"].to_numpy() last = edates[-1] rows = [] for label, days in [("12m", 365), ("24m", 730), ("36m", 1095), ("full", None)]: cut = edates[0] if days is None else last - _dt.timedelta(days=days) mask = np.array([d >= cut for d in edates]) eseg = equity[mask]; n_days = int(mask.sum()) dret = np.diff(eseg) / eseg[:-1] if len(eseg) > 1 else np.array([]) cg, dd = cagr_maxdd(eseg, n_days) calmar = cg / abs(dd) if dd and not np.isnan(dd) and dd != 0 else float("nan") tt = trades.filter(pl.col("exit_date") >= cut) n = tt.height wr = float((tt["pnl_pct"] > 0).mean()) if n else float("nan") rr = np.clip(tt["R"].drop_nulls().to_numpy(), -10.0, 10.0) # winsorize ±10R rows.append(dict(window=label, n_trades=n, win_rate=round(wr, 4), avg_win_R=round(float(rr[rr > 0].mean()), 3) if (rr > 0).any() else None, avg_loss_R=round(float(rr[rr <= 0].mean()), 3) if (rr <= 0).any() else None, expectancy_R=round(float(rr.mean()), 3) if len(rr) else None, sharpe=round(sharpe(dret), 3), calmar=round(calmar, 3))) return pl.DataFrame(rows) def _bandy(r, years): r = r[np.isfinite(r)] n = len(r) if n < 30: return dict(n=n) tpy = max(1.0, n / years) b = bandy_safe_f(r, n_trials=2000, trades_per_trial=500, trades_per_year=tpy, max_positions=MAX_POS, dd_constraint=-0.20, dd_pctile=5, seed=42) return dict(n=n, safe_f=round(float(b.safe_f), 3), per_slot=round(float(b.safe_f) / MAX_POS, 4), car25=round(float(b.car25), 4), maxdd_p5=round(float(b.max_dd_at_constraint_pctile), 4)) def sizing_oos(trades): t_is = trades.filter(pl.col("exit_date") < SIZE_PIVOT) t_oos = trades.filter(pl.col("exit_date") >= SIZE_PIVOT) return dict( pivot=str(SIZE_PIVOT), sizing_IS=_bandy(t_is["pnl_pct"].to_numpy().astype(float), max(1.0, (SIZE_PIVOT - date(2005, 1, 1)).days / 365.25)), sizing_OOS=_bandy(t_oos["pnl_pct"].to_numpy().astype(float), max(1.0, (date(2026, 5, 15) - SIZE_PIVOT).days / 365.25))) def exposure_block(trades, eq): eq = eq.sort("date") edates = [d.date() if hasattr(d, "date") else d for d in eq["date"].to_list()] equity = eq["equity"].to_numpy(); positions = eq["positions"].to_numpy() holds = trades["hold_days"].to_numpy() def win(lo, hi=None): mask = np.array([(d >= lo and (hi is None or d < hi)) for d in edates]) eseg = equity[mask]; pseg = positions[mask] in_days = int(np.sum(pseg > 0)); n = int(mask.sum()) exposure = in_days / n if n else float("nan") growth = eseg[-1] / eseg[0] if len(eseg) > 1 and eseg[0] > 0 else float("nan") yrs = in_days / TRADING_DAYS exp_adj = growth ** (1 / yrs) - 1 if yrs > 0 else float("nan") cg, _ = cagr_maxdd(eseg, n) return dict(exposure=round(exposure, 4), cagr=round(cg, 4), exp_adj_return=round(exp_adj, 4)) return dict(full=win(edates[0]), IS=win(edates[0], SIZE_PIVOT), OOS=win(SIZE_PIVOT), avg_hold_td=round(float(holds.mean()), 1), median_hold_td=int(np.median(holds))) def blotter(trades, eq): eq = eq.sort("date") eq_at = {(d.date() if hasattr(d, "date") else d): e for d, e in zip(eq["date"].to_list(), eq["equity"].to_numpy())} out = [] for r in trades.sort("exit_date").tail(25).iter_rows(named=True): xd = r["exit_date"]; xd = xd.date() if hasattr(xd, "date") else xd ed = r["entry_date"]; ed = ed.date() if hasattr(ed, "date") else ed out.append(dict(symbol=r["symbol"].split("-")[0], entry_date=str(ed), exit_date=str(xd), hold_td=int(r["hold_days"]), entry_price=round(r["entry_price"], 2), exit_price=round(r["exit_price"], 2), profit_pct=round(r["pnl_pct"] * 100, 1), profit_usd=round(r["pnl"], 0), equity_at_exit=round(eq_at.get(xd, float("nan")), 0), exit_reason=r["exit_reason"])) pl.DataFrame(out).write_csv(DATA / f"{TAG}_blotter_recent.csv") return out def sample_chart(sym): df = load_eod(sym) if df is None: return None dates, high, low, close = _adj_arrays(df) sma = np.full(len(close), np.nan) cs = np.cumsum(close); sma[149:] = (cs[149:] - np.concatenate([[0], cs[:-150]])) / 150 didx = {d: i for i, d in enumerate(dates)} tt = pl.read_parquet(SRC_DIR / TAG / "trades.parquet").filter( pl.col("symbol") == sym).sort("entry_date") eds = [(r["entry_date"].date() if hasattr(r["entry_date"], "date") else r["entry_date"]) for r in tt.iter_rows(named=True)] if not eds: return None center = eds[len(eds) // 2] lo, hi = center - _dt.timedelta(days=420), center + _dt.timedelta(days=420) wmask = np.array([lo <= d <= hi for d in dates]) wd = [d for d, m in zip(dates, wmask) if m] fig, ax = plt.subplots(figsize=(10, 4.6)) ax.plot(wd, close[wmask], color=VIOLET, lw=1.4, label=f"{sym} close (adj)") ax.plot(wd, sma[wmask], color=GREY, lw=1.1, ls="--", label="150-day SMA") seen = set() for r in tt.iter_rows(named=True): ed = r["entry_date"]; xd = r["exit_date"] ed = ed.date() if hasattr(ed, "date") else ed xd = xd.date() if hasattr(xd, "date") else xd if not (lo <= ed <= hi): continue ci = didx.get(ed); cx = didx.get(xd) if ci is not None: ax.scatter([ed], [r["entry_price"]], marker="^", s=80, color=GREEN, zorder=5, edgecolor="white", linewidth=0.6, label="entry (3% limit fill)" if "e" not in seen else None) seen.add("e") if cx is not None and lo <= xd <= hi: col = EXIT_COLORS.get(r["exit_reason"], GREY) lab = f"exit: {r['exit_reason']}" ax.scatter([xd], [r["exit_price"]], marker="v", s=70, color=col, zorder=5, edgecolor="white", linewidth=0.6, label=lab if lab not in seen else None) seen.add(lab) ax.set_title(f"{sym} (S&P 500) — dip-buy entries and one-week exits", fontsize=12) ax.yaxis.set_major_formatter(FuncFormatter(lambda x, _: f"${x:,.0f}")) ax.legend(loc="upper left", fontsize=8, framealpha=0.9, ncol=2) fig.tight_layout() out = CHARTS / f"sample_{sym}.png" fig.savefig(out, bbox_inches="tight"); plt.close(fig) print(f" R2: wrote {out}") return f"sample_{sym}.png" def main(): trades = pl.read_parquet(SRC_DIR / TAG / "trades.parquet") eq = pl.read_parquet(SRC_DIR / TAG / "equity_curve.parquet") trades = attach_R(trades) lb = lookback_metrics(trades, eq); lb.write_csv(DATA / f"{TAG}_lookback_metrics.csv") sz = sizing_oos(trades); (DATA / f"{TAG}_sizing_oos.json").write_text(json.dumps(sz, indent=2)) ex = exposure_block(trades, eq) bl = blotter(trades, eq) chart = sample_chart(SAMPLE_SYM) summary = dict(label="S&P 500", lookback=lb.to_dicts(), sizing=sz, exposure=ex, blotter_n=len(bl), sample_chart=chart) (DATA / "_refinements_summary.json").write_text(json.dumps(summary, indent=2)) print(lb) print("R8 sizing:", json.dumps(sz)) print("R10 exposure:", json.dumps(ex)) print("\n=== JSON SUMMARY ===") print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()