Basics sign predictor

This commit is contained in:
RobbeDeWaele
2023-04-08 12:52:20 +02:00
parent 7793122eac
commit 3716067f84
5 changed files with 389 additions and 1408 deletions

View File

@@ -0,0 +1,95 @@
import os
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from src.identifiers import LANDMARKS
from src.keypoint_extractor import KeypointExtractor
class BasicsDataset(torch.utils.data.Dataset):
def __init__(self, data_folder: str, bad_data_folder: str = "", subset:str="train", keypoints_identifier: dict = None, transform=None):
# list files with path in the datafolder ending with .mp4
files = [data_folder + f for f in os.listdir(data_folder) if f.endswith(".mp4")]
# append files from bad data folder
if bad_data_folder != "":
files += [bad_data_folder + f for f in os.listdir(bad_data_folder) if f.endswith(".mp4")]
labels = [f.split("/")[-1].split("!")[0] for f in files]
train_test = [f.split("/")[-1].split("!")[1] for f in files]
# count the number of each label
self.label_mapping, counts = np.unique(labels, return_counts=True)
# map the labels to their integer
labels = [np.where(self.label_mapping == label)[0][0] for label in labels]
# TODO: make split for train and val and test when enough data is available
if subset == "train":
# mask for train data
mask = np.array(train_test) == "train"
elif subset == "test":
mask = np.array(train_test) == "test"
# filter data and labels
self.data = np.array(files)[mask]
self.labels = np.array(labels)[mask]
# filter data by subset
self.transform = transform
self.subset = subset
self.keypoint_extractor = KeypointExtractor()
if keypoints_identifier:
self.keypoints_to_keep = [f"{i}_{j}" for i in keypoints_identifier.values() for j in ["x", "y"]]
def __len__(self):
return len(self.data)
def __getitem__(self, index):
# get i th element from ordered dict
video_name = self.data[index]
cache_name = video_name.split("/")[-1].split(".")[0] + ".npy"
# check if cache_name file exists
if not os.path.isfile(os.path.join("cache_processed", cache_name)):
# get the keypoints for the video (normalizations: minxmax, bohacek)
keypoints_df = self.keypoint_extractor.extract_keypoints_from_video(video_name, normalize="bohacek")
# filter the keypoints by the identified subset
if self.keypoints_to_keep:
keypoints_df = keypoints_df[self.keypoints_to_keep]
current_row = np.empty(shape=(keypoints_df.shape[0], keypoints_df.shape[1] // 2, 2))
for i in range(0, keypoints_df.shape[1], 2):
current_row[:, i // 2, 0] = keypoints_df.iloc[:, i]
current_row[:, i // 2, 1] = keypoints_df.iloc[:, i + 1]
# check if cache_processed folder exists
if not os.path.isdir("cache_processed"):
os.mkdir("cache_processed")
# save the processed data to a file
np.save(os.path.join("cache_processed", cache_name), current_row)
else:
current_row = np.load(os.path.join("cache_processed", cache_name))
# get the label
label = self.labels[index]
# data to tensor
data = torch.from_numpy(current_row)
if self.transform:
data = self.transform(data)
return data, label

152
src/train_basics.py Normal file
View File

@@ -0,0 +1,152 @@
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from src.augmentations import MirrorKeypoints, Z_augmentation, NoiseAugmentation
from src.datasets.basics_dataset import BasicsDataset
from src.identifiers import LANDMARKS
from src.model import SPOTER
from src.loss_function import CustomLoss
import torch
from torch.utils.tensorboard import SummaryWriter
def train():
writer = SummaryWriter()
random.seed(379)
np.random.seed(379)
os.environ['PYTHONHASHSEED'] = str(379)
torch.manual_seed(379)
torch.cuda.manual_seed(379)
torch.cuda.manual_seed_all(379)
torch.backends.cudnn.deterministic = True
g = torch.Generator()
g.manual_seed(379)
if torch.cuda.is_available():
device = torch.device("cuda:0")
else:
device = torch.device("cpu")
spoter_model = SPOTER(num_classes=15, hidden_dim=len(LANDMARKS) *2)
spoter_model.train(True)
spoter_model.to(device)
criterion = nn.CrossEntropyLoss()
criterion_bad = CustomLoss()
optimizer = optim.Adam(spoter_model.parameters(), lr=0.00001)
scheduler = None
# check if checkpoints folder exists
if not os.path.exists("checkpoints"):
os.makedirs("checkpoints")
transform = transforms.Compose([NoiseAugmentation(noise=0.1)])
train_set = BasicsDataset("data/basics/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="train", transform=transform)
train_loader = DataLoader(train_set, shuffle=True, generator=g)
val_set = BasicsDataset("data/basics/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="test")
val_loader = DataLoader(val_set, shuffle=True, generator=g)
train_acc, val_acc = 0, 0
lr_progress = []
top_train_acc, top_val_acc = 0, 0
checkpoint_index = 0
epochs_without_improvement = 0
best_val_acc = 0
for epoch in range(300):
running_loss = 0.0
pred_correct, pred_all = 0, 0
# train
for i, (inputs, labels) in enumerate(train_loader):
# skip videos that are too short
if inputs.shape[1] < 20:
continue
inputs = inputs.squeeze(0).to(device)
labels = labels.to(device, dtype=torch.long)
optimizer.zero_grad()
outputs = spoter_model(inputs).expand(1, -1, -1)
loss = criterion(outputs[0], labels)
loss.backward()
optimizer.step()
running_loss += loss
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
pred_correct += 1
pred_all += 1
if scheduler:
scheduler.step(running_loss.item() / (len(train_loader)) )
writer.add_scalar("Loss/train", loss, epoch)
writer.add_scalar("Accuracy/train", (pred_correct / pred_all), epoch)
# validate and print val acc
val_pred_correct, val_pred_all = 0, 0
val_loss = 0.0
with torch.no_grad():
for i, (inputs, labels) in enumerate(val_loader):
inputs = inputs.squeeze(0).to(device)
labels = labels.to(device, dtype=torch.long)
outputs = spoter_model(inputs).expand(1, -1, -1)
# calculate loss
val_loss += criterion(outputs[0], labels)
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
val_pred_correct += 1
val_pred_all += 1
val_acc = (val_pred_correct / val_pred_all)
writer.add_scalar("Loss/val", val_loss, epoch)
writer.add_scalar("Accuracy/val", val_acc, epoch)
print(f"Epoch: {epoch} | Train Acc: {(pred_correct / pred_all)} | Val Acc: {val_acc}")
# save checkpoint and update epochs_without_improvement
if val_acc > best_val_acc:
best_val_acc = val_acc
epochs_without_improvement = 0
if epoch > 20:
top_val_acc = val_acc
top_train_acc = train_acc
checkpoint_index = epoch
torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
else:
epochs_without_improvement += 1
# early stopping
if epochs_without_improvement >= 40:
print("Early stopping due to no improvement in validation accuracy for 40 epochs.")
break
lr_progress.append(optimizer.param_groups[0]['lr'])
print(f"Best val acc: {top_val_acc} | Best train acc: {top_train_acc} | Epoch: {checkpoint_index}")
writer.flush()
writer.close()
# Path: src/train.py
if __name__ == "__main__":
train()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -26,8 +26,8 @@ frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
keypoints = []
spoter_model = SPOTER(num_classes=26, hidden_dim=len(LANDMARKS) * 2)
spoter_model.load_state_dict(torch.load('models/spoter_76.pth', map_location=torch.device('cpu')))
spoter_model = SPOTER(num_classes=19, hidden_dim=len(LANDMARKS) * 2)
spoter_model.load_state_dict(torch.load('checkpoints/spoter_80.pth', map_location=torch.device('cpu')))
# get values of the landmarks as a list of integers
values = []