from clearml import Task from tqdm import tqdm from src.policies.simple_baseline import BaselinePolicy import pandas as pd import numpy as np import torch import plotly.express as px from functools import lru_cache from src.utils.imbalance_price_calculator import ImbalancePriceCalculator class PolicyEvaluator: def __init__(self, baseline_policy: BaselinePolicy, task: Task = None): self.baseline_policy = baseline_policy self.ipc = ImbalancePriceCalculator(data_path="") self.dates = baseline_policy.test_data["DateTime"].dt.date.unique() self.dates = pd.to_datetime(self.dates) ### Load Imbalance Prices ### imbalance_prices = pd.read_csv("data/imbalance_prices.csv", sep=";") imbalance_prices["DateTime"] = pd.to_datetime( imbalance_prices["DateTime"], utc=True ) self.imbalance_prices = imbalance_prices.sort_values(by=["DateTime"]) self.penalties = [0, 1000, 1500] self.profits = [] self.task = task self.cache = {} @lru_cache(maxsize=None) def get_imbanlance_prices_for_date(self, date): imbalance_prices_day = self.imbalance_prices[ self.imbalance_prices["DateTime"].dt.date == date ] return imbalance_prices_day["Positive imbalance price"].values def evaluate_for_date( self, date, idx_samples, test_loader, charge_thresholds=np.arange(-1500, 1500, 50), discharge_thresholds=np.arange(-1500, 1500, 50), penalty: int = 0, state_of_charge: float = 0.0, ): if date in self.cache: (reconstructed_imbalance_prices, real_imbalance_prices) = self.cache[date] else: idx = test_loader.dataset.get_idx_for_date(date.date()) if idx not in idx_samples: print("No samples for idx: ", idx, date) (initial, samples) = idx_samples[idx] if len(initial.shape) == 2: initial = initial.cpu().numpy()[0][-1] else: initial = initial.cpu().numpy()[-1] samples = samples.cpu().numpy() initial = np.repeat(initial, samples.shape[0]) combined = np.concatenate((initial.reshape(-1, 1), samples), axis=1) reconstructed_imbalance_prices = ( self.ipc.get_imbalance_prices_2023_for_date_vectorized(date, combined) ) reconstructed_imbalance_prices = torch.tensor( reconstructed_imbalance_prices, device="cuda" ) real_imbalance_prices = self.get_imbanlance_prices_for_date(date.date()) self.cache[date] = (reconstructed_imbalance_prices, real_imbalance_prices) return self.profit_for_penalty( reconstructed_imbalance_prices, real_imbalance_prices, penalty, charge_thresholds, discharge_thresholds, state_of_charge=state_of_charge, ) def optimize_penalty_for_target_charge_cycles( self, idx_samples, test_loader, initial_penalty, target_charge_cycles, initial_learning_rate=2, max_iterations=10, tolerance=10, learning_rate_decay=0.9, # Factor to reduce the learning rate after each iteration ): self.cache = {} penalty = initial_penalty learning_rate = initial_learning_rate previous_gradient = None # Track the previous gradient to adjust learning rate based on progress for iteration in range(max_iterations): # Calculate profit and charge cycles for the current penalty simulated_profit, simulated_charge_cycles = ( self.evaluate_test_set_for_penalty(idx_samples, test_loader, penalty) ) print( f"Iteration {iteration}: Penalty: {penalty}, Charge Cycles: {simulated_charge_cycles}, Profit: {simulated_profit}, Learning Rate: {learning_rate}" ) # Calculate the gradient (difference) between the simulated and target charge cycles gradient = simulated_charge_cycles - target_charge_cycles # Optionally, adjust learning rate based on the change of gradient direction to avoid oscillation if previous_gradient is not None and gradient * previous_gradient < 0: learning_rate *= learning_rate_decay # Update the penalty parameter in the direction of the gradient penalty += ( learning_rate * gradient ) # Note: Using -= to move penalty in the opposite direction of gradient if necessary # Update the previous gradient previous_gradient = gradient # Check if the charge cycles are close enough to the target if abs(gradient) < tolerance: print(f"Optimal penalty found after {iteration+1} iterations") break else: print( f"Reached max iterations ({max_iterations}) without converging to the target charge cycles" ) # Re-calculate profit and charge cycles for the final penalty to return accurate results profit, charge_cycles = self.evaluate_test_set_for_penalty( idx_samples, test_loader, penalty ) return penalty, profit, charge_cycles def profit_for_penalty( self, reconstructed_imbalance_prices, real_imbalance_prices, penalty: int, charge_thresholds, discharge_thresholds, state_of_charge=0.0, ): """_summary_ Args: date (_type_): date to evaluate reconstructed_imbalance_prices (_type_): predicted imbalance price real_imbalance_prices (_type_): real imbalance price penalty (int): penalty parameter to take into account charge_thresholds (_type_): list of charge thresholds discharge_thresholds (_type_): list of discharge thresholds Returns: _type_: returns the simulated profit, charge cycles, the found charge threshold and discharge threshold """ found_charge_thresholds, found_discharge_thresholds = ( self.baseline_policy.get_optimal_thresholds( reconstructed_imbalance_prices, charge_thresholds, discharge_thresholds, penalty, battery_state_of_charge=state_of_charge, ) ) predicted_charge_threshold = found_charge_thresholds.mean(axis=0) predicted_discharge_threshold = found_discharge_thresholds.mean(axis=0) ### Determine Profits and Charge Cycles ### simulated_profit, simulated_charge_cycles, new_state_of_charge = ( self.baseline_policy.simulate( torch.tensor([[real_imbalance_prices]]), torch.tensor([predicted_charge_threshold]), torch.tensor([predicted_discharge_threshold]), battery_state_of_charge=torch.tensor([state_of_charge]), ) ) return ( simulated_profit[0][0].item(), simulated_charge_cycles[0][0].item(), predicted_charge_threshold.item(), predicted_discharge_threshold.item(), new_state_of_charge.squeeze(0).item(), ) def evaluate_test_set(self, idx_samples, test_loader): self.profits = [] self.cache = {} for date in tqdm(self.dates): try: for penalty in self.penalties: self.profits.append( [ date, penalty, *self.evaluate_for_date( date, idx_samples, test_loader, penalty=penalty ), ] ) except KeyboardInterrupt: print("Interrupted") raise KeyboardInterrupt except Exception as e: print(e) pass self.profits = pd.DataFrame( self.profits, columns=[ "Date", "Penalty", "Profit", "Charge Cycles", "Charge Threshold", "Discharge Threshold", ], ) def evaluate_test_set_for_penalty(self, idx_samples, test_loader, penalty): total_profit = 0 total_charge_cycles = 0 state_of_charge = 0.0 for date in tqdm(self.dates): try: profit, charge_cycles, _, _, new_state_of_charge = ( self.evaluate_for_date( date, idx_samples, test_loader, penalty=penalty, state_of_charge=state_of_charge, ) ) state_of_charge = new_state_of_charge total_profit += profit total_charge_cycles += charge_cycles except KeyboardInterrupt: print("Interrupted") raise KeyboardInterrupt except Exception as e: print(e) pass return total_profit, total_charge_cycles def plot_profits_table(self): # Check if task or penalties are not set if ( self.task is None or not hasattr(self, "penalties") or not hasattr(self, "profits") ): print("Task, penalties, or profits not defined.") return if self.profits.empty: print("Profits DataFrame is empty.") return # Aggregate profits and charge cycles by penalty, calculating totals and per-year values aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique"), ) aggregated["Profit_Per_Year"] = ( aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 ) aggregated["Charge_Cycles_Per_Year"] = ( aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 ) # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop( columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"] ) # Rename columns to match expected output final_df.columns = [ "Penalty", "Total Profit (per year)", "Total Charge Cycles (per year)", ] # Profits till 400 profits_till_400 = self.get_profits_till_400() # aggregate the final_df and profits_till_400 with columns: Penalty, total profit, total charge cycles, profit till 400, total charge cycles final_df = final_df.merge(profits_till_400, on="Penalty") # Log the final results table self.task.get_logger().report_table( "Test Set Results", "Profits per Penalty", iteration=0, table_plot=final_df ) def plot_thresholds_per_day(self): if self.task is None: return fig = px.line( self.profits[self.profits["Penalty"] == 0], x="Date", y=["Charge Threshold", "Discharge Threshold"], title="Charge and Discharge Thresholds per Day", ) fig.update_layout( width=1000, height=600, title_x=0.5, ) self.task.get_logger().report_plotly( "Thresholds per Day", "Thresholds per Day", iteration=0, figure=fig ) def get_profits_as_scalars(self): aggregated = self.profits.groupby("Penalty").agg( Total_Profit=("Profit", "sum"), Total_Charge_Cycles=("Charge Cycles", "sum"), Num_Days=("Date", "nunique"), ) aggregated["Profit_Per_Year"] = ( aggregated["Total_Profit"] / aggregated["Num_Days"] * 365 ) aggregated["Charge_Cycles_Per_Year"] = ( aggregated["Total_Charge_Cycles"] / aggregated["Num_Days"] * 365 ) # Reset index to make 'Penalty' a column again and drop unnecessary columns final_df = aggregated.reset_index().drop( columns=["Total_Profit", "Total_Charge_Cycles", "Num_Days"] ) # Rename columns to match expected output final_df.columns = ["Penalty", "Total Profit", "Total Charge Cycles"] return final_df def get_profits_till_400(self, profits: pd.DataFrame = None): if profits is None: profits = self.profits # calculates profits until 400 charge cycles per year are reached number_of_days = len(profits["Date"].unique()) usable_charge_cycles = (400 / 365) * number_of_days # now sum the profit until the usable charge cycles are reached penalty_profits = {} penalty_charge_cycles = {} for index, row in profits.iterrows(): penalty = row["Penalty"] profit = row["Profit"] charge_cycles = row["Charge Cycles"] if penalty not in penalty_profits: penalty_profits[penalty] = 0 penalty_charge_cycles[penalty] = 0 if penalty_charge_cycles[penalty] < usable_charge_cycles: penalty_profits[penalty] += profit penalty_charge_cycles[penalty] += charge_cycles # transform profits to per year: penalty_profits / penalty_charge_cycles * 400 cycles per year transformed_profits_per_year = {} for penalty in penalty_profits: transformed_profits_per_year[penalty] = ( penalty_profits[penalty] / penalty_charge_cycles[penalty] * 400 ) df = pd.DataFrame( list( zip( penalty_profits.keys(), penalty_profits.values(), penalty_charge_cycles.values(), transformed_profits_per_year.values(), ) ), columns=[ "Penalty", "Profit_till_400", f"Cycles_till_400 (max {usable_charge_cycles})", "Profit_per_year_till_400", ], ) return df