This commit is contained in:
2025-04-18 08:14:37 -05:00
parent 4105cc2373
commit 2503141c34
25 changed files with 1127 additions and 278 deletions

51
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,51 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: convert_to_sportspress",
"type": "python",
"request": "launch",
"module": "src",
"args": [
"convert",
"sportspress",
"data/2025-hounds.csv",
"data/output/out.csv",
],
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "generate calendar",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"module": "src",
"args": [
"generate",
"calendar",
"./data/2025-hounds.csv"
],
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "generate calendar config",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"module": "src",
"args": [
"generate",
"calendar-config",
"./data/2024-hounds.csv",
"./data/output/"
],
"console": "integratedTerminal",
"justMyCode": true
}
]
}

25
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,25 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
".",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true,
"python.analysis.enablePytestSupport": false,
"launch": {
"configurations": [{
"name": "Python: Debug Tests",
"type": "python",
"request": "launch",
"program": "${file}",
"purpose": ["debug-test"],
"console": "integratedTerminal",
"justMyCode": false
}],
"compounds": []
},
}

View File

@@ -1,2 +0,0 @@
from .convert_to_sportspress import app
app()

View File

@@ -1,238 +0,0 @@
import csv
import re
from typing import List, Dict
from dateutil import parser
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.table import Table, Column
from rich.columns import Columns
import typer
from .utils import normalize_header_key, validate_csv_header, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv
app = typer.Typer()
@app.command()
def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")):
# Validate CSV header
header = next(csv.reader(open(file_path, "r")))
normalized_header = [normalize_header_key(key) for key in header]
if not validate_csv_header(header):
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
return
# Read CSV data
data = read_csv(file_path)
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
processed_data = process_data(data, visitor_home_order_reversed)
aggregate_team_data = aggregate_teams(processed_data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
table.add_column("Wins", style="bold")
table.add_column("Losses", style="bold")
table.add_column("Ties", style="bold")
table.add_column("Runs For", style="bold")
table.add_column("Runs Against", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(team_stats["win"]),
str(team_stats["loss"]),
str(team_stats["tie"]),
str(team_stats["runs_for"]),
str(team_stats["runs_against"]),
)
console.print(table)
# Write processed CSV data back to a new file
# output_file_path = file_path.with_suffix(".processed.csv")
# write_csv(output_file_path, data)
# typer.echo(f"Processed data written to: {output_file_path}")
@app.command()
def sportspress_csv(file_path: Path = typer.Argument(..., help="Path to the CSV file"), file_output_path: Path = typer.Argument(..., help="Path to the output CSV file"), only_with_outcome: bool = typer.Option(default=True, is_flag=True, help="")):
# Validate CSV header
header = next(csv.reader(open(file_path, "r")))
normalized_header = [normalize_header_key(key) for key in header]
if not validate_csv_header(header):
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
return
# Read CSV data
data = read_csv(file_path)
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
processed_data = process_data(data, visitor_home_order_reversed)
write_sportspress_csv(processed_data, file_output_path, only_with_outcome)
typer.echo(f"Output to {file_output_path}")
def list_key_values(data: List[Dict], key):
if key.lower() == "team":
normalized_key = "team"
else:
normalized_key = normalize_header_key(key)
if normalized_key != "team" or "team" in data[0].keys():
output = {row.get(normalized_key) for row in data}
else:
output = {row.get('home') for row in data}
output = output | {row.get('visitor') for row in data}
return output
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
if not is_regex:
regex = re.compile(fr"^{match}$")
else:
regex = re.compile(fr"{match}")
for row in data:
row[key] = regex.sub(replace, row[key])
return data
def add_key_values(data: List[Dict], key, value:str):
for row in data:
row[key] = value
return data
clean_app = typer.Typer()
@clean_app.command("list")
def print_values_for_key(file_path: Path = typer.Argument(..., help="Path to the CSV file"), key: str = typer.Argument(..., help="")):
# Read CSV data
data = read_csv(file_path)
processed_data = list_key_values(data, key)
console = Console()
table = Table(show_header=False)
table.add_column("Values")
for value in sorted(processed_data):
table.add_row(value)
console.print(table)
@clean_app.command("replace")
def replace_values_for_key(
file_path: Path = typer.Argument(..., help="Path to the CSV file"),
key: str = typer.Argument(..., help=""),
match: str = typer.Argument(..., help=""),
replace: str = typer.Argument(..., help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.")
):
if in_place and output_file:
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
raise typer.Abort()
if key.lower() == "team":
normalized_key = "team"
else:
normalized_key = normalize_header_key(key)
console = Console()
# Read CSV data
data = read_csv(file_path)
before_table = Table(Column(), show_header=False, title="Before")
for value in sorted(list_key_values(data, key)):
before_table.add_row(value)
after_table = Table( Column(), show_header=False, title="After")
if normalized_key != "team" or "team" in data[0].keys():
data = replace_key_values(data, normalized_key, match, replace, match_is_regex)
else:
data=replace_key_values(data, "home", match, replace, match_is_regex)
data=replace_key_values(data, "visitor", match, replace, match_is_regex)
for value in sorted(list_key_values(data, key)):
after_table.add_row(value)
panel = Panel(
Columns([before_table, after_table]),
title="Replace"
)
console.print(panel)
if in_place and typer.confirm("Perform Replacement in-place?"):
with file_path.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(file_path.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
@clean_app.command("add-key")
def add_values_for_key(
file_path: Path = typer.Argument(..., help="Path to the CSV file"),
key: str = typer.Argument(..., help=""),
value: str = typer.Argument("", help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
):
if in_place and output_file:
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
raise typer.Abort()
# Validate CSV header
header = next(csv.reader(open(file_path, "r")))
normalized_header = [normalize_header_key(key) for key in header]
if key.lower() == "team":
normalized_key = "team"
else:
normalized_key = normalize_header_key(key)
if not validate_csv_header(header):
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
return
console = Console()
# Read CSV data
data = read_csv(file_path)
data = add_key_values(data, key, value)
if in_place and typer.confirm("Perform Replacement in-place?"):
with file_path.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(file_path.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
app.add_typer(clean_app, name="clean")
if __name__ == "__main__":
app()

57
normalization.toml Normal file
View File

@@ -0,0 +1,57 @@
# config.toml
[win]
potential_keys = ["w", "wins"]
[loss]
potential_keys = ["l", "losses"]
[tie]
potential_keys = ["t", "ties"]
[points]
potential_keys = ["pts.", "pts", "pt"]
[runs_for]
potential_keys = ["rf", "rs"]
[runs_against]
potential_keys = ["ra"]
[division]
potential_keys = ["div"]
[date]
potential_keys = ["Date", "EventDate"]
[time]
potential_keys = ["Time", "EventTime"]
[visitor]
potential_keys = ["Away"]
[field]
potential_keys = ["Field", "Location", "Venue"]
[[field.values]]
original = ["Winnemac"]
normalized = "Winnemac Park"
[[field.values]]
original = ["Taft HS"]
normalized = "Taft High School"
[[field.values]]
original = ["Southwest"]
normalized = "Southwest Park"
[results]
potential_keys = ["Final Score", "Score", "Result", "Outcome"]
# No potential_values specified for 'final_score' in this example
[[team.values]]
original = ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"]
normalized = "Hounds"
[[team.values]]
original = ["Chicago Red Sox"]
normalized = "Red Sox"
[[team.values]]
original = ["NorthSide White Sox"]
normalized = "North Side White Sox"

View File

@@ -1,2 +1,4 @@
typer[all]==0.9.0
python-dateutil==2.8.2
toml==0.10.2
pillow

15
src/__main__.py Normal file
View File

@@ -0,0 +1,15 @@
from .apps.convert import app as convert_app
from .apps.clean import app as clean_app
from .apps.read import app as read_app
from .apps.generate import app as generate_app
import typer
app = typer.Typer()
app.add_typer(convert_app, name="convert")
app.add_typer(clean_app, name="clean")
app.add_typer(read_app, name="read")
app.add_typer(generate_app, name="generate")
if __name__ == "__main__":
app()

0
src/apps/__init__.py Normal file
View File

View File

@@ -0,0 +1 @@
from .clean import app

109
src/apps/clean/clean.py Normal file
View File

@@ -0,0 +1,109 @@
import typer
from rich.table import Table, Column
from rich.console import Console
from rich.columns import Columns
from rich.panel import Panel
from pathlib import Path
import csv
from ...utils.common import list_key_values, read_and_normalize_csv
from ...utils.normalize import normalize_header_key, replace_key_values, DEFAULT_NORMALIZATION_PATH
from typing import Annotated, List
import re
app = typer.Typer()
@app.command("replace")
def replace_values_for_key(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
output_file: Annotated[List[typer.FileText], typer.Option(..., "--output-file", "-o", help="Specify output file.")],
key: str = typer.Argument(..., help=""),
match: str = typer.Argument(..., help=""),
replace: str = typer.Argument(..., help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.")
):
# normalized_key = normalize_header_key(key)
normalized_key = key
console = Console()
# Read CSV data
for f in input_file:
data = read_and_normalize_csv(f)
before_table = Table(Column(), show_header=False, title="Before")
for value in sorted(list_key_values(data, key)):
before_table.add_row(value)
after_table = Table( Column(), show_header=False, title="After")
if normalized_key != "team" or "team" in data[0].keys():
data = replace_key_values(data, normalized_key, match, replace, match_is_regex)
else:
data=replace_key_values(data, "home", match, replace, match_is_regex)
data=replace_key_values(data, "visitor", match, replace, match_is_regex)
for value in sorted(list_key_values(data, key)):
after_table.add_row(value)
panel = Panel(
Columns([before_table, after_table]),
title="Replace"
)
console.print(panel)
if in_place and typer.confirm("Perform Replacement in-place?"):
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(f.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
@app.command("add-key")
def add_values_for_key(
file_path: Path = typer.Argument(..., help="Path to the CSV file"),
key: str = typer.Argument(..., help=""),
value: str = typer.Argument("", help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
):
if in_place and output_file:
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
raise typer.Abort()
console = Console()
# Read CSV data
data = read_and_normalize_csv(file_path)
# data = add_key_values(data, key, value)
if in_place and typer.confirm("Perform Replacement in-place?"):
with file_path.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(file_path.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)

View File

@@ -0,0 +1 @@
from .convert import app

View File

@@ -0,0 +1,44 @@
import typer
from typing import Annotated
from pathlib import Path
from ...utils.sportspress import validate_keys
from ...utils.normalize import normalize_header_key, load_config
from ...utils.common import read_and_normalize_csv, is_visitor_home_order_reversed, import_gamebygame
from ...utils.sportspress import write_sportspress_csv
import csv
app = typer.Typer()
@app.command(name="sportspress")
def sportspress_csv(
input_file: Annotated[typer.FileText, typer.Argument(..., help="Path to the CSV file")],
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
):
# Read CSV data
data = import_gamebygame(input_file)
try:
write_sportspress_csv(data, file_output_path, only_with_outcome)
except KeyError as e:
typer.echo(f"Error: {e}")
typer.echo(f"Output to {file_output_path.name}")
@app.command(name="teamsnap")
def sportspress_csv(
input_file: Annotated[typer.FileText, typer.Argument(..., help="Path to the CSV file")],
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
):
# Read CSV data
data = import_gamebygame(input_file)
try:
write_sportspress_csv(data, file_output_path, only_with_outcome)
except KeyError as e:
typer.echo(f"Error: {e}")
typer.echo(f"Output to {file_output_path.name}")

View File

@@ -0,0 +1 @@
from .calendar import app

View File

@@ -0,0 +1,54 @@
import typer
from rich.console import Console
from typing import Annotated, List
from pathlib import Path
from ...utils.sportspress import validate_keys
from ...utils.normalize import normalize_header_key, load_config
from ...utils.common import read_and_normalize_csv, is_visitor_home_order_reversed, import_gamebygame, parse_datetime, personalize_data_for_team
from ...utils.sportspress import write_sportspress_csv
from .calendar_utils import generate_calendar
from collections import defaultdict
import toml
app = typer.Typer()
@app.command(name="calendar")
def generate_calendar_app(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
):
# Read CSV data
data = read_and_normalize_csv(input_file)
data = personalize_data_for_team(data, "Hounds")
# data = parse_datetime(data)
generate_calendar(data)
pass
@app.command(name="calendar-config")
def generate_calendar_configs(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
output_file: Annotated[Path, typer.Argument(..., help="Path(s) to the output config file")]
):
data = read_and_normalize_csv(input_file)
teams = {row.get('visitor') for row in data}
teams.update({row.get('home') for row in data})
fields = {row.get('field') for row in data}
config = defaultdict(dict)
config['fields']['default'] = {
'bg_color': (0, 0, 0, 256)
}
config['teams']['default'] = {
'logo': ''
}
for field in fields:
config['fields'][field] = config['fields']['default']
for team in teams:
config['teams'][team] = config['teams']['default']
if output_file.is_dir:
output_file = output_file.joinpath('calendar_config.toml')
with output_file.open('w') as f:
toml.dump(config, f)
pass

View File

@@ -0,0 +1,200 @@
from calendar import Calendar
from PIL import Image, ImageDraw, ImageFont
from typing import Tuple
from pathlib import Path
calendar_cell_size = (400, 500)
calendar_cell_width, calendar_cell_height = calendar_cell_size
def textsize(text, font):
im = Image.new(mode="P", size=(0, 0))
draw = ImageDraw.Draw(im)
_, _, width, height = draw.textbbox((0, 0), text=text, font=font)
return width, height
def corner_image():
return Image.new()
def text_rectangle(text:str, font: str, font_size: int, foreground_color: Tuple[int, int, int, int]=(0,0,0,255), background_color: Tuple[int, int, int, int]=(0,0,0,0), height: int=400, width: int=500) -> Image:
font_obj = ImageFont.truetype(font,font_size)
img = Image.new('RGBA', (int(width),int(height)), background_color)
draw = ImageDraw.Draw(img)
text = str(text)
text_width, text_height = textsize(text, font=font_obj)
x = (width - text_width) / 2
y = (height - text_height) / 2
text_position = (x,y)
draw.text(text_position, text, font=font_obj, fill=foreground_color)
return img
def calendar_cell(
height: int=calendar_cell_height, width: int=calendar_cell_width,
background_color: Tuple[int, int, int, int]=(0,0,0,0),
left_top_corner = None,
right_top_corner = None,
top_center = None,
right_bottom_corner = None,
bottom_center = None,
left_bottom_corner = None,
center = None
):
# Create a blank rectangle image
cell_img = Image.new('RGBA', (width, height), background_color)
# Left top corner
if left_top_corner:
paste_position = (0, 0)
cell_img.paste(left_top_corner, paste_position, left_top_corner)
# Right top corner
if right_top_corner:
paste_position = (width - right_top_corner.width, 0)
cell_img.paste(right_top_corner, paste_position, right_top_corner)
if top_center:
raise NotImplementedError
if right_bottom_corner:
paste_position = (width - right_bottom_corner.width, height - right_bottom_corner.height)
cell_img.paste(right_bottom_corner, paste_position, right_bottom_corner)
if bottom_center:
paste_position = ((width - bottom_center.width)//2, (height - bottom_center.height))
cell_img.paste(bottom_center, paste_position, bottom_center)
if left_bottom_corner:
raise NotImplementedError
if center:
paste_position = ((width - center.width)//2, (height - center.height)//2)
cell_img.paste(center, paste_position, center)
return cell_img
def generate_calendar(data):
result_calendar = Calendar()
result_calendar.setfirstweekday(6)
baseball_bat = Image.open(f"data/logos/baseball_bat_2.png")
baseball_bat = baseball_bat.resize((90, 90))
for year, month in {(row['datetime'].year, row['datetime'].month) for row in data}:
month_days=list(result_calendar.monthdayscalendar(year, month))
month_image = Image.new('RGBA', (calendar_cell_width*7, calendar_cell_height*len(month_days)), (0, 0, 0, 0))
first_thursday=(month, [w[4] for w in month_days if w[4] != 0][0])
colors = {
# 'proviso-west': (139, 69, 19, 256),
'winnemac': (37, 89, 164, 256),
# 'walther': (251, 231, 77, 256),
# 'taft': (64, 119, 0, 256),
'southwest': (230, 136, 60, 256),
# 'maywood': (107, 5, 4, 256),
# 'ozinga': (170, 143, 102, 256),
# 'simeon':(167,192,226),
'Skokie':(72,159,88),
# 'comed':(206,45,137),
'default': (0, 0, 0, 256),
'Loyola': (206,45,137),
'Piotrowski': (251, 231, 77, 256),
'Baseball Alley': (167,192,226),
}
for week_num, week in enumerate(month_days):
for day_num, date in enumerate(week):
date_text_image = text_rectangle(date,
"data/fonts/refrigerator-deluxe-bold.otf",
100,
foreground_color='white',
height=calendar_cell_height*.25,
width=calendar_cell_width*.25)
if filtered_data := [row for row in data if row['datetime'].month == month and row['datetime'].day == date]:
# Gen square that has one game
if len (filtered_data) == 1:
game = filtered_data[0]
opponent_logo_path = Path(f"data/logos/{game['opponent'].lower()}.png")
if opponent_logo_path.exists():
opponent_logo = Image.open(f"data/logos/{game['opponent'].lower()}.png")
else:
opponent_logo = text_rectangle(text=game['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf")
is_home_game = game['homevisitor'] == "home"
if game.get('wood','').lower() == 'yes':
right_bottom_corner = baseball_bat
else:
right_bottom_corner = None
img = calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color=colors[game['field']],
left_top_corner = text_rectangle('H' if is_home_game else "A",
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='black' if is_home_game else 'white',
background_color='white' if is_home_game else 'black',
height=calendar_cell_height*.2,
width=calendar_cell_width*.2),
right_top_corner = date_text_image,
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
bottom_center = text_rectangle(f"{game['time']:%-I:%M}",
"data/fonts/refrigerator-deluxe-bold.otf",
120,
foreground_color='white',
height=calendar_cell_height*.25,
width=calendar_cell_width),
right_bottom_corner=right_bottom_corner
)
# img.show()
elif len(filtered_data) == 2:
game1, game2 = filtered_data[:2]
img = calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color='red',
left_top_corner = text_rectangle('DH',
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='black',
background_color='white',
height=calendar_cell_height*.2,
width=calendar_cell_width*.2),
right_top_corner = date_text_image,
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
bottom_center = text_rectangle(f"{game1['time']:%-I:%M} & {game2['time']:%-I:%M}",
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='white',
height=calendar_cell_height*.2,
width=calendar_cell_width),
)
pass
else:
img=calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color=(204,204,204,int(256*.85)),
right_top_corner = text_rectangle(date,
"data/fonts/refrigerator-deluxe-bold.otf",
100,
foreground_color='black',
height=calendar_cell_height*.25,
width=calendar_cell_width*.25)
)
pass
if date: month_image.paste(img, (img.size[0]*day_num, img.size[1]*week_num), img)
month_image.save(f'data/output/{year}-{month}.png')
# if (month, date) in games_lookup.keys() and not (month, date) == first_thursday:
# background_image = game_square([g for g in games if g['dtstart'].month==month and g['dtstart'].day==date])
# elif (month, date) in games_lookup.keys() and (month, date) == first_thursday:
# background_image = game_square(
# [g for g in games if g['dtstart'].month == month and g['dtstart'].day == date], special='open-mic')
# elif (month, date) == first_thursday:
# background_image = openmic_square(date)
# else:
# background_image = blank_square(date)
# if date: month_image.paste(background_image, (background_image.size[0]*day_num, background_image.size[1]*week_num), background_image)
# month_image.thumbnail((1000,1000))
# month_image.save(f'output/{year}-{month}.png')
# month_image.show()

View File

@@ -0,0 +1 @@
from .read import app

116
src/apps/read/read.py Normal file
View File

@@ -0,0 +1,116 @@
import typer
from rich.table import Table, Column
from rich.console import Console
from rich.columns import Columns
from pathlib import Path
import csv
from ...utils.common import list_key_values, read_and_normalize_csv, import_gamebygame, aggregate_teams, aggregate_teams_by_season
from ...utils.normalize import normalize_header_key
from typing import Annotated, List
app = typer.Typer()
@app.command("list-values")
def print_values_for_key(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
key: str = typer.Argument(..., help="The key to retrieve to generate list.")
):
# Read CSV data
data = []
for f in input_file:
data.extend(read_and_normalize_csv(f))
values = list_key_values(data, key)
console = Console()
table = Table(show_header=False, title=f'Values for "{key.title()}" ({len(values)})')
table.add_column("Values")
# breakpoint()
for value in sorted(values):
table.add_row(value)
console.print(table)
@app.command("print")
def print_table(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")]
):
# Read CSV data
data = []
for f in input_file:
data.extend(read_and_normalize_csv(f))
console = Console()
table = Table()
keys = data[0].keys()
for key in keys:
table.add_column(key)
# breakpoint()
for row in data:
table.add_row(*[row[key] for key in keys])
console.print(table)
@app.command()
def standings(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
):
# Read CSV data
data=[]
for f in input_file:
data.extend(import_gamebygame(f))
aggregate_team_data = aggregate_teams(data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
table.add_column("GP", style="bold")
table.add_column("Wins", style="bold")
table.add_column("Losses", style="bold")
table.add_column("Ties", style="bold")
table.add_column("Runs For", style="bold")
table.add_column("Runs Against", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(team_stats["gp"]),
str(team_stats["win"]),
str(team_stats["loss"]),
str(team_stats["tie"]),
str(team_stats["runs_for"]),
str(team_stats["runs_against"]),
)
console.print(table)
@app.command()
def seasons(
input_file: Annotated[List[typer.FileText], typer.Argument(..., help="Path(s) to the CSV file")],
):
# Read CSV data
data=[]
for f in input_file:
data.extend(read_and_normalize_csv(f))
aggregate_team_data = aggregate_teams_by_season(data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(", ".join(sorted(team_stats["seasons"])))
)
console.print(table)

View File

@@ -0,0 +1,81 @@
import csv
import re
from typing import List, Dict
from dateutil import parser
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.table import Table, Column
from rich.columns import Columns
import typer
# from .utils.common import normalize_header_key, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv
# validate_csv_header
app = typer.Typer()
@app.command()
def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")):
# Validate CSV header
header = next(csv.reader(open(file_path, "r")))
normalized_header = [normalize_header_key(key) for key in header]
if not validate_csv_header(header):
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
return
# Read CSV data
data = read_csv(file_path)
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
processed_data = process_data(data, visitor_home_order_reversed)
aggregate_team_data = aggregate_teams(processed_data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
table.add_column("Wins", style="bold")
table.add_column("Losses", style="bold")
table.add_column("Ties", style="bold")
table.add_column("Runs For", style="bold")
table.add_column("Runs Against", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(team_stats["win"]),
str(team_stats["loss"]),
str(team_stats["tie"]),
str(team_stats["runs_for"]),
str(team_stats["runs_against"]),
)
console.print(table)
# Write processed CSV data back to a new file
# output_file_path = file_path.with_suffix(".processed.csv")
# write_csv(output_file_path, data)
# typer.echo(f"Processed data written to: {output_file_path}")
# @app.command()
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
if not is_regex:
regex = re.compile(fr"^{match}$")
else:
regex = re.compile(fr"{match}")
for row in data:
row[key] = regex.sub(replace, row[key])
return data
def add_key_values(data: List[Dict], key, value:str):
for row in data:
row[key] = value
return data
if __name__ == "__main__":
app()

0
src/utils/__init__.py Normal file
View File

View File

@@ -1,40 +1,71 @@
import csv
import re
from typing import List, Dict
from typing import List, Dict, Union, TextIO
from io import TextIOBase
from dateutil import parser
from pathlib import Path
from rich.console import Console
from rich.table import Table
from .normalize import normalize_header_key, load_config, normalize_value, normalize_keyvalue, normalize_row
import datetime
def normalize_header_key(key: str) -> str:
key_mapping = {
"away": "visitor",
"results": "results",
"final score": "results",
"venue": "field",
"location":"field",
"result": "results",
"w":"win",
"l":"loss",
"t":"tie",
"div":"division",
"rf":"runs_for",
"runs":"runs_against"
}
return key_mapping.get(key.lower().strip(), key.lower().strip())
def list_key_values(data: List[Dict], key):
if key.lower() == "team":
key = "team"
else:
key = key
def validate_csv_header(header: List[str]) -> bool:
required_keys = ["date", "time", "field", "visitor", "home", "results"]
normalized_header = [normalize_header_key(key) for key in header]
return all(key in normalized_header for key in required_keys)
if key != "team" or "team" in data[0].keys():
output = {row.get(key) for row in data}
else:
output = {row.get('team') for row in data}
output = output | {row.get('home') for row in data}
output = output | {row.get('visitor') for row in data}
output.discard(None)
return output
def read_csv(file_path: Path) -> List[dict]:
data = []
with open(file_path, "r", newline="") as csvfile:
reader = csv.DictReader(csvfile)
def read_and_normalize_csv(input_file: Union[List[TextIO], List[Path], TextIO, Path]) -> List[dict]:
"""
Reads CSV file(s) from the provided input file path(s) or file object(s),
and returns a list of dictionaries with normalized keys and values
where each dictionary represents a row in the CSV.
Parameters:
input_file (Union[List[TextIO], List[Path], TextIO, Path]):
Either a single file path (as a string or Path object) or a list of file paths,
or a single file object (opened in text mode) or a list of file objects.
If a list is provided, each element should be either a file path or a file object.
Returns:
List[dict]:
A list of dictionaries where each dictionary represents a row in the CSV.
Keys in the dictionaries correspond to column names, and values correspond to cell values.
"""
normalization_config = load_config()
result_data = []
if isinstance(input_file, list):
file_list = input_file
else:
file_list = [input_file]
for f in file_list:
if isinstance(f, Path):
f = f.open()
reader = csv.DictReader(f)
for row in reader:
normalized_row = {normalize_header_key(key): value.strip() for key, value in row.items()}
data.append(normalized_row)
normalized_row = normalize_row(row, normalization_config)
result_data.append(normalized_row)
return result_data
def personalize_data_for_team(data:List[dict], target_team:str):
for row in data:
if row.get('home') == target_team:
row['homevisitor'] = 'home'
row['opponent'] = row.get('visitor')
elif row.get('visitor') == target_team:
row['homevisitor'] = 'visitor'
row['opponent'] = row.get('home')
return data
def write_csv(file_path: Path, data: List[dict]) -> None:
@@ -106,14 +137,32 @@ def is_visitor_home_order_reversed(header: List[str]) -> bool:
header (List[str]): The list of header keys.
Returns:
bool: True if the 'home' key comes before the 'visitor' key, indicating reversed order.
bool: True if the 'home' key comes before the 'visitor' key, indicating reversed order.
Returns False if nothing needs to be done to the data.
"""
return header.index('visitor') > header.index('home')
def process_data(data: List[Dict], visitor_home_order_reversed = False) -> List[Dict]:
if 'visitor' in header and 'home' in header:
return header.index('visitor') > header.index('home')
else:
return KeyError
def parse_datetime(data: List[Dict]):
for row in data:
parsed_score = parse_score(row["results"], visitor_home_order_reversed)
if isinstance(row.get('date'), datetime.datetime) and isinstance(roq.get('time'), datetime.time):
row['datetime']
try:
row['datetime'] = parser.parse(f"{row.get('date')} {row.get('time')}")
except parser.ParserError as e:
raise e
return data
def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
if isinstance(data, TextIOBase) or isinstance(data, Path) :
data = read_and_normalize_csv(data)
header = data[0].keys()
visitor_home_order_reversed = is_visitor_home_order_reversed(list(header))
for row in data:
parsed_score = parse_score(row.get("results",''), visitor_home_order_reversed)
row.update(parsed_score)
try:
row['datetime'] = parser.parse(f"{row['date']} {row['time']}")
@@ -169,6 +218,27 @@ def aggregate_teams(data: List[Dict[str, str]]) -> List[Dict[str, int]]:
return sorted_aggregated_data
def aggregate_teams_by_season(data: List[Dict[str, str]]) -> List[Dict[str, int]]:
team_stats = {}
for row in data:
for team_key in ["home", "visitor", "team"]:
# team = row.get(team_key)
if (team := row.get(team_key)) and (season := row.get('season')):
team_stats.setdefault(team, {"seasons": set()})
# breakpoint()
team_stats[team]['seasons'].update({season})
# Convert team_stats dictionary to a list of dictionaries
aggregated_data = [{"team": team, **stats} for team, stats in team_stats.items()]
# Sort the list by team name
sorted_aggregated_data = sorted(aggregated_data, key=lambda x: x["team"])
return sorted_aggregated_data
def write_sportspress_csv(data: List[Dict], file_path: Path, only_with_outcome:bool = False):
"""
Writes sports event data to a CSV file in a specific format.

0
src/utils/csv.py Normal file
View File

76
src/utils/normalize.py Normal file
View File

@@ -0,0 +1,76 @@
import toml
from typing import List, Dict
DEFAULT_NORMALIZATION_PATH="normalization.toml"
import re
from dateutil import parser
import datetime
def load_config(normalization_config_path=DEFAULT_NORMALIZATION_PATH):
with open(normalization_config_path, "r") as config_file:
config = toml.load(config_file)
return config
def normalize_header_key(original_key: str, normalization_config) -> str:
original_key = original_key.strip().lower()
for key, mappings in normalization_config.items():
if original_key.lower() in [potential_key.lower() for potential_key in mappings.get('potential_keys',[])]:
normalized_key = key
return normalized_key
return original_key
return key_mapping.get(key.lower().strip(), key.lower().strip())
def normalize_value(value, key, normalization_config):
value = value.strip()
for value in normalization_config.get(key,{}).get('values',[]):
if value in value["original"]:
value = value["normalized"]
match key.lower():
case "date":
if value:
value = parser.parse(value).date()
else:
pass
case "home":
value = value.title()
case "away":
value = value.title()
case "time":
if value:
value = parser.parse(value).time()
else:
pass
case _:
# Handle other cases
pass
return value
def normalize_keyvalue(key: str,value:str, normalization_config):
key, value = key.strip(), value.strip()
normalized_k = normalize_header_key(key, normalization_config)
normalized_v = normalize_value(value, key, normalization_config)
return normalized_k, normalized_v
def normalize_row(row:dict, normalization_config):
result_row = {}
for key, value in row.items():
normalized_key, normalized_value = normalize_keyvalue(key, value, normalization_config)
result_row[normalized_key] = normalized_value
if (isinstance(result_row.get('date'), datetime.date)) and (isinstance(result_row.get('time'), datetime.time)):
result_row['datetime'] = datetime.datetime.combine(result_row.get('date'), result_row.get('time'))
elif (isinstance(result_row.get('date'), datetime.date)) and not (result_row.get('time')):
result_row['datetime'] = result_row.get('date')
if '' in result_row.keys() and not result_row.get(''):
del result_row['']
return result_row
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
if not is_regex:
regex = re.compile(fr"^{match}$")
else:
regex = re.compile(fr"{match}")
for row in data:
row[key] = regex.sub(replace, row[key])
return data

105
src/utils/sportspress.py Normal file
View File

@@ -0,0 +1,105 @@
from typing import List, Dict
from pathlib import Path
import csv
REQUIRED_KEYS=["date", "time", "field", "visitor", "home"]
def validate_keys(header: List[str]) -> bool:
required_keys = REQUIRED_KEYS
return all(key in header for key in required_keys)
def write_sportspress_csv(data: List[Dict], file: Path, only_with_outcome:bool = False):
"""
Writes sports event data to a CSV file in a specific format.
Parameters:
- data (List[Dict]): List of dictionaries where each dictionary represents a sports event.
- file_path (Path): The Path object representing the file path where the CSV file will be created.
- only_with_outcome (bool, optional): If True, only events with outcomes will be included in the CSV. Default is False.
Returns:
None
Example:
>>> data = [...] # List of dictionaries representing sports events
>>> file_path = Path("output.csv")
>>> write_sportspress_csv(data, file_path)
"""
header = data[0].keys()
if not validate_keys(header):
raise KeyError(f"Missing Keys. Requires: {REQUIRED_KEYS}, provided: {list(header)}")
writer = csv.writer(file)
fieldnames = [
"Format", #Competitive or Friendly
# "Competition",
"Season",
# "Date Format",
"Date",
"Time",
"Venue",
"Team",
"Results",
"Outcome",
# "Players",
# "Performance",
]
# Write the header
writer.writerow(fieldnames)
# Write the data
for row in data:
if only_with_outcome and not row.get('has_result'):
continue
writer.writerow(
[
row["datetime"].strftime("%Y/%m/%d"),
row["datetime"].strftime("%H:%M"),
row.get("field", ""),
row["home"],
"|".join([str(row.get(k,"")) for k in [
"home_runs_for_inning_1",
"home_runs_for_inning_2",
"home_runs_for_inning_3",
"home_runs_for_inning_4",
"home_runs_for_inning_5",
"home_runs_for_inning_6",
"home_runs_for_inning_7",
"home_runs_for_inning_8",
"home_runs_for_inning_9",
"home_runs_for_inning_10",
"home_runs_for",
"home_errors",
"home_hits"
]]),
row.get("home_outcome")
]
)
writer.writerow(
[
"",
"",
"",
row["visitor"],
"|".join([str(row.get(k,"")) for k in [
# "visitor_runs_for_inning_1",
# "visitor_runs_for_inning_2",
# "visitor_runs_for_inning_3",
# "visitor_runs_for_inning_4",
# "visitor_runs_for_inning_5",
# "visitor_runs_for_inning_6",
# "visitor_runs_for_inning_7",
# "visitor_runs_for_inning_8",
# "visitor_runs_for_inning_9",
# "visitor_runs_for_inning_10",
"visitor_runs_for",
"visitor_errors",
"visitor_hits"
]]),
row.get("visitor_outcome")
]
)

View File

@@ -1,7 +1,9 @@
import unittest
from pathlib import Path
# from convert_to_sportspress
from convert_to_sportspress.utils import validate_csv_header, normalize_header_key, read_csv, parse_score, is_visitor_home_order_reversed, process_data, aggregate_teams
from src.utils.common import validate_csv_header, normalize_header_key, read_and_normalize_csv, parse_score, is_visitor_home_order_reversed, import_gamebygame, aggregate_teams
from src.utils.normalize import normalize_value, normalize_header_key, load_config
import toml
class TestConvertToSportsPress(unittest.TestCase):
def setUp(self):
@@ -23,11 +25,11 @@ class TestConvertToSportsPress(unittest.TestCase):
def test_read_csv(self):
# Assuming that the CSV file has a valid header
with self.subTest("Read CSV data"):
data = read_csv(self.test_csv_path_2009)
data = read_and_normalize_csv(self.test_csv_path_2009)
self.assertIsInstance(data, list)
self.assertTrue(all(isinstance(row, dict) for row in data))
with self.subTest("Normalized keys"):
normalized_data = read_csv(self.test_csv_path_2009)
normalized_data = read_and_normalize_csv(self.test_csv_path_2009)
self.assertTrue(all("visitor" in row.keys() and "results" in row.keys() for row in normalized_data))
def test_parse_score_visitor_first(self):
@@ -115,8 +117,8 @@ class TestConvertToSportsPress(unittest.TestCase):
def test_process_data(self):
# Assuming that the CSV file has a valid header and read_csv is good
data = read_csv(self.test_csv_path_2009)
processed_data = process_data(data)
data = read_and_normalize_csv(self.test_csv_path_2009)
processed_data = import_gamebygame(data)
aggregate_team_data = aggregate_teams(processed_data)
expected_result = [
{"team": "Marlins", "gp": 28, "win": 23, "loss": 5, "tie": 0, "pts": 46, "runs_for": 249, "runs_against": 117},
@@ -139,7 +141,85 @@ class TestConvertToSportsPress(unittest.TestCase):
with self.subTest(f'Results of "{team}"'):
self.assertDictContainsSubset(aggregate_team_data_dict, expected_dict)
class TestNormalization(unittest.TestCase):
def test_normalize_key(self):
header_key_normalization = {
"date": {"potential_keys": ["Date", "EventDate"]},
"time": {"potential_keys": ["Time", "EventTime"]},
"visitor": {"potential_keys": ["Away"]},
"field":
{
"potential_keys": ["Field", "Location", "Venue"],
"values": [{"original": ["Winnemac"], "normalized": "Winnemac Park"}],
}
}
# Test cases for normalize_key function
self.assertEqual(normalize_header_key("Date", header_key_normalization), "date")
self.assertEqual(normalize_header_key("Time", header_key_normalization), "time")
self.assertEqual(normalize_header_key("Venue", header_key_normalization), "field")
self.assertEqual(normalize_header_key("Away", header_key_normalization), "visitor")
def test_load_config_file(self):
expected = {
"win": {"potential_keys": ["w", "wins"]},
"loss": {"potential_keys": ["l", "losses"]},
"tie": {"potential_keys": ["t", "ties"]},
"points": {"potential_keys": ["pts.", "pts", "pt"]},
"runs_for": {"potential_keys": ["rf", "rs"]},
"runs_against": {"potential_keys": ["ra"]},
"division": {"potential_keys": ["div"]},
"date": {"potential_keys": ["Date", "EventDate"]},
"time": {"potential_keys": ["Time", "EventTime"]},
"visitor": {"potential_keys": ["Away"]},
"field": {
"potential_keys": ["Field", "Location", "Venue"],
"values": [{"original": ["Winnemac"], "normalized": "Winnemac Park"}],
},
"results": {"potential_keys": ["Final Score", "Score", "Result", "Outcome"]},
"team": {
"values": [
{
"original": ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"],
"normalized": "Hounds",
},
{"original": ["Chicago Red Sox"], "normalized": "Red Sox"},
]
},
}
config_path = "normalization.toml"
config = load_config(config_path)
self.assertEqual(config, expected)
def test_normalize_value(self):
# Test cases for normalize_value function
team_normalization = {
"team": {
"values": [
{
"original": ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"],
"normalized": "Hounds",
},
{"original": ["Chicago Red Sox"], "normalized": "Red Sox"},
]
}
}
# Test case with normalization
self.assertEqual(normalize_value("Chicago Hounds", 'team', team_normalization), "Hounds")
# Test case without title case normalization
# self.assertEqual(normalize_value("red sox", team_normalization, 'team'), "Red Sox")
# Add more test cases for other values
if __name__ == '__main__':
unittest.main()
if __name__ == "__main__":
unittest.main()