Compare commits
2 Commits
ef11cdbac3
...
8cea48457f
| Author | SHA1 | Date | |
|---|---|---|---|
|
8cea48457f
|
|||
|
03f87c205b
|
@@ -41,24 +41,31 @@ logging.basicConfig(
|
||||
|
||||
# ----------------- constants -----------------
|
||||
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) SE-Schedule/1.3 Safari/537.36"
|
||||
HEADERS = {"User-Agent": UA}
|
||||
PRINT_BASE = "https://www.csyba.com/schedule/print/team_instance/{iid}"
|
||||
GAME_BASE = "https://www.csyba.com/game/show/{gid}"
|
||||
HEADERS = {"User-Agent": UA} # HTTP headers with custom User-Agent for requests
|
||||
PRINT_BASE = "https://www.csyba.com/schedule/print/team_instance/{iid}" # base URL for team-instance printable schedule
|
||||
GAME_BASE = "https://www.csyba.com/game/show/{gid}" # base URL for game detail page
|
||||
|
||||
# Regular expressions for parsing scores, game links, and time strings
|
||||
SCORE_RE = re.compile(r"\b(\d+)\s*[–-]\s*(\d+)\b")
|
||||
GAME_LINK_RE = re.compile(r"/game/show/(\d+)")
|
||||
TIME_RE = re.compile(r"\b(\d{1,2}:\d{2})\s*([ap]\.?m\.?|AM|PM)?\b", re.I)
|
||||
|
||||
# ----------------- helpers -----------------
|
||||
def clean(x: str) -> str:
|
||||
"""Normalize whitespace and strip input string."""
|
||||
return re.sub(r"\s+", " ", (x or "")).strip()
|
||||
|
||||
def slugify(s: str) -> str:
|
||||
"""Convert string to lowercase slug with words separated by hyphens."""
|
||||
s = s.lower()
|
||||
s = re.sub(r"[^a-z0-9]+", "-", s).strip("-")
|
||||
return s
|
||||
|
||||
def norm_name(s: str) -> str:
|
||||
"""
|
||||
Normalize team names by lowercasing, removing common words like 'the', 'club',
|
||||
and stripping punctuation, to help with loose matching.
|
||||
"""
|
||||
s = s.lower()
|
||||
s = re.sub(r"[^a-z0-9 ]+", " ", s)
|
||||
s = re.sub(r"\b(the|club|team|ll|little league|baseball|softball|youth|athletic|athletics|rec|rec\.)\b", " ", s)
|
||||
@@ -67,6 +74,7 @@ def norm_name(s: str) -> str:
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TeamRec:
|
||||
"""Data class representing a team record with identifying information."""
|
||||
name: str
|
||||
slug: str
|
||||
team_id: str
|
||||
@@ -74,7 +82,10 @@ class TeamRec:
|
||||
subseason_id: str
|
||||
|
||||
def load_teams(teams_path: str):
|
||||
"""Load mapping tables from teams.json you provided."""
|
||||
"""
|
||||
Load team mapping data from JSON file.
|
||||
Returns dictionaries keyed by instance_id, slug, and normalized names for lookups.
|
||||
"""
|
||||
with open(teams_path, "r", encoding="utf-8") as f:
|
||||
arr = json.load(f)
|
||||
by_instance: Dict[str, TeamRec] = {}
|
||||
@@ -94,7 +105,11 @@ def load_teams(teams_path: str):
|
||||
return by_instance, by_slug, by_norm
|
||||
|
||||
def best_match_team(opponent_text: str, by_slug, by_norm) -> Optional[TeamRec]:
|
||||
"""Match opponent using slug first, then normalized name, then loose containment."""
|
||||
"""
|
||||
Attempt to match the opponent team name to a known team record.
|
||||
Tries slug first, then normalized name exact match,
|
||||
then loose containment matching on normalized names.
|
||||
"""
|
||||
s = slugify(opponent_text)
|
||||
if s in by_slug:
|
||||
return by_slug[s]
|
||||
@@ -108,8 +123,8 @@ def best_match_team(opponent_text: str, by_slug, by_norm) -> Optional[TeamRec]:
|
||||
|
||||
def runs_from_team_pov(result_flag: str, s_a: str, s_b: str):
|
||||
"""
|
||||
Team-instance pages are TEAM-FIRST. s_a is THIS team's runs, s_b is opponent runs.
|
||||
We don't reorder; we only validate with W/L/T if needed.
|
||||
Parse runs scored by team and opponent, assuming team-first order.
|
||||
Validate results with result_flag (W/L/T).
|
||||
"""
|
||||
if not (s_a.isdigit() and s_b.isdigit()):
|
||||
return None, None
|
||||
@@ -122,6 +137,10 @@ def runs_from_team_pov(result_flag: str, s_a: str, s_b: str):
|
||||
|
||||
# ----------------- HTTP utils -----------------
|
||||
def get_soup(url: str, session: Optional[requests.Session] = None, timeout: int = 30) -> Optional[BeautifulSoup]:
|
||||
"""
|
||||
Fetch a URL and return a BeautifulSoup parsed document.
|
||||
Uses a shared requests.Session if provided.
|
||||
"""
|
||||
try:
|
||||
sess = session or requests.Session()
|
||||
r = sess.get(url, headers=HEADERS, timeout=timeout)
|
||||
@@ -133,7 +152,10 @@ def get_soup(url: str, session: Optional[requests.Session] = None, timeout: int
|
||||
|
||||
# ----------------- scraping -----------------
|
||||
def parse_printable(instance_id: str, subseason_id: str, session: requests.Session) -> List[dict]:
|
||||
"""Parse one team-instance printable schedule page into perspective rows."""
|
||||
"""
|
||||
Download and parse the team-instance printable schedule page,
|
||||
extracting a list of game dictionaries from the perspective of that team.
|
||||
"""
|
||||
url = PRINT_BASE.format(iid=instance_id) + "?" + urlencode({
|
||||
"schedule_type": "index",
|
||||
"subseason": subseason_id,
|
||||
@@ -148,25 +170,27 @@ def parse_printable(instance_id: str, subseason_id: str, session: requests.Sessi
|
||||
return []
|
||||
|
||||
games = []
|
||||
# Skip header row; iterate over game rows
|
||||
for row_idx, tr in enumerate(table.select("tr")[1:], start=1):
|
||||
tds = tr.select("td")
|
||||
if len(tds) < 5:
|
||||
continue
|
||||
|
||||
# Cells: Date | Result | Opponent | Location | Status
|
||||
# Extract text from each relevant cell:
|
||||
# Date | Result | Opponent | Location | Status
|
||||
date_txt = clean(tds[0].get_text(" "))
|
||||
result_txt = clean(tds[1].get_text(" "))
|
||||
opp_txt = clean(tds[2].get_text(" "))
|
||||
loc_txt = clean(tds[3].get_text(" "))
|
||||
status_txt = clean(tds[4].get_text(" "))
|
||||
|
||||
# Date → ISO
|
||||
# Parse date into ISO format (YYYY-MM-DD) if possible
|
||||
try:
|
||||
date_iso = dtp.parse(date_txt, fuzzy=True).date().isoformat()
|
||||
except Exception:
|
||||
date_iso = date_txt
|
||||
date_iso = date_txt # leave raw if parsing fails
|
||||
|
||||
# Pull a game_id if present (from any link in the row)
|
||||
# Find game ID from any game/show links in the row, if present
|
||||
game_id = ""
|
||||
for a in tr.select("a[href]"):
|
||||
m = GAME_LINK_RE.search(a.get("href", ""))
|
||||
@@ -174,19 +198,19 @@ def parse_printable(instance_id: str, subseason_id: str, session: requests.Sessi
|
||||
game_id = m.group(1)
|
||||
break
|
||||
|
||||
# Extract W/L/T (Result cell)
|
||||
# Extract W/L/T indicator from Result cell
|
||||
m_res = re.search(r"\b(W|L|T)\b", result_txt, re.I)
|
||||
result_flag = m_res.group(1).upper() if m_res else ""
|
||||
|
||||
# Extract score from Result cell; if missing, also try Opponent cell
|
||||
# Extract numeric scores from Result or Opponent cell
|
||||
m_score = SCORE_RE.search(result_txt) or SCORE_RE.search(opp_txt)
|
||||
s_a, s_b = (m_score.group(1), m_score.group(2)) if m_score else ("", "")
|
||||
|
||||
# Opponent + home/away flag
|
||||
# Determine if game is away based on '@' prefix in opponent cell
|
||||
is_away = opp_txt.startswith("@")
|
||||
opponent_name = opp_txt.lstrip("@").strip()
|
||||
|
||||
# Compute team/opp runs (TEAM-FIRST orientation)
|
||||
# Convert scores to integers with team-first orientation
|
||||
team_runs, opp_runs = runs_from_team_pov(result_flag, s_a, s_b)
|
||||
|
||||
logging.debug(
|
||||
@@ -214,10 +238,9 @@ def parse_printable(instance_id: str, subseason_id: str, session: requests.Sessi
|
||||
|
||||
def fetch_game_time(game_id: str, session: requests.Session) -> Optional[str]:
|
||||
"""
|
||||
Fetch the game's local start time from the /game/show/<id> page.
|
||||
Looks inside the tab with id 'tab_boxscores_content' but also
|
||||
falls back to scanning the page for common time patterns.
|
||||
Returns a zero-padded 24h 'HH:MM' string or None if unavailable.
|
||||
Fetch the start time of a game from its detail page.
|
||||
Looks inside the boxscores tab or scans text for time patterns.
|
||||
Returns a 24-hour formatted 'HH:MM' string or None if not found.
|
||||
"""
|
||||
if not game_id:
|
||||
return None
|
||||
@@ -226,13 +249,13 @@ def fetch_game_time(game_id: str, session: requests.Session) -> Optional[str]:
|
||||
if not soup:
|
||||
return None
|
||||
|
||||
# Prefer the boxscores tab content
|
||||
# Prefer boxscores tab content to search for time string
|
||||
box = soup.select_one("#tab_boxscores_content") or soup.select_one("#tab_boxscore_content")
|
||||
text = ""
|
||||
if box:
|
||||
text = " ".join(box.stripped_strings)
|
||||
else:
|
||||
# Fall back to page-wide text (but avoid pulling too much)
|
||||
# Fall back to main page text with length limit to prevent excessive text processing
|
||||
main = soup.select_one("div.page") or soup
|
||||
text = " ".join((main.get_text(" ", strip=True) or "")[:4000].split())
|
||||
|
||||
@@ -244,21 +267,19 @@ def fetch_game_time(game_id: str, session: requests.Session) -> Optional[str]:
|
||||
hhmm = m.group(1)
|
||||
ampm = (m.group(2) or "").lower().replace(".", "")
|
||||
try:
|
||||
# Normalize to 24h HH:MM
|
||||
# Normalize time to 24h format
|
||||
from datetime import datetime
|
||||
if ampm:
|
||||
dt = datetime.strptime(f"{hhmm} {ampm.upper()}", "%I:%M %p")
|
||||
else:
|
||||
# already 24h-ish
|
||||
dt = datetime.strptime(hhmm, "%H:%M")
|
||||
return dt.strftime("%H:%M")
|
||||
except Exception:
|
||||
# Be forgiving (e.g., "6:00pm" without space)
|
||||
# Try forgiving parse if combined time/ampm without space
|
||||
try:
|
||||
from datetime import datetime
|
||||
hhmm2 = hhmm
|
||||
if ampm:
|
||||
dt = datetime.strptime(f"{hhmm2}{ampm}", "%I:%M%p")
|
||||
dt = datetime.strptime(f"{hhmm}{ampm}", "%I:%M%p")
|
||||
return dt.strftime("%H:%M")
|
||||
except Exception:
|
||||
logging.debug(f"TIME: could not normalize '{hhmm} {ampm}' for game {game_id}")
|
||||
@@ -272,25 +293,34 @@ def main(
|
||||
fetch_time: bool = typer.Option(False, help="Fetch game time from /game/show/<id>"),
|
||||
sleep: float = typer.Option(0.35, help="Delay between requests (seconds)")
|
||||
):
|
||||
"""
|
||||
Main function to scrape schedules for all teams, merge them,
|
||||
deduplicate entries (primary by game_id), and output a consolidated CSV.
|
||||
Optionally fetches start times per game.
|
||||
"""
|
||||
# Load teams data and indexes
|
||||
by_instance, by_slug, by_norm = load_teams(teams)
|
||||
instance_ids = sorted(by_instance.keys())
|
||||
|
||||
# Requests session with custom headers
|
||||
session = requests.Session()
|
||||
session.headers.update(HEADERS)
|
||||
|
||||
# Scrape all teams
|
||||
# Scrape all team instance printable schedules
|
||||
raw: List[dict] = []
|
||||
for i, iid in enumerate(instance_ids, 1):
|
||||
logging.info(f"[{i}/{len(instance_ids)}] Fetching schedule for instance {iid}")
|
||||
raw.extend(parse_printable(iid, subseason, session=session))
|
||||
time.sleep(sleep) # be polite
|
||||
|
||||
# Helper lookups for team records
|
||||
def rec_from_instance(iid: str) -> Optional[TeamRec]:
|
||||
return by_instance.get(iid)
|
||||
|
||||
def match_opponent(text: str) -> Optional[TeamRec]:
|
||||
return best_match_team(text, by_slug, by_norm)
|
||||
|
||||
# Deduplicate buckets keyed by game_id or fallback composite keys
|
||||
buckets: Dict[str, dict] = {}
|
||||
fallback_rows = 0
|
||||
|
||||
@@ -313,6 +343,7 @@ def main(
|
||||
key = f"fb:{row['date']}|{pair[0]}@{pair[1]}|{runs_sig}"
|
||||
fallback_rows += 1
|
||||
|
||||
# Store perspective of one team's view of the game
|
||||
perspective = {
|
||||
"team": team_rec,
|
||||
"opp": opp_rec, # may be None
|
||||
@@ -336,42 +367,54 @@ def main(
|
||||
logging.info(f"Used fallback dedupe for {fallback_rows} rows without game_id.")
|
||||
|
||||
out_rows = []
|
||||
time_cache: Dict[str, Optional[str]] = {}
|
||||
time_cache: Dict[str, Optional[str]] = {} # cache game times to avoid re-fetching
|
||||
|
||||
# Merge perspectives and produce consolidated rows
|
||||
for key, bucket in buckets.items():
|
||||
p = bucket["persp"]
|
||||
date = p[0]["date"]
|
||||
game_id = bucket.get("game_id", "")
|
||||
|
||||
# Try to identify home and away perspectives
|
||||
p_home = next((x for x in p if x["is_away"] is False), None)
|
||||
p_away = next((x for x in p if x["is_away"] is True), None)
|
||||
|
||||
# Home is the team who is not away, else fallback to the other team's opponent
|
||||
home_team = (p_home["team"] if p_home else (p_away["opp"] if p_away else None))
|
||||
away_team = (p_away["team"] if p_away else (p_home["opp"] if p_home else None))
|
||||
|
||||
def pack_team(rec: Optional[TeamRec], fallback_slug: str):
|
||||
"""Pack team record to tuple or fallback to slug-based default values."""
|
||||
if rec:
|
||||
return rec.slug, rec.instance_id, rec.team_id, rec.name
|
||||
return fallback_slug, "", "", fallback_slug.replace("-", " ").title()
|
||||
|
||||
# Attempt to get runs from home perspective
|
||||
home_runs = away_runs = None
|
||||
if p_home and isinstance(p_home["team_runs"], int) and isinstance(p_home["opp_runs"], int):
|
||||
home_runs = p_home["team_runs"]
|
||||
away_runs = p_home["opp_runs"]
|
||||
# Otherwise try away perspective with reversed runs
|
||||
elif p_away and isinstance(p_away["team_runs"], int) and isinstance(p_away["opp_runs"], int):
|
||||
away_runs = p_away["team_runs"]
|
||||
home_runs = p_away["opp_runs"]
|
||||
|
||||
# If runs still missing, guess from first perspective, adjusting for is_away
|
||||
if (home_runs is None or away_runs is None) and p:
|
||||
one = p[0]
|
||||
if isinstance(one["team_runs"], int) and isinstance(one["opp_runs"], int):
|
||||
if one["is_away"]:
|
||||
away_runs = one["team_runs"]; home_runs = one["opp_runs"]
|
||||
away_team = one["team"]; home_team = one["opp"] if one["opp"] else home_team
|
||||
away_runs = one["team_runs"]
|
||||
home_runs = one["opp_runs"]
|
||||
away_team = one["team"]
|
||||
home_team = one["opp"] if one["opp"] else home_team
|
||||
else:
|
||||
home_runs = one["team_runs"]; away_runs = one["opp_runs"]
|
||||
home_team = one["team"]; away_team = one["opp"] if one["opp"] else away_team
|
||||
home_runs = one["team_runs"]
|
||||
away_runs = one["opp_runs"]
|
||||
home_team = one["team"]
|
||||
away_team = one["opp"] if one["opp"] else away_team
|
||||
|
||||
# Fallback guesses for home and away slugs if team data missing
|
||||
guess_home_fallback = (p_home["team"].slug if p_home and p_home["team"] else
|
||||
p_away["opp"].slug if p_away and p_away["opp"] else
|
||||
p[0]["pair"][0])
|
||||
@@ -382,6 +425,7 @@ def main(
|
||||
home_slug, home_inst, home_id, home_name = pack_team(home_team, guess_home_fallback)
|
||||
away_slug, away_inst, away_id, away_name = pack_team(away_team, guess_away_fallback)
|
||||
|
||||
# Determine winner and loser slugs based on runs
|
||||
winner_slug = winner_inst = winner_id = loser_slug = loser_inst = loser_id = ""
|
||||
if isinstance(home_runs, int) and isinstance(away_runs, int):
|
||||
if home_runs > away_runs:
|
||||
@@ -391,10 +435,12 @@ def main(
|
||||
winner_slug, winner_inst, winner_id = away_slug, away_inst, away_id
|
||||
loser_slug, loser_inst, loser_id = home_slug, home_inst, home_id
|
||||
|
||||
# Consolidate location and status from home or away perspectives
|
||||
loc = (p_home["location"] if p_home else "") or (p_away["location"] if p_away else "")
|
||||
status = (p_home["status"] if p_home else "") or (p_away["status"] if p_away else "")
|
||||
source_urls = sorted({x["source_url"] for x in p})
|
||||
|
||||
# Optionally fetch game start time
|
||||
time_local = ""
|
||||
if fetch_time and game_id:
|
||||
if game_id in time_cache:
|
||||
@@ -403,6 +449,7 @@ def main(
|
||||
logging.debug(f"TIME: fetching game {game_id}")
|
||||
tval = fetch_game_time(game_id, session=session)
|
||||
time_cache[game_id] = tval
|
||||
# If no time found, wait longer before next request to be polite
|
||||
if tval is None:
|
||||
time.sleep(min(sleep * 2, 1.0))
|
||||
if tval:
|
||||
@@ -413,6 +460,7 @@ def main(
|
||||
f"winner={winner_slug or 'TIE'} id={game_id} time={time_local or 'NA'}"
|
||||
)
|
||||
|
||||
# Append consolidated game record for CSV output
|
||||
out_rows.append({
|
||||
"date_local": date,
|
||||
"time_local": time_local,
|
||||
@@ -431,6 +479,7 @@ def main(
|
||||
logging.warning("No games produced.")
|
||||
return
|
||||
|
||||
# Define CSV output columns
|
||||
fieldnames = [
|
||||
"date_local","time_local",
|
||||
"home_slug","home_instance","home_id","home_name",
|
||||
@@ -440,6 +489,7 @@ def main(
|
||||
"loser_slug","loser_instance","loser_id",
|
||||
"location","status","game_id","source_urls",
|
||||
]
|
||||
# Write consolidated game data to CSV
|
||||
with open(out, "w", newline="", encoding="utf-8") as f:
|
||||
w = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
w.writeheader()
|
||||
|
||||
@@ -29,8 +29,20 @@ def load_games(
|
||||
team_id: str = "names",
|
||||
final_status: str | None = None,
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Load input CSV (season_schedule.csv) into a cleaned DataFrame with consistent columns.
|
||||
|
||||
Parameters:
|
||||
- inp: CSV path to read
|
||||
- team_id: 'names' or 'slugs' to identify teams
|
||||
- final_status: if given, filter rows with status matching this (e.g. 'final')
|
||||
|
||||
Returns:
|
||||
DataFrame with columns Date, HomeTeam, AwayTeam, HomeRuns, AwayRuns, Margin, Result
|
||||
"""
|
||||
df = pd.read_csv(inp)
|
||||
# Choose identifiers
|
||||
# Determine team ID columns based on input param
|
||||
home_id_col = "home_name" if team_id == "names" else "home_slug"
|
||||
away_id_col = "away_name" if team_id == "names" else "away_slug"
|
||||
for c in [home_id_col, away_id_col, "home_runs", "away_runs"]:
|
||||
@@ -38,27 +50,31 @@ def load_games(
|
||||
raise ValueError(f"Missing required column: {c}")
|
||||
|
||||
# Optional status filter (helps exclude postponed/canceled)
|
||||
# Filter for final_status if provided to exclude e.g. postponed games
|
||||
if final_status is not None and "status" in df.columns:
|
||||
df = df[df["status"].astype(str).str.lower() == str(final_status).lower()]
|
||||
|
||||
# Keep only games with numeric scores
|
||||
# Convert run columns to numeric, drop rows with missing runs or teams
|
||||
df = df.copy()
|
||||
df["home_runs"] = pd.to_numeric(df["home_runs"], errors="coerce")
|
||||
df["away_runs"] = pd.to_numeric(df["away_runs"], errors="coerce")
|
||||
df = df.dropna(subset=[home_id_col, away_id_col, "home_runs", "away_runs"])
|
||||
|
||||
# Parse datetime (robust to missing either field)
|
||||
|
||||
# Parse datetime by combining date_local and time_local if possible
|
||||
date = pd.to_datetime(df.get("date_local", pd.NaT), errors="coerce")
|
||||
time = pd.to_datetime(df.get("time_local", pd.NaT), errors="coerce").dt.time
|
||||
# Combine when possible
|
||||
|
||||
dt = date
|
||||
if "time_local" in df.columns:
|
||||
# build datetime only where both present
|
||||
|
||||
# Build datetime where both date and time present
|
||||
dt = pd.to_datetime(
|
||||
date.dt.strftime("%Y-%m-%d").fillna("") + " " +
|
||||
pd.Series(time).astype(str).replace("NaT",""),
|
||||
errors="coerce"
|
||||
)
|
||||
# Construct cleaned DataFrame with fixed column names
|
||||
df_out = pd.DataFrame({
|
||||
"Date": dt,
|
||||
"HomeTeam": df[home_id_col].astype(str),
|
||||
@@ -66,19 +82,35 @@ def load_games(
|
||||
"HomeRuns": df["home_runs"].astype(int),
|
||||
"AwayRuns": df["away_runs"].astype(int),
|
||||
})
|
||||
# Margin is difference in runs (home - away)
|
||||
df_out["Margin"] = df_out["HomeRuns"] - df_out["AwayRuns"]
|
||||
# Result: 'H' if home win, 'A' if away win, 'T' for tie
|
||||
df_out["Result"] = np.where(df_out["HomeRuns"] > df_out["AwayRuns"], "H",
|
||||
np.where(df_out["HomeRuns"] < df_out["AwayRuns"], "A", "T"))
|
||||
return df_out.reset_index(drop=True)
|
||||
|
||||
def aggregate_team_stats(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Aggregate game-level data into team-level season stats: wins, losses, ties, runs scored,
|
||||
runs allowed, games played, win percentage, and run differential.
|
||||
|
||||
Parameters:
|
||||
- df: DataFrame with game results
|
||||
|
||||
Returns:
|
||||
DataFrame indexed by Team with aggregated stats
|
||||
"""
|
||||
# Collect all team names from home and away columns
|
||||
teams = pd.Index(sorted(set(df["HomeTeam"]).union(df["AwayTeam"])), name="Team")
|
||||
# Initialize stats DataFrame with W/L/T/RS/RA all zero
|
||||
stats = pd.DataFrame(index=teams, columns=["W","L","T","RS","RA"], data=0)
|
||||
for _, r in df.iterrows():
|
||||
h, a = r["HomeTeam"], r["AwayTeam"]
|
||||
hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"])
|
||||
# Update runs scored and allowed for both teams
|
||||
stats.at[h,"RS"] += hr; stats.at[h,"RA"] += ar
|
||||
stats.at[a,"RS"] += ar; stats.at[a,"RA"] += hr
|
||||
# Update win/loss/tie counts
|
||||
if hr > ar:
|
||||
stats.at[h,"W"] += 1; stats.at[a,"L"] += 1
|
||||
elif hr < ar:
|
||||
@@ -86,22 +118,57 @@ def aggregate_team_stats(df: pd.DataFrame) -> pd.DataFrame:
|
||||
else:
|
||||
stats.at[h,"T"] += 1; stats.at[a,"T"] += 1
|
||||
stats = stats.astype(int)
|
||||
# Games played
|
||||
stats["GP"] = stats["W"] + stats["L"] + stats["T"]
|
||||
# Win percentage with ties counting as half a win
|
||||
stats["WinPct"] = (stats["W"] + 0.5 * stats["T"]) / stats["GP"].replace(0, np.nan)
|
||||
# Run differential (runs scored - runs allowed)
|
||||
stats["RunDiff"] = stats["RS"] - stats["RA"]
|
||||
return stats.reset_index()
|
||||
|
||||
def pythagorean(rs: pd.Series, ra: pd.Series, exp: float) -> pd.Series:
|
||||
"""
|
||||
Compute Pythagorean expectation for winning percentage:
|
||||
RS^exp / (RS^exp + RA^exp), handling zero or missing runs.
|
||||
|
||||
Parameters:
|
||||
- rs: runs scored
|
||||
- ra: runs allowed
|
||||
- exp: exponent (typically ~1.83 for baseball)
|
||||
|
||||
Returns:
|
||||
Series of expected win percentages
|
||||
"""
|
||||
rs = rs.clip(lower=0); ra = ra.clip(lower=0)
|
||||
num = np.power(rs, exp); den = num + np.power(ra, exp)
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
p = np.where(den > 0, num / den, 0.5)
|
||||
|
||||
p = np.where(den > 0, num / den, 0.5) # handle zero denominator as 0.5 (neutral)
|
||||
return pd.Series(p, index=rs.index)
|
||||
|
||||
def estimate_home_field_runs(df: pd.DataFrame) -> float:
|
||||
"""
|
||||
Estimate home-field advantage in runs as the average margin (home_runs - away_runs).
|
||||
Useful for adjusting rating systems to neutralize advantage.
|
||||
|
||||
Returns:
|
||||
Float average home-field runs advantage.
|
||||
"""
|
||||
return float(df["Margin"].mean()) if len(df) else 0.0
|
||||
|
||||
def massey(df: pd.DataFrame, cap: float, subtract_home: bool) -> tuple[pd.Series, float]:
|
||||
"""
|
||||
Calculate Massey ratings (simple linear system) for teams using margins of victory.
|
||||
Optionally caps margins and subtracts estimated home field runs.
|
||||
|
||||
Parameters:
|
||||
- df: games DataFrame with HomeTeam, AwayTeam, Margin columns
|
||||
- cap: maximum absolute margin value to use (run cap)
|
||||
- subtract_home: whether to subtract estimated home field runs advantage
|
||||
|
||||
Returns:
|
||||
Tuple of (ratings Series indexed by team, estimated home-run advantage float)
|
||||
"""
|
||||
teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"]))
|
||||
idx = {t:i for i,t in enumerate(teams)}
|
||||
y = df["Margin"].astype(float).to_numpy()
|
||||
@@ -111,48 +178,107 @@ def massey(df: pd.DataFrame, cap: float, subtract_home: bool) -> tuple[pd.Series
|
||||
if subtract_home:
|
||||
y = y - h_est
|
||||
G, N = len(df), len(teams)
|
||||
# Construct design matrix A with +1 for home, -1 for away per game, plus normalization row
|
||||
A = np.zeros((G+1, N), dtype=float)
|
||||
for r_i, r in enumerate(df.itertuples(index=False)):
|
||||
A[r_i, idx[r.HomeTeam]] = 1.0
|
||||
A[r_i, idx[r.AwayTeam]] = -1.0
|
||||
# Normalize ratings sum to zero for uniqueness
|
||||
A[G, :] = 1.0
|
||||
y_ext = np.concatenate([y, [0.0]])
|
||||
# Solve least squares for ratings vector
|
||||
r_sol, *_ = np.linalg.lstsq(A, y_ext, rcond=None)
|
||||
return pd.Series(r_sol, index=teams), (h_est if subtract_home else 0.0)
|
||||
|
||||
def elo_expected(ra: float, rb: float) -> float:
|
||||
"""
|
||||
Compute Elo expected probability (expected score) for player A.
|
||||
|
||||
Parameters:
|
||||
- ra: rating of player A
|
||||
- rb: rating of player B
|
||||
|
||||
Returns:
|
||||
Probability player A wins
|
||||
"""
|
||||
return 1.0 / (1.0 + 10.0 ** (-(ra - rb) / 400.0))
|
||||
|
||||
def elo_once(df: pd.DataFrame, K: float, H: float, mcap: float, init: dict[str,float]) -> dict[str,float]:
|
||||
"""
|
||||
Perform one pass of Elo rating updates across the games in chronological order.
|
||||
|
||||
Parameters:
|
||||
- df: DataFrame with games (must have HomeTeam, AwayTeam, HomeRuns, AwayRuns)
|
||||
- K: Elo K-factor (adjustment multiplier)
|
||||
- H: home field bonus in points
|
||||
- mcap: cap for margin of victory factor ln(|margin| + 1)
|
||||
- init: dict of initial ratings by team
|
||||
|
||||
Returns:
|
||||
Updated dict of Elo ratings after processing games.
|
||||
"""
|
||||
ratings = dict(init)
|
||||
for _, r in df.iterrows():
|
||||
h, a = r["HomeTeam"], r["AwayTeam"]
|
||||
hr, ar = int(r["HomeRuns"]), int(r["AwayRuns"])
|
||||
margin = hr - ar
|
||||
# Calculate expected win probability for home team (with home advantage added)
|
||||
Eh = elo_expected(ratings[h] + H, ratings[a])
|
||||
# Actual game result scores (1 for win, 0 for loss, 0.5 tie)
|
||||
Sh, Sa = (1.0, 0.0) if hr > ar else ((0.0, 1.0) if hr < ar else (0.5, 0.5))
|
||||
# Margin factor based on logarithm of absolute margin plus one
|
||||
M = np.log(abs(margin) + 1.0)
|
||||
if mcap is not None:
|
||||
M = min(M, mcap)
|
||||
# Elo rating update, scaled by margin factor and difference between actual and expected score
|
||||
ratings[h] += K * M * (Sh - Eh)
|
||||
ratings[a] += K * M * ((1.0 - Sh) - (1.0 - Eh))
|
||||
return ratings
|
||||
|
||||
def elo(df: pd.DataFrame, K=24.0, H=30.0, mcap=2.0, shuffles=20, seed=42) -> pd.Series:
|
||||
"""
|
||||
Compute Elo ratings averaged over multiple random shuffle orders of games
|
||||
to reduce order dependency of sequential Elo updates.
|
||||
|
||||
Parameters:
|
||||
- df: games DataFrame sorted by Date
|
||||
- K: Elo K-factor
|
||||
- H: home field advantage bonus
|
||||
- mcap: margin factor cap
|
||||
- shuffles: number of random game orders to compute Elo over
|
||||
- seed: RNG seed for reproducibility
|
||||
|
||||
Returns:
|
||||
Series of Elo ratings indexed by team
|
||||
"""
|
||||
teams = sorted(set(df["HomeTeam"]).union(df["AwayTeam"]))
|
||||
base = {t: 1500.0 for t in teams}
|
||||
|
||||
base = {t: 1500.0 for t in teams} # initial Elo ratings
|
||||
df0 = df.sort_values(["Date"]).reset_index(drop=True)
|
||||
# Elo with original date order (baseline)
|
||||
r_first = elo_once(df0, K, H, mcap, base)
|
||||
# Initialize RNG
|
||||
rng = np.random.default_rng(seed)
|
||||
vals = {t: [r_first[t]] for t in teams}
|
||||
# Compute Elo over randomized orderings for averaging
|
||||
for _ in range(max(0, shuffles-1)):
|
||||
idx = np.arange(len(df0)); rng.shuffle(idx)
|
||||
r = elo_once(df0.iloc[idx].reset_index(drop=True), K, H, mcap, base)
|
||||
for t in teams:
|
||||
vals[t].append(r[t])
|
||||
# Average ratings across runs for each team
|
||||
return pd.Series({t: float(np.mean(vals[t])) for t in teams}).sort_index()
|
||||
|
||||
def zscore(s: pd.Series) -> pd.Series:
|
||||
"""
|
||||
Calculate z-score (standard score) for a pandas Series.
|
||||
|
||||
Parameters:
|
||||
- s: input Series
|
||||
|
||||
Returns:
|
||||
Series normalized to mean=0 and std=1; zeros if std=0.
|
||||
"""
|
||||
mu, sd = s.mean(), s.std(ddof=0)
|
||||
return pd.Series(0.0, index=s.index) if (sd == 0 or np.isnan(sd)) else (s - mu) / sd
|
||||
|
||||
@@ -176,51 +302,71 @@ def main(
|
||||
elo_shuffles: int = typer.Option(20, help="Random shuffles to average Elo"),
|
||||
elo_seed: int = typer.Option(42, help="RNG seed for shuffles")
|
||||
):
|
||||
"""
|
||||
Main entry point:
|
||||
|
||||
Loads input games, computes aggregate stats, Pythagorean expectation,
|
||||
Massey ratings, Elo ratings (averaged over shuffles), Strength of Schedule,
|
||||
and an overall CompositeRating combining these metrics.
|
||||
|
||||
Outputs a CSV file with rankings and stats.
|
||||
"""
|
||||
team_id = team_id.lower()
|
||||
# Load games
|
||||
|
||||
# Load cleaned games DataFrame
|
||||
games = load_games(inp, team_id=team_id, final_status=final_status)
|
||||
|
||||
# Aggregates
|
||||
|
||||
# Compute aggregated team-level statistics from games
|
||||
team = aggregate_team_stats(games)
|
||||
# Calculate Pythagorean expected winning percentage
|
||||
team["PythagoreanWinPct"] = pythagorean(team["RS"], team["RA"], pyexp)
|
||||
|
||||
# Ratings
|
||||
|
||||
# Calculate Massey ratings and get estimated home field runs
|
||||
massey_r, h_runs = massey(games, cap=massey_cap, subtract_home=not no_massey_home_adj)
|
||||
|
||||
# Strength of schedule
|
||||
|
||||
# Calculate Strength of Schedule as average Massey rating of opponents
|
||||
opps = {t: [] for t in massey_r.index}
|
||||
for _, r in games.iterrows():
|
||||
opps[r["HomeTeam"]].append(r["AwayTeam"])
|
||||
opps[r["AwayTeam"]].append(r["HomeTeam"])
|
||||
sos_series = pd.Series({t: (float(massey_r[opps[t]].mean()) if opps[t] else 0.0) for t in opps})
|
||||
|
||||
# Compute Elo ratings with multiple shuffles for stability
|
||||
elo_r = elo(games, K=elo_k, H=elo_home, mcap=elo_mcap, shuffles=elo_shuffles, seed=elo_seed)
|
||||
|
||||
# Merge
|
||||
|
||||
# Merge all metrics into a single DataFrame
|
||||
out_df = team.set_index("Team")
|
||||
out_df["MasseyRating"] = massey_r
|
||||
out_df["EloRating"] = elo_r
|
||||
out_df["StrengthOfSchedule"] = sos_series
|
||||
|
||||
# Composite
|
||||
|
||||
# Composite rating: weighted Z-score combination of Massey, Elo, and Pythagorean
|
||||
Z_r, Z_e, Z_p = zscore(out_df["MasseyRating"]), zscore(out_df["EloRating"]), zscore(out_df["PythagoreanWinPct"])
|
||||
out_df["CompositeRating"] = 0.45*Z_r + 0.35*Z_e + 0.20*Z_p
|
||||
|
||||
out_df = out_df.reset_index()
|
||||
# Select columns and sort teams by CompositeRating descending
|
||||
out_df = out_df[[
|
||||
"Team","GP","W","L","T","WinPct","RS","RA","RunDiff",
|
||||
"PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating"
|
||||
]].sort_values("CompositeRating", ascending=False)
|
||||
|
||||
# Round for readability
|
||||
|
||||
# Round numeric columns for neatness
|
||||
for c in ["WinPct","PythagoreanWinPct","MasseyRating","EloRating","StrengthOfSchedule","CompositeRating"]:
|
||||
out_df[c] = out_df[c].astype(float).round(5)
|
||||
|
||||
# Write to output CSV
|
||||
out_df.to_csv(out, index=False)
|
||||
# Output summary info
|
||||
print(f"Done. Estimated home-field (runs) used in Massey: {h_runs:.3f}")
|
||||
print(f"Teams ranked: {len(out_df)} | Games processed: {len(games)}")
|
||||
print(f"Output -> {out}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(main)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user