Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7793122eac | ||
|
|
e13f365d81 | ||
|
|
883ea5d631 | ||
|
|
0b62603920 | ||
|
|
bbc0796504 | ||
| 66c9eccd10 | |||
| ed0385d1c5 | |||
|
|
cf6ddd1214 | ||
|
|
9f7197e4e9 | ||
|
|
e30661b96f | ||
| ba44762eba | |||
| b0335044af | |||
|
|
31d5283d9e | ||
|
|
8f46b2b498 | ||
|
|
c8611182c1 | ||
|
|
983a216f53 | ||
|
|
7653b9b35c | ||
|
|
246595780c | ||
|
|
baeafe8c49 | ||
|
|
febfed7e33 | ||
|
|
5735360eae | ||
|
|
8e5957f4ff | ||
|
|
01c50764b0 | ||
|
|
f95a0a5bbc |
@@ -7,7 +7,7 @@ steps:
|
||||
pull: if-not-exists
|
||||
image: sonarsource/sonar-scanner-cli
|
||||
commands:
|
||||
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY
|
||||
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY -Dsonar.qualitygate.wait=true
|
||||
environment:
|
||||
SONAR_HOST:
|
||||
from_secret: sonar_host
|
||||
@@ -19,4 +19,4 @@ steps:
|
||||
trigger:
|
||||
event:
|
||||
- push
|
||||
- pull_request
|
||||
# - pull_request
|
||||
|
||||
9
.gitignore
vendored
9
.gitignore
vendored
@@ -1,5 +1,12 @@
|
||||
.devcontainer/
|
||||
data/
|
||||
.DS_Store
|
||||
|
||||
cache/
|
||||
__pycache__/
|
||||
cache_processed/
|
||||
cache_wlasl/
|
||||
|
||||
__pycache__/
|
||||
|
||||
checkpoints/
|
||||
.ipynb_checkpoints
|
||||
39
.gitlab/merge_request_templates/Default.md
Normal file
39
.gitlab/merge_request_templates/Default.md
Normal file
@@ -0,0 +1,39 @@
|
||||
## Description
|
||||
|
||||
Please provide a brief summary of the changes in this merge request.
|
||||
|
||||
If possible, add a short screengrab or some screenshots of the changes.
|
||||
|
||||
## Testing Instructions
|
||||
|
||||
Please provide instructions on how the code reviewers can test your changes:
|
||||
|
||||
1. [Step 1]
|
||||
2. [Step 2]
|
||||
3. [Step 3]
|
||||
4. ...
|
||||
|
||||
Please include any specific information on test data, configurations, or other requirements that are necessary to properly test the changes.
|
||||
|
||||
Once you've tested the changes, please confirm that they work as expected and that there are no regressions or unexpected side effects. If any issues are discovered during testing, please include detailed steps to reproduce the issue in the merge request comments. Thank you!
|
||||
|
||||
## Related Issues
|
||||
|
||||
Please list any related issues or pull requests that are relevant to this merge request.
|
||||
E.g. WES-XXX-...
|
||||
|
||||
## Known bugs or issues
|
||||
|
||||
Please list any known bugs or issues related to the changes in this merge request.
|
||||
|
||||
## Checklist
|
||||
|
||||
- [ ] I have filled in this template.
|
||||
- [ ] I have tested my changes thoroughly.
|
||||
- [ ] I have updated the user documentation as necessary.
|
||||
- [ ] Code reviewed by 1 person.
|
||||
|
||||
## Additional Notes
|
||||
|
||||
Please add any additional notes or comments that may be helpful for reviewers to understand your changes.
|
||||
|
||||
0
__init__.py
Normal file
0
__init__.py
Normal file
17
export_json.py
Normal file
17
export_json.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import json
|
||||
|
||||
from src.identifiers import HAND_LANDMARKS, POSE_LANDMARKS
|
||||
|
||||
|
||||
def export_json(pose_landmarks, hand_landmarks, filename):
|
||||
|
||||
l = {
|
||||
"pose_landmarks": list(pose_landmarks.values()),
|
||||
"hand_landmarks": list(hand_landmarks.values())
|
||||
}
|
||||
|
||||
# write l to filename
|
||||
with open(filename, 'w') as f:
|
||||
json.dump(l, f)
|
||||
|
||||
export_json(POSE_LANDMARKS, HAND_LANDMARKS, "landmarks.json")
|
||||
1
landmarks.json
Normal file
1
landmarks.json
Normal file
@@ -0,0 +1 @@
|
||||
{"pose_landmarks": [0, 2, 5, 7, 8, 9, 11, 12, 13, 14, 15, 16], "hand_landmarks": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]}
|
||||
BIN
models/Fingerspelling_AE.onnx
Normal file
BIN
models/Fingerspelling_AE.onnx
Normal file
Binary file not shown.
BIN
models/Fingerspelling_AE.pth
Normal file
BIN
models/Fingerspelling_AE.pth
Normal file
Binary file not shown.
BIN
models/model_A-E.pth
Normal file
BIN
models/model_A-E.pth
Normal file
Binary file not shown.
BIN
models/model_A-L.onnx
Normal file
BIN
models/model_A-L.onnx
Normal file
Binary file not shown.
BIN
models/model_A-L.pth
Normal file
BIN
models/model_A-L.pth
Normal file
Binary file not shown.
BIN
models/model_A-Z.onnx
Normal file
BIN
models/model_A-Z.onnx
Normal file
Binary file not shown.
BIN
models/model_A-Z.pth
Normal file
BIN
models/model_A-Z.pth
Normal file
Binary file not shown.
@@ -1,6 +1,7 @@
|
||||
torch
|
||||
torchvision
|
||||
pandas
|
||||
mediapipe
|
||||
joblib
|
||||
tensorboard
|
||||
torch==1.13.1
|
||||
torchvision==0.14.1
|
||||
pandas==1.5.3
|
||||
mediapipe==0.9.1.0
|
||||
tensorboard==2.12.0
|
||||
mediapy==1.1.6
|
||||
scikit-learn==0.24.2
|
||||
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
127
src/augmentations.py
Normal file
127
src/augmentations.py
Normal file
@@ -0,0 +1,127 @@
|
||||
import math
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
|
||||
import math
|
||||
import torch
|
||||
|
||||
def circle_intersection(x0, y0, r0, x1, y1, r1):
|
||||
# circle 1: (x0, y0), radius r0
|
||||
# circle 2: (x1, y1), radius r1
|
||||
|
||||
d=math.sqrt((x1-x0)**2 + (y1-y0)**2)
|
||||
|
||||
# non intersecting
|
||||
if d > r0 + r1 :
|
||||
return None
|
||||
# One circle within other
|
||||
if d < abs(r0-r1):
|
||||
return None
|
||||
# coincident circles
|
||||
if d == 0 and r0 == r1:
|
||||
return None
|
||||
else:
|
||||
a=(r0**2-r1**2+d**2)/(2*d)
|
||||
h=math.sqrt(r0**2-a**2)
|
||||
x2=x0+a*(x1-x0)/d
|
||||
y2=y0+a*(y1-y0)/d
|
||||
x3=x2+h*(y1-y0)/d
|
||||
y3=y2-h*(x1-x0)/d
|
||||
|
||||
x4=x2-h*(y1-y0)/d
|
||||
y4=y2+h*(x1-x0)/d
|
||||
|
||||
return (np.array([x3, y3]), np.array([x4, y4]))
|
||||
|
||||
|
||||
class MirrorKeypoints:
|
||||
def __call__(self, sample):
|
||||
if random.random() > 0.5:
|
||||
return sample
|
||||
# flip the keypoints tensor
|
||||
sample = 1 - sample
|
||||
|
||||
return sample
|
||||
|
||||
class Z_augmentation:
|
||||
|
||||
def __init__(self, hand_side="left"):
|
||||
self.hand_side = hand_side
|
||||
|
||||
def new_wrist(self, sample, hand_side="left", new_wrist=None):
|
||||
if hand_side == "left":
|
||||
wrist = sample[30:32]
|
||||
shoulder = sample[22:24]
|
||||
elbow = sample[26:28]
|
||||
else:
|
||||
wrist = sample[32:34]
|
||||
shoulder = sample[24:26]
|
||||
elbow = sample[28:30]
|
||||
|
||||
# calculate the length of the shoulder to elbow using math package
|
||||
shoulder_elbow_length = math.sqrt((shoulder[0] - elbow[0])**2 + (shoulder[1] - elbow[1])**2)
|
||||
# calculate the length of the wrist to elbow using math package
|
||||
wrist_elbow_length = math.sqrt((wrist[0] - elbow[0])**2 + (wrist[1] - elbow[1])**2)
|
||||
|
||||
if shoulder_elbow_length == 0 or wrist_elbow_length == 0:
|
||||
return sample, None
|
||||
|
||||
first_time = True
|
||||
new_loc = False
|
||||
while not new_loc:
|
||||
|
||||
if new_wrist is None or not first_time:
|
||||
# get random new wrist point that is not too far from the elbow
|
||||
new_wrist = [random.uniform(elbow[0] - 0.3, elbow[0] + 0.3), random.uniform(elbow[1] - 0.3, elbow[1] + 0.3)]
|
||||
|
||||
# get intersection points of the circles
|
||||
c = circle_intersection(shoulder[0], shoulder[1], shoulder_elbow_length, new_wrist[0], new_wrist[1], wrist_elbow_length)
|
||||
if c is not None:
|
||||
(i1, i2) = c
|
||||
new_loc = True
|
||||
first_time = False
|
||||
|
||||
# get the point that is below the hand
|
||||
if i1[1] > i2[1]:
|
||||
new_elbow = i1
|
||||
else:
|
||||
new_elbow = i2
|
||||
# new_elbow to shape (2,1)
|
||||
new_elbow = np.array(new_elbow)
|
||||
new_wrist = np.array(new_wrist)
|
||||
|
||||
# replace the keypoints in the sample
|
||||
if hand_side == "left":
|
||||
sample[26:28] = new_elbow
|
||||
sample[30:32] = new_wrist
|
||||
else:
|
||||
sample[28:30] = new_elbow
|
||||
sample[32:34] = new_wrist
|
||||
return sample, new_wrist
|
||||
|
||||
def __call__(self, samples):
|
||||
# transform each sample in the batch
|
||||
t_new = []
|
||||
|
||||
t = samples.numpy()
|
||||
new_wrist = None
|
||||
for t_i in t:
|
||||
# if new_wrist is None:
|
||||
# new_t, w = self.new_wrist(t_i.reshape(-1), self.hand_side)
|
||||
# new_wrist = w
|
||||
# else:
|
||||
new_t, _ = self.new_wrist(t_i.reshape(-1), self.hand_side)
|
||||
# reshape back to 2 dimensions
|
||||
t_new.append(new_t.reshape(-1, 2))
|
||||
return torch.tensor(np.array(t_new))
|
||||
|
||||
# augmentation to add little randow noise to the keypoints
|
||||
class NoiseAugmentation:
|
||||
def __init__(self, noise=0.05):
|
||||
self.noise = noise
|
||||
|
||||
def __call__(self, sample):
|
||||
# add noise to the keypoints
|
||||
sample = sample + torch.randn(sample.shape) * self.noise
|
||||
return sample
|
||||
0
src/datasets/__init__.py
Normal file
0
src/datasets/__init__.py
Normal file
95
src/datasets/finger_spelling_dataset.py
Normal file
95
src/datasets/finger_spelling_dataset.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
|
||||
|
||||
class FingerSpellingDataset(torch.utils.data.Dataset):
|
||||
def __init__(self, data_folder: str, bad_data_folder: str = "", subset:str="train", keypoints_identifier: dict = None, transform=None):
|
||||
|
||||
|
||||
# list files with path in the datafolder ending with .mp4
|
||||
files = [data_folder + f for f in os.listdir(data_folder) if f.endswith(".mp4")]
|
||||
|
||||
# append files from bad data folder
|
||||
if bad_data_folder != "":
|
||||
files += [bad_data_folder + f for f in os.listdir(bad_data_folder) if f.endswith(".mp4")]
|
||||
|
||||
labels = [f.split("/")[-1].split("!")[0] for f in files]
|
||||
train_test = [f.split("/")[-1].split("!")[1] for f in files]
|
||||
|
||||
# count the number of each label
|
||||
self.label_mapping, counts = np.unique(labels, return_counts=True)
|
||||
|
||||
|
||||
|
||||
# map the labels to their integer
|
||||
labels = [np.where(self.label_mapping == label)[0][0] for label in labels]
|
||||
|
||||
|
||||
# TODO: make split for train and val and test when enough data is available
|
||||
if subset == "train":
|
||||
# mask for train data
|
||||
mask = np.array(train_test) == "train"
|
||||
elif subset == "test":
|
||||
mask = np.array(train_test) == "test"
|
||||
|
||||
# filter data and labels
|
||||
self.data = np.array(files)[mask]
|
||||
self.labels = np.array(labels)[mask]
|
||||
|
||||
# filter wlasl data by subset
|
||||
self.transform = transform
|
||||
self.subset = subset
|
||||
self.keypoint_extractor = KeypointExtractor()
|
||||
if keypoints_identifier:
|
||||
self.keypoints_to_keep = [f"{i}_{j}" for i in keypoints_identifier.values() for j in ["x", "y"]]
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def __getitem__(self, index):
|
||||
# get i th element from ordered dict
|
||||
video_name = self.data[index]
|
||||
|
||||
cache_name = video_name.split("/")[-1].split(".")[0] + ".npy"
|
||||
|
||||
# check if cache_name file exists
|
||||
if not os.path.isfile(os.path.join("cache_processed", cache_name)):
|
||||
|
||||
|
||||
# get the keypoints for the video (normalizations: minxmax, bohacek)
|
||||
keypoints_df = self.keypoint_extractor.extract_keypoints_from_video(video_name, normalize="bohacek")
|
||||
|
||||
# filter the keypoints by the identified subset
|
||||
if self.keypoints_to_keep:
|
||||
keypoints_df = keypoints_df[self.keypoints_to_keep]
|
||||
|
||||
current_row = np.empty(shape=(keypoints_df.shape[0], keypoints_df.shape[1] // 2, 2))
|
||||
for i in range(0, keypoints_df.shape[1], 2):
|
||||
current_row[:, i // 2, 0] = keypoints_df.iloc[:, i]
|
||||
current_row[:, i // 2, 1] = keypoints_df.iloc[:, i + 1]
|
||||
|
||||
# check if cache_processed folder exists
|
||||
if not os.path.isdir("cache_processed"):
|
||||
os.mkdir("cache_processed")
|
||||
|
||||
# save the processed data to a file
|
||||
np.save(os.path.join("cache_processed", cache_name), current_row)
|
||||
|
||||
else:
|
||||
current_row = np.load(os.path.join("cache_processed", cache_name))
|
||||
|
||||
# get the label
|
||||
label = self.labels[index]
|
||||
# data to tensor
|
||||
data = torch.from_numpy(current_row)
|
||||
|
||||
if self.transform:
|
||||
data = self.transform(data)
|
||||
|
||||
return data, label
|
||||
@@ -4,8 +4,8 @@ from collections import OrderedDict
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from identifiers import LANDMARKS
|
||||
from keypoint_extractor import KeypointExtractor
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
|
||||
|
||||
class WLASLDataset(torch.utils.data.Dataset):
|
||||
44
src/export.py
Normal file
44
src/export.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import torch
|
||||
import torchvision
|
||||
import onnx
|
||||
import numpy as np
|
||||
|
||||
from src.model import SPOTER
|
||||
from src.identifiers import LANDMARKS
|
||||
|
||||
# set parameters of the model
|
||||
model_name = 'model_A-Z'
|
||||
num_classes = 26
|
||||
|
||||
# load PyTorch model from .pth file
|
||||
model = SPOTER(num_classes=num_classes, hidden_dim=len(LANDMARKS) *2)
|
||||
if torch.cuda.is_available():
|
||||
state_dict = torch.load('models/' + model_name + '.pth')
|
||||
else:
|
||||
state_dict = torch.load('models/' + model_name + '.pth', map_location=torch.device('cpu'))
|
||||
model.load_state_dict(state_dict)
|
||||
|
||||
# set model to evaluation mode
|
||||
model.eval()
|
||||
|
||||
# create dummy input tensor
|
||||
dummy_input = torch.randn(10, 108)
|
||||
|
||||
# export model to ONNX format
|
||||
output_file = 'models/' + model_name + '.onnx'
|
||||
torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
|
||||
|
||||
torch.onnx.export(model, # model being run
|
||||
dummy_input, # model input (or a tuple for multiple inputs)
|
||||
'models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object)
|
||||
export_params=True, # store the trained parameter weights inside the model file
|
||||
opset_version=9, # the ONNX version to export the model to
|
||||
do_constant_folding=True, # whether to execute constant folding for optimization
|
||||
input_names = ['X'], # the model's input names
|
||||
output_names = ['Y'] # the model's output names
|
||||
)
|
||||
|
||||
|
||||
# load exported ONNX model for verification
|
||||
onnx_model = onnx.load(output_file)
|
||||
onnx.checker.check_model(onnx_model)
|
||||
@@ -80,3 +80,65 @@ LANDMARKS = {
|
||||
"right_pinky_dip": 73,
|
||||
"right_pinky_tip": 74,
|
||||
}
|
||||
|
||||
POSE_LANDMARKS = {
|
||||
# Pose Landmarks
|
||||
"nose": 0,
|
||||
# "left_eye_inner": 1,
|
||||
"left_eye": 2,
|
||||
# "left_eye_outer": 3,
|
||||
# "right_eye_inner": 4,
|
||||
"right_eye": 5,
|
||||
# "right_eye_outer": 6,
|
||||
"left_ear": 7,
|
||||
"right_ear": 8,
|
||||
"mouth_left": 9,
|
||||
# "mouth_right": 10,
|
||||
"left_shoulder": 11,
|
||||
"right_shoulder": 12,
|
||||
"left_elbow": 13,
|
||||
"right_elbow": 14,
|
||||
"left_wrist": 15,
|
||||
"right_wrist": 16,
|
||||
# "left_pinky": 17,
|
||||
# "right_pinky": 18,
|
||||
# "left_index": 19,
|
||||
# "right_index": 20,
|
||||
# "left_thumb": 21,
|
||||
# "right_thumb": 22,
|
||||
# "left_hip": 23,
|
||||
# "right_hip": 24,
|
||||
# "left_knee": 25,
|
||||
# "right_knee": 26,
|
||||
# "left_ankle": 27,
|
||||
# "right_ankle": 28,
|
||||
# "left_heel": 29,
|
||||
# "right_heel": 30,
|
||||
# "left_foot_index": 31,
|
||||
# "right_foot_index": 32,
|
||||
}
|
||||
|
||||
HAND_LANDMARKS = {
|
||||
# Left Hand Landmarks
|
||||
"wrist": 0,
|
||||
"thumb_cmc": 1,
|
||||
"thumb_mcp": 2,
|
||||
"thumb_ip": 3,
|
||||
"thumb_tip": 4,
|
||||
"index_finger_mcp": 5,
|
||||
"index_finger_pip": 6,
|
||||
"index_finger_dip": 7,
|
||||
"index_finger_tip": 8,
|
||||
"middle_finger_mcp": 9,
|
||||
"middle_finger_pip": 10,
|
||||
"middle_finger_dip": 11,
|
||||
"middle_finger_tip": 12,
|
||||
"ring_finger_mcp": 13,
|
||||
"ring_finger_pip": 14,
|
||||
"ring_finger_dip": 15,
|
||||
"ring_finger_tip": 16,
|
||||
"pinky_mcp": 17,
|
||||
"pinky_pip": 18,
|
||||
"pinky_dip": 19,
|
||||
"pinky_tip": 20,
|
||||
}
|
||||
@@ -1,17 +1,19 @@
|
||||
import mediapipe as mp
|
||||
import cv2
|
||||
import time
|
||||
from typing import Dict, List, Tuple
|
||||
import numpy as np
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class KeypointExtractor:
|
||||
def __init__(self, video_folder: str, cache_folder: str = "cache"):
|
||||
def __init__(self, cache_folder: str = "cache"):
|
||||
self.mp_drawing = mp.solutions.drawing_utils
|
||||
self.mp_holistic = mp.solutions.holistic
|
||||
self.video_folder = video_folder
|
||||
# self.video_folder = video_folder
|
||||
self.cache_folder = cache_folder
|
||||
|
||||
# we will store the keypoints of each frame as a row in the dataframe. The columns are the keypoints: Pose (33), Left Hand (21), Right Hand (21). Each keypoint has 3 values: x, y
|
||||
@@ -25,39 +27,68 @@ class KeypointExtractor:
|
||||
|
||||
def extract_keypoints_from_video(self,
|
||||
video: str,
|
||||
normalize: str = None,
|
||||
draw: bool = False,
|
||||
) -> pd.DataFrame:
|
||||
"""extract_keypoints_from_video this function extracts keypoints from a video and stores them in a dataframe
|
||||
|
||||
:param video: the video to extract keypoints from
|
||||
:type video: str
|
||||
:return: dataframe with keypoints
|
||||
:param normalize: the hand normalization algorithm to use, defaults to None
|
||||
:type normalize: str, optional
|
||||
:return: dataframe with keypoints in absolute pixels
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
# check if video exists
|
||||
if not os.path.exists(self.video_folder + video):
|
||||
logging.error("Video does not exist at path: " + self.video_folder + video)
|
||||
return None
|
||||
|
||||
# check if cache exists
|
||||
if not os.path.exists(self.cache_folder):
|
||||
os.makedirs(self.cache_folder)
|
||||
video_name = video.split("/")[-1].split(".")[0]
|
||||
|
||||
# check if cache file exists and return
|
||||
if os.path.exists(self.cache_folder + "/" + video + ".npy"):
|
||||
# create dataframe from cache
|
||||
return pd.DataFrame(np.load(self.cache_folder + "/" + video + ".npy", allow_pickle=True), columns=self.columns)
|
||||
if not draw:
|
||||
# check if video exists
|
||||
if not os.path.exists(video):
|
||||
logging.error("Video does not exist at path: " + video)
|
||||
return None
|
||||
|
||||
# check if cache exists
|
||||
if not os.path.exists(self.cache_folder):
|
||||
os.makedirs(self.cache_folder)
|
||||
|
||||
# check if cache file exists and return
|
||||
if os.path.exists(self.cache_folder + "/" + video_name + ".npy"):
|
||||
# create dataframe from cache
|
||||
df = pd.DataFrame(np.load(self.cache_folder + "/" + video_name + ".npy", allow_pickle=True), columns=self.columns)
|
||||
if normalize:
|
||||
df = self.normalize_hands(df, norm_algorithm=normalize)
|
||||
df, _ = self.normalize_pose_bohacek(df)
|
||||
return df
|
||||
|
||||
# open video
|
||||
cap = cv2.VideoCapture(self.video_folder + video)
|
||||
cap = cv2.VideoCapture(video)
|
||||
|
||||
keypoints_df = pd.DataFrame(columns=self.columns)
|
||||
|
||||
# extract frames from video so we extract 5 frames per second
|
||||
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
|
||||
frame_skip = (frame_rate // 10) -1
|
||||
|
||||
output_frames = []
|
||||
|
||||
while cap.isOpened():
|
||||
|
||||
# skip frames
|
||||
for _ in range(frame_skip):
|
||||
success, image = cap.read()
|
||||
if not success:
|
||||
break
|
||||
|
||||
success, image = cap.read()
|
||||
if not success:
|
||||
break
|
||||
# extract keypoints of frame
|
||||
results = self.extract_keypoints_from_frame(image)
|
||||
if draw:
|
||||
results, draw_image = self.extract_keypoints_from_frame(image, draw=True)
|
||||
output_frames.append(draw_image)
|
||||
else:
|
||||
results = self.extract_keypoints_from_frame(image)
|
||||
|
||||
def extract_keypoints(landmarks):
|
||||
if landmarks:
|
||||
@@ -67,15 +98,33 @@ class KeypointExtractor:
|
||||
k1 = extract_keypoints(results.pose_landmarks)
|
||||
k2 = extract_keypoints(results.left_hand_landmarks)
|
||||
k3 = extract_keypoints(results.right_hand_landmarks)
|
||||
if k1 and k2 and k3:
|
||||
keypoints_df = pd.concat([keypoints_df, pd.DataFrame([k1+k2+k3], columns=self.columns)])
|
||||
if k1 and (k2 or k3):
|
||||
data = [k1 + (k2 or [0] * 42) + (k3 or [0] * 42)]
|
||||
new_df = pd.DataFrame(data, columns=self.columns)
|
||||
keypoints_df = pd.concat([keypoints_df, new_df], ignore_index=True)
|
||||
|
||||
# get frame width and height
|
||||
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# convert to pixels
|
||||
keypoints_df.iloc[:, ::2] *= frame_width
|
||||
keypoints_df.iloc[:, 1::2] *= frame_height
|
||||
|
||||
# close video
|
||||
cap.release()
|
||||
|
||||
# save keypoints to cache
|
||||
np.save(self.cache_folder + "/" + video + ".npy", keypoints_df.to_numpy())
|
||||
np.save(self.cache_folder + "/" + video_name + ".npy", keypoints_df.to_numpy())
|
||||
|
||||
# normalize hands and pose keypoints
|
||||
if normalize:
|
||||
keypoints_df = self.normalize_hands(keypoints_df, norm_algorithm=normalize)
|
||||
keypoints_df, _ = self.normalize_pose_bohacek(keypoints_df)
|
||||
|
||||
if draw:
|
||||
return keypoints_df, output_frames
|
||||
|
||||
return keypoints_df
|
||||
|
||||
|
||||
@@ -95,11 +144,223 @@ class KeypointExtractor:
|
||||
if draw:
|
||||
# Draw the pose annotations on the image
|
||||
draw_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.face_landmarks, self.mp_holistic.FACEMESH_CONTOURS)
|
||||
# self.mp_drawing.draw_landmarks(draw_image, results.face_landmarks, self.mp_holistic.FACEMESH_CONTOURS)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.left_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.right_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS)
|
||||
|
||||
img_width, img_height = image.shape[1], image.shape[0]
|
||||
|
||||
# create bounding box around hands
|
||||
if results.left_hand_landmarks:
|
||||
x = [landmark.x for landmark in results.left_hand_landmarks.landmark]
|
||||
y = [landmark.y for landmark in results.left_hand_landmarks.landmark]
|
||||
draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (0, 255, 0), 2)
|
||||
|
||||
if results.right_hand_landmarks:
|
||||
x = [landmark.x for landmark in results.right_hand_landmarks.landmark]
|
||||
y = [landmark.y for landmark in results.right_hand_landmarks.landmark]
|
||||
draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (255, 0, 0), 2)
|
||||
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.pose_landmarks, self.mp_holistic.POSE_CONNECTIONS)
|
||||
|
||||
return results, draw_image
|
||||
|
||||
return results
|
||||
return results
|
||||
|
||||
|
||||
def normalize_hands(self, dataframe: pd.DataFrame, norm_algorithm: str="minmax") -> pd.DataFrame:
|
||||
"""normalize_hands this function normalizes the hand keypoints of a dataframe
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param norm_algorithm: the normalization algorithm to use, pick from "minmax" and "bohacek"
|
||||
:type norm_algorithm: str
|
||||
:return: the normalized dataframe
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
|
||||
if norm_algorithm == "minmax":
|
||||
# normalize left hand
|
||||
dataframe, _= self.normalize_hand_minmax(dataframe, "left_hand")
|
||||
# normalize right hand
|
||||
dataframe, _= self.normalize_hand_minmax(dataframe, "right_hand")
|
||||
elif norm_algorithm == "bohacek":
|
||||
# normalize left hand
|
||||
dataframe, _= self.normalize_hand_bohacek(dataframe, "left_hand")
|
||||
# normalize right hand
|
||||
dataframe, _= self.normalize_hand_bohacek(dataframe, "right_hand")
|
||||
else:
|
||||
return dataframe
|
||||
|
||||
return dataframe
|
||||
|
||||
def normalize_hand_minmax(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe with respect to the minimum and maximum coordinates
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param hand: the hand to normalize
|
||||
:type hand: str
|
||||
:return: the normalized dataframe and the bounding boxes dataframe
|
||||
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||
"""
|
||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||
|
||||
# get the x, y coordinates of the hand keypoints
|
||||
hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2)
|
||||
|
||||
# get the min and max x, y coordinates of the hand keypoints
|
||||
min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1)
|
||||
max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1)
|
||||
|
||||
# calculate the center of the hand keypoints
|
||||
center_x, center_y = (min_x + max_x) / 2, (min_y + max_y) / 2
|
||||
|
||||
# calculate the width and height of the bounding box around the hand keypoints
|
||||
bbox_width, bbox_height = max_x - min_x, max_y - min_y
|
||||
|
||||
# repeat the center coordinates and bounding box dimensions to match the shape of hand_coords (numpy magic)
|
||||
center_x, center_y = center_x.reshape(-1, 1, 1), center_y.reshape(-1, 1, 1)
|
||||
center_coords = np.concatenate((np.tile(center_x, (1, 21, 1)), np.tile(center_y, (1, 21, 1))), axis=2)
|
||||
|
||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1)
|
||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||
|
||||
if np.any(bbox_dims == 0):
|
||||
return dataframe, None
|
||||
# normalize the hand keypoints based on the bounding box around the hand
|
||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||
|
||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||
|
||||
# merge starting and ending points of the bounding boxes in a dataframe
|
||||
bbox_array = np.hstack((min_x.reshape(-1, 1), min_y.reshape(-1, 1), max_x.reshape(-1, 1), max_y.reshape(-1, 1)))
|
||||
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||
|
||||
return dataframe, bbox
|
||||
|
||||
def normalize_hand_bohacek(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe using the bohacek normalization algorithm
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param hand: the hand to normalize
|
||||
:type hand: str
|
||||
:return: the normalized dataframe and the bounding boxes dataframe
|
||||
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||
"""
|
||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||
|
||||
# get the x, y coordinates of the hand keypoints
|
||||
hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2)
|
||||
|
||||
# get the min and max x, y coordinates of the hand keypoints
|
||||
min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1)
|
||||
max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1)
|
||||
|
||||
# calculate the hand keypoint width and height (NOT the bounding box width and height!)
|
||||
width, height = max_x - min_x, max_y - min_y
|
||||
|
||||
# initialize empty arrays for deltas
|
||||
delta_x = np.zeros(width.shape, dtype='float64')
|
||||
delta_y = np.zeros(height.shape, dtype='float64')
|
||||
|
||||
# calculate the deltas
|
||||
mask = width>height
|
||||
# width > height
|
||||
delta_x[mask] = (0.1 * width)[mask]
|
||||
delta_y[mask] = (delta_x + ((width - height) / 2))[mask]
|
||||
# height >= width
|
||||
delta_y[~mask] = (0.1 * height)[~mask]
|
||||
delta_x[~mask] = (delta_y + ((height - width) / 2))[~mask]
|
||||
|
||||
# set the starting and ending point of the normalization bounding box
|
||||
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
||||
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
||||
|
||||
# calculate the center of the bounding box and the bounding box dimensions
|
||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||
|
||||
# repeat the center coordinates and bounding box dimensions to match the shape of hand_coords
|
||||
bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1)
|
||||
center_coords = np.concatenate((np.tile(bbox_center_x, (1, 21, 1)), np.tile(bbox_center_y, (1, 21, 1))), axis=2)
|
||||
|
||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1)
|
||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||
|
||||
if np.any(bbox_dims == 0):
|
||||
return dataframe, None
|
||||
# normalize the hand keypoints based on the bounding box around the hand
|
||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||
|
||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||
|
||||
# merge starting and ending points of the bounding boxes in a dataframe
|
||||
bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1)))
|
||||
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||
|
||||
return dataframe, bbox
|
||||
|
||||
def normalize_pose_bohacek(self, dataframe: pd.DataFrame, bbox_size: float = 4) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||
"""normalize_pose_bohacek this function normalizes the pose keypoints of a dataframe using the Bohacek-normalization algorithm
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param bbox_size: the width and height of the normalization bounding box expressed in head metrics, defaults to 4
|
||||
:type bbox_size: float, optional
|
||||
:return: the normalized dataframe and the bounding boxes dataframe
|
||||
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
|
||||
"""
|
||||
# get the columns that belong to the pose
|
||||
pose_columns = np.array([i for i in range(66)])
|
||||
|
||||
# get the x, y coordinates of the pose keypoints
|
||||
pose_coords = dataframe.iloc[:, pose_columns].values.reshape(-1, 33, 2)
|
||||
|
||||
# check in what frames shoulders are visible
|
||||
left_shoulder_present_mask = pose_coords[:, 11, 0] != 0
|
||||
right_shoulder_present_mask = pose_coords[:, 12, 0] != 0
|
||||
shoulders_present_mask = np.logical_and(left_shoulder_present_mask, right_shoulder_present_mask)
|
||||
|
||||
# calculate shoulder distance
|
||||
left_shoulder, right_shoulder = pose_coords[shoulders_present_mask, 11], pose_coords[shoulders_present_mask, 12]
|
||||
shoulder_distance = ((left_shoulder[:, 0] - right_shoulder[:, 0])**2 + (left_shoulder[:, 1] - right_shoulder[:, 1])**2)**0.5
|
||||
head_metric = shoulder_distance
|
||||
|
||||
# center of shoulders and left eye are necessary to construct bounding box
|
||||
center_shoulders = right_shoulder + (left_shoulder - right_shoulder) / 2
|
||||
left_eye = pose_coords[shoulders_present_mask, 2]
|
||||
|
||||
# set the starting and ending point of the normalization bounding box
|
||||
starting_x, starting_y = center_shoulders[:, 0] - (bbox_size / 2) * head_metric, left_eye[:, 1] - 0.5 * head_metric
|
||||
ending_x, ending_y = center_shoulders[:, 0] + (bbox_size / 2) * head_metric, starting_y + (bbox_size - 0.5) * head_metric
|
||||
|
||||
# calculate the center of the bounding box and the bounding box dimensions
|
||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||
|
||||
# repeat the center coordinates and bounding box dimensions to match the shape of pose_coords
|
||||
bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1)
|
||||
center_coords = np.concatenate((np.tile(bbox_center_x, (1, 33, 1)), np.tile(bbox_center_y, (1, 33, 1))), axis=2)
|
||||
|
||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1, 1)
|
||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 33, 1)), np.tile(bbox_height, (1, 33, 1))), axis=2)
|
||||
|
||||
if np.any(bbox_dims == 0):
|
||||
return dataframe, None
|
||||
# normalize the pose keypoints based on the bounding box
|
||||
norm_pose_coords = (pose_coords - center_coords) / bbox_dims
|
||||
|
||||
# flatten the normalized pose keypoints array and replace the original pose keypoints with the normalized pose keypoints in the dataframe
|
||||
dataframe.iloc[shoulders_present_mask, pose_columns] = norm_pose_coords.reshape(-1, 66)
|
||||
|
||||
# merge starting and ending points of the bounding boxes in a dataframe
|
||||
bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1)))
|
||||
bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y'])
|
||||
|
||||
return dataframe, bbox
|
||||
|
||||
21
src/loss_function.py
Normal file
21
src/loss_function.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# create custom loss function
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
||||
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
from torch.utils.data import DataLoader
|
||||
from src.identifiers import LANDMARKS
|
||||
|
||||
class CustomLoss(nn.Module):
|
||||
# combine cross entropy loss and L1 loss
|
||||
def __init__(self):
|
||||
super(CustomLoss, self).__init__()
|
||||
|
||||
def forward(self, pred, target):
|
||||
# the prediciton for Z cannot be higher than 0.6 else give a high loss, backward must be able to learn this (return tensor)
|
||||
|
||||
if torch.nn.functional.softmax(pred, dim=2)[0][0][25] > 0.4:
|
||||
return torch.tensor(100.0, requires_grad=True)
|
||||
|
||||
return torch.tensor(0.0, requires_grad=True)
|
||||
27
src/model.py
27
src/model.py
@@ -1,6 +1,7 @@
|
||||
### SPOTER model implementation from the paper "SPOTER: Sign Pose-based Transformer for Sign Language Recognition from Sequence of Skeletal Data"
|
||||
|
||||
import copy
|
||||
import math
|
||||
from typing import Optional
|
||||
|
||||
import torch
|
||||
@@ -38,7 +39,20 @@ class SPOTERTransformerDecoderLayer(nn.TransformerDecoderLayer):
|
||||
|
||||
return tgt
|
||||
|
||||
class PositionalEmbedding(nn.Module):
|
||||
def __init__(self, d_model, max_len=60):
|
||||
super().__init__()
|
||||
pe = torch.zeros(max_len, d_model)
|
||||
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
||||
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
|
||||
pe[:, 0::2] = torch.sin(position * div_term)
|
||||
pe[:, 1::2] = torch.cos(position * div_term)
|
||||
pe = pe.unsqueeze(0).transpose(0, 1)
|
||||
self.register_buffer('pe', pe)
|
||||
|
||||
def forward(self, x):
|
||||
return x + self.pe[:x.size(0), :]
|
||||
|
||||
class SPOTER(nn.Module):
|
||||
"""
|
||||
Implementation of the SPOTER (Sign POse-based TransformER) architecture for sign language recognition from sequence
|
||||
@@ -48,8 +62,9 @@ class SPOTER(nn.Module):
|
||||
def __init__(self, num_classes, hidden_dim=55):
|
||||
super().__init__()
|
||||
|
||||
self.row_embed = nn.Parameter(torch.rand(50, hidden_dim))
|
||||
self.pos = nn.Parameter(torch.cat([self.row_embed[0].unsqueeze(0).repeat(1, 1, 1)], dim=-1).flatten(0, 1).unsqueeze(0))
|
||||
|
||||
self.pos = PositionalEmbedding(hidden_dim)
|
||||
|
||||
self.class_query = nn.Parameter(torch.rand(1, hidden_dim))
|
||||
self.transformer = nn.Transformer(hidden_dim, 9, 6, 6)
|
||||
self.linear_class = nn.Linear(hidden_dim, num_classes)
|
||||
@@ -61,7 +76,13 @@ class SPOTER(nn.Module):
|
||||
|
||||
def forward(self, inputs):
|
||||
h = torch.unsqueeze(inputs.flatten(start_dim=1), 1).float()
|
||||
h = self.transformer(self.pos + h, self.class_query.unsqueeze(0)).transpose(0, 1)
|
||||
# add positional encoding
|
||||
h = self.pos(h)
|
||||
|
||||
# add class query
|
||||
h = self.transformer(h, self.class_query.unsqueeze(0)).transpose(0, 1)
|
||||
|
||||
# get class prediction
|
||||
res = self.linear_class(h)
|
||||
|
||||
return res
|
||||
64
src/normalizations.py
Normal file
64
src/normalizations.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def normalize_hand_bohaecek(keypoints):
|
||||
min_x, min_y = np.min(keypoints[::2]), np.min(keypoints[1::2])
|
||||
max_x, max_y = np.max(keypoints[::2]), np.max(keypoints[1::2])
|
||||
|
||||
width, height = max_x - min_x, max_y - min_y
|
||||
|
||||
delta_x = 0.0
|
||||
delta_y = 0.0
|
||||
|
||||
if width > height:
|
||||
delta_x = 0.1 * width
|
||||
delta_y = delta_x + ((width - height) / 2)
|
||||
else:
|
||||
delta_y = 0.1 * height
|
||||
delta_x = delta_y + ((height - width) / 2)
|
||||
|
||||
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
||||
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
||||
|
||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||
|
||||
if bbox_width == 0 or bbox_height == 0:
|
||||
return keypoints, None
|
||||
|
||||
# every odd index minus center_x and divide by width, every even index minus center_y and divide by height
|
||||
normalized_keypoints = np.zeros(keypoints.shape)
|
||||
normalized_keypoints[::2] = (keypoints[::2] - bbox_center_x) / bbox_width
|
||||
normalized_keypoints[1::2] = (keypoints[1::2] - bbox_center_y) / bbox_height
|
||||
|
||||
return normalized_keypoints, (int(starting_x), int(starting_y), int(bbox_width), int(bbox_height))
|
||||
|
||||
|
||||
def normalize_pose(keypoints, bbox_size: float = 4.0):
|
||||
shoulder_left = keypoints[22:24]
|
||||
shoulder_right = keypoints[24:26]
|
||||
|
||||
# distance between shoulders
|
||||
shoulder_distance = np.linalg.norm(shoulder_left - shoulder_right)
|
||||
|
||||
# center of shoulders
|
||||
shoulder_center = (shoulder_left + shoulder_right) / 2
|
||||
|
||||
# left eye
|
||||
eye_left = keypoints[4:6]
|
||||
|
||||
starting_x, starting_y = shoulder_center[0] - (bbox_size / 2) * shoulder_distance, eye_left[1] - 0.5 * shoulder_distance
|
||||
ending_x, ending_y = shoulder_center[0] + (bbox_size / 2) * shoulder_distance, starting_y + (bbox_size - 0.5) * shoulder_distance
|
||||
|
||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||
|
||||
if bbox_width == 0 or bbox_height == 0:
|
||||
return keypoints, None
|
||||
|
||||
# every odd index minus center_x and divide by width, every even index minus center_y and divide by height
|
||||
normalized_keypoints = np.zeros(keypoints.shape)
|
||||
normalized_keypoints[::2] = (keypoints[::2] - bbox_center_x) / bbox_width
|
||||
normalized_keypoints[1::2] = (keypoints[1::2] - bbox_center_y) / bbox_height
|
||||
|
||||
return normalized_keypoints, (int(starting_x), int(starting_y), int(bbox_width), int(bbox_height))
|
||||
119
src/train.py
119
src/train.py
@@ -1,11 +1,6 @@
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.ticker as ticker
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
@@ -13,13 +8,17 @@ import torch.optim as optim
|
||||
from torch.utils.data import DataLoader
|
||||
from torchvision import transforms
|
||||
|
||||
from dataset import WLASLDataset
|
||||
from identifiers import LANDMARKS
|
||||
from keypoint_extractor import KeypointExtractor
|
||||
from model import SPOTER
|
||||
from src.augmentations import MirrorKeypoints, Z_augmentation, NoiseAugmentation
|
||||
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.model import SPOTER
|
||||
from src.loss_function import CustomLoss
|
||||
|
||||
import torch
|
||||
from torch.utils.tensorboard import SummaryWriter
|
||||
|
||||
def train():
|
||||
writer = SummaryWriter()
|
||||
random.seed(379)
|
||||
np.random.seed(379)
|
||||
os.environ['PYTHONHASHSEED'] = str(379)
|
||||
@@ -30,50 +29,57 @@ def train():
|
||||
g = torch.Generator()
|
||||
g.manual_seed(379)
|
||||
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
device = torch.device("cuda:0")
|
||||
|
||||
spoter_model = SPOTER(num_classes=100, hidden_dim=len(LANDMARKS) *2)
|
||||
spoter_model = SPOTER(num_classes=26, hidden_dim=len(LANDMARKS) *2)
|
||||
spoter_model.train(True)
|
||||
spoter_model.to(device)
|
||||
|
||||
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
optimizer = optim.SGD(spoter_model.parameters(), lr=0.001, momentum=0.9)
|
||||
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=5)
|
||||
criterion_bad = CustomLoss()
|
||||
optimizer = optim.Adam(spoter_model.parameters(), lr=0.00001)
|
||||
scheduler = None
|
||||
|
||||
# check if checkpoints folder exists
|
||||
if not os.path.exists("checkpoints"):
|
||||
os.makedirs("checkpoints")
|
||||
|
||||
# TODO: create paths for checkpoints
|
||||
transform = transforms.Compose([MirrorKeypoints(), NoiseAugmentation(noise=0.1)])
|
||||
|
||||
# TODO: transformations + augmentations
|
||||
|
||||
k = KeypointExtractor("data/videos/")
|
||||
|
||||
train_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="train")
|
||||
train_set = FingerSpellingDataset("data/fingerspelling/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="train", transform=transform)
|
||||
train_loader = DataLoader(train_set, shuffle=True, generator=g)
|
||||
|
||||
val_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="val")
|
||||
val_set = FingerSpellingDataset("data/fingerspelling/data/", bad_data_folder="", keypoints_identifier=LANDMARKS, subset="test")
|
||||
val_loader = DataLoader(val_set, shuffle=True, generator=g)
|
||||
|
||||
test_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="test")
|
||||
test_loader = DataLoader(test_set, shuffle=True, generator=g)
|
||||
|
||||
|
||||
train_acc, val_acc = 0, 0
|
||||
lr_progress = []
|
||||
top_train_acc, top_val_acc = 0, 0
|
||||
checkpoint_index = 0
|
||||
|
||||
for epoch in range(100):
|
||||
epochs_without_improvement = 0
|
||||
best_val_acc = 0
|
||||
|
||||
for epoch in range(300):
|
||||
|
||||
running_loss = 0.0
|
||||
pred_correct, pred_all = 0, 0
|
||||
|
||||
# train
|
||||
for i, (inputs, labels) in enumerate(train_loader):
|
||||
# skip videos that are too short
|
||||
if inputs.shape[1] < 20:
|
||||
continue
|
||||
|
||||
inputs = inputs.squeeze(0).to(device)
|
||||
labels = labels.to(device, dtype=torch.long)
|
||||
|
||||
optimizer.zero_grad()
|
||||
outputs = spoter_model(inputs).expand(1, -1, -1)
|
||||
loss = criterion(outputs[0], labels)
|
||||
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
running_loss += loss
|
||||
@@ -82,31 +88,62 @@ def train():
|
||||
pred_correct += 1
|
||||
pred_all += 1
|
||||
|
||||
if i % 100 == 0:
|
||||
print(f"Epoch: {epoch} | Batch: {i} | Loss: {running_loss.item()} | Train Acc: {(pred_correct / pred_all)}")
|
||||
|
||||
|
||||
if scheduler:
|
||||
scheduler.step(running_loss.item() / len(train_loader))
|
||||
scheduler.step(running_loss.item() / (len(train_loader)) )
|
||||
|
||||
writer.add_scalar("Loss/train", loss, epoch)
|
||||
writer.add_scalar("Accuracy/train", (pred_correct / pred_all), epoch)
|
||||
|
||||
# validate
|
||||
# validate and print val acc
|
||||
val_pred_correct, val_pred_all = 0, 0
|
||||
val_loss = 0.0
|
||||
with torch.no_grad():
|
||||
for i, (inputs, labels) in enumerate(val_loader):
|
||||
inputs = inputs.squeeze(0).to(device)
|
||||
labels = labels.to(device)
|
||||
labels = labels.to(device, dtype=torch.long)
|
||||
|
||||
outputs = spoter_model(inputs)
|
||||
_, predicted = torch.max(outputs.data, 1)
|
||||
val_acc = (predicted == labels).sum().item() / labels.size(0)
|
||||
outputs = spoter_model(inputs).expand(1, -1, -1)
|
||||
|
||||
# calculate loss
|
||||
val_loss += criterion(outputs[0], labels)
|
||||
|
||||
# save checkpoint
|
||||
# if val_acc > top_val_acc:
|
||||
# top_val_acc = val_acc
|
||||
# top_train_acc = train_acc
|
||||
# checkpoint_index = epoch
|
||||
# torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
||||
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
|
||||
val_pred_correct += 1
|
||||
val_pred_all += 1
|
||||
|
||||
val_acc = (val_pred_correct / val_pred_all)
|
||||
|
||||
writer.add_scalar("Loss/val", val_loss, epoch)
|
||||
writer.add_scalar("Accuracy/val", val_acc, epoch)
|
||||
|
||||
|
||||
print(f"Epoch: {epoch} | Train Acc: {(pred_correct / pred_all)} | Val Acc: {val_acc}")
|
||||
|
||||
# save checkpoint and update epochs_without_improvement
|
||||
if val_acc > best_val_acc:
|
||||
best_val_acc = val_acc
|
||||
epochs_without_improvement = 0
|
||||
if epoch > 55:
|
||||
top_val_acc = val_acc
|
||||
top_train_acc = train_acc
|
||||
checkpoint_index = epoch
|
||||
torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
||||
else:
|
||||
epochs_without_improvement += 1
|
||||
|
||||
# early stopping
|
||||
if epochs_without_improvement >= 40:
|
||||
print("Early stopping due to no improvement in validation accuracy for 40 epochs.")
|
||||
break
|
||||
|
||||
print(f"Epoch: {epoch} | Train Acc: {train_acc} | Val Acc: {val_acc}")
|
||||
lr_progress.append(optimizer.param_groups[0]['lr'])
|
||||
|
||||
print(f"Best val acc: {top_val_acc} | Best train acc: {top_train_acc} | Epoch: {checkpoint_index}")
|
||||
writer.flush()
|
||||
writer.close()
|
||||
|
||||
train()
|
||||
|
||||
# Path: src/train.py
|
||||
if __name__ == "__main__":
|
||||
train()
|
||||
|
||||
0
visualizations/__init__.py
Normal file
0
visualizations/__init__.py
Normal file
146
visualizations/analyze_model.ipynb
Normal file
146
visualizations/analyze_model.ipynb
Normal file
File diff suppressed because one or more lines are too long
1781
visualizations/visualize_data.ipynb
Normal file
1781
visualizations/visualize_data.ipynb
Normal file
File diff suppressed because one or more lines are too long
116
visualizations/webcam_view.py
Normal file
116
visualizations/webcam_view.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
from src.model import SPOTER
|
||||
from src.normalizations import normalize_hand_bohaecek, normalize_pose
|
||||
|
||||
# Initialize MediaPipe Hands model
|
||||
holistic = mp.solutions.holistic.Holistic(
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
model_complexity=2
|
||||
)
|
||||
mp_holistic = mp.solutions.holistic
|
||||
mp_drawing = mp.solutions.drawing_utils
|
||||
|
||||
# Initialize video capture object
|
||||
cap = cv2.VideoCapture(0)
|
||||
|
||||
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
keypoints = []
|
||||
|
||||
spoter_model = SPOTER(num_classes=26, hidden_dim=len(LANDMARKS) * 2)
|
||||
spoter_model.load_state_dict(torch.load('models/spoter_76.pth', map_location=torch.device('cpu')))
|
||||
|
||||
# get values of the landmarks as a list of integers
|
||||
values = []
|
||||
for i in LANDMARKS.values():
|
||||
values.append(i * 2)
|
||||
values.append(i * 2 + 1)
|
||||
values = np.array(values)
|
||||
|
||||
while True:
|
||||
# Read frame from camera
|
||||
success, frame = cap.read()
|
||||
|
||||
# Convert the frame to RGB
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Detect hand landmarks in the frame
|
||||
results = holistic.process(frame)
|
||||
|
||||
def extract_keypoints(landmarks):
|
||||
if landmarks:
|
||||
return np.array([i for landmark in landmarks.landmark for i in [landmark.x, landmark.y]])
|
||||
|
||||
k1 = extract_keypoints(results.pose_landmarks)
|
||||
k2 = extract_keypoints(results.left_hand_landmarks)
|
||||
k3 = extract_keypoints(results.right_hand_landmarks)
|
||||
|
||||
if k1 is not None and (k2 is not None or k3 is not None):
|
||||
k2 = k2 if k2 is not None else np.zeros(42)
|
||||
k3 = k3 if k3 is not None else np.zeros(42)
|
||||
|
||||
k1 = k1 * np.array([frame_width, frame_height] * 33)
|
||||
k2 = k2 * np.array([frame_width, frame_height] * 21)
|
||||
k3 = k3 * np.array([frame_width, frame_height] * 21)
|
||||
|
||||
k1, bbox_pose = normalize_pose(k1)
|
||||
k2, bbox_left = normalize_hand_bohaecek(k2)
|
||||
k3, bbox_right = normalize_hand_bohaecek(k3)
|
||||
|
||||
# Draw normalization bounding boxes
|
||||
if bbox_pose is not None:
|
||||
frame = cv2.rectangle(frame, bbox_pose, (0, 255, 0), 2)
|
||||
if bbox_left is not None:
|
||||
frame = cv2.rectangle(frame, bbox_left, (0, 255, 0), 2)
|
||||
if bbox_right is not None:
|
||||
frame = cv2.rectangle(frame, bbox_right, (0, 255, 0), 2)
|
||||
|
||||
k = np.concatenate((k1, k2, k3))
|
||||
filtered = k[values]
|
||||
|
||||
while len(keypoints) >= 8:
|
||||
keypoints.pop(0)
|
||||
keypoints.append(filtered)
|
||||
|
||||
if len(keypoints) == 8:
|
||||
# keypoints to tensor
|
||||
keypoints_tensor = torch.tensor(keypoints).float()
|
||||
outputs = spoter_model(keypoints_tensor).expand(1, -1, -1)
|
||||
outputs = torch.nn.functional.softmax(outputs, dim=2)
|
||||
topk = torch.topk(outputs, k=3, dim=2)
|
||||
|
||||
# show overlay on frame at top right with confidence scores of topk predictions
|
||||
for i, (label, score) in enumerate(zip(topk.indices[0][0], topk.values[0][0])):
|
||||
# get the label (A-Z), index to char
|
||||
l = label.item()
|
||||
if l < 26:
|
||||
l = chr(l + 65)
|
||||
|
||||
cv2.putText(frame, f"{l} {score.item():.2f}", (frame.shape[1] - 200, 50 + i * 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||
|
||||
mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||
mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
|
||||
|
||||
# frame to rgb
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
||||
|
||||
# Show the frame
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
|
||||
# Wait for key press to exit
|
||||
if cv2.waitKey(5) & 0xFF == 27:
|
||||
break
|
||||
|
||||
# Release the video capture object and destroy the windows
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
Reference in New Issue
Block a user