239 lines
8.4 KiB
Python
239 lines
8.4 KiB
Python
import csv
|
|
import re
|
|
from typing import List, Dict
|
|
from dateutil import parser
|
|
from pathlib import Path
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table, Column
|
|
from rich.columns import Columns
|
|
import typer
|
|
from .utils import normalize_header_key, validate_csv_header, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv
|
|
|
|
app = typer.Typer()
|
|
|
|
|
|
@app.command()
|
|
def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")):
|
|
# Validate CSV header
|
|
header = next(csv.reader(open(file_path, "r")))
|
|
normalized_header = [normalize_header_key(key) for key in header]
|
|
if not validate_csv_header(header):
|
|
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
|
|
return
|
|
|
|
# Read CSV data
|
|
data = read_csv(file_path)
|
|
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
|
|
processed_data = process_data(data, visitor_home_order_reversed)
|
|
aggregate_team_data = aggregate_teams(processed_data)
|
|
|
|
# Display aggregated data as a table
|
|
console = Console()
|
|
table = Table(title="Aggregated Team Data")
|
|
table.add_column("Team", style="bold")
|
|
table.add_column("Wins", style="bold")
|
|
table.add_column("Losses", style="bold")
|
|
table.add_column("Ties", style="bold")
|
|
table.add_column("Runs For", style="bold")
|
|
table.add_column("Runs Against", style="bold")
|
|
|
|
for team_stats in aggregate_team_data:
|
|
table.add_row(
|
|
team_stats["team"],
|
|
str(team_stats["win"]),
|
|
str(team_stats["loss"]),
|
|
str(team_stats["tie"]),
|
|
str(team_stats["runs_for"]),
|
|
str(team_stats["runs_against"]),
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
# Write processed CSV data back to a new file
|
|
# output_file_path = file_path.with_suffix(".processed.csv")
|
|
# write_csv(output_file_path, data)
|
|
# typer.echo(f"Processed data written to: {output_file_path}")
|
|
|
|
@app.command()
|
|
def sportspress_csv(file_path: Path = typer.Argument(..., help="Path to the CSV file"), file_output_path: Path = typer.Argument(..., help="Path to the output CSV file"), only_with_outcome: bool = typer.Option(default=True, is_flag=True, help="")):
|
|
# Validate CSV header
|
|
header = next(csv.reader(open(file_path, "r")))
|
|
normalized_header = [normalize_header_key(key) for key in header]
|
|
if not validate_csv_header(header):
|
|
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
|
|
return
|
|
|
|
# Read CSV data
|
|
data = read_csv(file_path)
|
|
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
|
|
processed_data = process_data(data, visitor_home_order_reversed)
|
|
|
|
write_sportspress_csv(processed_data, file_output_path, only_with_outcome)
|
|
typer.echo(f"Output to {file_output_path}")
|
|
|
|
def list_key_values(data: List[Dict], key):
|
|
if key.lower() == "team":
|
|
normalized_key = "team"
|
|
else:
|
|
normalized_key = normalize_header_key(key)
|
|
|
|
if normalized_key != "team" or "team" in data[0].keys():
|
|
output = {row.get(normalized_key) for row in data}
|
|
else:
|
|
output = {row.get('home') for row in data}
|
|
output = output | {row.get('visitor') for row in data}
|
|
return output
|
|
|
|
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
|
|
if not is_regex:
|
|
regex = re.compile(fr"^{match}$")
|
|
else:
|
|
regex = re.compile(fr"{match}")
|
|
|
|
for row in data:
|
|
row[key] = regex.sub(replace, row[key])
|
|
|
|
return data
|
|
|
|
def add_key_values(data: List[Dict], key, value:str):
|
|
for row in data:
|
|
row[key] = value
|
|
|
|
return data
|
|
|
|
clean_app = typer.Typer()
|
|
@clean_app.command("list")
|
|
def print_values_for_key(file_path: Path = typer.Argument(..., help="Path to the CSV file"), key: str = typer.Argument(..., help="")):
|
|
# Read CSV data
|
|
data = read_csv(file_path)
|
|
processed_data = list_key_values(data, key)
|
|
|
|
console = Console()
|
|
table = Table(show_header=False)
|
|
table.add_column("Values")
|
|
|
|
for value in sorted(processed_data):
|
|
table.add_row(value)
|
|
|
|
console.print(table)
|
|
|
|
@clean_app.command("replace")
|
|
def replace_values_for_key(
|
|
file_path: Path = typer.Argument(..., help="Path to the CSV file"),
|
|
key: str = typer.Argument(..., help=""),
|
|
match: str = typer.Argument(..., help=""),
|
|
replace: str = typer.Argument(..., help=""),
|
|
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
|
|
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
|
|
match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.")
|
|
):
|
|
|
|
if in_place and output_file:
|
|
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
|
|
raise typer.Abort()
|
|
|
|
if key.lower() == "team":
|
|
normalized_key = "team"
|
|
else:
|
|
normalized_key = normalize_header_key(key)
|
|
|
|
console = Console()
|
|
|
|
# Read CSV data
|
|
data = read_csv(file_path)
|
|
|
|
before_table = Table(Column(), show_header=False, title="Before")
|
|
for value in sorted(list_key_values(data, key)):
|
|
before_table.add_row(value)
|
|
|
|
|
|
after_table = Table( Column(), show_header=False, title="After")
|
|
|
|
if normalized_key != "team" or "team" in data[0].keys():
|
|
data = replace_key_values(data, normalized_key, match, replace, match_is_regex)
|
|
else:
|
|
data=replace_key_values(data, "home", match, replace, match_is_regex)
|
|
data=replace_key_values(data, "visitor", match, replace, match_is_regex)
|
|
|
|
for value in sorted(list_key_values(data, key)):
|
|
after_table.add_row(value)
|
|
|
|
panel = Panel(
|
|
Columns([before_table, after_table]),
|
|
title="Replace"
|
|
)
|
|
|
|
console.print(panel)
|
|
|
|
if in_place and typer.confirm("Perform Replacement in-place?"):
|
|
with file_path.open('w') as f:
|
|
fieldnames = data[0].keys()
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(data)
|
|
|
|
elif output_file:
|
|
if output_file.is_dir():
|
|
output_file = output_file.joinpath(file_path.name)
|
|
if typer.confirm(f"Write to {output_file}?"):
|
|
with output_file.open('w') as f:
|
|
fieldnames = data[0].keys()
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(data)
|
|
|
|
@clean_app.command("add-key")
|
|
def add_values_for_key(
|
|
file_path: Path = typer.Argument(..., help="Path to the CSV file"),
|
|
key: str = typer.Argument(..., help=""),
|
|
value: str = typer.Argument("", help=""),
|
|
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
|
|
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
|
|
):
|
|
|
|
if in_place and output_file:
|
|
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
|
|
raise typer.Abort()
|
|
|
|
# Validate CSV header
|
|
header = next(csv.reader(open(file_path, "r")))
|
|
normalized_header = [normalize_header_key(key) for key in header]
|
|
if key.lower() == "team":
|
|
normalized_key = "team"
|
|
else:
|
|
normalized_key = normalize_header_key(key)
|
|
if not validate_csv_header(header):
|
|
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
|
|
return
|
|
|
|
console = Console()
|
|
|
|
# Read CSV data
|
|
data = read_csv(file_path)
|
|
|
|
data = add_key_values(data, key, value)
|
|
|
|
if in_place and typer.confirm("Perform Replacement in-place?"):
|
|
with file_path.open('w') as f:
|
|
fieldnames = data[0].keys()
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(data)
|
|
|
|
elif output_file:
|
|
if output_file.is_dir():
|
|
output_file = output_file.joinpath(file_path.name)
|
|
if typer.confirm(f"Write to {output_file}?"):
|
|
with output_file.open('w') as f:
|
|
fieldnames = data[0].keys()
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(data)
|
|
|
|
|
|
app.add_typer(clean_app, name="clean")
|
|
|
|
if __name__ == "__main__":
|
|
app()
|