#!/usr/bin/env python3 """ Rank baseball teams from a season_schedule.csv that has columns: date_local,time_local,home_slug,home_instance,home_id,home_name, away_slug,away_instance,away_id,away_name,home_runs,away_runs, winner_slug,winner_instance,winner_id,loser_slug,loser_instance,loser_id, location,status,game_id,source_urls Output CSV columns (one row per team): Team, GP, W, L, T, WinPct, RS, RA, RunDiff, PythagoreanWinPct, MasseyRating, EloRating, StrengthOfSchedule, CompositeRating Defaults: - Team identity uses *_name; switch to slugs with --team-id slugs - Pythagorean exponent = 1.83 - Massey caps margins at 8 runs and subtracts estimated home-field runs - Elo: start 1500, K=24, home bonus H=30, margin factor ln(|m|+1) capped at 2.0 - Elo averaged over 20 random shuffles (reduces order dependence) """ from __future__ import annotations import math import numpy as np import pandas as pd import typer def load_games( inp: str, team_id: str = "names", final_status: str | None = None, ) -> pd.DataFrame: df = pd.read_csv(inp) # Choose identifiers home_id_col = "home_name" if team_id == "names" else "home_slug" away_id_col = "away_name" if team_id == "names" else "away_slug" for c in [home_id_col, away_id_col, "home_runs", "away_runs"]: if c not in df.columns: raise ValueError(f"Missing required column: {c}") # Optional status filter (helps exclude postponed/canceled) if final_status is not None and "status" in df.columns: df = df[df["status"].astype(str).str.lower() == str(final_status).lower()] # Keep only games with numeric scores df = df.copy() df["home_runs"] = pd.to_numeric(df["home_runs"], errors="coerce") df["away_runs"] = pd.to_numeric(df["away_runs"], errors="coerce") df = df.dropna(subset=[home_id_col, away_id_col, "home_runs", "away_runs"]) # Parse datetime (robust to missing either field) date = pd.to_datetime(df.get("date_local", pd.NaT), errors="coerce") time = pd.to_datetime(df.get("time_local", pd.NaT), errors="coerce").dt.time # Combine when possible dt = date if "time_local" in df.columns: # build datetime only where both present dt = pd.to_datetime( date.dt.strftime("%Y-%m-%d").fillna("") + " " + pd.Series(time).astype(str).replace("NaT",""), errors="coerce" ) df_out = pd.DataFrame({ "Date": dt, "HomeTeam": df[home_id_col].astype(str), "AwayTeam": df[away_id_col].astype(str), "HomeRuns": df["home_runs"].astype(int), "AwayRuns": df["away_runs"].astype(int), }) df_out["Margin"] = df_out["HomeRuns"] - df_out["AwayRuns"] df_out["Result"] = np.where(df_out["HomeRuns"] > df_out["AwayRuns"], "H", np.where(df_out["HomeRuns"] < df_out["AwayRuns"], "A", "T")) return df_out.reset_index(drop=True) def aggregate_team_stats(df: pd.DataFrame) -> pd.DataFrame: teams = pd.Index(sorted(set(df["HomeTeam"]).union(df["AwayTeam"])), name="Team") stats = pd.DataFrame(index=teams, columns=["W","L","T","RS","RA"], data=0) for _, r in df.iterrows(): h, a = r["HomeTeam"], r["AwayTeam"] hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"]) stats.at[h,"RS"] += hr; stats.at[h,"RA"] += ar stats.at[a,"RS"] += ar; stats.at[a,"RA"] += hr if hr > ar: stats.at[h,"W"] += 1; stats.at[a,"L"] += 1 elif hr < ar: stats.at[a,"W"] += 1; stats.at[h,"L"] += 1 else: stats.at[h,"T"] += 1; stats.at[a,"T"] += 1 stats = stats.astype(int) stats["GP"] = stats["W"] + stats["L"] + stats["T"] stats["WinPct"] = (stats["W"] + 0.5 * stats["T"]) / stats["GP"].replace(0, np.nan) stats["RunDiff"] = stats["RS"] - stats["RA"] return stats.reset_index() def pythagorean(rs: pd.Series, ra: pd.Series, exp: float) -> pd.Series: rs = rs.clip(lower=0); ra = ra.clip(lower=0) num = np.power(rs, exp); den = num + np.power(ra, exp) with np.errstate(divide="ignore", invalid="ignore"): p = np.where(den > 0, num / den, 0.5) return pd.Series(p, index=rs.index) def estimate_home_field_runs(df: pd.DataFrame) -> float: return float(df["Margin"].mean()) if len(df) else 0.0 def massey(df: pd.DataFrame, cap: float, subtract_home: bool) -> tuple[pd.Series, float]: teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"])) idx = {t:i for i,t in enumerate(teams)} y = df["Margin"].astype(float).to_numpy() if cap and cap > 0: y = np.clip(y, -cap, cap) h_est = estimate_home_field_runs(df) if subtract_home: y = y - h_est G, N = len(df), len(teams) A = np.zeros((G+1, N), dtype=float) for r_i, r in enumerate(df.itertuples(index=False)): A[r_i, idx[r.HomeTeam]] = 1.0 A[r_i, idx[r.AwayTeam]] = -1.0 A[G, :] = 1.0 y_ext = np.concatenate([y, [0.0]]) r_sol, *_ = np.linalg.lstsq(A, y_ext, rcond=None) return pd.Series(r_sol, index=teams), (h_est if subtract_home else 0.0) def elo_expected(ra: float, rb: float) -> float: return 1.0 / (1.0 + 10.0 ** (-(ra - rb) / 400.0)) def elo_once(df: pd.DataFrame, K: float, H: float, mcap: float, init: dict[str,float]) -> dict[str,float]: ratings = dict(init) for _, r in df.iterrows(): h, a = r["HomeTeam"], r["AwayTeam"] hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"]) margin = hr - ar Eh = elo_expected(ratings[h] + H, ratings[a]) Sh, Sa = (1.0, 0.0) if hr > ar else ((0.0, 1.0) if hr < ar else (0.5, 0.5)) M = np.log(abs(margin) + 1.0) if mcap is not None: M = min(M, mcap) ratings[h] += K * M * (Sh - Eh) ratings[a] += K * M * ((1.0 - Sh) - (1.0 - Eh)) return ratings def elo(df: pd.DataFrame, K=24.0, H=30.0, mcap=2.0, shuffles=20, seed=42) -> pd.Series: teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"])) base = {t: 1500.0 for t in teams} # baseline in chronological order (Date may be NaT; sort is stable) df0 = df.sort_values(["Date"]).reset_index(drop=True) r_first = elo_once(df0, K, H, mcap, base) rng = np.random.default_rng(seed) vals = {t: [r_first[t]] for t in teams} for _ in range(max(0, shuffles-1)): idx = np.arange(len(df0)); rng.shuffle(idx) r = elo_once(df0.iloc[idx].reset_index(drop=True), K, H, mcap, base) for t in teams: vals[t].append(r[t]) return pd.Series({t: float(np.mean(vals[t])) for t in teams}).sort_index() def zscore(s: pd.Series) -> pd.Series: mu, sd = s.mean(), s.std(ddof=0) return pd.Series(0.0, index=s.index) if (sd == 0 or np.isnan(sd)) else (s - mu) / sd def main( inp: str = typer.Option(..., help="Input CSV (season_schedule.csv)"), out: str = typer.Option(..., help="Output ratings CSV"), team_id: str = typer.Option( "names", help="Use team names or slugs as identifiers (default: names)", show_default=True, case_sensitive=False, prompt=False, ), final_status: str | None = typer.Option(None, help="Only include games where status == this value (e.g., 'final'). If omitted, any row with scores is included."), pyexp: float = typer.Option(1.83, help="Pythagorean exponent"), massey_cap: float = typer.Option(8.0, help="Cap for run margins in Massey"), no_massey_home_adj: bool = typer.Option(False, help="Disable subtracting estimated home-field runs in Massey"), elo_k: float = typer.Option(24.0, help="Elo K-factor"), elo_home: float = typer.Option(30.0, help="Elo home bonus (points)"), elo_mcap: float = typer.Option(2.0, help="Cap for margin factor ln(|m|+1)"), elo_shuffles: int = typer.Option(20, help="Random shuffles to average Elo"), elo_seed: int = typer.Option(42, help="RNG seed for shuffles") ): team_id = team_id.lower() # Load games games = load_games(inp, team_id=team_id, final_status=final_status) # Aggregates team = aggregate_team_stats(games) team["PythagoreanWinPct"] = pythagorean(team["RS"], team["RA"], pyexp) # Ratings massey_r, h_runs = massey(games, cap=massey_cap, subtract_home=not no_massey_home_adj) # Strength of schedule opps = {t: [] for t in massey_r.index} for _, r in games.iterrows(): opps[r["HomeTeam"]].append(r["AwayTeam"]) opps[r["AwayTeam"]].append(r["HomeTeam"]) sos_series = pd.Series({t: (float(massey_r[opps[t]].mean()) if opps[t] else 0.0) for t in opps}) elo_r = elo(games, K=elo_k, H=elo_home, mcap=elo_mcap, shuffles=elo_shuffles, seed=elo_seed) # Merge out_df = team.set_index("Team") out_df["MasseyRating"] = massey_r out_df["EloRating"] = elo_r out_df["StrengthOfSchedule"] = sos_series # Composite Z_r, Z_e, Z_p = zscore(out_df["MasseyRating"]), zscore(out_df["EloRating"]), zscore(out_df["PythagoreanWinPct"]) out_df["CompositeRating"] = 0.45*Z_r + 0.35*Z_e + 0.20*Z_p out_df = out_df.reset_index() out_df = out_df[[ "Team","GP","W","L","T","WinPct","RS","RA","RunDiff", "PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating" ]].sort_values("CompositeRating", ascending=False) # Round for readability for c in ["WinPct","PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating"]: out_df[c] = out_df[c].astype(float).round(5) out_df.to_csv(out, index=False) print(f"Done. Estimated home-field (runs) used in Massey: {h_runs:.3f}") print(f"Teams ranked: {len(out_df)} | Games processed: {len(games)}") print(f"Output -> {out}") if __name__ == "__main__": typer.run(main)