import plotly.graph_objects as go import numpy as np import pytz import pandas as pd import os incremental_bids = "data/incremental_bids.csv" decremental_bids = "data/decremental_bids.csv" class ImbalancePriceCalculator: def __init__(self, method: int = 1, data_path: str = "../../") -> None: self.method = method self.data_path = data_path # check if pickle of bid_ladder exists: data/bid_ladder.pkl if not os.path.isfile(self.data_path + "data/bid_ladder.pkl"): print("Bid ladder pickle not found, loading bids and generating bid ladder...") self.load_bids() self.bid_ladder.to_pickle(self.data_path + "data/bid_ladder.pkl") else: print("Bid ladder pickle found, loading bid ladder...") self.bid_ladder = pd.read_pickle(self.data_path + "data/bid_ladder.pkl") def load_bids(self): df = pd.read_csv(self.data_path + incremental_bids, sep=";") df["Datetime"] = pd.to_datetime(df["Datetime"], utc=True) # sort by Datetime, Activation Order, Bid Price df = df.sort_values(by=["Datetime", "Activation Order", "Bid Price"]) # next we need to calculate the cummulative bids for every datetime incremental_cum_df = df.groupby(["Datetime"]).apply(self.generate_bid_ladder) df = pd.read_csv(self.data_path + decremental_bids, sep=";") df["Datetime"] = pd.to_datetime(df["Datetime"], utc=True) decremental_cum_df = df.groupby(["Datetime"]).apply(self.generate_bid_ladder, incremental=False) # merge the two dataframes on the Datetime self.bid_ladder = pd.merge(incremental_cum_df, decremental_cum_df, on="Datetime", how="inner", suffixes=("_inc", "_dec")) self.bid_ladder = self.bid_ladder.sort_values(by=["Datetime"]) def generate_bid_ladder(self, bids, incremental: bool = True): if self.method == 1: bids_df = bids.sort_values(by=["Bid Price"], ascending=[incremental]) elif self.method == 2: bids_df = bids.sort_values(by=["Activation Order", "Bid Price"], ascending=[True, incremental]) bids_df['cumulative_volume'] = bids_df['Bid Volume'].cumsum() if incremental: bids_df['max_price'] = bids_df['Bid Price'].cummax() else: bids_df['max_price'] = bids_df['Bid Price'].cummin() bid_ladder = list(zip(bids_df['cumulative_volume'], bids_df['max_price'])) bid_ladder_df = pd.DataFrame([[bid_ladder]], columns=['bid_ladder']) return bid_ladder_df def check_datetime(self, datetime): return datetime in self.bid_ladder.index def get_imbalance_price_vectorized(self, datetimes, volumes): # Convert inputs to numpy arrays datetimes = np.array(datetimes) # Fetch bid ladders for each datetime and determine if it's inc or dec based on original volume sign bid_ladders = np.where(np.array(volumes) < 0, self.bid_ladder.loc[datetimes, "bid_ladder_dec"].values, self.bid_ladder.loc[datetimes, "bid_ladder_inc"].values) volumes = np.abs(np.array(volumes)) # Make all volumes positive # Vectorized operation to get prices from bid ladders prices = [] for i, bid_ladder in enumerate(bid_ladders): volume = volumes[i] for bid in bid_ladder: if bid[0] > volume: prices.append(bid[1]) break else: prices.append(bid_ladder[-1][1]) # Append last price if no break occurred return np.array(prices) def plot(self, datetime): row = self.bid_ladder.loc[self.bid_ladder.index == datetime] dec_bids = row["bid_ladder_dec"].values[0] inc_bids = row["bid_ladder_inc"].values[0] # Prepare data for plot x_inc_interpolated = [vol for i in range(len(inc_bids) - 1) for vol in [inc_bids[i][0], inc_bids[i+1][0]]] y_inc_interpolated = [price for cum_vol, price in inc_bids for _ in (0, 1)] x_dec_interpolated = [-vol for i in range(len(dec_bids) - 1) for vol in [dec_bids[i][0], dec_bids[i+1][0]]] y_dec_interpolated = [price for cum_vol, price in dec_bids for _ in (0, 1)] # Create and show the plot make sure hovering works in between the steps fig = go.Figure() fig.add_trace(go.Scatter(x=x_inc_interpolated, y=y_inc_interpolated, mode='lines+markers', name="inc")) fig.add_trace(go.Scatter(x=x_dec_interpolated, y=y_dec_interpolated, mode='lines+markers', name="dec")) fig.update_layout( title='Bid ladder', xaxis_title='Volume [MWh]', yaxis_title='Price [EUR/MWh]', hovermode='x unified' ) fig.show() def get_imbalance_prices_2023_for_date_vectorized(self, date, NRV_predictions_matrix): # Initialize an empty list for the 2D results all_imbalance_prices = [] # Iterate over each row in the NRV_predictions matrix for NRV_predictions in NRV_predictions_matrix: # Create a range of datetimes for the entire day hours = pd.TimedeltaIndex(range(len(NRV_predictions) - 1), 'H') datetimes = date + hours datetimes = datetimes.tz_localize(pytz.utc) # Apply the vectorized imbalance price function to each row imbalance_prices = self.get_imbalance_price_2023(datetimes, NRV_predictions[:-1], NRV_predictions[1:]) # Extract the imbalance prices and add to the 2D list all_imbalance_prices.append(imbalance_prices[1]) return all_imbalance_prices def get_imbalance_price_2023(self, datetimes, NRV_PREVS, NRVS): # create numpy arrays datetimes = np.array(datetimes) NRV_PREVS = np.array(NRV_PREVS) NRVS = np.array(NRVS) MIPS = self.get_imbalance_price_vectorized(datetimes, np.abs(NRVS)) MDPS = self.get_imbalance_price_vectorized(datetimes, -np.abs(NRVS)) return calculate_imbalance_price(-NRV_PREVS, -NRVS, MIPS, MDPS) def calculate_imbalance_price(SI_PREV, SI, MIP, MDP): # Convert parameters to numpy arrays for vectorized operations SI_PREV = np.array(SI_PREV) SI = np.array(SI) MIP = np.array(MIP) MDP = np.array(MDP) # parameters a, b, c, d a = 0 # EUR/MWh b = 200 # EUR/MWh c = 450 # MW d = 65 # MW # x = average abs(SI) and abs(SI_PREV) x = (np.abs(SI_PREV) + np.abs(SI)) / 2 # Calculate cp cp = np.zeros_like(SI) cp[(SI <= 0) & ((MIP > 200) & (MIP <= 400))] = (400 - MIP[(SI <= 0) & ((MIP > 200) & (MIP <= 400))]) / 200 cp[(SI <= 0) & ~((MIP > 200) & (MIP <= 400))] = 1 cp[(SI > 0) & (MDP >= 0)] = 1 cp[(SI > 0) & (MDP >= -200) & (MDP < 0)] = (MDP[(SI > 0) & (MDP >= -200) & (MDP < 0)] + 200) / 200 # Calculate alpha alpha = np.zeros_like(SI) alpha[np.abs(SI) > 150] = (a + b / (1 + np.exp((c - x[np.abs(SI) > 150]) / d))) * cp[np.abs(SI) > 150] # Calculate imbalance prices pos_imbalance_price = MIP + alpha neg_imbalance_price = MDP - alpha imbalance_price = np.zeros_like(SI) imbalance_price[SI < 0] = pos_imbalance_price[SI < 0] imbalance_price[SI > 0] = neg_imbalance_price[SI > 0] imbalance_price[SI == 0] = (pos_imbalance_price[SI == 0] + neg_imbalance_price[SI == 0]) / 2 return alpha, imbalance_price