Merge branch 'dev' into 'main'
Dev See merge request wesign/sign-predictor!14
This commit was merged in pull request #14.
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,6 +3,7 @@ data/
|
|||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
||||||
cache/
|
cache/
|
||||||
|
cache_processed/
|
||||||
cache_wlasl/
|
cache_wlasl/
|
||||||
|
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
31
export.py
31
export.py
@@ -1,31 +0,0 @@
|
|||||||
import torch
|
|
||||||
import torchvision
|
|
||||||
import onnx
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
from src.model import SPOTER
|
|
||||||
from src.identifiers import LANDMARKS
|
|
||||||
|
|
||||||
model_name = 'Fingerspelling_AE'
|
|
||||||
|
|
||||||
# load PyTorch model from .pth file
|
|
||||||
model = SPOTER(num_classes=5, hidden_dim=len(LANDMARKS) *2)
|
|
||||||
state_dict = torch.load('models/' + model_name + '.pth')
|
|
||||||
model.load_state_dict(state_dict)
|
|
||||||
|
|
||||||
# set model to evaluation mode
|
|
||||||
model.eval()
|
|
||||||
|
|
||||||
# create dummy input tensor
|
|
||||||
batch_size = 1
|
|
||||||
num_of_frames = 1
|
|
||||||
input_shape = (108, num_of_frames)
|
|
||||||
dummy_input = torch.randn(batch_size, *input_shape)
|
|
||||||
|
|
||||||
# export model to ONNX format
|
|
||||||
output_file = 'models/' + model_name + '.onnx'
|
|
||||||
torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
|
|
||||||
|
|
||||||
# load exported ONNX model for verification
|
|
||||||
onnx_model = onnx.load(output_file)
|
|
||||||
onnx.checker.check_model(onnx_model)
|
|
||||||
17
export_json.py
Normal file
17
export_json.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from src.identifiers import HAND_LANDMARKS, POSE_LANDMARKS
|
||||||
|
|
||||||
|
|
||||||
|
def export_json(pose_landmarks, hand_landmarks, filename):
|
||||||
|
|
||||||
|
l = {
|
||||||
|
"pose_landmarks": list(pose_landmarks.values()),
|
||||||
|
"hand_landmarks": list(hand_landmarks.values())
|
||||||
|
}
|
||||||
|
|
||||||
|
# write l to filename
|
||||||
|
with open(filename, 'w') as f:
|
||||||
|
json.dump(l, f)
|
||||||
|
|
||||||
|
export_json(POSE_LANDMARKS, HAND_LANDMARKS, "landmarks.json")
|
||||||
1
landmarks.json
Normal file
1
landmarks.json
Normal file
@@ -0,0 +1 @@
|
|||||||
|
{"pose_landmarks": [0, 2, 5, 7, 8, 9, 11, 12, 13, 14, 15, 16], "hand_landmarks": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
|
||||||
BIN
models/model_A-L.onnx
Normal file
BIN
models/model_A-L.onnx
Normal file
Binary file not shown.
BIN
models/model_A-Z.onnx
Normal file
BIN
models/model_A-Z.onnx
Normal file
Binary file not shown.
BIN
models/model_A-Z.pth
Normal file
BIN
models/model_A-Z.pth
Normal file
Binary file not shown.
@@ -4,3 +4,4 @@ pandas==1.5.3
|
|||||||
mediapipe==0.9.1.0
|
mediapipe==0.9.1.0
|
||||||
tensorboard==2.12.0
|
tensorboard==2.12.0
|
||||||
mediapy==1.1.6
|
mediapy==1.1.6
|
||||||
|
scikit-learn==0.24.2
|
||||||
@@ -1,5 +1,39 @@
|
|||||||
|
import math
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import math
|
||||||
|
import torch
|
||||||
|
|
||||||
|
def circle_intersection(x0, y0, r0, x1, y1, r1):
|
||||||
|
# circle 1: (x0, y0), radius r0
|
||||||
|
# circle 2: (x1, y1), radius r1
|
||||||
|
|
||||||
|
d=math.sqrt((x1-x0)**2 + (y1-y0)**2)
|
||||||
|
|
||||||
|
# non intersecting
|
||||||
|
if d > r0 + r1 :
|
||||||
|
return None
|
||||||
|
# One circle within other
|
||||||
|
if d < abs(r0-r1):
|
||||||
|
return None
|
||||||
|
# coincident circles
|
||||||
|
if d == 0 and r0 == r1:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
a=(r0**2-r1**2+d**2)/(2*d)
|
||||||
|
h=math.sqrt(r0**2-a**2)
|
||||||
|
x2=x0+a*(x1-x0)/d
|
||||||
|
y2=y0+a*(y1-y0)/d
|
||||||
|
x3=x2+h*(y1-y0)/d
|
||||||
|
y3=y2-h*(x1-x0)/d
|
||||||
|
|
||||||
|
x4=x2-h*(y1-y0)/d
|
||||||
|
y4=y2+h*(x1-x0)/d
|
||||||
|
|
||||||
|
return (np.array([x3, y3]), np.array([x4, y4]))
|
||||||
|
|
||||||
|
|
||||||
class MirrorKeypoints:
|
class MirrorKeypoints:
|
||||||
def __call__(self, sample):
|
def __call__(self, sample):
|
||||||
@@ -9,3 +43,85 @@ class MirrorKeypoints:
|
|||||||
sample = 1 - sample
|
sample = 1 - sample
|
||||||
|
|
||||||
return sample
|
return sample
|
||||||
|
|
||||||
|
class Z_augmentation:
|
||||||
|
|
||||||
|
def __init__(self, hand_side="left"):
|
||||||
|
self.hand_side = hand_side
|
||||||
|
|
||||||
|
def new_wrist(self, sample, hand_side="left", new_wrist=None):
|
||||||
|
if hand_side == "left":
|
||||||
|
wrist = sample[30:32]
|
||||||
|
shoulder = sample[22:24]
|
||||||
|
elbow = sample[26:28]
|
||||||
|
else:
|
||||||
|
wrist = sample[32:34]
|
||||||
|
shoulder = sample[24:26]
|
||||||
|
elbow = sample[28:30]
|
||||||
|
|
||||||
|
# calculate the length of the shoulder to elbow using math package
|
||||||
|
shoulder_elbow_length = math.sqrt((shoulder[0] - elbow[0])**2 + (shoulder[1] - elbow[1])**2)
|
||||||
|
# calculate the length of the wrist to elbow using math package
|
||||||
|
wrist_elbow_length = math.sqrt((wrist[0] - elbow[0])**2 + (wrist[1] - elbow[1])**2)
|
||||||
|
|
||||||
|
if shoulder_elbow_length == 0 or wrist_elbow_length == 0:
|
||||||
|
return sample, None
|
||||||
|
|
||||||
|
first_time = True
|
||||||
|
new_loc = False
|
||||||
|
while not new_loc:
|
||||||
|
|
||||||
|
if new_wrist is None or not first_time:
|
||||||
|
# get random new wrist point that is not too far from the elbow
|
||||||
|
new_wrist = [random.uniform(elbow[0] - 0.3, elbow[0] + 0.3), random.uniform(elbow[1] - 0.3, elbow[1] + 0.3)]
|
||||||
|
|
||||||
|
# get intersection points of the circles
|
||||||
|
c = circle_intersection(shoulder[0], shoulder[1], shoulder_elbow_length, new_wrist[0], new_wrist[1], wrist_elbow_length)
|
||||||
|
if c is not None:
|
||||||
|
(i1, i2) = c
|
||||||
|
new_loc = True
|
||||||
|
first_time = False
|
||||||
|
|
||||||
|
# get the point that is below the hand
|
||||||
|
if i1[1] > i2[1]:
|
||||||
|
new_elbow = i1
|
||||||
|
else:
|
||||||
|
new_elbow = i2
|
||||||
|
# new_elbow to shape (2,1)
|
||||||
|
new_elbow = np.array(new_elbow)
|
||||||
|
new_wrist = np.array(new_wrist)
|
||||||
|
|
||||||
|
# replace the keypoints in the sample
|
||||||
|
if hand_side == "left":
|
||||||
|
sample[26:28] = new_elbow
|
||||||
|
sample[30:32] = new_wrist
|
||||||
|
else:
|
||||||
|
sample[28:30] = new_elbow
|
||||||
|
sample[32:34] = new_wrist
|
||||||
|
return sample, new_wrist
|
||||||
|
|
||||||
|
def __call__(self, samples):
|
||||||
|
# transform each sample in the batch
|
||||||
|
t_new = []
|
||||||
|
|
||||||
|
t = samples.numpy()
|
||||||
|
new_wrist = None
|
||||||
|
for t_i in t:
|
||||||
|
# if new_wrist is None:
|
||||||
|
# new_t, w = self.new_wrist(t_i.reshape(-1), self.hand_side)
|
||||||
|
# new_wrist = w
|
||||||
|
# else:
|
||||||
|
new_t, _ = self.new_wrist(t_i.reshape(-1), self.hand_side)
|
||||||
|
# reshape back to 2 dimensions
|
||||||
|
t_new.append(new_t.reshape(-1, 2))
|
||||||
|
return torch.tensor(np.array(t_new))
|
||||||
|
|
||||||
|
# augmentation to add little randow noise to the keypoints
|
||||||
|
class NoiseAugmentation:
|
||||||
|
def __init__(self, noise=0.05):
|
||||||
|
self.noise = noise
|
||||||
|
|
||||||
|
def __call__(self, sample):
|
||||||
|
# add noise to the keypoints
|
||||||
|
sample = sample + torch.randn(sample.shape) * self.noise
|
||||||
|
return sample
|
||||||
@@ -9,43 +9,43 @@ from src.keypoint_extractor import KeypointExtractor
|
|||||||
|
|
||||||
|
|
||||||
class FingerSpellingDataset(torch.utils.data.Dataset):
|
class FingerSpellingDataset(torch.utils.data.Dataset):
|
||||||
def __init__(self, data_folder: str, keypoint_extractor: KeypointExtractor, subset:str="train", keypoints_identifier: dict = None, transform=None):
|
def __init__(self, data_folder: str, bad_data_folder: str = "", subset:str="train", keypoints_identifier: dict = None, transform=None):
|
||||||
|
|
||||||
# list data from data folder
|
|
||||||
self.data_folder = data_folder
|
|
||||||
|
|
||||||
# list files in the datafolder ending with .mp4
|
# list files with path in the datafolder ending with .mp4
|
||||||
files = [f for f in os.listdir(self.data_folder) if f.endswith(".mp4")]
|
files = [data_folder + f for f in os.listdir(data_folder) if f.endswith(".mp4")]
|
||||||
|
|
||||||
labels = [f.split("!")[0] for f in files]
|
# append files from bad data folder
|
||||||
|
if bad_data_folder != "":
|
||||||
|
files += [bad_data_folder + f for f in os.listdir(bad_data_folder) if f.endswith(".mp4")]
|
||||||
|
|
||||||
|
labels = [f.split("/")[-1].split("!")[0] for f in files]
|
||||||
|
train_test = [f.split("/")[-1].split("!")[1] for f in files]
|
||||||
|
|
||||||
# count the number of each label
|
# count the number of each label
|
||||||
self.label_mapping, counts = np.unique(labels, return_counts=True)
|
self.label_mapping, counts = np.unique(labels, return_counts=True)
|
||||||
|
|
||||||
# save the label mapping to a file
|
|
||||||
with open(os.path.join(self.data_folder, "label_mapping.txt"), "w") as f:
|
|
||||||
for i, label in enumerate(self.label_mapping):
|
|
||||||
f.write(f"{label} {i}")
|
|
||||||
|
|
||||||
# map the labels to their integer
|
# map the labels to their integer
|
||||||
labels = [np.where(self.label_mapping == label)[0][0] for label in labels]
|
labels = [np.where(self.label_mapping == label)[0][0] for label in labels]
|
||||||
|
|
||||||
|
|
||||||
# TODO: make split for train and val and test when enough data is available
|
# TODO: make split for train and val and test when enough data is available
|
||||||
|
|
||||||
# split the data into train and val and test and make them balanced
|
|
||||||
x_train, x_test, y_train, y_test = train_test_split(files, labels, test_size=0.3, random_state=1, stratify=labels)
|
|
||||||
|
|
||||||
if subset == "train":
|
if subset == "train":
|
||||||
self.data = x_train
|
# mask for train data
|
||||||
self.labels = y_train
|
mask = np.array(train_test) == "train"
|
||||||
elif subset == "val":
|
elif subset == "test":
|
||||||
self.data = x_test
|
mask = np.array(train_test) == "test"
|
||||||
self.labels = y_test
|
|
||||||
|
# filter data and labels
|
||||||
|
self.data = np.array(files)[mask]
|
||||||
|
self.labels = np.array(labels)[mask]
|
||||||
|
|
||||||
# filter wlasl data by subset
|
# filter wlasl data by subset
|
||||||
self.transform = transform
|
self.transform = transform
|
||||||
self.subset = subset
|
self.subset = subset
|
||||||
self.keypoint_extractor = keypoint_extractor
|
self.keypoint_extractor = KeypointExtractor()
|
||||||
if keypoints_identifier:
|
if keypoints_identifier:
|
||||||
self.keypoints_to_keep = [f"{i}_{j}" for i in keypoints_identifier.values() for j in ["x", "y"]]
|
self.keypoints_to_keep = [f"{i}_{j}" for i in keypoints_identifier.values() for j in ["x", "y"]]
|
||||||
|
|
||||||
@@ -56,20 +56,36 @@ class FingerSpellingDataset(torch.utils.data.Dataset):
|
|||||||
# get i th element from ordered dict
|
# get i th element from ordered dict
|
||||||
video_name = self.data[index]
|
video_name = self.data[index]
|
||||||
|
|
||||||
# get the keypoints for the video
|
cache_name = video_name.split("/")[-1].split(".")[0] + ".npy"
|
||||||
keypoints_df = self.keypoint_extractor.extract_keypoints_from_video(video_name, normalize="minxmax")
|
|
||||||
|
|
||||||
# filter the keypoints by the identified subset
|
# check if cache_name file exists
|
||||||
if self.keypoints_to_keep:
|
if not os.path.isfile(os.path.join("cache_processed", cache_name)):
|
||||||
keypoints_df = keypoints_df[self.keypoints_to_keep]
|
|
||||||
|
|
||||||
current_row = np.empty(shape=(keypoints_df.shape[0], keypoints_df.shape[1] // 2, 2))
|
|
||||||
for i in range(0, keypoints_df.shape[1], 2):
|
|
||||||
current_row[:, i//2, 0] = keypoints_df.iloc[:,i]
|
|
||||||
current_row[:, i//2, 1] = keypoints_df.iloc[:,i+1]
|
|
||||||
|
|
||||||
|
# get the keypoints for the video (normalizations: minxmax, bohacek)
|
||||||
|
keypoints_df = self.keypoint_extractor.extract_keypoints_from_video(video_name, normalize="bohacek")
|
||||||
|
|
||||||
|
# filter the keypoints by the identified subset
|
||||||
|
if self.keypoints_to_keep:
|
||||||
|
keypoints_df = keypoints_df[self.keypoints_to_keep]
|
||||||
|
|
||||||
|
current_row = np.empty(shape=(keypoints_df.shape[0], keypoints_df.shape[1] // 2, 2))
|
||||||
|
for i in range(0, keypoints_df.shape[1], 2):
|
||||||
|
current_row[:, i // 2, 0] = keypoints_df.iloc[:, i]
|
||||||
|
current_row[:, i // 2, 1] = keypoints_df.iloc[:, i + 1]
|
||||||
|
|
||||||
|
# check if cache_processed folder exists
|
||||||
|
if not os.path.isdir("cache_processed"):
|
||||||
|
os.mkdir("cache_processed")
|
||||||
|
|
||||||
|
# save the processed data to a file
|
||||||
|
np.save(os.path.join("cache_processed", cache_name), current_row)
|
||||||
|
|
||||||
|
else:
|
||||||
|
current_row = np.load(os.path.join("cache_processed", cache_name))
|
||||||
|
|
||||||
|
# get the label
|
||||||
label = self.labels[index]
|
label = self.labels[index]
|
||||||
|
|
||||||
# data to tensor
|
# data to tensor
|
||||||
data = torch.from_numpy(current_row)
|
data = torch.from_numpy(current_row)
|
||||||
|
|
||||||
|
|||||||
44
src/export.py
Normal file
44
src/export.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import torch
|
||||||
|
import torchvision
|
||||||
|
import onnx
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from src.model import SPOTER
|
||||||
|
from src.identifiers import LANDMARKS
|
||||||
|
|
||||||
|
# set parameters of the model
|
||||||
|
model_name = 'model_A-Z'
|
||||||
|
num_classes = 26
|
||||||
|
|
||||||
|
# load PyTorch model from .pth file
|
||||||
|
model = SPOTER(num_classes=num_classes, hidden_dim=len(LANDMARKS) *2)
|
||||||
|
if torch.cuda.is_available():
|
||||||
|
state_dict = torch.load('models/' + model_name + '.pth')
|
||||||
|
else:
|
||||||
|
state_dict = torch.load('models/' + model_name + '.pth', map_location=torch.device('cpu'))
|
||||||
|
model.load_state_dict(state_dict)
|
||||||
|
|
||||||
|
# set model to evaluation mode
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# create dummy input tensor
|
||||||
|
dummy_input = torch.randn(10, 108)
|
||||||
|
|
||||||
|
# export model to ONNX format
|
||||||
|
output_file = 'models/' + model_name + '.onnx'
|
||||||
|
torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
|
||||||
|
|
||||||
|
torch.onnx.export(model, # model being run
|
||||||
|
dummy_input, # model input (or a tuple for multiple inputs)
|
||||||
|
'models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object)
|
||||||
|
export_params=True, # store the trained parameter weights inside the model file
|
||||||
|
opset_version=9, # the ONNX version to export the model to
|
||||||
|
do_constant_folding=True, # whether to execute constant folding for optimization
|
||||||
|
input_names = ['X'], # the model's input names
|
||||||
|
output_names = ['Y'] # the model's output names
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# load exported ONNX model for verification
|
||||||
|
onnx_model = onnx.load(output_file)
|
||||||
|
onnx.checker.check_model(onnx_model)
|
||||||
@@ -80,3 +80,65 @@ LANDMARKS = {
|
|||||||
"right_pinky_dip": 73,
|
"right_pinky_dip": 73,
|
||||||
"right_pinky_tip": 74,
|
"right_pinky_tip": 74,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
POSE_LANDMARKS = {
|
||||||
|
# Pose Landmarks
|
||||||
|
"nose": 0,
|
||||||
|
# "left_eye_inner": 1,
|
||||||
|
"left_eye": 2,
|
||||||
|
# "left_eye_outer": 3,
|
||||||
|
# "right_eye_inner": 4,
|
||||||
|
"right_eye": 5,
|
||||||
|
# "right_eye_outer": 6,
|
||||||
|
"left_ear": 7,
|
||||||
|
"right_ear": 8,
|
||||||
|
"mouth_left": 9,
|
||||||
|
# "mouth_right": 10,
|
||||||
|
"left_shoulder": 11,
|
||||||
|
"right_shoulder": 12,
|
||||||
|
"left_elbow": 13,
|
||||||
|
"right_elbow": 14,
|
||||||
|
"left_wrist": 15,
|
||||||
|
"right_wrist": 16,
|
||||||
|
# "left_pinky": 17,
|
||||||
|
# "right_pinky": 18,
|
||||||
|
# "left_index": 19,
|
||||||
|
# "right_index": 20,
|
||||||
|
# "left_thumb": 21,
|
||||||
|
# "right_thumb": 22,
|
||||||
|
# "left_hip": 23,
|
||||||
|
# "right_hip": 24,
|
||||||
|
# "left_knee": 25,
|
||||||
|
# "right_knee": 26,
|
||||||
|
# "left_ankle": 27,
|
||||||
|
# "right_ankle": 28,
|
||||||
|
# "left_heel": 29,
|
||||||
|
# "right_heel": 30,
|
||||||
|
# "left_foot_index": 31,
|
||||||
|
# "right_foot_index": 32,
|
||||||
|
}
|
||||||
|
|
||||||
|
HAND_LANDMARKS = {
|
||||||
|
# Left Hand Landmarks
|
||||||
|
"wrist": 0,
|
||||||
|
"thumb_cmc": 1,
|
||||||
|
"thumb_mcp": 2,
|
||||||
|
"thumb_ip": 3,
|
||||||
|
"thumb_tip": 4,
|
||||||
|
"index_finger_mcp": 5,
|
||||||
|
"index_finger_pip": 6,
|
||||||
|
"index_finger_dip": 7,
|
||||||
|
"index_finger_tip": 8,
|
||||||
|
"middle_finger_mcp": 9,
|
||||||
|
"middle_finger_pip": 10,
|
||||||
|
"middle_finger_dip": 11,
|
||||||
|
"middle_finger_tip": 12,
|
||||||
|
"ring_finger_mcp": 13,
|
||||||
|
"ring_finger_pip": 14,
|
||||||
|
"ring_finger_dip": 15,
|
||||||
|
"ring_finger_tip": 16,
|
||||||
|
"pinky_mcp": 17,
|
||||||
|
"pinky_pip": 18,
|
||||||
|
"pinky_dip": 19,
|
||||||
|
"pinky_tip": 20,
|
||||||
|
}
|
||||||
@@ -10,10 +10,10 @@ import pandas as pd
|
|||||||
|
|
||||||
|
|
||||||
class KeypointExtractor:
|
class KeypointExtractor:
|
||||||
def __init__(self, video_folder: str, cache_folder: str = "cache"):
|
def __init__(self, cache_folder: str = "cache"):
|
||||||
self.mp_drawing = mp.solutions.drawing_utils
|
self.mp_drawing = mp.solutions.drawing_utils
|
||||||
self.mp_holistic = mp.solutions.holistic
|
self.mp_holistic = mp.solutions.holistic
|
||||||
self.video_folder = video_folder
|
# self.video_folder = video_folder
|
||||||
self.cache_folder = cache_folder
|
self.cache_folder = cache_folder
|
||||||
|
|
||||||
# we will store the keypoints of each frame as a row in the dataframe. The columns are the keypoints: Pose (33), Left Hand (21), Right Hand (21). Each keypoint has 3 values: x, y
|
# we will store the keypoints of each frame as a row in the dataframe. The columns are the keypoints: Pose (33), Left Hand (21), Right Hand (21). Each keypoint has 3 values: x, y
|
||||||
@@ -40,10 +40,12 @@ class KeypointExtractor:
|
|||||||
:rtype: pd.DataFrame
|
:rtype: pd.DataFrame
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
video_name = video.split("/")[-1].split(".")[0]
|
||||||
|
|
||||||
if not draw:
|
if not draw:
|
||||||
# check if video exists
|
# check if video exists
|
||||||
if not os.path.exists(self.video_folder + video):
|
if not os.path.exists(video):
|
||||||
logging.error("Video does not exist at path: " + self.video_folder + video)
|
logging.error("Video does not exist at path: " + video)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# check if cache exists
|
# check if cache exists
|
||||||
@@ -51,22 +53,22 @@ class KeypointExtractor:
|
|||||||
os.makedirs(self.cache_folder)
|
os.makedirs(self.cache_folder)
|
||||||
|
|
||||||
# check if cache file exists and return
|
# check if cache file exists and return
|
||||||
if os.path.exists(self.cache_folder + "/" + video + ".npy"):
|
if os.path.exists(self.cache_folder + "/" + video_name + ".npy"):
|
||||||
# create dataframe from cache
|
# create dataframe from cache
|
||||||
df = pd.DataFrame(np.load(self.cache_folder + "/" + video + ".npy", allow_pickle=True), columns=self.columns)
|
df = pd.DataFrame(np.load(self.cache_folder + "/" + video_name + ".npy", allow_pickle=True), columns=self.columns)
|
||||||
if normalize:
|
if normalize:
|
||||||
df = self.normalize_hands(df, norm_algorithm=normalize)
|
df = self.normalize_hands(df, norm_algorithm=normalize)
|
||||||
df = self.normalize_pose_bohacek(df)
|
df, _ = self.normalize_pose_bohacek(df)
|
||||||
return df
|
return df
|
||||||
|
|
||||||
# open video
|
# open video
|
||||||
cap = cv2.VideoCapture(self.video_folder + video)
|
cap = cv2.VideoCapture(video)
|
||||||
|
|
||||||
keypoints_df = pd.DataFrame(columns=self.columns)
|
keypoints_df = pd.DataFrame(columns=self.columns)
|
||||||
|
|
||||||
# extract frames from video so we extract 5 frames per second
|
# extract frames from video so we extract 5 frames per second
|
||||||
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
|
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
|
||||||
frame_skip = frame_rate // 10
|
frame_skip = (frame_rate // 10) -1
|
||||||
|
|
||||||
output_frames = []
|
output_frames = []
|
||||||
|
|
||||||
@@ -113,12 +115,12 @@ class KeypointExtractor:
|
|||||||
cap.release()
|
cap.release()
|
||||||
|
|
||||||
# save keypoints to cache
|
# save keypoints to cache
|
||||||
np.save(self.cache_folder + "/" + video + ".npy", keypoints_df.to_numpy())
|
np.save(self.cache_folder + "/" + video_name + ".npy", keypoints_df.to_numpy())
|
||||||
|
|
||||||
# normalize hands and pose keypoints
|
# normalize hands and pose keypoints
|
||||||
if normalize:
|
if normalize:
|
||||||
keypoints_df = self.normalize_hands(keypoints_df, norm_algorithm=normalize)
|
keypoints_df = self.normalize_hands(keypoints_df, norm_algorithm=normalize)
|
||||||
keypoints_df = self.normalize_pose_bohacek(keypoints_df)
|
keypoints_df, _ = self.normalize_pose_bohacek(keypoints_df)
|
||||||
|
|
||||||
if draw:
|
if draw:
|
||||||
return keypoints_df, output_frames
|
return keypoints_df, output_frames
|
||||||
@@ -179,28 +181,28 @@ class KeypointExtractor:
|
|||||||
|
|
||||||
if norm_algorithm == "minmax":
|
if norm_algorithm == "minmax":
|
||||||
# normalize left hand
|
# normalize left hand
|
||||||
dataframe = self.normalize_hand_minmax(dataframe, "left_hand")
|
dataframe, _= self.normalize_hand_minmax(dataframe, "left_hand")
|
||||||
# normalize right hand
|
# normalize right hand
|
||||||
dataframe = self.normalize_hand_minmax(dataframe, "right_hand")
|
dataframe, _= self.normalize_hand_minmax(dataframe, "right_hand")
|
||||||
elif norm_algorithm == "bohacek":
|
elif norm_algorithm == "bohacek":
|
||||||
# normalize left hand
|
# normalize left hand
|
||||||
dataframe = self.normalize_hand_bohacek(dataframe, "left_hand")
|
dataframe, _= self.normalize_hand_bohacek(dataframe, "left_hand")
|
||||||
# normalize right hand
|
# normalize right hand
|
||||||
dataframe = self.normalize_hand_bohacek(dataframe, "right_hand")
|
dataframe, _= self.normalize_hand_bohacek(dataframe, "right_hand")
|
||||||
else:
|
else:
|
||||||
return dataframe
|
return dataframe
|
||||||
|
|
||||||
return dataframe
|
return dataframe
|
||||||
|
|
||||||
def normalize_hand_minmax(self, dataframe: pd.DataFrame, hand: str) -> pd.DataFrame:
|
def normalize_hand_minmax(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||||
"""normalize_hand_minmax this function normalizes the hand keypoints of a dataframe with respect to the minimum and maximum coordinates
|
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe with respect to the minimum and maximum coordinates
|
||||||
|
|
||||||
:param dataframe: the dataframe to normalize
|
:param dataframe: the dataframe to normalize
|
||||||
:type dataframe: pd.DataFrame
|
:type dataframe: pd.DataFrame
|
||||||
:param hand: the hand to normalize
|
:param hand: the hand to normalize
|
||||||
:type hand: str
|
:type hand: str
|
||||||
:return: the normalized dataframe
|
:return: the normalized dataframe and the bounding boxes dataframe
|
||||||
:rtype: pd.DataFrame
|
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||||
"""
|
"""
|
||||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||||
@@ -226,24 +228,28 @@ class KeypointExtractor:
|
|||||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||||
|
|
||||||
if np.any(bbox_dims == 0):
|
if np.any(bbox_dims == 0):
|
||||||
return dataframe
|
return dataframe, None
|
||||||
# normalize the hand keypoints based on the bounding box around the hand
|
# normalize the hand keypoints based on the bounding box around the hand
|
||||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||||
|
|
||||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||||
|
|
||||||
return dataframe
|
# merge starting and ending points of the bounding boxes in a dataframe
|
||||||
|
bbox_array = np.hstack((min_x.reshape(-1, 1), min_y.reshape(-1, 1), max_x.reshape(-1, 1), max_y.reshape(-1, 1)))
|
||||||
|
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||||
|
|
||||||
def normalize_hand_bohacek(self, dataframe: pd.DataFrame, hand: str) -> pd.DataFrame:
|
return dataframe, bbox
|
||||||
"""normalize_hand_bohacek this function normalizes the hand keypoints of a dataframe using the Bohacek-normalization algorithm
|
|
||||||
|
def normalize_hand_bohacek(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||||
|
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe using the bohacek normalization algorithm
|
||||||
|
|
||||||
:param dataframe: the dataframe to normalize
|
:param dataframe: the dataframe to normalize
|
||||||
:type dataframe: pd.DataFrame
|
:type dataframe: pd.DataFrame
|
||||||
:param hand: the hand to normalize
|
:param hand: the hand to normalize
|
||||||
:type hand: str
|
:type hand: str
|
||||||
:return: the normalized dataframe
|
:return: the normalized dataframe and the bounding boxes dataframe
|
||||||
:rtype: pd.DataFrame
|
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||||
"""
|
"""
|
||||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||||
@@ -287,22 +293,28 @@ class KeypointExtractor:
|
|||||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||||
|
|
||||||
if np.any(bbox_dims == 0):
|
if np.any(bbox_dims == 0):
|
||||||
return dataframe
|
return dataframe, None
|
||||||
# normalize the hand keypoints based on the bounding box around the hand
|
# normalize the hand keypoints based on the bounding box around the hand
|
||||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||||
|
|
||||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||||
|
|
||||||
return dataframe
|
# merge starting and ending points of the bounding boxes in a dataframe
|
||||||
|
bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1)))
|
||||||
|
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||||
|
|
||||||
def normalize_pose_bohacek(self, dataframe: pd.DataFrame) -> pd.DataFrame:
|
return dataframe, bbox
|
||||||
|
|
||||||
|
def normalize_pose_bohacek(self, dataframe: pd.DataFrame, bbox_size: float = 4) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||||
"""normalize_pose_bohacek this function normalizes the pose keypoints of a dataframe using the Bohacek-normalization algorithm
|
"""normalize_pose_bohacek this function normalizes the pose keypoints of a dataframe using the Bohacek-normalization algorithm
|
||||||
|
|
||||||
:param dataframe: the dataframe to normalize
|
:param dataframe: the dataframe to normalize
|
||||||
:type dataframe: pd.DataFrame
|
:type dataframe: pd.DataFrame
|
||||||
:return: the normalized dataframe
|
:param bbox_size: the width and height of the normalization bounding box expressed in head metrics, defaults to 4
|
||||||
:rtype: pd.DataFrame
|
:type bbox_size: float, optional
|
||||||
|
:return: the normalized dataframe and the bounding boxes dataframe
|
||||||
|
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||||
"""
|
"""
|
||||||
# get the columns that belong to the pose
|
# get the columns that belong to the pose
|
||||||
pose_columns = np.array([i for i in range(66)])
|
pose_columns = np.array([i for i in range(66)])
|
||||||
@@ -311,28 +323,22 @@ class KeypointExtractor:
|
|||||||
pose_coords = dataframe.iloc[:, pose_columns].values.reshape(-1, 33, 2)
|
pose_coords = dataframe.iloc[:, pose_columns].values.reshape(-1, 33, 2)
|
||||||
|
|
||||||
# check in what frames shoulders are visible
|
# check in what frames shoulders are visible
|
||||||
left_shoulder_present_mask = pose_coords[:, 11, 0]!=0
|
left_shoulder_present_mask = pose_coords[:, 11, 0] != 0
|
||||||
right_shoulder_present_mask = pose_coords[:, 12, 0]!=0
|
right_shoulder_present_mask = pose_coords[:, 12, 0] != 0
|
||||||
shoulders_present_mask = np.logical_and(left_shoulder_present_mask,right_shoulder_present_mask)
|
shoulders_present_mask = np.logical_and(left_shoulder_present_mask, right_shoulder_present_mask)
|
||||||
|
|
||||||
# calculate shoulder distance
|
# calculate shoulder distance
|
||||||
left_shoulder, right_shoulder = pose_coords[shoulders_present_mask, 11,], pose_coords[shoulders_present_mask, 12,]
|
left_shoulder, right_shoulder = pose_coords[shoulders_present_mask, 11], pose_coords[shoulders_present_mask, 12]
|
||||||
shoulder_distance = ((left_shoulder[:, 0] - right_shoulder[:, 0])**2 + (left_shoulder[:, 1] - right_shoulder[:, 1])**2)**0.5
|
shoulder_distance = ((left_shoulder[:, 0] - right_shoulder[:, 0])**2 + (left_shoulder[:, 1] - right_shoulder[:, 1])**2)**0.5
|
||||||
head_metric = shoulder_distance
|
head_metric = shoulder_distance
|
||||||
|
|
||||||
# center of shoulders and left eye are necessary to construct bounding box
|
# center of shoulders and left eye are necessary to construct bounding box
|
||||||
center_shoulders = right_shoulder + (left_shoulder - right_shoulder)/2
|
center_shoulders = right_shoulder + (left_shoulder - right_shoulder) / 2
|
||||||
left_eye = pose_coords[shoulders_present_mask, 2]
|
left_eye = pose_coords[shoulders_present_mask, 2]
|
||||||
|
|
||||||
# set the starting and ending point of the normalization bounding box
|
# set the starting and ending point of the normalization bounding box
|
||||||
starting_x, starting_y = center_shoulders[:, 0] - 2*head_metric, left_eye[:, 1] - 0.5*head_metric
|
starting_x, starting_y = center_shoulders[:, 0] - (bbox_size / 2) * head_metric, left_eye[:, 1] - 0.5 * head_metric
|
||||||
ending_x, ending_y = center_shoulders[:, 0] + 2*head_metric, starting_y + 4*head_metric
|
ending_x, ending_y = center_shoulders[:, 0] + (bbox_size / 2) * head_metric, starting_y + (bbox_size - 0.5) * head_metric
|
||||||
|
|
||||||
# ensure that the starting and ending point of the bounding box are not out of the frame
|
|
||||||
#starting_x = np.clip(starting_x, 0, None)
|
|
||||||
#starting_y = np.clip(starting_y, 0 ,None)
|
|
||||||
#ending_x = np.clip(ending_x, 0, None)
|
|
||||||
#ending_y = np.clip(ending_y, 0 ,None)
|
|
||||||
|
|
||||||
# calculate the center of the bounding box and the bounding box dimensions
|
# calculate the center of the bounding box and the bounding box dimensions
|
||||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||||
@@ -342,15 +348,19 @@ class KeypointExtractor:
|
|||||||
bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1)
|
bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1)
|
||||||
center_coords = np.concatenate((np.tile(bbox_center_x, (1, 33, 1)), np.tile(bbox_center_y, (1, 33, 1))), axis=2)
|
center_coords = np.concatenate((np.tile(bbox_center_x, (1, 33, 1)), np.tile(bbox_center_y, (1, 33, 1))), axis=2)
|
||||||
|
|
||||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1)
|
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1, 1)
|
||||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 33, 1)), np.tile(bbox_height, (1, 33, 1))), axis=2)
|
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 33, 1)), np.tile(bbox_height, (1, 33, 1))), axis=2)
|
||||||
|
|
||||||
if np.any(bbox_dims == 0):
|
if np.any(bbox_dims == 0):
|
||||||
return dataframe
|
return dataframe, None
|
||||||
# normalize the pose keypoints based on the bounding box
|
# normalize the pose keypoints based on the bounding box
|
||||||
norm_pose_coords= (pose_coords - center_coords) / bbox_dims
|
norm_pose_coords = (pose_coords - center_coords) / bbox_dims
|
||||||
|
|
||||||
# flatten the normalized pose keypoints array and replace the original pose keypoints with the normalized pose keypoints in the dataframe
|
# flatten the normalized pose keypoints array and replace the original pose keypoints with the normalized pose keypoints in the dataframe
|
||||||
dataframe.iloc[shoulders_present_mask, pose_columns] = norm_pose_coords.reshape(-1, 66)
|
dataframe.iloc[shoulders_present_mask, pose_columns] = norm_pose_coords.reshape(-1, 66)
|
||||||
|
|
||||||
return dataframe
|
# merge starting and ending points of the bounding boxes in a dataframe
|
||||||
|
bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1)))
|
||||||
|
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||||
|
|
||||||
|
return dataframe, bbox
|
||||||
|
|||||||
21
src/loss_function.py
Normal file
21
src/loss_function.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# create custom loss function
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
||||||
|
|
||||||
|
from src.keypoint_extractor import KeypointExtractor
|
||||||
|
from torch.utils.data import DataLoader
|
||||||
|
from src.identifiers import LANDMARKS
|
||||||
|
|
||||||
|
class CustomLoss(nn.Module):
|
||||||
|
# combine cross entropy loss and L1 loss
|
||||||
|
def __init__(self):
|
||||||
|
super(CustomLoss, self).__init__()
|
||||||
|
|
||||||
|
def forward(self, pred, target):
|
||||||
|
# the prediciton for Z cannot be higher than 0.6 else give a high loss, backward must be able to learn this (return tensor)
|
||||||
|
|
||||||
|
if torch.nn.functional.softmax(pred, dim=2)[0][0][25] > 0.4:
|
||||||
|
return torch.tensor(100.0, requires_grad=True)
|
||||||
|
|
||||||
|
return torch.tensor(0.0, requires_grad=True)
|
||||||
27
src/model.py
27
src/model.py
@@ -1,6 +1,7 @@
|
|||||||
### SPOTER model implementation from the paper "SPOTER: Sign Pose-based Transformer for Sign Language Recognition from Sequence of Skeletal Data"
|
### SPOTER model implementation from the paper "SPOTER: Sign Pose-based Transformer for Sign Language Recognition from Sequence of Skeletal Data"
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import math
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import torch
|
import torch
|
||||||
@@ -38,6 +39,19 @@ class SPOTERTransformerDecoderLayer(nn.TransformerDecoderLayer):
|
|||||||
|
|
||||||
return tgt
|
return tgt
|
||||||
|
|
||||||
|
class PositionalEmbedding(nn.Module):
|
||||||
|
def __init__(self, d_model, max_len=60):
|
||||||
|
super().__init__()
|
||||||
|
pe = torch.zeros(max_len, d_model)
|
||||||
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
||||||
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
|
||||||
|
pe[:, 0::2] = torch.sin(position * div_term)
|
||||||
|
pe[:, 1::2] = torch.cos(position * div_term)
|
||||||
|
pe = pe.unsqueeze(0).transpose(0, 1)
|
||||||
|
self.register_buffer('pe', pe)
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
return x + self.pe[:x.size(0), :]
|
||||||
|
|
||||||
class SPOTER(nn.Module):
|
class SPOTER(nn.Module):
|
||||||
"""
|
"""
|
||||||
@@ -48,8 +62,9 @@ class SPOTER(nn.Module):
|
|||||||
def __init__(self, num_classes, hidden_dim=55):
|
def __init__(self, num_classes, hidden_dim=55):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self.row_embed = nn.Parameter(torch.rand(50, hidden_dim))
|
|
||||||
self.pos = nn.Parameter(torch.cat([self.row_embed[0].unsqueeze(0).repeat(1, 1, 1)], dim=-1).flatten(0, 1).unsqueeze(0))
|
self.pos = PositionalEmbedding(hidden_dim)
|
||||||
|
|
||||||
self.class_query = nn.Parameter(torch.rand(1, hidden_dim))
|
self.class_query = nn.Parameter(torch.rand(1, hidden_dim))
|
||||||
self.transformer = nn.Transformer(hidden_dim, 9, 6, 6)
|
self.transformer = nn.Transformer(hidden_dim, 9, 6, 6)
|
||||||
self.linear_class = nn.Linear(hidden_dim, num_classes)
|
self.linear_class = nn.Linear(hidden_dim, num_classes)
|
||||||
@@ -61,7 +76,13 @@ class SPOTER(nn.Module):
|
|||||||
|
|
||||||
def forward(self, inputs):
|
def forward(self, inputs):
|
||||||
h = torch.unsqueeze(inputs.flatten(start_dim=1), 1).float()
|
h = torch.unsqueeze(inputs.flatten(start_dim=1), 1).float()
|
||||||
h = self.transformer(self.pos + h, self.class_query.unsqueeze(0)).transpose(0, 1)
|
# add positional encoding
|
||||||
|
h = self.pos(h)
|
||||||
|
|
||||||
|
# add class query
|
||||||
|
h = self.transformer(h, self.class_query.unsqueeze(0)).transpose(0, 1)
|
||||||
|
|
||||||
|
# get class prediction
|
||||||
res = self.linear_class(h)
|
res = self.linear_class(h)
|
||||||
|
|
||||||
return res
|
return res
|
||||||
64
src/normalizations.py
Normal file
64
src/normalizations.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_hand_bohaecek(keypoints):
|
||||||
|
min_x, min_y = np.min(keypoints[::2]), np.min(keypoints[1::2])
|
||||||
|
max_x, max_y = np.max(keypoints[::2]), np.max(keypoints[1::2])
|
||||||
|
|
||||||
|
width, height = max_x - min_x, max_y - min_y
|
||||||
|
|
||||||
|
delta_x = 0.0
|
||||||
|
delta_y = 0.0
|
||||||
|
|
||||||
|
if width > height:
|
||||||
|
delta_x = 0.1 * width
|
||||||
|
delta_y = delta_x + ((width - height) / 2)
|
||||||
|
else:
|
||||||
|
delta_y = 0.1 * height
|
||||||
|
delta_x = delta_y + ((height - width) / 2)
|
||||||
|
|
||||||
|
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
||||||
|
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
||||||
|
|
||||||
|
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||||
|
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||||
|
|
||||||
|
if bbox_width == 0 or bbox_height == 0:
|
||||||
|
return keypoints, None
|
||||||
|
|
||||||
|
# every odd index minus center_x and divide by width, every even index minus center_y and divide by height
|
||||||
|
normalized_keypoints = np.zeros(keypoints.shape)
|
||||||
|
normalized_keypoints[::2] = (keypoints[::2] - bbox_center_x) / bbox_width
|
||||||
|
normalized_keypoints[1::2] = (keypoints[1::2] - bbox_center_y) / bbox_height
|
||||||
|
|
||||||
|
return normalized_keypoints, (int(starting_x), int(starting_y), int(bbox_width), int(bbox_height))
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_pose(keypoints, bbox_size: float = 4.0):
|
||||||
|
shoulder_left = keypoints[22:24]
|
||||||
|
shoulder_right = keypoints[24:26]
|
||||||
|
|
||||||
|
# distance between shoulders
|
||||||
|
shoulder_distance = np.linalg.norm(shoulder_left - shoulder_right)
|
||||||
|
|
||||||
|
# center of shoulders
|
||||||
|
shoulder_center = (shoulder_left + shoulder_right) / 2
|
||||||
|
|
||||||
|
# left eye
|
||||||
|
eye_left = keypoints[4:6]
|
||||||
|
|
||||||
|
starting_x, starting_y = shoulder_center[0] - (bbox_size / 2) * shoulder_distance, eye_left[1] - 0.5 * shoulder_distance
|
||||||
|
ending_x, ending_y = shoulder_center[0] + (bbox_size / 2) * shoulder_distance, starting_y + (bbox_size - 0.5) * shoulder_distance
|
||||||
|
|
||||||
|
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||||
|
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||||
|
|
||||||
|
if bbox_width == 0 or bbox_height == 0:
|
||||||
|
return keypoints, None
|
||||||
|
|
||||||
|
# every odd index minus center_x and divide by width, every even index minus center_y and divide by height
|
||||||
|
normalized_keypoints = np.zeros(keypoints.shape)
|
||||||
|
normalized_keypoints[::2] = (keypoints[::2] - bbox_center_x) / bbox_width
|
||||||
|
normalized_keypoints[1::2] = (keypoints[1::2] - bbox_center_y) / bbox_height
|
||||||
|
|
||||||
|
return normalized_keypoints, (int(starting_x), int(starting_y), int(bbox_width), int(bbox_height))
|
||||||
86
src/train.py
86
src/train.py
@@ -1,11 +1,6 @@
|
|||||||
import argparse
|
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
import matplotlib.ticker as ticker
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import torch
|
import torch
|
||||||
import torch.nn as nn
|
import torch.nn as nn
|
||||||
@@ -13,15 +8,17 @@ import torch.optim as optim
|
|||||||
from torch.utils.data import DataLoader
|
from torch.utils.data import DataLoader
|
||||||
from torchvision import transforms
|
from torchvision import transforms
|
||||||
|
|
||||||
from src.augmentations import MirrorKeypoints
|
from src.augmentations import MirrorKeypoints, Z_augmentation, NoiseAugmentation
|
||||||
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
||||||
from src.datasets.wlasl_dataset import WLASLDataset
|
|
||||||
from src.identifiers import LANDMARKS
|
from src.identifiers import LANDMARKS
|
||||||
from src.keypoint_extractor import KeypointExtractor
|
|
||||||
from src.model import SPOTER
|
from src.model import SPOTER
|
||||||
|
from src.loss_function import CustomLoss
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from torch.utils.tensorboard import SummaryWriter
|
||||||
|
|
||||||
def train():
|
def train():
|
||||||
|
writer = SummaryWriter()
|
||||||
random.seed(379)
|
random.seed(379)
|
||||||
np.random.seed(379)
|
np.random.seed(379)
|
||||||
os.environ['PYTHONHASHSEED'] = str(379)
|
os.environ['PYTHONHASHSEED'] = str(379)
|
||||||
@@ -32,48 +29,57 @@ def train():
|
|||||||
g = torch.Generator()
|
g = torch.Generator()
|
||||||
g.manual_seed(379)
|
g.manual_seed(379)
|
||||||
|
|
||||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
device = torch.device("cuda:0")
|
||||||
|
|
||||||
spoter_model = SPOTER(num_classes=12, hidden_dim=len(LANDMARKS) *2)
|
spoter_model = SPOTER(num_classes=26, hidden_dim=len(LANDMARKS) *2)
|
||||||
spoter_model.train(True)
|
spoter_model.train(True)
|
||||||
spoter_model.to(device)
|
spoter_model.to(device)
|
||||||
|
|
||||||
|
|
||||||
criterion = nn.CrossEntropyLoss()
|
criterion = nn.CrossEntropyLoss()
|
||||||
optimizer = optim.SGD(spoter_model.parameters(), lr=0.0001, momentum=0.9)
|
criterion_bad = CustomLoss()
|
||||||
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=5)
|
optimizer = optim.Adam(spoter_model.parameters(), lr=0.00001)
|
||||||
|
scheduler = None
|
||||||
|
|
||||||
# TODO: create paths for checkpoints
|
# check if checkpoints folder exists
|
||||||
|
if not os.path.exists("checkpoints"):
|
||||||
|
os.makedirs("checkpoints")
|
||||||
|
|
||||||
# TODO: transformations + augmentations
|
transform = transforms.Compose([MirrorKeypoints(), NoiseAugmentation(noise=0.1)])
|
||||||
|
|
||||||
k = KeypointExtractor("data/fingerspelling/data/")
|
train_set = FingerSpellingDataset("data/fingerspelling/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="train", transform=transform)
|
||||||
|
|
||||||
transform = transforms.Compose([MirrorKeypoints()])
|
|
||||||
|
|
||||||
train_set = FingerSpellingDataset("data/fingerspelling/data/", k, keypoints_identifier=LANDMARKS, subset="train", transform=transform)
|
|
||||||
train_loader = DataLoader(train_set, shuffle=True, generator=g)
|
train_loader = DataLoader(train_set, shuffle=True, generator=g)
|
||||||
|
|
||||||
val_set = FingerSpellingDataset("data/fingerspelling/data/", k, keypoints_identifier=LANDMARKS, subset="val")
|
val_set = FingerSpellingDataset("data/fingerspelling/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="test")
|
||||||
val_loader = DataLoader(val_set, shuffle=True, generator=g)
|
val_loader = DataLoader(val_set, shuffle=True, generator=g)
|
||||||
|
|
||||||
|
|
||||||
train_acc, val_acc = 0, 0
|
train_acc, val_acc = 0, 0
|
||||||
lr_progress = []
|
lr_progress = []
|
||||||
top_train_acc, top_val_acc = 0, 0
|
top_train_acc, top_val_acc = 0, 0
|
||||||
checkpoint_index = 0
|
checkpoint_index = 0
|
||||||
|
|
||||||
for epoch in range(100):
|
epochs_without_improvement = 0
|
||||||
|
best_val_acc = 0
|
||||||
|
|
||||||
|
for epoch in range(300):
|
||||||
|
|
||||||
running_loss = 0.0
|
running_loss = 0.0
|
||||||
pred_correct, pred_all = 0, 0
|
pred_correct, pred_all = 0, 0
|
||||||
|
|
||||||
# train
|
# train
|
||||||
for i, (inputs, labels) in enumerate(train_loader):
|
for i, (inputs, labels) in enumerate(train_loader):
|
||||||
|
# skip videos that are too short
|
||||||
|
if inputs.shape[1] < 20:
|
||||||
|
continue
|
||||||
|
|
||||||
inputs = inputs.squeeze(0).to(device)
|
inputs = inputs.squeeze(0).to(device)
|
||||||
labels = labels.to(device, dtype=torch.long)
|
labels = labels.to(device, dtype=torch.long)
|
||||||
|
|
||||||
optimizer.zero_grad()
|
optimizer.zero_grad()
|
||||||
outputs = spoter_model(inputs).expand(1, -1, -1)
|
outputs = spoter_model(inputs).expand(1, -1, -1)
|
||||||
loss = criterion(outputs[0], labels)
|
loss = criterion(outputs[0], labels)
|
||||||
|
|
||||||
loss.backward()
|
loss.backward()
|
||||||
optimizer.step()
|
optimizer.step()
|
||||||
running_loss += loss
|
running_loss += loss
|
||||||
@@ -82,11 +88,16 @@ def train():
|
|||||||
pred_correct += 1
|
pred_correct += 1
|
||||||
pred_all += 1
|
pred_all += 1
|
||||||
|
|
||||||
|
|
||||||
if scheduler:
|
if scheduler:
|
||||||
scheduler.step(running_loss.item() / len(train_loader))
|
scheduler.step(running_loss.item() / (len(train_loader)) )
|
||||||
|
|
||||||
|
writer.add_scalar("Loss/train", loss, epoch)
|
||||||
|
writer.add_scalar("Accuracy/train", (pred_correct / pred_all), epoch)
|
||||||
|
|
||||||
# validate and print val acc
|
# validate and print val acc
|
||||||
val_pred_correct, val_pred_all = 0, 0
|
val_pred_correct, val_pred_all = 0, 0
|
||||||
|
val_loss = 0.0
|
||||||
with torch.no_grad():
|
with torch.no_grad():
|
||||||
for i, (inputs, labels) in enumerate(val_loader):
|
for i, (inputs, labels) in enumerate(val_loader):
|
||||||
inputs = inputs.squeeze(0).to(device)
|
inputs = inputs.squeeze(0).to(device)
|
||||||
@@ -94,25 +105,44 @@ def train():
|
|||||||
|
|
||||||
outputs = spoter_model(inputs).expand(1, -1, -1)
|
outputs = spoter_model(inputs).expand(1, -1, -1)
|
||||||
|
|
||||||
|
# calculate loss
|
||||||
|
val_loss += criterion(outputs[0], labels)
|
||||||
|
|
||||||
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
|
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
|
||||||
val_pred_correct += 1
|
val_pred_correct += 1
|
||||||
val_pred_all += 1
|
val_pred_all += 1
|
||||||
|
|
||||||
val_acc = (val_pred_correct / val_pred_all)
|
val_acc = (val_pred_correct / val_pred_all)
|
||||||
|
|
||||||
|
writer.add_scalar("Loss/val", val_loss, epoch)
|
||||||
|
writer.add_scalar("Accuracy/val", val_acc, epoch)
|
||||||
|
|
||||||
|
|
||||||
print(f"Epoch: {epoch} | Train Acc: {(pred_correct / pred_all)} | Val Acc: {val_acc}")
|
print(f"Epoch: {epoch} | Train Acc: {(pred_correct / pred_all)} | Val Acc: {val_acc}")
|
||||||
|
|
||||||
|
# save checkpoint and update epochs_without_improvement
|
||||||
|
if val_acc > best_val_acc:
|
||||||
|
best_val_acc = val_acc
|
||||||
|
epochs_without_improvement = 0
|
||||||
|
if epoch > 55:
|
||||||
|
top_val_acc = val_acc
|
||||||
|
top_train_acc = train_acc
|
||||||
|
checkpoint_index = epoch
|
||||||
|
torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
||||||
|
else:
|
||||||
|
epochs_without_improvement += 1
|
||||||
|
|
||||||
# save checkpoint
|
# early stopping
|
||||||
if val_acc > top_val_acc and epoch > 55:
|
if epochs_without_improvement >= 40:
|
||||||
top_val_acc = val_acc
|
print("Early stopping due to no improvement in validation accuracy for 40 epochs.")
|
||||||
top_train_acc = train_acc
|
break
|
||||||
checkpoint_index = epoch
|
|
||||||
torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
|
||||||
|
|
||||||
lr_progress.append(optimizer.param_groups[0]['lr'])
|
lr_progress.append(optimizer.param_groups[0]['lr'])
|
||||||
|
|
||||||
print(f"Best val acc: {top_val_acc} | Best train acc: {top_train_acc} | Epoch: {checkpoint_index}")
|
print(f"Best val acc: {top_val_acc} | Best train acc: {top_train_acc} | Epoch: {checkpoint_index}")
|
||||||
|
writer.flush()
|
||||||
|
writer.close()
|
||||||
|
|
||||||
|
|
||||||
# Path: src/train.py
|
# Path: src/train.py
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
0
visualizations/__init__.py
Normal file
0
visualizations/__init__.py
Normal file
146
visualizations/analyze_model.ipynb
Normal file
146
visualizations/analyze_model.ipynb
Normal file
File diff suppressed because one or more lines are too long
1781
visualizations/visualize_data.ipynb
Normal file
1781
visualizations/visualize_data.ipynb
Normal file
File diff suppressed because one or more lines are too long
116
visualizations/webcam_view.py
Normal file
116
visualizations/webcam_view.py
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import cv2
|
||||||
|
import mediapipe as mp
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import torch
|
||||||
|
|
||||||
|
from src.identifiers import LANDMARKS
|
||||||
|
from src.keypoint_extractor import KeypointExtractor
|
||||||
|
from src.model import SPOTER
|
||||||
|
from src.normalizations import normalize_hand_bohaecek, normalize_pose
|
||||||
|
|
||||||
|
# Initialize MediaPipe Hands model
|
||||||
|
holistic = mp.solutions.holistic.Holistic(
|
||||||
|
min_detection_confidence=0.5,
|
||||||
|
min_tracking_confidence=0.5,
|
||||||
|
model_complexity=2
|
||||||
|
)
|
||||||
|
mp_holistic = mp.solutions.holistic
|
||||||
|
mp_drawing = mp.solutions.drawing_utils
|
||||||
|
|
||||||
|
# Initialize video capture object
|
||||||
|
cap = cv2.VideoCapture(0)
|
||||||
|
|
||||||
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
|
||||||
|
keypoints = []
|
||||||
|
|
||||||
|
spoter_model = SPOTER(num_classes=26, hidden_dim=len(LANDMARKS) * 2)
|
||||||
|
spoter_model.load_state_dict(torch.load('models/spoter_76.pth', map_location=torch.device('cpu')))
|
||||||
|
|
||||||
|
# get values of the landmarks as a list of integers
|
||||||
|
values = []
|
||||||
|
for i in LANDMARKS.values():
|
||||||
|
values.append(i * 2)
|
||||||
|
values.append(i * 2 + 1)
|
||||||
|
values = np.array(values)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Read frame from camera
|
||||||
|
success, frame = cap.read()
|
||||||
|
|
||||||
|
# Convert the frame to RGB
|
||||||
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||||
|
|
||||||
|
# Detect hand landmarks in the frame
|
||||||
|
results = holistic.process(frame)
|
||||||
|
|
||||||
|
def extract_keypoints(landmarks):
|
||||||
|
if landmarks:
|
||||||
|
return np.array([i for landmark in landmarks.landmark for i in [landmark.x, landmark.y]])
|
||||||
|
|
||||||
|
k1 = extract_keypoints(results.pose_landmarks)
|
||||||
|
k2 = extract_keypoints(results.left_hand_landmarks)
|
||||||
|
k3 = extract_keypoints(results.right_hand_landmarks)
|
||||||
|
|
||||||
|
if k1 is not None and (k2 is not None or k3 is not None):
|
||||||
|
k2 = k2 if k2 is not None else np.zeros(42)
|
||||||
|
k3 = k3 if k3 is not None else np.zeros(42)
|
||||||
|
|
||||||
|
k1 = k1 * np.array([frame_width, frame_height] * 33)
|
||||||
|
k2 = k2 * np.array([frame_width, frame_height] * 21)
|
||||||
|
k3 = k3 * np.array([frame_width, frame_height] * 21)
|
||||||
|
|
||||||
|
k1, bbox_pose = normalize_pose(k1)
|
||||||
|
k2, bbox_left = normalize_hand_bohaecek(k2)
|
||||||
|
k3, bbox_right = normalize_hand_bohaecek(k3)
|
||||||
|
|
||||||
|
# Draw normalization bounding boxes
|
||||||
|
if bbox_pose is not None:
|
||||||
|
frame = cv2.rectangle(frame, bbox_pose, (0, 255, 0), 2)
|
||||||
|
if bbox_left is not None:
|
||||||
|
frame = cv2.rectangle(frame, bbox_left, (0, 255, 0), 2)
|
||||||
|
if bbox_right is not None:
|
||||||
|
frame = cv2.rectangle(frame, bbox_right, (0, 255, 0), 2)
|
||||||
|
|
||||||
|
k = np.concatenate((k1, k2, k3))
|
||||||
|
filtered = k[values]
|
||||||
|
|
||||||
|
while len(keypoints) >= 8:
|
||||||
|
keypoints.pop(0)
|
||||||
|
keypoints.append(filtered)
|
||||||
|
|
||||||
|
if len(keypoints) == 8:
|
||||||
|
# keypoints to tensor
|
||||||
|
keypoints_tensor = torch.tensor(keypoints).float()
|
||||||
|
outputs = spoter_model(keypoints_tensor).expand(1, -1, -1)
|
||||||
|
outputs = torch.nn.functional.softmax(outputs, dim=2)
|
||||||
|
topk = torch.topk(outputs, k=3, dim=2)
|
||||||
|
|
||||||
|
# show overlay on frame at top right with confidence scores of topk predictions
|
||||||
|
for i, (label, score) in enumerate(zip(topk.indices[0][0], topk.values[0][0])):
|
||||||
|
# get the label (A-Z), index to char
|
||||||
|
l = label.item()
|
||||||
|
if l < 26:
|
||||||
|
l = chr(l + 65)
|
||||||
|
|
||||||
|
cv2.putText(frame, f"{l} {score.item():.2f}", (frame.shape[1] - 200, 50 + i * 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||||
|
|
||||||
|
mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||||
|
mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||||
|
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
|
||||||
|
|
||||||
|
# frame to rgb
|
||||||
|
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
||||||
|
|
||||||
|
# Show the frame
|
||||||
|
cv2.imshow('MediaPipe Hands', frame)
|
||||||
|
|
||||||
|
# Wait for key press to exit
|
||||||
|
if cv2.waitKey(5) & 0xFF == 27:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Release the video capture object and destroy the windows
|
||||||
|
cap.release()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
@@ -1,301 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
|
||||||
"\n",
|
|
||||||
"# reload modules\n",
|
|
||||||
"%load_ext autoreload"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"video_name = '69547.mp4' "
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# extract keypoints\n",
|
|
||||||
"keypoint_extractor = KeypointExtractor('data/videos/')"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import numpy as np\n",
|
|
||||||
"from IPython.display import HTML\n",
|
|
||||||
"from base64 import b64encode\n",
|
|
||||||
"import mediapy as media\n",
|
|
||||||
"%matplotlib inline\n",
|
|
||||||
"\n",
|
|
||||||
"# Define the frames per second (fps) and duration of the video\n",
|
|
||||||
"fps = 25\n",
|
|
||||||
"duration = 10\n",
|
|
||||||
"\n",
|
|
||||||
"# Create a dummy video of random noise\n",
|
|
||||||
"_, video_frames = keypoint_extractor.extract_keypoints_from_video(video_name, normalize=\"minmax\", draw=True)\n",
|
|
||||||
"\n",
|
|
||||||
"# Convert the video to a numpy array\n",
|
|
||||||
"video = np.array(video_frames)\n",
|
|
||||||
"media.show_video(video, height=400, codec='gif', fps=4)\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from src.model import SPOTER\n",
|
|
||||||
"from src.identifiers import LANDMARKS\n",
|
|
||||||
"import torch\n",
|
|
||||||
"\n",
|
|
||||||
"spoter_model = SPOTER(num_classes=5, hidden_dim=len(LANDMARKS) *2)\n",
|
|
||||||
"spoter_model.load_state_dict(torch.load('models/spoter_40.pth'))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# get average number of frames in test set\n",
|
|
||||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
|
||||||
"from src.datasets.finger_spelling_dataset import FingerSpellingDataset\n",
|
|
||||||
"from src.identifiers import LANDMARKS\n",
|
|
||||||
"import numpy as np\n",
|
|
||||||
"\n",
|
|
||||||
"keypoints_extractor = KeypointExtractor(\"data/fingerspelling/data/\")\n",
|
|
||||||
"test_set = FingerSpellingDataset(\"data/fingerspelling/data/\", keypoints_extractor, keypoints_identifier=LANDMARKS, subset=\"val\")\n",
|
|
||||||
"\n",
|
|
||||||
"frames = []\n",
|
|
||||||
"labels = []\n",
|
|
||||||
"for sample, label in test_set:\n",
|
|
||||||
" frames.append(sample.shape[0])\n",
|
|
||||||
" labels.append(label)\n",
|
|
||||||
"\n",
|
|
||||||
"print(np.mean(frames))\n",
|
|
||||||
"# get label frequency in the labels list\n",
|
|
||||||
"from collections import Counter\n",
|
|
||||||
"\n",
|
|
||||||
"counter = Counter(labels)\n",
|
|
||||||
"print(counter)\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"# Hand keypoint visualization"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import matplotlib.pyplot as plt\n",
|
|
||||||
"\n",
|
|
||||||
"def plot_hand_keypoints(dataframe, hand, frame):\n",
|
|
||||||
" hand_columns = np.array([i for i in range(66 + (42 if hand == \"right\" else 0), 108 + (42 if hand == \"right\" else 0))])\n",
|
|
||||||
" \n",
|
|
||||||
" # get the x, y coordinates of the hand keypoints\n",
|
|
||||||
" frame_df = dataframe.iloc[frame:frame+1, hand_columns]\n",
|
|
||||||
" hand_coords = frame_df.values.reshape(21, 2)\n",
|
|
||||||
" \n",
|
|
||||||
" x_coords = hand_coords[:, ::2] #Even indices\n",
|
|
||||||
" y_coords = -hand_coords[:, 1::2] #Uneven indices (negative because pixels start from the top left)\n",
|
|
||||||
" \n",
|
|
||||||
" #Plot the keypoints\n",
|
|
||||||
" plt.scatter(x_coords, y_coords)\n",
|
|
||||||
" return frame_df.style"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#Set video, hand and frame to display\n",
|
|
||||||
"video_name = '69547.mp4'\n",
|
|
||||||
"hand = \"right\"\n",
|
|
||||||
"frame = 3\n",
|
|
||||||
"%reload_ext autoreload"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
|
||||||
"import numpy as np\n",
|
|
||||||
"\n",
|
|
||||||
"#Extract keypoints from requested video\n",
|
|
||||||
"keypoints_extractor = KeypointExtractor(\"data/videos/\")\n",
|
|
||||||
"\n",
|
|
||||||
"#Plot the hand keypoints\n",
|
|
||||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name)\n",
|
|
||||||
"df.head()\n",
|
|
||||||
"plot_hand_keypoints(df, hand, frame)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#Plot the NORMALIZED hand keypoints (using minxmax)\n",
|
|
||||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name, normalize=\"minmax\")\n",
|
|
||||||
"plt.xlim(-0.5,0.5)\n",
|
|
||||||
"plt.ylim(-0.5,0.5)\n",
|
|
||||||
"plot_hand_keypoints(df, hand, frame)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#Plot the NORMALIZED hand keypoints (using bohacek)\n",
|
|
||||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name, normalize=\"bohacek\")\n",
|
|
||||||
"plt.xlim(-0.5,0.5)\n",
|
|
||||||
"plt.ylim(-0.5,0.5)\n",
|
|
||||||
"plot_hand_keypoints(df, hand, frame)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"# Pose keypoint visualization"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import matplotlib.pyplot as plt\n",
|
|
||||||
"\n",
|
|
||||||
"def plot_pose_keypoints(dataframe, frame):\n",
|
|
||||||
" pose_columns = np.array([i for i in range(32)])\n",
|
|
||||||
" \n",
|
|
||||||
" # get the x, y coordinates of the relevant pose keypoints\n",
|
|
||||||
" frame_df = dataframe.iloc[frame:frame+1, pose_columns]\n",
|
|
||||||
" pose_coords = frame_df.values.reshape(16, 2)\n",
|
|
||||||
" \n",
|
|
||||||
" x_coords = pose_coords[:, ::2] #Even indices\n",
|
|
||||||
" y_coords = -pose_coords[:, 1::2] #Uneven indices (negative because pixels start from the top left)\n",
|
|
||||||
" \n",
|
|
||||||
" #Plot the keypoints\n",
|
|
||||||
" plt.scatter(x_coords, y_coords)\n",
|
|
||||||
" return frame_df.style"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#Set video, hand and frame to display\n",
|
|
||||||
"video_name = '69547.mp4'\n",
|
|
||||||
"frame = 2\n",
|
|
||||||
"%reload_ext autoreload"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
|
||||||
"import numpy as np\n",
|
|
||||||
"\n",
|
|
||||||
"#Extract keypoints from requested video\n",
|
|
||||||
"keypoints_extractor = KeypointExtractor(\"data/videos/\")\n",
|
|
||||||
"\n",
|
|
||||||
"#Plot the hand keypoints\n",
|
|
||||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name)\n",
|
|
||||||
"df.head()\n",
|
|
||||||
"plot_pose_keypoints(df, frame)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"norm_df = keypoints_extractor.extract_keypoints_from_video(video_name, normalize=\"bohacek\")\n",
|
|
||||||
"plt.xlim(-0.5,0.5)\n",
|
|
||||||
"plt.ylim(-0.5,0.5)\n",
|
|
||||||
"plot_pose_keypoints(norm_df, frame)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": []
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "Python 3 (ipykernel)",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.9.13"
|
|
||||||
},
|
|
||||||
"vscode": {
|
|
||||||
"interpreter": {
|
|
||||||
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
||||||
167
webcam_view.py
167
webcam_view.py
@@ -1,167 +0,0 @@
|
|||||||
import cv2
|
|
||||||
import mediapipe as mp
|
|
||||||
import numpy as np
|
|
||||||
import torch
|
|
||||||
|
|
||||||
from src.identifiers import LANDMARKS
|
|
||||||
from src.model import SPOTER
|
|
||||||
|
|
||||||
# Initialize MediaPipe Hands model
|
|
||||||
holistic = mp.solutions.holistic.Holistic(
|
|
||||||
min_detection_confidence=0.5,
|
|
||||||
min_tracking_confidence=0.5,
|
|
||||||
model_complexity=2
|
|
||||||
)
|
|
||||||
mp_holistic = mp.solutions.holistic
|
|
||||||
mp_drawing = mp.solutions.drawing_utils
|
|
||||||
# Initialize video capture object
|
|
||||||
cap = cv2.VideoCapture(0)
|
|
||||||
|
|
||||||
|
|
||||||
keypoints = []
|
|
||||||
|
|
||||||
spoter_model = SPOTER(num_classes=12, hidden_dim=len(LANDMARKS) *2)
|
|
||||||
spoter_model.load_state_dict(torch.load('models/spoter_57.pth'))
|
|
||||||
|
|
||||||
m = {
|
|
||||||
0: "A",
|
|
||||||
1: "B",
|
|
||||||
2: "C",
|
|
||||||
3: "D",
|
|
||||||
4: "E",
|
|
||||||
5: "F",
|
|
||||||
6: "G",
|
|
||||||
7: "H",
|
|
||||||
8: "I",
|
|
||||||
9: "J",
|
|
||||||
10: "K",
|
|
||||||
11: "L",
|
|
||||||
}
|
|
||||||
|
|
||||||
while True:
|
|
||||||
# Read a frame from the webcam
|
|
||||||
ret, frame = cap.read()
|
|
||||||
if not ret:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Convert the frame to RGB
|
|
||||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
||||||
|
|
||||||
# Detect hand landmarks in the frame
|
|
||||||
results = holistic.process(frame)
|
|
||||||
|
|
||||||
def extract_keypoints(landmarks):
|
|
||||||
if landmarks:
|
|
||||||
return [i for landmark in landmarks.landmark for i in [landmark.x, landmark.y]]
|
|
||||||
|
|
||||||
k1 = extract_keypoints(results.pose_landmarks)
|
|
||||||
k2 = extract_keypoints(results.left_hand_landmarks)
|
|
||||||
k3 = extract_keypoints(results.right_hand_landmarks)
|
|
||||||
|
|
||||||
if k1 and (k2 or k3):
|
|
||||||
data = np.array([k1 + (k2 or [0] * 42) + (k3 or [0] * 42)])
|
|
||||||
|
|
||||||
def normalize_hand(frame, data, hand, algorithm="minmax"):
|
|
||||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
|
||||||
hand_data = np.array(data[0])[hand_columns]
|
|
||||||
|
|
||||||
# convert to absolute pixels
|
|
||||||
hand_data = hand_data.reshape(21, 2)
|
|
||||||
hand_data[:, 0] *= frame.shape[1]
|
|
||||||
hand_data[:, 1] *= frame.shape[0]
|
|
||||||
|
|
||||||
min_x, min_y = np.min(hand_data[:, 0]), np.min(hand_data[:, 1])
|
|
||||||
max_x, max_y = np.max(hand_data[:, 0]), np.max(hand_data[:, 1])
|
|
||||||
|
|
||||||
width, height = max_x - min_x, max_y - min_y
|
|
||||||
|
|
||||||
if algorithm == "minmax":
|
|
||||||
bbox_height, bbox_width = height, width
|
|
||||||
center_x, center_y = (min_x + max_x) / 2, (min_y + max_y) / 2
|
|
||||||
|
|
||||||
starting_x, starting_y = min_x, min_y
|
|
||||||
ending_x, ending_y = max_x, max_y
|
|
||||||
|
|
||||||
elif algorithm == "bohacek":
|
|
||||||
if width > height:
|
|
||||||
delta_x = 0.1 * width
|
|
||||||
delta_y = delta_x + ((width - height) / 2)
|
|
||||||
else:
|
|
||||||
delta_y = 0.1 * height
|
|
||||||
delta_x = delta_y + ((height - width) / 2)
|
|
||||||
|
|
||||||
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
|
||||||
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
|
||||||
|
|
||||||
center_x, center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
|
||||||
bbox_height, bbox_width = ending_y - starting_y, ending_x - starting_x
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("Not a valid normalization algorithm")
|
|
||||||
return data, frame
|
|
||||||
|
|
||||||
if bbox_height == 0 or bbox_width == 0:
|
|
||||||
return data, frame
|
|
||||||
|
|
||||||
center_coords = np.tile(np.array([center_x, center_y]), (21, 1)).reshape(21, 2)
|
|
||||||
bbox_dims = np.tile(np.array([bbox_width, bbox_height]), (21, 1)).reshape(21, 2)
|
|
||||||
|
|
||||||
hand_data = (hand_data - center_coords) / bbox_dims
|
|
||||||
|
|
||||||
# add bouding box to frame
|
|
||||||
frame = cv2.rectangle(frame, (int(starting_x), int(starting_y)), (int(ending_x), int(ending_y)), (0, 255, 0), 2)
|
|
||||||
|
|
||||||
data[:, hand_columns] = hand_data.reshape(-1, 42)
|
|
||||||
return data, frame
|
|
||||||
|
|
||||||
norm_alg = "minmax"
|
|
||||||
|
|
||||||
data, frame = normalize_hand(frame, data, "left_hand", norm_alg)
|
|
||||||
data, frame = normalize_hand(frame, data, "right_hand", norm_alg)
|
|
||||||
|
|
||||||
# get values of the landmarks as a list of integers
|
|
||||||
values = []
|
|
||||||
for i in LANDMARKS.values():
|
|
||||||
values.append(i*2)
|
|
||||||
values.append(i*2+1)
|
|
||||||
filtered = np.array(data[0])[np.array(values)]
|
|
||||||
|
|
||||||
while len(keypoints) >= 8:
|
|
||||||
keypoints.pop(0)
|
|
||||||
keypoints.append(filtered)
|
|
||||||
|
|
||||||
if len(keypoints) == 8:
|
|
||||||
# keypoints to tensor
|
|
||||||
keypoints_tensor = torch.tensor(keypoints).float()
|
|
||||||
|
|
||||||
# predict
|
|
||||||
outputs = spoter_model(keypoints_tensor).expand(1, -1, -1)
|
|
||||||
|
|
||||||
# softmax
|
|
||||||
outputs = torch.nn.functional.softmax(outputs, dim=2)
|
|
||||||
|
|
||||||
# get topk predictions
|
|
||||||
topk = torch.topk(outputs, k=3, dim=2)
|
|
||||||
|
|
||||||
# show overlay on frame at top right with confidence scores of topk predictions
|
|
||||||
for i, (label, score) in enumerate(zip(topk.indices[0][0], topk.values[0][0])):
|
|
||||||
cv2.putText(frame, f"{m[label.item()]} {score.item():.2f}", (frame.shape[1] - 200, 50 + i * 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
|
||||||
|
|
||||||
|
|
||||||
mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
|
||||||
mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
|
||||||
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
|
|
||||||
|
|
||||||
# frame to rgb
|
|
||||||
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
|
||||||
|
|
||||||
# Show the frame
|
|
||||||
cv2.imshow('MediaPipe Hands', frame)
|
|
||||||
|
|
||||||
# Wait for key press to exit
|
|
||||||
if cv2.waitKey(5) & 0xFF == 27:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Release the video capture object and destroy the windows
|
|
||||||
cap.release()
|
|
||||||
cv2.destroyAllWindows()
|
|
||||||
Reference in New Issue
Block a user