import csv import math import multiprocessing as mp import os from datetime import datetime import matplotlib.patches as patches import matplotlib.pyplot as plt import numpy as np import pandas as pd from matplotlib.colors import LinearSegmentedColormap from competition_page import get_competition_page_html from team_page import get_team_page_html from tools import * from world_ranking_list import get_world_ranking_html # Constants for Elo calculation K_FACTOR = 15 INITIAL_ELO = 1500 # Starting Elo for all teams IMAGE_FORMAT = "webp" def expected_score(rating_a, rating_b): """Calculate the expected score for a player with rating_a against a player with rating_b""" return 1 / (1 + math.pow(10, (rating_b - rating_a) / 400)) class EloSystem: def __init__(self, csv_directory, output_directory, competition_type): self.competition_type = competition_type self.events_directory = os.path.join(csv_directory, "events") self.competition_meta_data_directory = csv_directory self.output_directory = output_directory self.teams = {} # Dictionary to store team Elo ratings self.match_history = [] # List to store all match results for history tracking self.competition_history = {} # Dictionary to store competition results self.initialize_output_directory() self.competition_meta_data = self.load_competition_meta_data() if self.competition_type == "electric": self.BORDER_COLOR = "#ffff1e" self.BORDER_COLOR_DARK = "#f9f918" self.LINK_COLOR = "#ffc400" self.LINK_COLOR_HOVER = "#cc9d00" elif self.competition_type == "combustion": self.BORDER_COLOR = "#ff8c00" self.BORDER_COLOR_DARK = "#f98700" self.LINK_COLOR = "#ff4500" self.LINK_COLOR_HOVER = "#cc3700" else: raise with open("header-snippet.html") as header_snippet: self.header_snippet = header_snippet.read() self.IMAGE_FORMAT = IMAGE_FORMAT def load_competition_meta_data(self): if self.competition_type == "combustion": meta_data = pd.read_csv(os.path.join(self.competition_meta_data_directory, "FSC.csv")) elif self.competition_type == "electric": meta_data = pd.read_csv(os.path.join(self.competition_meta_data_directory, "FSE.csv")) else: raise meta_data.sort_values(by=['id']) return meta_data def initialize_output_directory(self): """Create output directory if it doesn't exist""" os.makedirs(self.output_directory, exist_ok=True) os.makedirs(os.path.join(self.output_directory, 'images'), exist_ok=True) os.makedirs(os.path.join(self.output_directory, 'team'), exist_ok=True) os.makedirs(os.path.join(self.output_directory, 'comp'), exist_ok=True) def get_competition_name(self, competition_id): return self.competition_meta_data[self.competition_meta_data["id"] == int(competition_id)]['abbr'].values[0].strip() def get_full_competition_name(self, competition_id): return self.competition_meta_data[self.competition_meta_data["id"] == int(competition_id)]['name'].values[0].strip() def get_competition_date(self, competition_id): date_string = self.competition_meta_data[self.competition_meta_data["id"] == int(competition_id)]['date'].values[0] return datetime.strptime(date_string, "%Y-%m-%d") def get_competition_type(self, competition_id): return self.competition_meta_data[self.competition_meta_data["id"] == int(competition_id)]['kind'].values[0].strip() def load_and_process_csv_files(self): """Load all CSV files and process them to calculate Elo ratings""" csv_files = self.competition_meta_data csv_files = csv_files.sort_values('date') csv_files = csv_files['id'].tolist() csv_files = [os.path.join(self.events_directory, str(x) + ".csv") for x in csv_files] print(f"Found {len(csv_files)} CSV files to process") # Process each CSV file (representing a competition) for csv_file in csv_files: print(f"Processing {os.path.basename(csv_file)}...") competition_id = os.path.basename(csv_file).split(".")[0] teams_in_competition = [] # Read the CSV file try: with open(csv_file, 'r', newline='', encoding='utf-8') as file: reader = csv.reader(file) skipped = False for row in reader: if not skipped: skipped = True continue if not row: # Skip empty rows continue rank = row[3].strip() team_name = row[0].strip() kind = row[2].strip() if kind.lower() == self.competition_type: # Initialize the team if not seen before if team_name not in self.teams: self.teams[team_name] = { 'elo': INITIAL_ELO, 'competitions': 0, 'history': [], 'delta_elo': 0 } teams_in_competition.append([rank, team_name, None, None]) except FileNotFoundError: print(f"file {csv_file} not found: skipping") continue # Sort teams by rank teams_in_competition.sort(key=lambda x: int(x[0])) # Store competition results date = self.get_competition_date(os.path.basename(csv_file).split(".")[0]) self.competition_history[competition_id] = { 'date': date, 'results': teams_in_competition } # Extract just team names in order team_names = [team[1] for team in teams_in_competition] # Update competition count for each team for team_name in team_names: self.teams[team_name]['competitions'] += 1 # Update Elo ratings based on this competition's results self.update_elo_for_competition(team_names, competition_id) def update_elo_for_competition(self, teams_in_order, competition_id): """Update Elo ratings for all teams in a competition""" # For each team, compare with teams ranked below them date = self.get_competition_date(competition_id) # Use current date as competition date old_world_ranking = sorted(self.teams.items(), key=lambda x: x[1]['elo'], reverse=True) old_world_ranking = tuple(name for name, _ in old_world_ranking) for i in range(len(teams_in_order)): for j in range(i + 1, len(teams_in_order)): team_a = teams_in_order[i] team_b = teams_in_order[j] # Team A is ranked higher than Team B self.update_elo_pair(team_a, team_b, 1, 0, competition_id, date) # Store post-competition Elo ratings and changes for team in teams_in_order: placement = teams_in_order.index(team) + 1 previous_elo = self.teams[team]['elo'] elo_change = self.teams[team]['delta_elo'] old_rank = old_world_ranking.index(team) + 1 new_rank = 0 # Store in team history self.teams[team]['history'].append({ 'date': date.strftime("%Y-%m-%d"), 'competition': self.get_competition_name(competition_id), 'competition_id': competition_id, 'placement': placement, 'participants': len(teams_in_order), 'previous_elo': previous_elo, 'new_elo': self.teams[team]['elo'] + self.teams[team]['delta_elo'], 'elo_change': elo_change, 'old_rank': old_rank, 'new_rank': new_rank, 'rank_change': new_rank - old_rank }) # Store in competition history self.competition_history[competition_id]['results'][placement-1][2] = previous_elo self.competition_history[competition_id]['results'][placement-1][3] = elo_change self.teams[team]['elo'] = self.teams[team]['elo'] + self.teams[team]['delta_elo'] self.teams[team]['delta_elo'] = 0 for team in teams_in_order: new_world_ranking = sorted(self.teams.items(), key=lambda x: x[1]['elo'], reverse=True) new_world_ranking = tuple(name for name, _ in new_world_ranking) new_rank = new_world_ranking.index(team) + 1 self.teams[team]['history'][-1]['new_rank'] = new_rank self.teams[team]['history'][-1]['rank_change'] = new_rank - self.teams[team]['history'][-1]['old_rank'] def update_elo_pair(self, team_a, team_b, score_a, score_b, competition_id, date): """Update Elo ratings for a pair of teams based on their match outcome""" # Get current Elo ratings rating_a = self.teams[team_a]['elo'] rating_b = self.teams[team_b]['elo'] # Calculate expected scores expected_a = expected_score(rating_a, rating_b) expected_b = expected_score(rating_b, rating_a) # Compute preliminary Elo ratings delta_rating_a = K_FACTOR * (score_a - expected_a) delta_rating_b = K_FACTOR * (score_b - expected_b) # Store match in history match_info = { 'date': date, 'competition': competition_id, 'team_a': team_a, 'team_b': team_b, 'old_elo_a': rating_a, 'old_elo_b': rating_b, 'delta_elo_a': delta_rating_a, 'delta_elo_b': delta_rating_b, 'score_a': score_a, 'score_b': score_b } self.match_history.append(match_info) # Update team preliminary Elo ratings self.teams[team_a]['delta_elo'] = self.teams[team_a]['delta_elo'] + delta_rating_a self.teams[team_b]['delta_elo'] = self.teams[team_b]['delta_elo'] + delta_rating_b def generate_website(self): """Generate static HTML website with Elo ratings and visualizations""" # If num_processes not specified, use number of CPU cores num_processes = int(mp.cpu_count()) print(f"staring computation on {num_processes} cores") pool = mp.Pool(processes=num_processes) self.create_world_ranking() #for competition in self.competition_history: # self.create_competition_page(competition) pool.map(self.create_competition_page, self.competition_history) pool.map(self.create_team_page, self.teams) self.create_visualizations() print(f"Website generated in {self.output_directory}") def create_world_ranking(self): """Create the index.html page for the vehicle type""" # Sort teams by Elo rating sorted_teams = sorted(self.teams.items(), key=lambda x: x[1]['elo'], reverse=True) html_content = get_world_ranking_html(self, sorted_teams) # Write the HTML file with open(os.path.join(self.output_directory, 'index.html'), 'w', encoding='utf-8') as f: f.write(html_content) def create_team_page(self, team_name): """Create a dedicated page for a specific team""" print(f"Generating Team site for {team_name}") team_data = self.teams[team_name] # Generate Elo history chart for this team if team_data['history']: self.create_team_elo_history_chart(team_name) # Filter history for this team's competitions team_competitions = sorted(team_data['history'], key=lambda x: x['date'], reverse=True) html_content = get_team_page_html(self, team_name, team_data, team_competitions) # Write the HTML file with open(os.path.join(self.output_directory, 'team', f'{clean_filename(team_name)}.html'), 'w', encoding='utf-8') as f: f.write(html_content) def create_competition_page(self, competition_id): """Create a dedicated page for a specific team""" print(f"Generating Competition site for {competition_id}") # Competition results as bar chart self.create_competition_result_chart(competition_id) html_content = get_competition_page_html(self, competition_id) # Write the HTML file with open(os.path.join(self.output_directory, 'comp', f'{str(competition_id)}.html'), 'w', encoding='utf-8') as f: f.write(html_content) def create_visualizations(self): """Create various visualizations for the website""" # Top teams Elo chart self.create_top_teams_chart() # Elo distribution histogram self.create_elo_distribution_chart() def create_top_teams_chart(self): """Create a bar chart of the top teams by Elo rating""" fig = plt.figure(figsize=(12, 8)) ax = fig.add_axes([0.3, 0.1, 0.6, 0.8]) # Get top 15 teams by Elo top_teams = sorted(self.teams.items(), key=lambda x: x[1]['elo'], reverse=True)[:15] team_names = [team[0] for team in top_teams] elo_ratings = [team[1]['elo'] for team in top_teams] # Create bar chart bars = plt.barh(team_names, elo_ratings, color='none') # Add Elo values at the end of each bar for i, bar in enumerate(bars): plt.text(bar.get_width() + 5, bar.get_y() + bar.get_height() / 2, f"{int(elo_ratings[i])}", va='center') # Create custom colormap colors = [hex_to_normalized_rgb(self.LINK_COLOR), hex_to_normalized_rgb(self.BORDER_COLOR)] # RGB values for #ffff1e and #ffc400 custom_yellow_cmap = LinearSegmentedColormap.from_list('custom_yellow', colors) for bar in bars: # Get the coordinates of the bar x0, y0 = bar.get_xy() width = bar.get_width() height = bar.get_height() # Create a gradient for the bar gradient = np.linspace(0, 1, 100).reshape(1, -1) # Get colors from the colormap colors = custom_yellow_cmap(gradient) # Set the bar's face color to the gradient bar.set_facecolor('none') # Remove default color # Add the gradient rectangle gradient_rect = patches.Rectangle((x0, y0), width, height, linewidth=0, fill=True, facecolor=custom_yellow_cmap(0.5)) # To create a gradient, add an axial gradient transform gradient_rect.set_facecolor('none') ax.add_patch(gradient_rect) # Create multiple thin rectangles to simulate a gradient steps = 50 for i in range(steps): left = x0 + (i/steps) * width rect_width = width/steps color = custom_yellow_cmap(i/steps) rect = patches.Rectangle((left, y0), rect_width, height, linewidth=0, color=color) ax.add_patch(rect) plt.xlabel('Elo Rating') plt.title('Top Teams by Elo Rating') # plt.tight_layout() # Save the chart plt.savefig(os.path.join(self.output_directory, 'images', 'top_teams_elo.' + self.IMAGE_FORMAT), dpi=100) plt.close() def create_elo_distribution_chart(self): """Create a histogram of Elo rating distribution""" plt.figure(figsize=(10, 6)) # Get all Elo ratings elo_ratings = [team_data['elo'] for team_data in self.teams.values()] # Create custom yellow gradient colormap (#ffff1e to #ffc400) colors = [hex_to_normalized_rgb(self.LINK_COLOR), hex_to_normalized_rgb(self.BORDER_COLOR)] custom_yellow_cmap = LinearSegmentedColormap.from_list('custom_yellow', colors) # Create histogram counts, bin_edges, _ = plt.hist(elo_ratings, bins=20, alpha=0) # Draw bars with gradients bin_width = bin_edges[1] - bin_edges[0] for i in range(len(counts)): # For each bar in the histogram left = bin_edges[i] bottom = 0 width = bin_width height = counts[i] # Create the gradient effect by stacking multiple thin rectangles gradient_steps = 50 for step in range(gradient_steps): # Calculate position and width of each thin rectangle rect_left = left rect_width = width # Calculate height of this piece of the gradient rect_bottom = bottom + (step / gradient_steps) * height rect_height = height / gradient_steps # Get color from our custom colormap color = custom_yellow_cmap(step / gradient_steps) # Add the rectangle patch rect = patches.Rectangle( (rect_left, rect_bottom), rect_width, rect_height, linewidth=0, facecolor=color, edgecolor=None ) plt.gca().add_patch(rect) # Add black edges to each bar for i in range(len(counts)): left = bin_edges[i] width = bin_width height = counts[i] rect = patches.Rectangle( (left, 0), width, height, fill=False, edgecolor='black', linewidth=1 ) plt.gca().add_patch(rect) # Set axis limits to make sure the entire histogram is visible plt.xlim(bin_edges[0], bin_edges[-1]) plt.ylim(0, max(counts) * 1.1) plt.xlabel('Elo Rating') plt.ylabel('Number of Teams') plt.title('Distribution of Team Elo Ratings') plt.grid(axis='y', alpha=0.75) plt.tight_layout() # Save the chart plt.savefig(os.path.join(self.output_directory, 'images', 'elo_distribution.' + self.IMAGE_FORMAT), dpi=100) plt.close() def create_team_elo_history_chart(self, team_name): """Create a line chart showing a team's Elo history over time""" team_history = self.teams[team_name]['history'] # Sort history by date team_history.sort(key=lambda x: x['date']) dates = [entry['date'] for entry in team_history] elo_values = [entry['new_elo'] for entry in team_history] ranks = [entry['new_rank'] for entry in team_history] # Create the first plot with elo_values fig, ax2 = plt.subplots(figsize=(12, 6)) ax2.plot(range(len(dates)), ranks, marker='none', linestyle='-', color='gray', label='Rank') ax2.set_ylabel('Rank', color='gray') ax2.tick_params(axis='y', labelcolor='gray') ax2.invert_yaxis() ax2.set_xticks(range(len(dates))) ax2.set_xticklabels(dates, rotation=45, ha='right') # ha='right' helps with alignment # Create a second y-axis with the same x-axis ax1 = ax2.twinx() ax1.plot(range(len(dates)), elo_values, marker='o', linestyle='-', color='black', label='ELO Values') ax1.set_xlabel('Dates') ax1.set_ylabel('ELO Rating', color='black') ax1.tick_params(axis='y', labelcolor='black') # Add competition labels for i, entry in enumerate(team_history): ax1.annotate(entry['competition'] + f" (#{entry['placement']})", (i, entry['new_elo']), textcoords="offset points", xytext=(0, 10), ha='center', fontsize=8, rotation=45) plt.title(f'{team_name} - Elo Rating History') ax2.grid(True, axis='x', alpha=0.3) ax1.grid(True, alpha=0.3) plt.tight_layout() # Save the chart plt.savefig(os.path.join(self.output_directory, 'images', f'{clean_filename(team_name)}_history.' + self.IMAGE_FORMAT), dpi=100) plt.close() def create_competition_result_chart(self, competition_id): """Create a bar chart of the top teams by Elo rating""" competition = self.competition_history[competition_id] team_names = [result[1] for result in competition['results']] team_names = team_names[::-1] elo_change = [result[3] for result in competition['results']] elo_change = elo_change[::-1] fig = plt.figure(figsize=(12, len(team_names)*0.2175+2)) ax = fig.add_axes([0.3, 0.1, 0.6, 0.8]) # Create bar chart bars = plt.barh(team_names, elo_change, color='none') # Add Elo values at the end of each bar for i, bar in enumerate(bars): plt.text(bar.get_width() + 5, bar.get_y() + bar.get_height() / 2, f"{int(elo_change[i])}", va='center') # Create custom colormap colors = [hex_to_normalized_rgb(self.LINK_COLOR), hex_to_normalized_rgb(self.BORDER_COLOR)] # RGB values for #ffff1e and #ffc400 custom_yellow_cmap = LinearSegmentedColormap.from_list('custom_yellow', colors) for bar in bars: # Get the coordinates of the bar x0, y0 = bar.get_xy() width = bar.get_width() height = bar.get_height() # Create a gradient for the bar gradient = np.linspace(0, 1, 100).reshape(1, -1) # Get colors from the colormap colors = custom_yellow_cmap(gradient) # Set the bar's face color to the gradient bar.set_facecolor('none') # Remove default color # Add the gradient rectangle gradient_rect = patches.Rectangle((x0, y0), width, height, linewidth=0, fill=True, facecolor=custom_yellow_cmap(0.5)) # To create a gradient, add an axial gradient transform gradient_rect.set_facecolor('none') ax.add_patch(gradient_rect) # Create multiple thin rectangles to simulate a gradient steps = 50 for i in range(steps): left = x0 + (i/steps) * width rect_width = width/steps color = custom_yellow_cmap(i/steps) rect = patches.Rectangle((left, y0), rect_width, height, linewidth=0, color=color) ax.add_patch(rect) plt.xlabel('Elo Gain') plt.title('Elo Gain per Team') # Save the chart plt.savefig(os.path.join(self.output_directory, 'images', competition_id + '_results.' + self.IMAGE_FORMAT), dpi=100) plt.close() def optimize_k(self): self.load_and_process_csv_files() error_history = [] for comp in self.competition_history: if self.get_competition_name(comp) in ("DE", "EA", "AT", "NL", "AA", "ES", "UK", "IT", "CZ", "CH"): if 2020 > self.get_competition_date(comp).year > 2016: results = self.competition_history[comp]['results'] error = 0 for result in results: error = error + abs(result[3] / K_FACTOR) error = error / len(results) error_history.append(error) pass # calculate RMSE of the list of error squared_errors = [error ** 2 for error in error_history] mean_squared_error = sum(squared_errors) / len(squared_errors) rmse = math.sqrt(mean_squared_error) print(f"K of {K_FACTOR} results in a prediction RMSE of {rmse}") def run_optimize_k(): elo_system = EloSystem('csv_files', 'public/elo_website_combustion', "combustion") elo_system.optimize_k() def main(): """Main function to run the Elo ranking system""" import argparse parser = argparse.ArgumentParser(description='Process competition results and generate Elo rankings website') parser.add_argument('--input', default='csv_files', help='Directory containing CSV result files') parser.add_argument('--output', default='public/elo_website', help='Output directory for the website') args = parser.parse_args() # Create and run the Elo system elo_system = EloSystem(args.input, args.output + "_combustion", "combustion") elo_system.load_and_process_csv_files() elo_system.generate_website() elo_system = EloSystem(args.input, args.output + "_electric", "electric") elo_system.load_and_process_csv_files() elo_system.generate_website() if __name__ == "__main__": main()