224 lines
9.8 KiB
Python
224 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Rank baseball teams from a season_schedule.csv that has columns:
|
|
date_local,time_local,home_slug,home_instance,home_id,home_name,
|
|
away_slug,away_instance,away_id,away_name,home_runs,away_runs,
|
|
winner_slug,winner_instance,winner_id,loser_slug,loser_instance,loser_id,
|
|
location,status,game_id,source_urls
|
|
|
|
Output CSV columns (one row per team):
|
|
Team, GP, W, L, T, WinPct, RS, RA, RunDiff, PythagoreanWinPct,
|
|
MasseyRating, EloRating, StrengthOfSchedule, CompositeRating
|
|
|
|
Defaults:
|
|
- Team identity uses *_name; switch to slugs with --team-id slugs
|
|
- Pythagorean exponent = 1.83
|
|
- Massey caps margins at 8 runs and subtracts estimated home-field runs
|
|
- Elo: start 1500, K=24, home bonus H=30, margin factor ln(|m|+1) capped at 2.0
|
|
- Elo averaged over 20 random shuffles (reduces order dependence)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import argparse
|
|
import math
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="Power ratings from season_schedule.csv")
|
|
p.add_argument("--in", dest="inp", required=True, help="Input CSV (season_schedule.csv)")
|
|
p.add_argument("--out", dest="out", required=True, help="Output ratings CSV")
|
|
p.add_argument("--team-id", choices=["names","slugs"], default="names",
|
|
help="Use team names or slugs as identifiers (default: names)")
|
|
p.add_argument("--final-status", default=None,
|
|
help="Only include games where status == this value (e.g., 'final'). If omitted, any row with scores is included.")
|
|
# Tunables
|
|
p.add_argument("--pyexp", type=float, default=1.83, help="Pythagorean exponent")
|
|
p.add_argument("--massey-cap", type=float, default=8.0, help="Cap for run margins in Massey")
|
|
p.add_argument("--no-massey-home-adj", action="store_true",
|
|
help="Disable subtracting estimated home-field runs in Massey")
|
|
p.add_argument("--elo-k", type=float, default=24.0, help="Elo K-factor")
|
|
p.add_argument("--elo-home", type=float, default=30.0, help="Elo home bonus (points)")
|
|
p.add_argument("--elo-mcap", type=float, default=2.0, help="Cap for margin factor ln(|m|+1)")
|
|
p.add_argument("--elo-shuffles", type=int, default=20, help="Random shuffles to average Elo")
|
|
p.add_argument("--elo-seed", type=int, default=42, help="RNG seed for shuffles")
|
|
return p.parse_args()
|
|
|
|
def load_games(a) -> pd.DataFrame:
|
|
df = pd.read_csv(a.inp)
|
|
# Choose identifiers
|
|
home_id_col = "home_name" if a.team_id == "names" else "home_slug"
|
|
away_id_col = "away_name" if a.team_id == "names" else "away_slug"
|
|
for c in [home_id_col, away_id_col, "home_runs", "away_runs"]:
|
|
if c not in df.columns:
|
|
raise ValueError(f"Missing required column: {c}")
|
|
|
|
# Optional status filter (helps exclude postponed/canceled)
|
|
if a.final_status is not None and "status" in df.columns:
|
|
df = df[df["status"].astype(str).str.lower() == str(a.final_status).lower()]
|
|
|
|
# Keep only games with numeric scores
|
|
df = df.copy()
|
|
df["home_runs"] = pd.to_numeric(df["home_runs"], errors="coerce")
|
|
df["away_runs"] = pd.to_numeric(df["away_runs"], errors="coerce")
|
|
df = df.dropna(subset=[home_id_col, away_id_col, "home_runs", "away_runs"])
|
|
|
|
# Parse datetime (robust to missing either field)
|
|
date = pd.to_datetime(df.get("date_local", pd.NaT), errors="coerce")
|
|
time = pd.to_datetime(df.get("time_local", pd.NaT), errors="coerce").dt.time
|
|
# Combine when possible
|
|
dt = date
|
|
if "time_local" in df.columns:
|
|
# build datetime only where both present
|
|
dt = pd.to_datetime(
|
|
date.dt.strftime("%Y-%m-%d").fillna("") + " " +
|
|
pd.Series(time).astype(str).replace("NaT",""),
|
|
errors="coerce"
|
|
)
|
|
df_out = pd.DataFrame({
|
|
"Date": dt,
|
|
"HomeTeam": df[home_id_col].astype(str),
|
|
"AwayTeam": df[away_id_col].astype(str),
|
|
"HomeRuns": df["home_runs"].astype(int),
|
|
"AwayRuns": df["away_runs"].astype(int),
|
|
})
|
|
df_out["Margin"] = df_out["HomeRuns"] - df_out["AwayRuns"]
|
|
df_out["Result"] = np.where(df_out["HomeRuns"] > df_out["AwayRuns"], "H",
|
|
np.where(df_out["HomeRuns"] < df_out["AwayRuns"], "A", "T"))
|
|
return df_out.reset_index(drop=True)
|
|
|
|
def aggregate_team_stats(df: pd.DataFrame) -> pd.DataFrame:
|
|
teams = pd.Index(sorted(set(df["HomeTeam"]).union(df["AwayTeam"])), name="Team")
|
|
stats = pd.DataFrame(index=teams, columns=["W","L","T","RS","RA"], data=0)
|
|
for _, r in df.iterrows():
|
|
h, a = r["HomeTeam"], r["AwayTeam"]
|
|
hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"])
|
|
stats.at[h,"RS"] += hr; stats.at[h,"RA"] += ar
|
|
stats.at[a,"RS"] += ar; stats.at[a,"RA"] += hr
|
|
if hr > ar:
|
|
stats.at[h,"W"] += 1; stats.at[a,"L"] += 1
|
|
elif hr < ar:
|
|
stats.at[a,"W"] += 1; stats.at[h,"L"] += 1
|
|
else:
|
|
stats.at[h,"T"] += 1; stats.at[a,"T"] += 1
|
|
stats = stats.astype(int)
|
|
stats["GP"] = stats["W"] + stats["L"] + stats["T"]
|
|
stats["WinPct"] = (stats["W"] + 0.5 * stats["T"]) / stats["GP"].replace(0, np.nan)
|
|
stats["RunDiff"] = stats["RS"] - stats["RA"]
|
|
return stats.reset_index()
|
|
|
|
def pythagorean(rs: pd.Series, ra: pd.Series, exp: float) -> pd.Series:
|
|
rs = rs.clip(lower=0); ra = ra.clip(lower=0)
|
|
num = np.power(rs, exp); den = num + np.power(ra, exp)
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
p = np.where(den > 0, num / den, 0.5)
|
|
return pd.Series(p, index=rs.index)
|
|
|
|
def estimate_home_field_runs(df: pd.DataFrame) -> float:
|
|
return float(df["Margin"].mean()) if len(df) else 0.0
|
|
|
|
def massey(df: pd.DataFrame, cap: float, subtract_home: bool) -> tuple[pd.Series, float]:
|
|
teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"]))
|
|
idx = {t:i for i,t in enumerate(teams)}
|
|
y = df["Margin"].astype(float).to_numpy()
|
|
if cap and cap > 0:
|
|
y = np.clip(y, -cap, cap)
|
|
h_est = estimate_home_field_runs(df)
|
|
if subtract_home:
|
|
y = y - h_est
|
|
G, N = len(df), len(teams)
|
|
A = np.zeros((G+1, N), dtype=float)
|
|
for r_i, r in enumerate(df.itertuples(index=False)):
|
|
A[r_i, idx[r.HomeTeam]] = 1.0
|
|
A[r_i, idx[r.AwayTeam]] = -1.0
|
|
A[G, :] = 1.0
|
|
y_ext = np.concatenate([y, [0.0]])
|
|
r_sol, *_ = np.linalg.lstsq(A, y_ext, rcond=None)
|
|
return pd.Series(r_sol, index=teams), (h_est if subtract_home else 0.0)
|
|
|
|
def elo_expected(ra: float, rb: float) -> float:
|
|
return 1.0 / (1.0 + 10.0 ** (-(ra - rb) / 400.0))
|
|
|
|
def elo_once(df: pd.DataFrame, K: float, H: float, mcap: float, init: dict[str,float]) -> dict[str,float]:
|
|
ratings = dict(init)
|
|
for _, r in df.iterrows():
|
|
h, a = r["HomeTeam"], r["AwayTeam"]
|
|
hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"])
|
|
margin = hr - ar
|
|
Eh = elo_expected(ratings[h] + H, ratings[a])
|
|
Sh, Sa = (1.0, 0.0) if hr > ar else ((0.0, 1.0) if hr < ar else (0.5, 0.5))
|
|
M = np.log(abs(margin) + 1.0)
|
|
if mcap is not None:
|
|
M = min(M, mcap)
|
|
ratings[h] += K * M * (Sh - Eh)
|
|
ratings[a] += K * M * ((1.0 - Sh) - (1.0 - Eh))
|
|
return ratings
|
|
|
|
def elo(df: pd.DataFrame, K=24.0, H=30.0, mcap=2.0, shuffles=20, seed=42) -> pd.Series:
|
|
teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"]))
|
|
base = {t: 1500.0 for t in teams}
|
|
# baseline in chronological order (Date may be NaT; sort is stable)
|
|
df0 = df.sort_values(["Date"]).reset_index(drop=True)
|
|
r_first = elo_once(df0, K, H, mcap, base)
|
|
rng = np.random.default_rng(seed)
|
|
vals = {t: [r_first[t]] for t in teams}
|
|
for _ in range(max(0, shuffles-1)):
|
|
idx = np.arange(len(df0)); rng.shuffle(idx)
|
|
r = elo_once(df0.iloc[idx].reset_index(drop=True), K, H, mcap, base)
|
|
for t in teams:
|
|
vals[t].append(r[t])
|
|
return pd.Series({t: float(np.mean(vals[t])) for t in teams}).sort_index()
|
|
|
|
def zscore(s: pd.Series) -> pd.Series:
|
|
mu, sd = s.mean(), s.std(ddof=0)
|
|
return pd.Series(0.0, index=s.index) if (sd == 0 or np.isnan(sd)) else (s - mu) / sd
|
|
|
|
def main():
|
|
a = parse_args()
|
|
games = load_games(a)
|
|
|
|
# Aggregates
|
|
team = aggregate_team_stats(games)
|
|
team["PythagoreanWinPct"] = pythagorean(team["RS"], team["RA"], a.pyexp)
|
|
|
|
# Ratings
|
|
massey_r, h_runs = massey(games, cap=a.massey_cap, subtract_home=(not a.no_massey_home_adj))
|
|
sos = (
|
|
games.assign(OppTeam=np.where(True, games["AwayTeam"], games["AwayTeam"])) # placeholder
|
|
)
|
|
# Strength of schedule: avg opponent Massey rating faced
|
|
opps = {t: [] for t in massey_r.index}
|
|
for _, r in games.iterrows():
|
|
opps[r["HomeTeam"]].append(r["AwayTeam"])
|
|
opps[r["AwayTeam"]].append(r["HomeTeam"])
|
|
sos_series = pd.Series({t: (float(massey_r[opps[t]].mean()) if opps[t] else 0.0) for t in opps})
|
|
|
|
elo_r = elo(games, K=a.elo_k, H=a.elo_home, mcap=a.elo_mcap, shuffles=a.elo_shuffles, seed=a.elo_seed)
|
|
|
|
# Merge
|
|
out = team.set_index("Team")
|
|
out["MasseyRating"] = massey_r
|
|
out["EloRating"] = elo_r
|
|
out["StrengthOfSchedule"] = sos_series
|
|
|
|
# Composite
|
|
Z_r, Z_e, Z_p = zscore(out["MasseyRating"]), zscore(out["EloRating"]), zscore(out["PythagoreanWinPct"])
|
|
out["CompositeRating"] = 0.45*Z_r + 0.35*Z_e + 0.20*Z_p
|
|
|
|
out = out.reset_index()
|
|
out = out[[
|
|
"Team","GP","W","L","T","WinPct","RS","RA","RunDiff",
|
|
"PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating"
|
|
]].sort_values("CompositeRating", ascending=False)
|
|
|
|
# Round for readability
|
|
for c in ["WinPct","PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating"]:
|
|
out[c] = out[c].astype(float).round(5)
|
|
|
|
out.to_csv(a.out, index=False)
|
|
print(f"Done. Estimated home-field (runs) used in Massey: {h_runs:.3f}")
|
|
print(f"Teams ranked: {len(out)} | Games processed: {len(games)}")
|
|
print(f"Output -> {a.out}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |