15 Commits

18 changed files with 1225 additions and 442 deletions

View File

@@ -1,21 +0,0 @@
kind: pipeline
name: sonarcube
type: docker
steps:
- name: code-analysis
pull: if-not-exists
image: sonarsource/sonar-scanner-cli
commands:
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY -Dsonar.qualitygate.wait=true
environment:
SONAR_HOST:
from_secret: sonar_host
SONAR_TOKEN:
from_secret: sonar_token
SONAR_PROJECT_KEY:
from_secret: sonar_project_key
trigger:
event:
- push

View File

@@ -17,4 +17,5 @@ requests==2.28.1
onnx==1.12.0 onnx==1.12.0
onnx-tf==1.10.0 onnx-tf==1.10.0
onnxruntime==1.12.1 onnxruntime==1.12.1
coremltools==6.3.0 tensorflow
tensorflow-probability

View File

@@ -89,7 +89,7 @@ with torch.no_grad():
df = pd.read_csv(args.dataset) df = pd.read_csv(args.dataset)
df["embeddings"] = embeddings df["embeddings"] = embeddings
df = df[['embeddings', 'label_name', 'labels']] df = df[['embeddings', 'label_name', 'labels']]
df['embeddings'] = df['embeddings'].apply(lambda x: x.tolist()[0]) df['embeddings2'] = df['embeddings'].apply(lambda x: x.tolist()[0])
if args.format == 'json': if args.format == 'json':
df.to_json(args.output, orient='records') df.to_json(args.output, orient='records')

View File

@@ -1,15 +1,13 @@
# to run this script, you need torch 1.13.1 and torchvision 0.14.1
import numpy as np import numpy as np
import onnx import onnx
import torch import torch
import torchvision import torchvision
import os
from models.spoter_embedding_model import SPOTER_EMBEDDINGS from models.spoter_embedding_model import SPOTER_EMBEDDINGS
# set parameters of the model # set parameters of the model
model_name = 'fingerspelling_embedding_model' model_name = 'embedding_model'
output=32
# load PyTorch model from .pth file # load PyTorch model from .pth file
@@ -17,7 +15,7 @@ device = torch.device("cpu")
# if torch.cuda.is_available(): # if torch.cuda.is_available():
# device = torch.device("cuda") # device = torch.device("cuda")
CHECKPOINT_PATH = "checkpoints/fingerspelling_checkpoint.pth" CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
model = SPOTER_EMBEDDINGS( model = SPOTER_EMBEDDINGS(
@@ -29,39 +27,45 @@ model.load_state_dict(checkpoint["state_dict"])
# set model to evaluation mode # set model to evaluation mode
model.eval() model.eval()
dummy_input = torch.randn(1, 10, 54, 2) model_export = "onnx"
if model_export == "coreml":
dummy_input = torch.randn(1, 10, 54, 2)
# set device for dummy input
dummy_input = dummy_input.to(device)
traced_model = torch.jit.trace(model, dummy_input)
# check if models folder exists out = traced_model(dummy_input)
if not os.path.exists('out-models'): import coremltools as ct
os.makedirs('out-models')
for model_export in ["onnx", "coreml"]: # Convert to Core ML
if model_export == "coreml": coreml_model = ct.convert(
# set device for dummy input traced_model,
dummy_input = dummy_input.to(device) inputs=[ct.TensorType(name="input", shape=dummy_input.shape)],
traced_model = torch.jit.trace(model, dummy_input) )
out = traced_model(dummy_input) # Save Core ML model
import coremltools as ct coreml_model.save("out-models/" + model_name + ".mlmodel")
else:
# create dummy input tensor
dummy_input = torch.randn(1, 10, 54, 2)
# set device for dummy input
dummy_input = dummy_input.to(device)
# Convert to Core ML # export model to ONNX format
coreml_model = ct.convert( output_file = 'models/' + model_name + '.onnx'
traced_model, torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
inputs=[ct.TensorType(name="input", shape=dummy_input.shape)],
)
# Save Core ML model torch.onnx.export(model, # model being run
coreml_model.save("out-models/" + model_name + ".mlmodel") dummy_input, # model input (or a tuple for multiple inputs)
else: 'out-models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object)
# set device for dummy input export_params=True, # store the trained parameter weights inside the model file
dummy_input = dummy_input.to(device) opset_version=9, # the ONNX version to export the model to
do_constant_folding=True, # whether to execute constant folding for optimization
input_names = ['X'], # the model's input names
output_names = ['Y'] # the model's output names
)
torch.onnx.export(model, # model being run
dummy_input, # model input (or a tuple for multiple inputs) # load exported ONNX model for verification
'out-models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object) onnx_model = onnx.load(output_file)
export_params=True, # store the trained parameter weights inside the model file onnx.checker.check_model(onnx_model)
opset_version=9, # the ONNX version to export the model to
do_constant_folding=True, # whether to execute constant folding for optimization
input_names = ['X'], # the model's input names
output_names = ['Y'] # the model's output names
)

View File

@@ -88,10 +88,9 @@ def train_epoch_embedding_online(model, epoch_iters, train_loader, val_loader, c
if enable_batch_sorting: if enable_batch_sorting:
if labels_size < train_loader.batch_size: if labels_size < train_loader.batch_size:
trim_count = labels_size % mini_batch trim_count = labels_size % mini_batch
if trim_count > 0: inputs = inputs[:-trim_count]
inputs = inputs[:-trim_count] labels = labels[:-trim_count]
labels = labels[:-trim_count] masks = masks[:-trim_count]
masks = masks[:-trim_count]
embeddings = None embeddings = None
with torch.no_grad(): with torch.no_grad():
for j in range(batch_loop_count): for j in range(batch_loop_count):

File diff suppressed because one or more lines are too long

93
predictions/k_nearest.py Normal file
View File

@@ -0,0 +1,93 @@
import numpy as np
from collections import Counter
# TODO scaling van distance tov intra distances?
# TODO efficientere manier om k=1 te doen
def minkowski_distance_p(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
# Find the smallest common datatype with float64 (return type of this
# function) - addresses #10262.
# Don't just cast to float64 for complex input case.
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
'float64')
# Make sure x and y are NumPy arrays of correct datatype.
x = x.astype(common_datatype)
y = y.astype(common_datatype)
if p == np.inf:
return np.amax(np.abs(y - x), axis=-1)
elif p == 1:
return np.sum(np.abs(y - x), axis=-1)
else:
return np.sum(np.abs(y - x) ** p, axis=-1)
def minkowski_distance(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
if p == np.inf or p == 1:
return minkowski_distance_p(x, y, p)
else:
return minkowski_distance_p(x, y, p) ** (1. / p)
class KNearestNeighbours:
def __init__(self, k=5):
self.k = k
self.embeddings = None
self.embeddings_list = None
def set_embeddings(self, embeddings):
self.embeddings = embeddings
df = embeddings.drop(columns=['labels', 'label_name', 'embeddings'])
# convert embedding from string to list of floats
df["embeddings"] = df["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
# drop embeddings2
df = df.drop(columns=['embeddings2'])
# to list
self.embeddings_list = df["embeddings"].tolist()
def distance_matrix(self, keypoints, p=2, threshold=1000000):
x = np.array(keypoints)
m, k = x.shape
y = np.asarray(self.embeddings_list)
n, kk = y.shape
if k != kk:
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
f"{kk}-dimensional vectors")
if m * n * k <= threshold:
# print("Using minkowski_distance")
return minkowski_distance(x[:, np.newaxis, :], y[np.newaxis, :, :], p)
else:
result = np.empty((m, n), dtype=float) # FIXME: figure out the best dtype
if m < n:
for i in range(m):
result[i, :] = minkowski_distance(x[i], y, p)
else:
for j in range(n):
result[:, j] = minkowski_distance(x, y[j], p)
return result
def predict(self, key_points_embeddings):
# calculate distance matrix
dist_matrix = self.distance_matrix(key_points_embeddings, p=2, threshold=1000000)
# get the 5 closest matches and select the class that is most common and use the average distance as the score
# get the 5 closest matches
indeces = np.argsort(dist_matrix)[0][:self.k]
# get the labels
labels = self.embeddings["label_name"].iloc[indeces].tolist()
c = Counter(labels).most_common()[0][0]
# filter indeces to only include the most common label
indeces = [i for i in indeces if self.embeddings["label_name"].iloc[i] == c]
# get the average distance
score = np.mean(dist_matrix[0][indeces])
return c, score

86
predictions/plotting.py Normal file
View File

@@ -0,0 +1,86 @@
import json
from matplotlib import pyplot as plt
def load_results():
with open("predictions/test_results/knn.json", 'r') as f:
results = json.load(f)
return results
def plot_all():
results = load_results()
print(f"average elapsed time to detect a sign: {get_general_elapsed_time(results)}")
plot_general_accuracy(results)
for label in results.keys():
plot_accuracy_per_label(results, label)
def general_accuracy(results):
label_accuracy = get_label_accuracy(results)
accuracy = []
amount = []
response = []
for label in label_accuracy.keys():
for index, value in enumerate(label_accuracy[label]):
if index >= len(accuracy):
accuracy.append(0)
amount.append(0)
accuracy[index] += label_accuracy[label][index]
amount[index] += 1
for a, b in zip(accuracy, amount):
if b < 5:
break
response.append(a / b)
return response
def plot_general_accuracy(results):
accuracy = general_accuracy(results)
plt.plot(accuracy)
plt.title = "General accuracy"
plt.ylabel('accuracy')
plt.xlabel('buffer')
plt.show()
def plot_accuracy_per_label(results, label):
accuracy = get_label_accuracy(results)
plt.plot(accuracy[label], label=label)
plt.titel = f"Accuracy per label {label}"
plt.ylabel('accuracy')
plt.xlabel('prediction')
plt.legend()
plt.show()
def get_label_accuracy(results):
accuracy = {}
amount = {}
response = {}
for label, predictions in results.items():
if label not in accuracy:
accuracy[label] = []
amount[label] = []
for prediction in predictions:
for index, value in enumerate(prediction["predictions"]):
if index >= len(accuracy[label]):
accuracy[label].append(0)
amount[label].append(0)
accuracy[label][index] += 1 if value["correct"] else 0
amount[label][index] += 1
for label in accuracy:
response[label] = []
for index, value in enumerate(accuracy[label]):
if amount[label][index] < 2:
break
response[label].append(accuracy[label][index] / amount[label][index])
return response
def get_general_elapsed_time(results):
label_time = get_label_elapsed_time(results)
return sum([label_time[label] for label in results]) / len(results)
def get_label_elapsed_time(results):
return {label: sum([result["elapsed_time"] for result in results[label]]) / len(results[label]) for label in results}
if __name__ == '__main__':
plot_all()

267
predictions/predictor.py Normal file
View File

@@ -0,0 +1,267 @@
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import torch
from predictions.k_nearest import KNearestNeighbours
device = torch.device("cpu")
if torch.cuda.is_available():
device = torch.device("cuda")
from models import SPOTER_EMBEDDINGS
BODY_IDENTIFIERS = [
0,
33,
5,
2,
8,
7,
12,
11,
14,
13,
16,
15,
]
HAND_IDENTIFIERS = [
0,
8,
7,
6,
5,
12,
11,
10,
9,
16,
15,
14,
13,
20,
19,
18,
17,
4,
3,
2,
1,
]
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
class Predictor:
def __init__(self, embeddings_path, predictor_type):
# Initialize MediaPipe Hands model
self.holistic = mp.solutions.holistic.Holistic(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=2
)
self.mp_holistic = mp.solutions.holistic
self.mp_drawing = mp.solutions.drawing_utils
# buffer = []
self.left_shoulder_index = 11
self.right_shoulder_index = 12
self.neck_index = 33
self.nose_index = 0
self.left_eye_index = 2
# load training embedding csv
self.embeddings = pd.read_csv(embeddings_path)
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
self.model = SPOTER_EMBEDDINGS(
features=checkpoint["config_args"].vector_length,
hidden_dim=checkpoint["config_args"].hidden_dim,
norm_emb=checkpoint["config_args"].normalize_embeddings,
).to(device)
self.model.load_state_dict(checkpoint["state_dict"])
if predictor_type is None:
self.predictor = KNearestNeighbours(1)
else:
self.predictor = predictor_type
self.predictor.set_embeddings(self.embeddings)
def extract_keypoints(self, image_orig):
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
results = self.holistic.process(image)
def extract_keypoints(lmks):
if lmks:
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
return a
return None
def calculate_neck(keypoints):
if keypoints is not None:
left_shoulder = keypoints[11]
right_shoulder = keypoints[12]
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2,
(float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
# add neck to keypoints
keypoints = np.append(keypoints, [neck], axis=0)
return keypoints
return None
pose = extract_keypoints(results.pose_landmarks)
pose = calculate_neck(pose)
if pose is None:
return None
pose_norm = self.normalize_pose(pose)
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
pose_norm = pose_norm[BODY_IDENTIFIERS]
left_hand = extract_keypoints(results.left_hand_landmarks)
right_hand = extract_keypoints(results.right_hand_landmarks)
if left_hand is None and right_hand is None:
return None
# normalize hands
if left_hand is not None:
left_hand = self.normalize_hand(left_hand)
else:
left_hand = np.zeros((21, 2))
if right_hand is not None:
right_hand = self.normalize_hand(right_hand)
else:
right_hand = np.zeros((21, 2))
left_hand = left_hand[HAND_IDENTIFIERS]
right_hand = right_hand[HAND_IDENTIFIERS]
# combine pose and hands
pose_norm = np.append(pose_norm, left_hand, axis=0)
pose_norm = np.append(pose_norm, right_hand, axis=0)
# move interval
pose_norm -= 0.5
return pose_norm
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
def normalize_pose(self, keypoints):
left_shoulder = keypoints[self.left_shoulder_index]
right_shoulder = keypoints[self.right_shoulder_index]
neck = keypoints[self.neck_index]
nose = keypoints[self.nose_index]
# Prevent from even starting the analysis if some necessary elements are not present
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
return keypoints
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (
left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + (
(left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
head_metric = shoulder_distance
else:
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
head_metric = neck_nose_distance
# Set the starting and ending point of the normalization bounding box
starting_point = [keypoints[self.neck_index][0] - 3 * head_metric,
keypoints[self.left_eye_index][1] + head_metric]
ending_point = [keypoints[self.neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
if starting_point[0] < 0:
starting_point[0] = 0
if starting_point[1] < 0:
starting_point[1] = 0
if ending_point[0] < 0:
ending_point[0] = 0
if ending_point[1] < 0:
ending_point[1] = 0
# Normalize the keypoints
for i in range(len(keypoints)):
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
return keypoints
def normalize_hand(self, keypoints):
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
if not x_values or not y_values:
return keypoints
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
if width > height:
delta_x = 0.1 * width
delta_y = delta_x + ((width - height) / 2)
else:
delta_y = 0.1 * height
delta_x = delta_y + ((height - width) / 2)
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
return keypoints
# normalize keypoints
for i in range(len(keypoints)):
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
return keypoints
def get_embedding(self, keypoints):
# run model on frame
self.model.eval()
with torch.no_grad():
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0]
return new_embeddings
def predict(self, embeddings):
return self.predictor.predict(embeddings)
def make_prediction(self, keypoints):
# run model on frame
self.model.eval()
with torch.no_grad():
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0]
return self.predictor.predict(new_embeddings)
def validation(self):
# load validation data
validation_data = np.load('validation_data.npy', allow_pickle=True)
validation_labels = np.load('validation_labels.npy', allow_pickle=True)
# run model on validation data
self.model.eval()
with torch.no_grad():
validation_embeddings = self.model(torch.from_numpy(validation_data).float().to(device)).cpu().numpy()
# predict validation data
predictions = self.predictor.predict(validation_embeddings)
# calculate accuracy
correct = 0
for i in range(len(predictions)):
if predictions[i] == validation_labels[i]:
correct += 1
accuracy = correct / len(predictions)
print('Accuracy: ' + str(accuracy))

34
predictions/svm_model.py Normal file
View File

@@ -0,0 +1,34 @@
from sklearn import svm
class SVM:
def __init__(self, type="ovo"):
self.label_name_to_label = None
self.clf = None
self.embeddings_list = None
self.labels = None
self.type = type
def set_embeddings(self, embeddings):
# convert embedding from string to list of floats
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
# drop embeddings2
df = embeddings.drop(columns=['embeddings2'])
# to list
self.embeddings_list = df["embeddings"].tolist()
self.labels = df["labels"].tolist()
self.label_name_to_label = df[["label_name", "labels"]]
self.label_name_to_label.columns = ["label_name", "label"]
self.label_name_to_label = self.label_name_to_label.drop_duplicates()
self.train()
def train(self):
self.clf = svm.SVC(decision_function_shape=self.type, probability=True)
self.clf.fit(self.embeddings_list, self.labels)
def predict(self, key_points_embeddings):
label = self.clf.predict(key_points_embeddings)
score = self.clf.predict_log_proba(key_points_embeddings)
# TODO fix dictionary
label = label.item()
return self.label_name_to_label.loc[self.label_name_to_label["label"] == label]["label_name"].iloc[0], score[0][label]

File diff suppressed because one or more lines are too long

137
predictions/validation.py Normal file
View File

@@ -0,0 +1,137 @@
import json
import os
import time
import cv2
import numpy as np
from matplotlib import pyplot as plt
from predictions.k_nearest import KNearestNeighbours
from predictions.predictor import Predictor
from predictions.svm_model import SVM
buffer_size = 15
def predict_video(predictor, path_video):
# open mp4 video
cap = cv2.VideoCapture(path_video)
buffer = []
ret, img = cap.read() # read one frame from the 'capture' object; img is (H, W, C)
desired_fps = 15
original_fps = int(cap.get(cv2.CAP_PROP_FPS))
print("Original FPS: ", original_fps)
# Calculate the frame skipping rate based on desired frame rate
frame_skip = original_fps // desired_fps
if frame_skip == 0:
frame_skip = 1
print("Frame skip: ", frame_skip)
frame_number = 0
while img is not None:
pose = predictor.extract_keypoints(img)
if pose is not None and frame_number % frame_skip == 0:
buffer.append(pose)
frame_number += 1
ret, img = cap.read() # read one frame from the 'capture' object; img is (H, W, C)
print(len(buffer))
return buffer
def get_embeddings(predictor, buffer, name):
# check if file exists with name
# if os.path.exists("predictions/test_embeddings/" + name + ".csv"):
# print("Loading embeddings from file")
# # load embeddings from file
# with open("predictions/test_embeddings/" + name + ".csv", 'r') as f:
# embeddings = json.load(f)
# else:
embeddings = []
for index in range(buffer_size, len(buffer)):
embedding = predictor.get_embedding(buffer[index - buffer_size:index])
embeddings.append(embedding)
with open("predictions/test_embeddings/" + name + ".csv", 'w') as f:
json.dump(embeddings, f)
return embeddings
def compare_embeddings(predictor, embeddings, label_video, ):
results = []
for embedding in embeddings:
label, score = predictor.predict(embedding)
results.append({"label": label, "score": score, "label_video": label_video, "correct": label == label_video})
return results
def predict_video_files(predictor, path_video, label_video):
buffer = predict_video(predictor, path_video)
embeddings = get_embeddings(predictor, buffer, path_video.split("/")[-1].split(".")[0])
return compare_embeddings(predictor, embeddings, label_video)
def get_test_data(data_folder):
files = np.array([data_folder + f for f in os.listdir(data_folder) if f.endswith(".mp4")])
train_test = [f.split("/")[-1].split("!")[1] for f in files]
test_files = files[np.array(train_test) == "test"]
test_labels = [f.split("/")[-1].split("!")[0] for f in test_files]
return test_files, test_labels
def test_data(predictor, data_folder):
results = {}
for path_video, label_video in zip(*get_test_data(data_folder)):
print(path_video, label_video)
start_time = time.time()
prediction = predict_video_files(predictor, path_video, label_video)
end_time = time.time()
elapsed_time = end_time - start_time
# divide elapsed time by amount of predictions made so it represents an avarage execution time
if len(prediction) > 0:
elapsed_time /= len(prediction)
if label_video not in results:
results[label_video] = []
results[label_video].append({"predictions": prediction, "elapsed_time": elapsed_time, "video": path_video})
print("DONE")
return results
def plot_general_accuracy(results):
accuracy = []
amount = []
for result in results:
for index, value in enumerate(result[0]):
if len(accuracy) <= index:
accuracy.append(0)
amount.append(0)
accuracy[index] += 1 if value["correct"] else 0
amount[index] += 1
# plot the general accuracy
plt.plot(accuracy)
plt.show()
if __name__ == "__main__":
type_predictor = "knn"
if type_predictor == "knn":
k = 1
predictor_type = KNearestNeighbours(k)
elif type_predictor == "svm":
predictor_type = SVM()
else:
predictor_type = KNearestNeighbours(1)
# embeddings_path = 'embeddings/basic-signs/embeddings.csv'
embeddings_path = 'embeddings/fingerspelling/embeddings.csv'
predictor = Predictor(embeddings_path, predictor_type)
data_folder = '/home/tibe/Projects/design_project/sign-predictor/data/fingerspelling/data/'
results = test_data(predictor, data_folder)
# write results to a results json file
with open("predictions/test_results/" + type_predictor + ".json", 'w') as f:
json.dump(results, f)
print(results)
# plot_general_accuracy(results)

View File

@@ -35,11 +35,7 @@ class LandmarksResults:
): ):
self.results = results self.results = results
self.num_landmarks_pose = num_landmarks_pose self.num_landmarks_pose = num_landmarks_pose
self.num_landmarks_hand = num_landmarks_hand self.num_landmarks_hand = num_landmarks_hand
@property
def empty(self):
return self.results.pose_landmarks is None or (self.results.left_hand_landmarks is None and self.results.right_hand_landmarks is None)
@property @property
def pose_landmarks(self): def pose_landmarks(self):
@@ -71,10 +67,6 @@ def get_landmarks(image_orig, holistic, debug=False):
# Convert the BGR image to RGB before processing. # Convert the BGR image to RGB before processing.
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
results = LandmarksResults(holistic.process(image)) results = LandmarksResults(holistic.process(image))
if results.empty:
return None
if debug: if debug:
lmks_pose = [] lmks_pose = []
for lmk in results.pose_landmarks: for lmk in results.pose_landmarks:
@@ -102,7 +94,6 @@ def get_landmarks(image_orig, holistic, debug=False):
len(lmks_right_hand) == 2 * LEN_LANDMARKS_HAND len(lmks_right_hand) == 2 * LEN_LANDMARKS_HAND
), f"{len(lmks_right_hand)} != {2 * LEN_LANDMARKS_HAND}" ), f"{len(lmks_right_hand)} != {2 * LEN_LANDMARKS_HAND}"
landmarks = [] landmarks = []
for lmk in chain( for lmk in chain(
results.pose_landmarks, results.pose_landmarks,
results.left_hand_landmarks, results.left_hand_landmarks,
@@ -137,11 +128,6 @@ def extract(args):
videos_folder = args.videos_folder videos_folder = args.videos_folder
os.makedirs(landmarks_output, exist_ok=True) os.makedirs(landmarks_output, exist_ok=True)
for fn_video in tqdm(sorted(glob.glob(op.join(videos_folder, "*mp4")))): for fn_video in tqdm(sorted(glob.glob(op.join(videos_folder, "*mp4")))):
# check if landmarks already exist
if op.exists(op.join(landmarks_output, op.basename(fn_video).split(".")[0] + ".npy")):
continue
cap = cv2.VideoCapture(fn_video) cap = cv2.VideoCapture(fn_video)
ret, image_orig = cap.read() ret, image_orig = cap.read()
height, width = image_orig.shape[:2] height, width = image_orig.shape[:2]
@@ -149,7 +135,7 @@ def extract(args):
# make sure fps is 20 by determining the number of frames to be skipped # make sure fps is 20 by determining the number of frames to be skipped
frame_rate = int(cap.get(cv2.CAP_PROP_FPS)) frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
frame_skip = (frame_rate // 10) - 1 frame_skip = (frame_rate // 20) - 1
with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar: with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
@@ -168,8 +154,7 @@ def extract(args):
for _ in range(frame_skip): for _ in range(frame_skip):
ret, image_orig = cap.read() ret, image_orig = cap.read()
pbar.update(1) pbar.update(1)
if landmarks: landmarks_video.append(landmarks)
landmarks_video.append(landmarks)
pbar.update(1) pbar.update(1)
landmarks_video = np.vstack(landmarks_video) landmarks_video = np.vstack(landmarks_video)
np.save( np.save(

View File

@@ -16,9 +16,6 @@ with open("data/sign_to_prediction_index_map.json", "r") as f:
# filter df to make sure each sign has at least 4 samples # filter df to make sure each sign has at least 4 samples
df = df[df["sign"].map(df["sign"].value_counts()) > 4] df = df[df["sign"].map(df["sign"].value_counts()) > 4]
# print number of unique signs
print("Number of unique signs: ", len(df["sign"].unique()))
# use the path column to split the dataset # use the path column to split the dataset
paths = df["path"].unique() paths = df["path"].unique()

View File

@@ -12,5 +12,4 @@ clearml==1.10.3
torch==2.0.0 torch==2.0.0
torchvision==0.15.1 torchvision==0.15.1
tqdm==4.54.1 tqdm==4.54.1
optuna==3.1.1 optuna==3.1.1
onnx==1.14.0

View File

@@ -246,9 +246,6 @@ def train(args, tracker: Tracker):
val_accs.append(val_acc) val_accs.append(val_acc)
tracker.log_scalar_metric("acc", "val", epoch, val_acc) tracker.log_scalar_metric("acc", "val", epoch, val_acc)
create_embedding_scatter_plots(tracker, slrt_model, train_loader, val_loader, device, id_to_label, epoch,
top_model_name)
logger.info(f"Epoch time: {datetime.now() - start_time}") logger.info(f"Epoch time: {datetime.now() - start_time}")
logger.info("[" + str(epoch) + "] TRAIN loss: " + str(train_loss) + " acc: " + str(train_accs[-1])) logger.info("[" + str(epoch) + "] TRAIN loss: " + str(train_loss) + " acc: " + str(train_accs[-1]))
logger.info("[" + str(epoch) + "] VALIDATION acc: " + str(val_accs[-1])) logger.info("[" + str(epoch) + "] VALIDATION acc: " + str(val_accs[-1]))

View File

@@ -1,24 +1,23 @@
#!/bin/sh #!/bin/sh
python3 -m train \ python3 -m train \
--save_checkpoints_every 10 \ --save_checkpoints_every 1 \
--experiment_name "Finetune Fingerspelling Signs" \ --experiment_name "Finetune Basic Signs" \
--epochs 1000 \ --epochs 100 \
--optimizer "ADAM" \ --optimizer "ADAM" \
--lr 0.00001 \ --lr 0.00001 \
--batch_size 8 \ --batch_size 16 \
--dataset_name "FingerSpelling" \ --dataset_name "BasicSigns" \
--training_set_path "train.csv" \ --training_set_path "train.csv" \
--validation_set_path "val.csv" \ --validation_set_path "val.csv" \
--vector_length 32 \ --vector_length 32 \
--epoch_iters -1 \ --epoch_iters -1 \
--scheduler_factor 0 \ --scheduler_factor 0.05 \
--hard_triplet_mining "in_batch" \ --hard_triplet_mining "None" \
--filter_easy_triplets \ --filter_easy_triplets \
--start_mining_hard 50 \ --triplet_loss_margin 2 \
--triplet_loss_margin 4 \
--dropout 0.2 \ --dropout 0.2 \
--tracker=clearml \ --tracker=clearml \
--dataset_loader=clearml \ --dataset_loader=clearml \
--dataset_project="SpoterEmbedding" \ --dataset_project="SpoterEmbedding" \
--finetune \ --finetune \
--checkpoint_path "checkpoints/checkpoint_embed_3835.pth" --checkpoint_path "checkpoints/checkpoint_embed_3006.pth"

359
webcam.py
View File

@@ -1,339 +1,54 @@
from collections import Counter
import cv2 import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import torch
device = torch.device("cpu") from predictions.k_nearest import KNearestNeighbours
if torch.cuda.is_available(): from predictions.predictor import Predictor
device = torch.device("cuda") from predictions.svm_model import SVM
from models import SPOTER_EMBEDDINGS
# Initialize MediaPipe Hands model if __name__ == '__main__':
holistic = mp.solutions.holistic.Holistic( buffer = []
min_detection_confidence=0.5, # open webcam stream
min_tracking_confidence=0.5, cap = cv2.VideoCapture(0)
model_complexity=2
)
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
BODY_IDENTIFIERS = [ type_predictor = "svm"
0, if type_predictor == "knn":
33, k = 10
5, predictor_type = KNearestNeighbours(k)
2, elif type_predictor == "svm":
8, predictor_type = SVM()
7,
12,
11,
14,
13,
16,
15,
]
HAND_IDENTIFIERS = [
0,
8,
7,
6,
5,
12,
11,
10,
9,
16,
15,
14,
13,
20,
19,
18,
17,
4,
3,
2,
1,
]
def extract_keypoints(image_orig):
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
results = holistic.process(image)
def extract_keypoints(lmks):
if lmks:
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
return a
return None
def calculate_neck(keypoints):
left_shoulder = keypoints[11]
right_shoulder = keypoints[12]
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
# add neck to keypoints
keypoints = np.append(keypoints, [neck], axis=0)
return keypoints
pose = extract_keypoints(results.pose_landmarks)
pose = calculate_neck(pose)
pose_norm = normalize_pose(pose)
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
pose_norm = pose_norm[BODY_IDENTIFIERS]
left_hand = extract_keypoints(results.left_hand_landmarks)
right_hand = extract_keypoints(results.right_hand_landmarks)
if left_hand is None and right_hand is None:
return None
# normalize hands
if left_hand is not None:
left_hand = normalize_hand(left_hand)
else: else:
left_hand = np.zeros((21, 2)) predictor_type = KNearestNeighbours(1)
if right_hand is not None:
right_hand = normalize_hand(right_hand)
else:
right_hand = np.zeros((21, 2))
left_hand = left_hand[HAND_IDENTIFIERS]
right_hand = right_hand[HAND_IDENTIFIERS]
# combine pose and hands
pose_norm = np.append(pose_norm, left_hand, axis=0)
pose_norm = np.append(pose_norm, right_hand, axis=0)
# move interval
pose_norm -= 0.5
return pose_norm
buffer = []
left_shoulder_index = 11 # embeddings_path = 'embeddings/basic-signs/embeddings.csv'
right_shoulder_index = 12 embeddings_path = 'embeddings/fingerspelling/embeddings.csv'
neck_index = 33
nose_index = 0
left_eye_index = 2
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) predictor = Predictor(embeddings_path, predictor_type)
def normalize_pose(keypoints):
left_shoulder = keypoints[left_shoulder_index]
right_shoulder = keypoints[right_shoulder_index]
neck = keypoints[neck_index] index = 0
nose = keypoints[nose_index]
# Prevent from even starting the analysis if some necessary elements are not present while cap.isOpened():
if (left_shoulder[0] == 0 or right_shoulder[0] == 0 # Wait for key press to exit
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( if cv2.waitKey(5) & 0xFF == 27:
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): break
return keypoints
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
head_metric = shoulder_distance
else:
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
head_metric = neck_nose_distance
# Set the starting and ending point of the normalization bounding box ret, frame = cap.read()
starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric] pose = predictor.extract_keypoints(frame)
ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
if starting_point[0] < 0: if pose is None:
starting_point[0] = 0 cv2.imshow('MediaPipe Hands', frame)
if starting_point[1] < 0: continue
starting_point[1] = 0
if ending_point[0] < 0:
ending_point[0] = 0
if ending_point[1] < 0:
ending_point[1] = 0
# Normalize the keypoints buffer.append(pose)
for i in range(len(keypoints)): if len(buffer) > 15:
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) buffer.pop(0)
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
return keypoints if len(buffer) == 15:
label, score = predictor.make_prediction(buffer)
def normalize_hand(keypoints): # draw label
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] cv2.putText(frame, str(label), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
if not x_values or not y_values: # Show the frame
return keypoints cv2.imshow('MediaPipe Hands', frame)
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
if width > height:
delta_x = 0.1 * width
delta_y = delta_x + ((width - height) / 2)
else:
delta_y = 0.1 * height
delta_x = delta_y + ((height - width) / 2)
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
return keypoints
# normalize keypoints
for i in range(len(keypoints)):
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
return keypoints
# load training embedding csv
df = pd.read_csv('embeddings/basic-signs/embeddings.csv')
def minkowski_distance_p(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
# Find smallest common datatype with float64 (return type of this
# function) - addresses #10262.
# Don't just cast to float64 for complex input case.
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
'float64')
# Make sure x and y are NumPy arrays of correct datatype.
x = x.astype(common_datatype)
y = y.astype(common_datatype)
if p == np.inf:
return np.amax(np.abs(y-x), axis=-1)
elif p == 1:
return np.sum(np.abs(y-x), axis=-1)
else:
return np.sum(np.abs(y-x)**p, axis=-1)
def minkowski_distance(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
if p == np.inf or p == 1:
return minkowski_distance_p(x, y, p)
else:
return minkowski_distance_p(x, y, p)**(1./p)
def distance_matrix(keypoints, embeddings, p=2, threshold=1000000):
x = np.array(keypoints)
m, k = x.shape
y = np.asarray(embeddings)
n, kk = y.shape
if k != kk:
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
f"{kk}-dimensional vectors")
if m*n*k <= threshold:
print("Using minkowski_distance")
return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p)
else:
result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype
if m < n:
for i in range(m):
result[i,:] = minkowski_distance(x[i],y,p)
else:
for j in range(n):
result[:,j] = minkowski_distance(x,y[j],p)
return result
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
model = SPOTER_EMBEDDINGS(
features=checkpoint["config_args"].vector_length,
hidden_dim=checkpoint["config_args"].hidden_dim,
norm_emb=checkpoint["config_args"].normalize_embeddings,
).to(device)
model.load_state_dict(checkpoint["state_dict"])
embeddings = df.drop(columns=['labels', 'label_name', 'embeddings'])
# convert embedding from string to list of floats
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
# drop embeddings2
embeddings = embeddings.drop(columns=['embeddings2'])
# to list
embeddings = embeddings["embeddings"].tolist()
def make_prediction(keypoints):
# run model on frame
model.eval()
with torch.no_grad():
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
new_embeddings = model(keypoints).cpu().numpy().tolist()[0]
# calculate distance matrix
dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000)
# get the 5 closest matches and select the class that is most common and use the average distance as the score
# get the 5 closest matches
indeces = np.argsort(dist_matrix)[0][:5]
# get the labels
labels = df["label_name"].iloc[indeces].tolist()
c = Counter(labels).most_common()[0][0]
# filter indeces to only include the most common label
indeces = [i for i in indeces if df["label_name"].iloc[i] == c]
# get the average distance
score = np.mean(dist_matrix[0][indeces])
return c, score
# open webcam stream
cap = cv2.VideoCapture(0)
while cap.isOpened():
# read frame
ret, frame = cap.read()
pose = extract_keypoints(frame)
if pose is None:
cv2.imshow('MediaPipe Hands', frame)
continue
buffer.append(pose)
if len(buffer) > 15:
buffer.pop(0)
if len(buffer) == 15:
label, score = make_prediction(buffer)
# draw label
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
# Show the frame
cv2.imshow('MediaPipe Hands', frame)
# Wait for key press to exit
if cv2.waitKey(5) & 0xFF == 27:
break
# open video A.mp4
# cap = cv2.VideoCapture('E.mp4')
# while cap.isOpened():
# # read frame
# ret, frame = cap.read()
# if frame is None:
# break
# pose = extract_keypoints(frame)
# buffer.append(pose)
# label, score = make_prediction(buffer)
# print(label, score)