Initial Commit
This commit is contained in:
102
src/trainers/quantile_trainer.py
Normal file
102
src/trainers/quantile_trainer.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import torch
|
||||
from utils.autoregressive import predict_auto_regressive_quantile
|
||||
from scipy.interpolate import interp1d
|
||||
from trainers.trainer import Trainer
|
||||
from trainers.autoregressive_trainer import AutoRegressiveTrainer
|
||||
from data.preprocessing import DataProcessor
|
||||
from utils.clearml import ClearMLHelper
|
||||
from losses import PinballLoss
|
||||
from plotly.subplots import make_subplots
|
||||
import plotly.graph_objects as go
|
||||
import numpy as np
|
||||
|
||||
|
||||
class QuantileTrainer(AutoRegressiveTrainer):
|
||||
def __init__(self, model: torch.nn.Module, optimizer: torch.optim.Optimizer, data_processor: DataProcessor, quantiles: list, device: torch.device, clearml_helper: ClearMLHelper = None, debug: bool = True):
|
||||
quantiles_tensor = torch.tensor(quantiles)
|
||||
quantiles_tensor = quantiles_tensor.to(device)
|
||||
self.quantiles = quantiles
|
||||
|
||||
criterion = PinballLoss(quantiles=quantiles_tensor)
|
||||
super().__init__(model=model, optimizer=optimizer, criterion=criterion, data_processor=data_processor, device=device, clearml_helper=clearml_helper, debug=debug)
|
||||
|
||||
def predict_auto_regressive(self, initial_sequence: torch.Tensor, sequence_length: int = 96):
|
||||
initial_sequence = initial_sequence.to(self.device)
|
||||
|
||||
return predict_auto_regressive_quantile(self.model, self.sample_from_dist, initial_sequence, self.quantiles, sequence_length)
|
||||
|
||||
def log_final_metrics(self, task, dataloader, train: bool = True):
|
||||
metrics = { metric.__class__.__name__: 0.0 for metric in self.metrics_to_track }
|
||||
transformed_metrics = { metric.__class__.__name__: 0.0 for metric in self.metrics_to_track }
|
||||
|
||||
with torch.no_grad():
|
||||
for inputs, targets in dataloader:
|
||||
inputs, targets = inputs.to(self.device), targets
|
||||
outputs = self.model(inputs)
|
||||
|
||||
samples = []
|
||||
for output in outputs:
|
||||
samples.append(self.sample_from_dist(self.quantiles.cpu().numpy(), output.cpu().numpy()))
|
||||
|
||||
samples = torch.tensor(samples).to(self.device).reshape(-1, 1)
|
||||
|
||||
inversed_samples = torch.tensor(self.data_processor.inverse_transform(samples))
|
||||
inversed_targets = torch.tensor(self.data_processor.inverse_transform(targets.reshape(-1, 1)))
|
||||
|
||||
for metric in self.metrics_to_track:
|
||||
if metric.__class__ != PinballLoss:
|
||||
transformed_metrics[metric.__class__.__name__] += metric(samples, targets.view(-1, 1).to(self.device))
|
||||
metrics[metric.__class__.__name__] += metric(inversed_samples, inversed_targets)
|
||||
else:
|
||||
transformed_metrics[metric.__class__.__name__] += metric(outputs, targets.view(-1, 1).to(self.device))
|
||||
|
||||
for metric in self.metrics_to_track:
|
||||
metrics[metric.__class__.__name__] /= len(dataloader)
|
||||
transformed_metrics[metric.__class__.__name__] /= len(dataloader)
|
||||
|
||||
for metric_name, metric_value in metrics.items():
|
||||
if PinballLoss.__name__ in metric_name:
|
||||
continue
|
||||
name = f'train_{metric_name}' if train else f'test_{metric_name}'
|
||||
task.get_logger().report_single_value(name=name, value=metric_value)
|
||||
|
||||
for metric_name, metric_value in transformed_metrics.items():
|
||||
name = f'train_transformed_{metric_name}' if train else f'test_transformed_{metric_name}'
|
||||
task.get_logger().report_single_value(name=name, value=metric_value)
|
||||
|
||||
def get_plot(self, current_day, next_day, predictions, show_legend: bool = True):
|
||||
fig = go.Figure()
|
||||
|
||||
# Convert to numpy for plotting
|
||||
current_day_np = current_day.view(-1).cpu().numpy()
|
||||
next_day_np = next_day.view(-1).cpu().numpy()
|
||||
predictions_np = predictions.cpu().numpy()
|
||||
|
||||
# Add traces for current and next day
|
||||
fig.add_trace(go.Scatter(x=np.arange(96), y=current_day_np, name="Current Day"))
|
||||
fig.add_trace(go.Scatter(x=96 + np.arange(96), y=next_day_np, name="Next Day"))
|
||||
|
||||
for i, q in enumerate(self.quantiles):
|
||||
fig.add_trace(go.Scatter(x=96 + np.arange(96), y=predictions_np[:, i],
|
||||
name=f"Prediction (Q={q})", line=dict(dash='dash')))
|
||||
|
||||
# Update the layout
|
||||
fig.update_layout(title="Predictions and Quantiles of the Linear Model", showlegend=show_legend)
|
||||
|
||||
return fig
|
||||
|
||||
@staticmethod
|
||||
def sample_from_dist(quantiles, output_values):
|
||||
# Interpolate the inverse CDF
|
||||
inverse_cdf = interp1d(quantiles, output_values, kind='quadratic', bounds_error=False, fill_value="extrapolate")
|
||||
|
||||
# generate one random uniform number
|
||||
uniform_random_numbers = np.random.uniform(0, 1, 1000)
|
||||
|
||||
# Apply the inverse CDF to the uniform random numbers
|
||||
samples = inverse_cdf(uniform_random_numbers)
|
||||
|
||||
# Return the mean of the samples
|
||||
return np.mean(samples)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user