Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbc0796504 | ||
| 66c9eccd10 | |||
| ed0385d1c5 | |||
|
|
cf6ddd1214 | ||
|
|
9f7197e4e9 | ||
|
|
e30661b96f | ||
| ba44762eba | |||
| b0335044af | |||
|
|
31d5283d9e | ||
|
|
8f46b2b498 | ||
|
|
c8611182c1 | ||
|
|
983a216f53 | ||
|
|
7653b9b35c | ||
|
|
246595780c | ||
|
|
baeafe8c49 | ||
|
|
febfed7e33 | ||
|
|
5735360eae | ||
|
|
8e5957f4ff | ||
|
|
01c50764b0 | ||
|
|
f95a0a5bbc |
@@ -7,7 +7,7 @@ steps:
|
||||
pull: if-not-exists
|
||||
image: sonarsource/sonar-scanner-cli
|
||||
commands:
|
||||
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY
|
||||
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY -Dsonar.qualitygate.wait=true
|
||||
environment:
|
||||
SONAR_HOST:
|
||||
from_secret: sonar_host
|
||||
@@ -19,4 +19,4 @@ steps:
|
||||
trigger:
|
||||
event:
|
||||
- push
|
||||
- pull_request
|
||||
# - pull_request
|
||||
|
||||
8
.gitignore
vendored
8
.gitignore
vendored
@@ -1,5 +1,11 @@
|
||||
.devcontainer/
|
||||
data/
|
||||
.DS_Store
|
||||
|
||||
cache/
|
||||
__pycache__/
|
||||
cache_wlasl/
|
||||
|
||||
__pycache__/
|
||||
|
||||
checkpoints/
|
||||
.ipynb_checkpoints
|
||||
39
.gitlab/merge_request_templates/Default.md
Normal file
39
.gitlab/merge_request_templates/Default.md
Normal file
@@ -0,0 +1,39 @@
|
||||
## Description
|
||||
|
||||
Please provide a brief summary of the changes in this merge request.
|
||||
|
||||
If possible, add a short screengrab or some screenshots of the changes.
|
||||
|
||||
## Testing Instructions
|
||||
|
||||
Please provide instructions on how the code reviewers can test your changes:
|
||||
|
||||
1. [Step 1]
|
||||
2. [Step 2]
|
||||
3. [Step 3]
|
||||
4. ...
|
||||
|
||||
Please include any specific information on test data, configurations, or other requirements that are necessary to properly test the changes.
|
||||
|
||||
Once you've tested the changes, please confirm that they work as expected and that there are no regressions or unexpected side effects. If any issues are discovered during testing, please include detailed steps to reproduce the issue in the merge request comments. Thank you!
|
||||
|
||||
## Related Issues
|
||||
|
||||
Please list any related issues or pull requests that are relevant to this merge request.
|
||||
E.g. WES-XXX-...
|
||||
|
||||
## Known bugs or issues
|
||||
|
||||
Please list any known bugs or issues related to the changes in this merge request.
|
||||
|
||||
## Checklist
|
||||
|
||||
- [ ] I have filled in this template.
|
||||
- [ ] I have tested my changes thoroughly.
|
||||
- [ ] I have updated the user documentation as necessary.
|
||||
- [ ] Code reviewed by 1 person.
|
||||
|
||||
## Additional Notes
|
||||
|
||||
Please add any additional notes or comments that may be helpful for reviewers to understand your changes.
|
||||
|
||||
0
__init__.py
Normal file
0
__init__.py
Normal file
120
analyze_model.ipynb
Normal file
120
analyze_model.ipynb
Normal file
File diff suppressed because one or more lines are too long
31
export.py
Normal file
31
export.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import torch
|
||||
import torchvision
|
||||
import onnx
|
||||
import numpy as np
|
||||
|
||||
from src.model import SPOTER
|
||||
from src.identifiers import LANDMARKS
|
||||
|
||||
model_name = 'Fingerspelling_AE'
|
||||
|
||||
# load PyTorch model from .pth file
|
||||
model = SPOTER(num_classes=5, hidden_dim=len(LANDMARKS) *2)
|
||||
state_dict = torch.load('models/' + model_name + '.pth')
|
||||
model.load_state_dict(state_dict)
|
||||
|
||||
# set model to evaluation mode
|
||||
model.eval()
|
||||
|
||||
# create dummy input tensor
|
||||
batch_size = 1
|
||||
num_of_frames = 1
|
||||
input_shape = (108, num_of_frames)
|
||||
dummy_input = torch.randn(batch_size, *input_shape)
|
||||
|
||||
# export model to ONNX format
|
||||
output_file = 'models/' + model_name + '.onnx'
|
||||
torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
|
||||
|
||||
# load exported ONNX model for verification
|
||||
onnx_model = onnx.load(output_file)
|
||||
onnx.checker.check_model(onnx_model)
|
||||
BIN
models/Fingerspelling_AE.onnx
Normal file
BIN
models/Fingerspelling_AE.onnx
Normal file
Binary file not shown.
BIN
models/Fingerspelling_AE.pth
Normal file
BIN
models/Fingerspelling_AE.pth
Normal file
Binary file not shown.
BIN
models/model_A-E.pth
Normal file
BIN
models/model_A-E.pth
Normal file
Binary file not shown.
BIN
models/model_A-L.pth
Normal file
BIN
models/model_A-L.pth
Normal file
Binary file not shown.
@@ -1,6 +1,6 @@
|
||||
torch
|
||||
torchvision
|
||||
pandas
|
||||
mediapipe
|
||||
joblib
|
||||
tensorboard
|
||||
torch==1.13.1
|
||||
torchvision==0.14.1
|
||||
pandas==1.5.3
|
||||
mediapipe==0.9.1.0
|
||||
tensorboard==2.12.0
|
||||
mediapy==1.1.6
|
||||
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
11
src/augmentations.py
Normal file
11
src/augmentations.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import random
|
||||
|
||||
|
||||
class MirrorKeypoints:
|
||||
def __call__(self, sample):
|
||||
if random.random() > 0.5:
|
||||
return sample
|
||||
# flip the keypoints tensor
|
||||
sample = 1 - sample
|
||||
|
||||
return sample
|
||||
0
src/datasets/__init__.py
Normal file
0
src/datasets/__init__.py
Normal file
79
src/datasets/finger_spelling_dataset.py
Normal file
79
src/datasets/finger_spelling_dataset.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
|
||||
|
||||
class FingerSpellingDataset(torch.utils.data.Dataset):
|
||||
def __init__(self, data_folder: str, keypoint_extractor: KeypointExtractor, subset:str="train", keypoints_identifier: dict = None, transform=None):
|
||||
|
||||
# list data from data folder
|
||||
self.data_folder = data_folder
|
||||
|
||||
# list files in the datafolder ending with .mp4
|
||||
files = [f for f in os.listdir(self.data_folder) if f.endswith(".mp4")]
|
||||
|
||||
labels = [f.split("!")[0] for f in files]
|
||||
|
||||
# count the number of each label
|
||||
self.label_mapping, counts = np.unique(labels, return_counts=True)
|
||||
|
||||
# save the label mapping to a file
|
||||
with open(os.path.join(self.data_folder, "label_mapping.txt"), "w") as f:
|
||||
for i, label in enumerate(self.label_mapping):
|
||||
f.write(f"{label} {i}")
|
||||
|
||||
# map the labels to their integer
|
||||
labels = [np.where(self.label_mapping == label)[0][0] for label in labels]
|
||||
|
||||
# TODO: make split for train and val and test when enough data is available
|
||||
|
||||
# split the data into train and val and test and make them balanced
|
||||
x_train, x_test, y_train, y_test = train_test_split(files, labels, test_size=0.3, random_state=1, stratify=labels)
|
||||
|
||||
if subset == "train":
|
||||
self.data = x_train
|
||||
self.labels = y_train
|
||||
elif subset == "val":
|
||||
self.data = x_test
|
||||
self.labels = y_test
|
||||
|
||||
# filter wlasl data by subset
|
||||
self.transform = transform
|
||||
self.subset = subset
|
||||
self.keypoint_extractor = keypoint_extractor
|
||||
if keypoints_identifier:
|
||||
self.keypoints_to_keep = [f"{i}_{j}" for i in keypoints_identifier.values() for j in ["x", "y"]]
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def __getitem__(self, index):
|
||||
# get i th element from ordered dict
|
||||
video_name = self.data[index]
|
||||
|
||||
# get the keypoints for the video
|
||||
keypoints_df = self.keypoint_extractor.extract_keypoints_from_video(video_name, normalize="minxmax")
|
||||
|
||||
# filter the keypoints by the identified subset
|
||||
if self.keypoints_to_keep:
|
||||
keypoints_df = keypoints_df[self.keypoints_to_keep]
|
||||
|
||||
current_row = np.empty(shape=(keypoints_df.shape[0], keypoints_df.shape[1] // 2, 2))
|
||||
for i in range(0, keypoints_df.shape[1], 2):
|
||||
current_row[:, i//2, 0] = keypoints_df.iloc[:,i]
|
||||
current_row[:, i//2, 1] = keypoints_df.iloc[:,i+1]
|
||||
|
||||
label = self.labels[index]
|
||||
|
||||
# data to tensor
|
||||
data = torch.from_numpy(current_row)
|
||||
|
||||
if self.transform:
|
||||
data = self.transform(data)
|
||||
|
||||
return data, label
|
||||
@@ -4,8 +4,8 @@ from collections import OrderedDict
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from identifiers import LANDMARKS
|
||||
from keypoint_extractor import KeypointExtractor
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
|
||||
|
||||
class WLASLDataset(torch.utils.data.Dataset):
|
||||
@@ -1,12 +1,14 @@
|
||||
import mediapipe as mp
|
||||
import cv2
|
||||
import time
|
||||
from typing import Dict, List, Tuple
|
||||
import numpy as np
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class KeypointExtractor:
|
||||
def __init__(self, video_folder: str, cache_folder: str = "cache"):
|
||||
self.mp_drawing = mp.solutions.drawing_utils
|
||||
@@ -25,39 +27,65 @@ class KeypointExtractor:
|
||||
|
||||
def extract_keypoints_from_video(self,
|
||||
video: str,
|
||||
normalize: str = None,
|
||||
draw: bool = False,
|
||||
) -> pd.DataFrame:
|
||||
"""extract_keypoints_from_video this function extracts keypoints from a video and stores them in a dataframe
|
||||
|
||||
:param video: the video to extract keypoints from
|
||||
:type video: str
|
||||
:return: dataframe with keypoints
|
||||
:param normalize: the hand normalization algorithm to use, defaults to None
|
||||
:type normalize: str, optional
|
||||
:return: dataframe with keypoints in absolute pixels
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
# check if video exists
|
||||
if not os.path.exists(self.video_folder + video):
|
||||
logging.error("Video does not exist at path: " + self.video_folder + video)
|
||||
return None
|
||||
|
||||
# check if cache exists
|
||||
if not os.path.exists(self.cache_folder):
|
||||
os.makedirs(self.cache_folder)
|
||||
if not draw:
|
||||
# check if video exists
|
||||
if not os.path.exists(self.video_folder + video):
|
||||
logging.error("Video does not exist at path: " + self.video_folder + video)
|
||||
return None
|
||||
|
||||
# check if cache file exists and return
|
||||
if os.path.exists(self.cache_folder + "/" + video + ".npy"):
|
||||
# create dataframe from cache
|
||||
return pd.DataFrame(np.load(self.cache_folder + "/" + video + ".npy", allow_pickle=True), columns=self.columns)
|
||||
# check if cache exists
|
||||
if not os.path.exists(self.cache_folder):
|
||||
os.makedirs(self.cache_folder)
|
||||
|
||||
# check if cache file exists and return
|
||||
if os.path.exists(self.cache_folder + "/" + video + ".npy"):
|
||||
# create dataframe from cache
|
||||
df = pd.DataFrame(np.load(self.cache_folder + "/" + video + ".npy", allow_pickle=True), columns=self.columns)
|
||||
if normalize:
|
||||
df = self.normalize_hands(df, norm_algorithm=normalize)
|
||||
return df
|
||||
|
||||
# open video
|
||||
cap = cv2.VideoCapture(self.video_folder + video)
|
||||
|
||||
keypoints_df = pd.DataFrame(columns=self.columns)
|
||||
|
||||
# extract frames from video so we extract 5 frames per second
|
||||
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
|
||||
frame_skip = frame_rate // 10
|
||||
|
||||
output_frames = []
|
||||
|
||||
while cap.isOpened():
|
||||
|
||||
# skip frames
|
||||
for _ in range(frame_skip):
|
||||
success, image = cap.read()
|
||||
if not success:
|
||||
break
|
||||
|
||||
success, image = cap.read()
|
||||
if not success:
|
||||
break
|
||||
# extract keypoints of frame
|
||||
results = self.extract_keypoints_from_frame(image)
|
||||
if draw:
|
||||
results, draw_image = self.extract_keypoints_from_frame(image, draw=True)
|
||||
output_frames.append(draw_image)
|
||||
else:
|
||||
results = self.extract_keypoints_from_frame(image)
|
||||
|
||||
def extract_keypoints(landmarks):
|
||||
if landmarks:
|
||||
@@ -67,8 +95,18 @@ class KeypointExtractor:
|
||||
k1 = extract_keypoints(results.pose_landmarks)
|
||||
k2 = extract_keypoints(results.left_hand_landmarks)
|
||||
k3 = extract_keypoints(results.right_hand_landmarks)
|
||||
if k1 and k2 and k3:
|
||||
keypoints_df = pd.concat([keypoints_df, pd.DataFrame([k1+k2+k3], columns=self.columns)])
|
||||
if k1 and (k2 or k3):
|
||||
data = [k1 + (k2 or [0] * 42) + (k3 or [0] * 42)]
|
||||
new_df = pd.DataFrame(data, columns=self.columns)
|
||||
keypoints_df = pd.concat([keypoints_df, new_df], ignore_index=True)
|
||||
|
||||
# get frame width and height
|
||||
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# convert to pixels
|
||||
keypoints_df.iloc[:, ::2] *= frame_width
|
||||
keypoints_df.iloc[:, 1::2] *= frame_height
|
||||
|
||||
# close video
|
||||
cap.release()
|
||||
@@ -76,6 +114,12 @@ class KeypointExtractor:
|
||||
# save keypoints to cache
|
||||
np.save(self.cache_folder + "/" + video + ".npy", keypoints_df.to_numpy())
|
||||
|
||||
if normalize:
|
||||
keypoints_df = self.normalize_hands(keypoints_df, norm_algorithm=normalize)
|
||||
|
||||
if draw:
|
||||
return keypoints_df, output_frames
|
||||
|
||||
return keypoints_df
|
||||
|
||||
|
||||
@@ -95,11 +139,156 @@ class KeypointExtractor:
|
||||
if draw:
|
||||
# Draw the pose annotations on the image
|
||||
draw_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.face_landmarks, self.mp_holistic.FACEMESH_CONTOURS)
|
||||
# self.mp_drawing.draw_landmarks(draw_image, results.face_landmarks, self.mp_holistic.FACEMESH_CONTOURS)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.left_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS)
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.right_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS)
|
||||
|
||||
img_width, img_height = image.shape[1], image.shape[0]
|
||||
|
||||
# create bounding box around hands
|
||||
if results.left_hand_landmarks:
|
||||
x = [landmark.x for landmark in results.left_hand_landmarks.landmark]
|
||||
y = [landmark.y for landmark in results.left_hand_landmarks.landmark]
|
||||
draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (0, 255, 0), 2)
|
||||
|
||||
if results.right_hand_landmarks:
|
||||
x = [landmark.x for landmark in results.right_hand_landmarks.landmark]
|
||||
y = [landmark.y for landmark in results.right_hand_landmarks.landmark]
|
||||
draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (255, 0, 0), 2)
|
||||
|
||||
self.mp_drawing.draw_landmarks(draw_image, results.pose_landmarks, self.mp_holistic.POSE_CONNECTIONS)
|
||||
|
||||
return results, draw_image
|
||||
|
||||
return results
|
||||
return results
|
||||
|
||||
|
||||
def normalize_hands(self, dataframe: pd.DataFrame, norm_algorithm: str="minmax") -> pd.DataFrame:
|
||||
"""normalize_hand this function normalizes the hand keypoints of a dataframe
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param norm_algorithm: the normalization algorithm to use, pick from "minmax" and "bohacek"
|
||||
:type norm_algorithm: str
|
||||
:return: the normalized dataframe
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
|
||||
if norm_algorithm == "minmax":
|
||||
# normalize left hand
|
||||
dataframe = self.normalize_hand_minmax(dataframe, "left_hand")
|
||||
# normalize right hand
|
||||
dataframe = self.normalize_hand_minmax(dataframe, "right_hand")
|
||||
elif norm_algorithm == "bohacek":
|
||||
# normalize left hand
|
||||
dataframe = self.normalize_hand_bohacek(dataframe, "left_hand")
|
||||
# normalize right hand
|
||||
dataframe = self.normalize_hand_bohacek(dataframe, "right_hand")
|
||||
else:
|
||||
return dataframe
|
||||
|
||||
return dataframe
|
||||
|
||||
def normalize_hand_minmax(self, dataframe: pd.DataFrame, hand: str) -> pd.DataFrame:
|
||||
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe with respect to the minimum and maximum coordinates
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param hand: the hand to normalize
|
||||
:type hand: str
|
||||
:return: the normalized dataframe
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||
|
||||
# get the x, y coordinates of the hand keypoints
|
||||
hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2)
|
||||
|
||||
# get the min and max x, y coordinates of the hand keypoints
|
||||
min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1)
|
||||
max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1)
|
||||
|
||||
# calculate the center of the hand keypoints
|
||||
center_x, center_y = (min_x + max_x) / 2, (min_y + max_y) / 2
|
||||
|
||||
# calculate the width and height of the bounding box around the hand keypoints
|
||||
bbox_width, bbox_height = max_x - min_x, max_y - min_y
|
||||
|
||||
# repeat the center coordinates and bounding box dimensions to match the shape of hand_coords (numpy magic)
|
||||
center_x, center_y = center_x.reshape(-1, 1, 1), center_y.reshape(-1, 1, 1)
|
||||
center_coords = np.concatenate((np.tile(center_x, (1, 21, 1)), np.tile(center_y, (1, 21, 1))), axis=2)
|
||||
|
||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1)
|
||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||
|
||||
if np.any(bbox_dims == 0):
|
||||
return dataframe
|
||||
# normalize the hand keypoints based on the bounding box around the hand
|
||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||
|
||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||
|
||||
return dataframe
|
||||
|
||||
def normalize_hand_bohacek(self, dataframe: pd.DataFrame, hand: str) -> pd.DataFrame:
|
||||
"""normalize_hand_helper this function normalizes the hand keypoints of a dataframe using the bohacek normalization algorithm
|
||||
|
||||
:param dataframe: the dataframe to normalize
|
||||
:type dataframe: pd.DataFrame
|
||||
:param hand: the hand to normalize
|
||||
:type hand: str
|
||||
:return: the normalized dataframe
|
||||
:rtype: pd.DataFrame
|
||||
"""
|
||||
# get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149)
|
||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||
|
||||
# get the x, y coordinates of the hand keypoints
|
||||
hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2)
|
||||
|
||||
# get the min and max x, y coordinates of the hand keypoints
|
||||
min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1)
|
||||
max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1)
|
||||
|
||||
# calculate the hand keypoint width and height (NOT the bounding box width and height!)
|
||||
width, height = max_x - min_x, max_y - min_y
|
||||
|
||||
# initialize empty arrays for deltas
|
||||
delta_x = np.zeros(width.shape, dtype='float64')
|
||||
delta_y = np.zeros(height.shape, dtype='float64')
|
||||
|
||||
# calculate the deltas
|
||||
mask = width>height
|
||||
# width > height
|
||||
delta_x[mask] = (0.1 * width)[mask]
|
||||
delta_y[mask] = (delta_x + ((width - height) / 2))[mask]
|
||||
# height >= width
|
||||
delta_y[~mask] = (0.1 * height)[~mask]
|
||||
delta_x[~mask] = (delta_y + ((height - width) / 2))[~mask]
|
||||
|
||||
# Set the starting and ending point of the normalization bounding box
|
||||
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
||||
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
||||
|
||||
# calculate the center of the bounding box and the bounding box dimensions
|
||||
bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y
|
||||
|
||||
# repeat the center coordinates and bounding box dimensions to match the shape of hand_coords
|
||||
bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1)
|
||||
center_coords = np.concatenate((np.tile(bbox_center_x, (1, 21, 1)), np.tile(bbox_center_y, (1, 21, 1))), axis=2)
|
||||
|
||||
bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1)
|
||||
bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2)
|
||||
|
||||
if np.any(bbox_dims == 0):
|
||||
return dataframe
|
||||
# normalize the hand keypoints based on the bounding box around the hand
|
||||
norm_hand_coords = (hand_coords - center_coords) / bbox_dims
|
||||
|
||||
# flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe
|
||||
dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42)
|
||||
|
||||
return dataframe
|
||||
67
src/train.py
67
src/train.py
@@ -13,10 +13,12 @@ import torch.optim as optim
|
||||
from torch.utils.data import DataLoader
|
||||
from torchvision import transforms
|
||||
|
||||
from dataset import WLASLDataset
|
||||
from identifiers import LANDMARKS
|
||||
from keypoint_extractor import KeypointExtractor
|
||||
from model import SPOTER
|
||||
from src.augmentations import MirrorKeypoints
|
||||
from src.datasets.finger_spelling_dataset import FingerSpellingDataset
|
||||
from src.datasets.wlasl_dataset import WLASLDataset
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.keypoint_extractor import KeypointExtractor
|
||||
from src.model import SPOTER
|
||||
|
||||
|
||||
def train():
|
||||
@@ -32,30 +34,28 @@ def train():
|
||||
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
spoter_model = SPOTER(num_classes=100, hidden_dim=len(LANDMARKS) *2)
|
||||
spoter_model = SPOTER(num_classes=12, hidden_dim=len(LANDMARKS) *2)
|
||||
spoter_model.train(True)
|
||||
spoter_model.to(device)
|
||||
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
optimizer = optim.SGD(spoter_model.parameters(), lr=0.001, momentum=0.9)
|
||||
optimizer = optim.SGD(spoter_model.parameters(), lr=0.0001, momentum=0.9)
|
||||
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=5)
|
||||
|
||||
# TODO: create paths for checkpoints
|
||||
|
||||
# TODO: transformations + augmentations
|
||||
|
||||
k = KeypointExtractor("data/videos/")
|
||||
k = KeypointExtractor("data/fingerspelling/data/")
|
||||
|
||||
train_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="train")
|
||||
transform = transforms.Compose([MirrorKeypoints()])
|
||||
|
||||
train_set = FingerSpellingDataset("data/fingerspelling/data/", k, keypoints_identifier=LANDMARKS, subset="train", transform=transform)
|
||||
train_loader = DataLoader(train_set, shuffle=True, generator=g)
|
||||
|
||||
val_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="val")
|
||||
val_set = FingerSpellingDataset("data/fingerspelling/data/", k, keypoints_identifier=LANDMARKS, subset="val")
|
||||
val_loader = DataLoader(val_set, shuffle=True, generator=g)
|
||||
|
||||
test_set = WLASLDataset("data/nslt_100.json", "data/missing.txt", k, keypoints_identifier=LANDMARKS, subset="test")
|
||||
test_loader = DataLoader(test_set, shuffle=True, generator=g)
|
||||
|
||||
|
||||
|
||||
train_acc, val_acc = 0, 0
|
||||
lr_progress = []
|
||||
top_train_acc, top_val_acc = 0, 0
|
||||
@@ -81,32 +81,39 @@ def train():
|
||||
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
|
||||
pred_correct += 1
|
||||
pred_all += 1
|
||||
|
||||
if i % 100 == 0:
|
||||
print(f"Epoch: {epoch} | Batch: {i} | Loss: {running_loss.item()} | Train Acc: {(pred_correct / pred_all)}")
|
||||
|
||||
|
||||
if scheduler:
|
||||
scheduler.step(running_loss.item() / len(train_loader))
|
||||
|
||||
# validate
|
||||
# validate and print val acc
|
||||
val_pred_correct, val_pred_all = 0, 0
|
||||
with torch.no_grad():
|
||||
for i, (inputs, labels) in enumerate(val_loader):
|
||||
inputs = inputs.squeeze(0).to(device)
|
||||
labels = labels.to(device)
|
||||
labels = labels.to(device, dtype=torch.long)
|
||||
|
||||
outputs = spoter_model(inputs)
|
||||
_, predicted = torch.max(outputs.data, 1)
|
||||
val_acc = (predicted == labels).sum().item() / labels.size(0)
|
||||
outputs = spoter_model(inputs).expand(1, -1, -1)
|
||||
|
||||
if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0]):
|
||||
val_pred_correct += 1
|
||||
val_pred_all += 1
|
||||
|
||||
val_acc = (val_pred_correct / val_pred_all)
|
||||
|
||||
print(f"Epoch: {epoch} | Train Acc: {(pred_correct / pred_all)} | Val Acc: {val_acc}")
|
||||
|
||||
|
||||
# save checkpoint
|
||||
# if val_acc > top_val_acc:
|
||||
# top_val_acc = val_acc
|
||||
# top_train_acc = train_acc
|
||||
# checkpoint_index = epoch
|
||||
# torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
||||
if val_acc > top_val_acc and epoch > 55:
|
||||
top_val_acc = val_acc
|
||||
top_train_acc = train_acc
|
||||
checkpoint_index = epoch
|
||||
torch.save(spoter_model.state_dict(), f"checkpoints/spoter_{epoch}.pth")
|
||||
|
||||
print(f"Epoch: {epoch} | Train Acc: {train_acc} | Val Acc: {val_acc}")
|
||||
lr_progress.append(optimizer.param_groups[0]['lr'])
|
||||
|
||||
print(f"Best val acc: {top_val_acc} | Best train acc: {top_train_acc} | Epoch: {checkpoint_index}")
|
||||
|
||||
train()
|
||||
# Path: src/train.py
|
||||
if __name__ == "__main__":
|
||||
train()
|
||||
232
visualize_data.ipynb
Normal file
232
visualize_data.ipynb
Normal file
@@ -0,0 +1,232 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
||||
"\n",
|
||||
"# reload modules\n",
|
||||
"%load_ext autoreload"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"video_name = '69547.mp4' "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# extract keypoints\n",
|
||||
"keypoint_extractor = KeypointExtractor('data/videos/')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import numpy as np\n",
|
||||
"from IPython.display import HTML\n",
|
||||
"from base64 import b64encode\n",
|
||||
"import mediapy as media\n",
|
||||
"%matplotlib inline\n",
|
||||
"\n",
|
||||
"# Define the frames per second (fps) and duration of the video\n",
|
||||
"fps = 25\n",
|
||||
"duration = 10\n",
|
||||
"\n",
|
||||
"# Create a dummy video of random noise\n",
|
||||
"_, video_frames = keypoint_extractor.extract_keypoints_from_video(video_name, normalize=\"minmax\", draw=True)\n",
|
||||
"\n",
|
||||
"# Convert the video to a numpy array\n",
|
||||
"video = np.array(video_frames)\n",
|
||||
"media.show_video(video, height=400, codec='gif', fps=4)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from src.model import SPOTER\n",
|
||||
"from src.identifiers import LANDMARKS\n",
|
||||
"import torch\n",
|
||||
"\n",
|
||||
"spoter_model = SPOTER(num_classes=5, hidden_dim=len(LANDMARKS) *2)\n",
|
||||
"spoter_model.load_state_dict(torch.load('models/spoter_40.pth'))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# get average number of frames in test set\n",
|
||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
||||
"from src.datasets.finger_spelling_dataset import FingerSpellingDataset\n",
|
||||
"from src.identifiers import LANDMARKS\n",
|
||||
"import numpy as np\n",
|
||||
"\n",
|
||||
"keypoints_extractor = KeypointExtractor(\"data/fingerspelling/data/\")\n",
|
||||
"test_set = FingerSpellingDataset(\"data/fingerspelling/data/\", keypoints_extractor, keypoints_identifier=LANDMARKS, subset=\"val\")\n",
|
||||
"\n",
|
||||
"frames = []\n",
|
||||
"labels = []\n",
|
||||
"for sample, label in test_set:\n",
|
||||
" frames.append(sample.shape[0])\n",
|
||||
" labels.append(label)\n",
|
||||
"\n",
|
||||
"print(np.mean(frames))\n",
|
||||
"# get label frequency in the labels list\n",
|
||||
"from collections import Counter\n",
|
||||
"\n",
|
||||
"counter = Counter(labels)\n",
|
||||
"print(counter)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Hand keypoint visualization"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import matplotlib.pyplot as plt\n",
|
||||
"\n",
|
||||
"def plot_hand_keypoints(dataframe, hand, frame):\n",
|
||||
" hand_columns = np.array([i for i in range(66 + (42 if hand == \"right\" else 0), 108 + (42 if hand == \"right\" else 0))])\n",
|
||||
" \n",
|
||||
" # get the x, y coordinates of the hand keypoints\n",
|
||||
" frame_df = dataframe.iloc[frame:frame+1, hand_columns]\n",
|
||||
" hand_coords = frame_df.values.reshape(21, 2)\n",
|
||||
" \n",
|
||||
" x_coords = hand_coords[:, ::2] #Even indices\n",
|
||||
" y_coords = hand_coords[:, 1::2] #Uneven indices\n",
|
||||
" \n",
|
||||
" #Plot the keypoints\n",
|
||||
" plt.scatter(x_coords, y_coords)\n",
|
||||
" return frame_df.style"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#Set video, hand and frame to display\n",
|
||||
"video_name = '69547.mp4'\n",
|
||||
"hand = \"right\"\n",
|
||||
"frame = 3\n",
|
||||
"%reload_ext autoreload"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from src.keypoint_extractor import KeypointExtractor\n",
|
||||
"import numpy as np\n",
|
||||
"\n",
|
||||
"#Extract keypoints from requested video\n",
|
||||
"keypoints_extractor = KeypointExtractor(\"data/videos/\")\n",
|
||||
"\n",
|
||||
"#Plot the hand keypoints\n",
|
||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name)\n",
|
||||
"df.head()\n",
|
||||
"plot_hand_keypoints(df, hand, frame)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#Plot the NORMALIZED hand keypoints (using minxmax)\n",
|
||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name, normalize=\"minmax\")\n",
|
||||
"plot_hand_keypoints(df, hand, frame)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#Plot the NORMALIZED hand keypoints (using bohacek)\n",
|
||||
"df = keypoints_extractor.extract_keypoints_from_video(video_name, normalize=\"bohacek\")\n",
|
||||
"plot_hand_keypoints(df, hand, frame)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.16"
|
||||
},
|
||||
"vscode": {
|
||||
"interpreter": {
|
||||
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
|
||||
}
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
167
webcam_view.py
Normal file
167
webcam_view.py
Normal file
@@ -0,0 +1,167 @@
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from src.identifiers import LANDMARKS
|
||||
from src.model import SPOTER
|
||||
|
||||
# Initialize MediaPipe Hands model
|
||||
holistic = mp.solutions.holistic.Holistic(
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
model_complexity=2
|
||||
)
|
||||
mp_holistic = mp.solutions.holistic
|
||||
mp_drawing = mp.solutions.drawing_utils
|
||||
# Initialize video capture object
|
||||
cap = cv2.VideoCapture(0)
|
||||
|
||||
|
||||
keypoints = []
|
||||
|
||||
spoter_model = SPOTER(num_classes=12, hidden_dim=len(LANDMARKS) *2)
|
||||
spoter_model.load_state_dict(torch.load('models/spoter_57.pth'))
|
||||
|
||||
m = {
|
||||
0: "A",
|
||||
1: "B",
|
||||
2: "C",
|
||||
3: "D",
|
||||
4: "E",
|
||||
5: "F",
|
||||
6: "G",
|
||||
7: "H",
|
||||
8: "I",
|
||||
9: "J",
|
||||
10: "K",
|
||||
11: "L",
|
||||
}
|
||||
|
||||
while True:
|
||||
# Read a frame from the webcam
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
# Convert the frame to RGB
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Detect hand landmarks in the frame
|
||||
results = holistic.process(frame)
|
||||
|
||||
def extract_keypoints(landmarks):
|
||||
if landmarks:
|
||||
return [i for landmark in landmarks.landmark for i in [landmark.x, landmark.y]]
|
||||
|
||||
k1 = extract_keypoints(results.pose_landmarks)
|
||||
k2 = extract_keypoints(results.left_hand_landmarks)
|
||||
k3 = extract_keypoints(results.right_hand_landmarks)
|
||||
|
||||
if k1 and (k2 or k3):
|
||||
data = np.array([k1 + (k2 or [0] * 42) + (k3 or [0] * 42)])
|
||||
|
||||
def normalize_hand(frame, data, hand, algorithm="minmax"):
|
||||
hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))])
|
||||
hand_data = np.array(data[0])[hand_columns]
|
||||
|
||||
# convert to absolute pixels
|
||||
hand_data = hand_data.reshape(21, 2)
|
||||
hand_data[:, 0] *= frame.shape[1]
|
||||
hand_data[:, 1] *= frame.shape[0]
|
||||
|
||||
min_x, min_y = np.min(hand_data[:, 0]), np.min(hand_data[:, 1])
|
||||
max_x, max_y = np.max(hand_data[:, 0]), np.max(hand_data[:, 1])
|
||||
|
||||
width, height = max_x - min_x, max_y - min_y
|
||||
|
||||
if algorithm == "minmax":
|
||||
bbox_height, bbox_width = height, width
|
||||
center_x, center_y = (min_x + max_x) / 2, (min_y + max_y) / 2
|
||||
|
||||
starting_x, starting_y = min_x, min_y
|
||||
ending_x, ending_y = max_x, max_y
|
||||
|
||||
elif algorithm == "bohacek":
|
||||
if width > height:
|
||||
delta_x = 0.1 * width
|
||||
delta_y = delta_x + ((width - height) / 2)
|
||||
else:
|
||||
delta_y = 0.1 * height
|
||||
delta_x = delta_y + ((height - width) / 2)
|
||||
|
||||
starting_x, starting_y = min_x - delta_x, min_y - delta_y
|
||||
ending_x, ending_y = max_x + delta_x, max_y + delta_y
|
||||
|
||||
center_x, center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2
|
||||
bbox_height, bbox_width = ending_y - starting_y, ending_x - starting_x
|
||||
|
||||
else:
|
||||
print("Not a valid normalization algorithm")
|
||||
return data, frame
|
||||
|
||||
if bbox_height == 0 or bbox_width == 0:
|
||||
return data, frame
|
||||
|
||||
center_coords = np.tile(np.array([center_x, center_y]), (21, 1)).reshape(21, 2)
|
||||
bbox_dims = np.tile(np.array([bbox_width, bbox_height]), (21, 1)).reshape(21, 2)
|
||||
|
||||
hand_data = (hand_data - center_coords) / bbox_dims
|
||||
|
||||
# add bouding box to frame
|
||||
frame = cv2.rectangle(frame, (int(starting_x), int(starting_y)), (int(ending_x), int(ending_y)), (0, 255, 0), 2)
|
||||
|
||||
data[:, hand_columns] = hand_data.reshape(-1, 42)
|
||||
return data, frame
|
||||
|
||||
norm_alg = "minmax"
|
||||
|
||||
data, frame = normalize_hand(frame, data, "left_hand", norm_alg)
|
||||
data, frame = normalize_hand(frame, data, "right_hand", norm_alg)
|
||||
|
||||
# get values of the landmarks as a list of integers
|
||||
values = []
|
||||
for i in LANDMARKS.values():
|
||||
values.append(i*2)
|
||||
values.append(i*2+1)
|
||||
filtered = np.array(data[0])[np.array(values)]
|
||||
|
||||
while len(keypoints) >= 8:
|
||||
keypoints.pop(0)
|
||||
keypoints.append(filtered)
|
||||
|
||||
if len(keypoints) == 8:
|
||||
# keypoints to tensor
|
||||
keypoints_tensor = torch.tensor(keypoints).float()
|
||||
|
||||
# predict
|
||||
outputs = spoter_model(keypoints_tensor).expand(1, -1, -1)
|
||||
|
||||
# softmax
|
||||
outputs = torch.nn.functional.softmax(outputs, dim=2)
|
||||
|
||||
# get topk predictions
|
||||
topk = torch.topk(outputs, k=3, dim=2)
|
||||
|
||||
# show overlay on frame at top right with confidence scores of topk predictions
|
||||
for i, (label, score) in enumerate(zip(topk.indices[0][0], topk.values[0][0])):
|
||||
cv2.putText(frame, f"{m[label.item()]} {score.item():.2f}", (frame.shape[1] - 200, 50 + i * 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||
|
||||
|
||||
mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||
mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
|
||||
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
|
||||
|
||||
# frame to rgb
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
||||
|
||||
# Show the frame
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
|
||||
# Wait for key press to exit
|
||||
if cv2.waitKey(5) & 0xFF == 27:
|
||||
break
|
||||
|
||||
# Release the video capture object and destroy the windows
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
Reference in New Issue
Block a user