from clearml import Task from tqdm import tqdm from src.policies.simple_baseline import BaselinePolicy import pandas as pd import numpy as np import torch import plotly.express as px from src.utils.imbalance_price_calculator import ImbalancePriceCalculator class PolicyEvaluator: def __init__(self, baseline_policy: BaselinePolicy, task: Task = None): self.baseline_policy = baseline_policy self.ipc = ImbalancePriceCalculator(data_path="") self.dates = baseline_policy.test_data["DateTime"].dt.date.unique() self.dates = pd.to_datetime(self.dates) ### Load Imbalance Prices ### imbalance_prices = pd.read_csv("data/imbalance_prices.csv", sep=";") imbalance_prices["DateTime"] = pd.to_datetime( imbalance_prices["DateTime"], utc=True ) self.imbalance_prices = imbalance_prices.sort_values(by=["DateTime"]) self.penalties = [0, 100, 300, 500, 800, 1000, 1500] self.profits = [] self.task = task def get_imbanlance_prices_for_date(self, date): imbalance_prices_day = self.imbalance_prices[ self.imbalance_prices["DateTime"].dt.date == date ] return imbalance_prices_day["Positive imbalance price"].values def evaluate_for_date( self, date, idx_samples, test_loader, charge_thresholds=np.arange(-100, 250, 25), discharge_thresholds=np.arange(-100, 250, 25), ): idx = test_loader.dataset.get_idx_for_date(date.date()) print("Evaluated for idx: ", idx) (initial, samples) = idx_samples[idx] if len(initial.shape) == 2: initial = initial.cpu().numpy()[0][-1] else: initial = initial.cpu().numpy()[-1] samples = samples.cpu().numpy() initial = np.repeat(initial, samples.shape[0]) combined = np.concatenate((initial.reshape(-1, 1), samples), axis=1) reconstructed_imbalance_prices = ( self.ipc.get_imbalance_prices_2023_for_date_vectorized(date, combined) ) reconstructed_imbalance_prices = torch.tensor( reconstructed_imbalance_prices, device="cuda" ) real_imbalance_prices = self.get_imbanlance_prices_for_date(date.date()) for penalty in self.penalties: found_charge_thresholds, found_discharge_thresholds = ( self.baseline_policy.get_optimal_thresholds( reconstructed_imbalance_prices, charge_thresholds, discharge_thresholds, penalty, ) ) predicted_charge_threshold = found_charge_thresholds.mean(axis=0) predicted_discharge_threshold = found_discharge_thresholds.mean(axis=0) ### Determine Profits and Charge Cycles ### simulated_profit, simulated_charge_cycles = self.baseline_policy.simulate( torch.tensor([[real_imbalance_prices]]), torch.tensor([predicted_charge_threshold]), torch.tensor([predicted_discharge_threshold]), ) self.profits.append( [ date, penalty, simulated_profit[0][0].item(), simulated_charge_cycles[0][0].item(), predicted_charge_threshold.item(), predicted_discharge_threshold.item(), ] ) def evaluate_test_set(self, idx_samples, test_loader): self.profits = [] try: for date in tqdm(self.dates): self.evaluate_for_date(date, idx_samples, test_loader) except KeyboardInterrupt: print("Interrupted") raise KeyboardInterrupt except Exception as e: print(e) pass self.profits = pd.DataFrame( self.profits, columns=[ "Date", "Penalty", "Profit", "Charge Cycles", "Charge Threshold", "Discharge Threshold", ], ) print("Profits calculated") print(self.profits.head()) def plot_profits_table(self): # Check if task or penalties are not set if ( self.task is None or not hasattr(self, "penalties") or not hasattr(self, "profits") ): print("Task, penalties, or profits not defined.") return if self.profits.empty: print("Profits DataFrame is empty.") return # Aggregate profits and charge cycles by penalty, calculating totals and per-year values aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique"), ) aggregated["Profit_Per_Year"] = ( aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 ) aggregated["Charge_Cycles_Per_Year"] = ( aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 ) # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop( columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"] ) # Rename columns to match expected output final_df.columns = ["Penalty", "Total Profit", "Total Charge Cycles"] # Profits till 400 profits_till_400 = self.get_profits_till_400() # aggregate the final_df and profits_till_400 with columns: Penalty, total profit, total charge cycles, profit till 400, total charge cycles final_df = final_df.merge(profits_till_400, on="Penalty") # Log the final results table self.task.get_logger().report_table( "Policy Results", "Policy Results", iteration=0, table_plot=final_df ) def plot_thresholds_per_day(self): if self.task is None: return fig = px.line( self.profits[self.profits["Penalty"] == 0], x="Date", y=["Charge Threshold", "Discharge Threshold"], title="Charge and Discharge Thresholds per Day", ) fig.update_layout( width=1000, height=600, title_x=0.5, ) self.task.get_logger().report_plotly( "Thresholds per Day", "Thresholds per Day", iteration=0, figure=fig ) def get_profits_as_scalars(self): aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique"), ) aggregated["Profit_Per_Year"] = ( aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 ) aggregated["Charge_Cycles_Per_Year"] = ( aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 ) # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop( columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"] ) # Rename columns to match expected output final_df.columns = ["Penalty", "Total Profit", "Total Charge Cycles"] return final_df def get_profits_till_400(self): # calculates profits until 400 charge cycles per year are reached number_of_days = len(self.profits["Date"].unique()) usable_charge_cycles = (400 / 365) * number_of_days # now sum the profit until the usable charge cycles are reached penalty_profits = {} penalty_charge_cycles = {} for index, row in self.profits.iterrows(): penalty = row["Penalty"] profit = row["Profit"] charge_cycles = row["Charge Cycles"] if penalty not in penalty_profits: penalty_profits[penalty] = 0 penalty_charge_cycles[penalty] = 0 if penalty_charge_cycles[penalty] < usable_charge_cycles: penalty_profits[penalty] += profit penalty_charge_cycles[penalty] += charge_cycles df = pd.DataFrame( list( zip( penalty_profits.keys(), penalty_profits.values(), penalty_charge_cycles.values(), ) ), columns=["Penalty", "Profit_till_400", "Cycles_till_400"], ) return df