Compare commits
15 Commits
main
...
multiple_p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ff50ae7a2 | ||
|
|
d9c24df5f4 | ||
|
|
672f86c317 | ||
| 0941814d0b | |||
| 078fb4e38d | |||
|
|
9f5309e878 | ||
| 3e9e2196e9 | |||
| 151eefa1de | |||
| 2f7063b70d | |||
| 2e66cccf50 | |||
| 1ab0526f72 | |||
| a57bf235da | |||
| 1f24df1b8f | |||
| 49ced1983d | |||
| 7c973f1b88 |
21
.drone.yml
21
.drone.yml
@@ -1,21 +0,0 @@
|
||||
kind: pipeline
|
||||
name: sonarcube
|
||||
type: docker
|
||||
|
||||
steps:
|
||||
- name: code-analysis
|
||||
pull: if-not-exists
|
||||
image: sonarsource/sonar-scanner-cli
|
||||
commands:
|
||||
- sonar-scanner -Dsonar.host.url=$SONAR_HOST -Dsonar.login=$SONAR_TOKEN -Dsonar.projectKey=$SONAR_PROJECT_KEY -Dsonar.qualitygate.wait=true
|
||||
environment:
|
||||
SONAR_HOST:
|
||||
from_secret: sonar_host
|
||||
SONAR_TOKEN:
|
||||
from_secret: sonar_token
|
||||
SONAR_PROJECT_KEY:
|
||||
from_secret: sonar_project_key
|
||||
|
||||
trigger:
|
||||
event:
|
||||
- push
|
||||
@@ -17,4 +17,5 @@ requests==2.28.1
|
||||
onnx==1.12.0
|
||||
onnx-tf==1.10.0
|
||||
onnxruntime==1.12.1
|
||||
coremltools==6.3.0
|
||||
tensorflow
|
||||
tensorflow-probability
|
||||
|
||||
@@ -89,7 +89,7 @@ with torch.no_grad():
|
||||
df = pd.read_csv(args.dataset)
|
||||
df["embeddings"] = embeddings
|
||||
df = df[['embeddings', 'label_name', 'labels']]
|
||||
df['embeddings'] = df['embeddings'].apply(lambda x: x.tolist()[0])
|
||||
df['embeddings2'] = df['embeddings'].apply(lambda x: x.tolist()[0])
|
||||
|
||||
if args.format == 'json':
|
||||
df.to_json(args.output, orient='records')
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
# to run this script, you need torch 1.13.1 and torchvision 0.14.1
|
||||
|
||||
import numpy as np
|
||||
import onnx
|
||||
import torch
|
||||
import torchvision
|
||||
import os
|
||||
|
||||
from models.spoter_embedding_model import SPOTER_EMBEDDINGS
|
||||
|
||||
# set parameters of the model
|
||||
model_name = 'fingerspelling_embedding_model'
|
||||
model_name = 'embedding_model'
|
||||
output=32
|
||||
|
||||
# load PyTorch model from .pth file
|
||||
|
||||
@@ -17,7 +15,7 @@ device = torch.device("cpu")
|
||||
# if torch.cuda.is_available():
|
||||
# device = torch.device("cuda")
|
||||
|
||||
CHECKPOINT_PATH = "checkpoints/fingerspelling_checkpoint.pth"
|
||||
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
|
||||
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
|
||||
|
||||
model = SPOTER_EMBEDDINGS(
|
||||
@@ -29,39 +27,45 @@ model.load_state_dict(checkpoint["state_dict"])
|
||||
# set model to evaluation mode
|
||||
model.eval()
|
||||
|
||||
dummy_input = torch.randn(1, 10, 54, 2)
|
||||
model_export = "onnx"
|
||||
if model_export == "coreml":
|
||||
dummy_input = torch.randn(1, 10, 54, 2)
|
||||
# set device for dummy input
|
||||
dummy_input = dummy_input.to(device)
|
||||
traced_model = torch.jit.trace(model, dummy_input)
|
||||
|
||||
# check if models folder exists
|
||||
if not os.path.exists('out-models'):
|
||||
os.makedirs('out-models')
|
||||
out = traced_model(dummy_input)
|
||||
import coremltools as ct
|
||||
|
||||
for model_export in ["onnx", "coreml"]:
|
||||
if model_export == "coreml":
|
||||
# set device for dummy input
|
||||
dummy_input = dummy_input.to(device)
|
||||
traced_model = torch.jit.trace(model, dummy_input)
|
||||
# Convert to Core ML
|
||||
coreml_model = ct.convert(
|
||||
traced_model,
|
||||
inputs=[ct.TensorType(name="input", shape=dummy_input.shape)],
|
||||
)
|
||||
|
||||
out = traced_model(dummy_input)
|
||||
import coremltools as ct
|
||||
# Save Core ML model
|
||||
coreml_model.save("out-models/" + model_name + ".mlmodel")
|
||||
else:
|
||||
# create dummy input tensor
|
||||
dummy_input = torch.randn(1, 10, 54, 2)
|
||||
# set device for dummy input
|
||||
dummy_input = dummy_input.to(device)
|
||||
|
||||
# Convert to Core ML
|
||||
coreml_model = ct.convert(
|
||||
traced_model,
|
||||
inputs=[ct.TensorType(name="input", shape=dummy_input.shape)],
|
||||
)
|
||||
# export model to ONNX format
|
||||
output_file = 'models/' + model_name + '.onnx'
|
||||
torch.onnx.export(model, dummy_input, output_file, input_names=['input'], output_names=['output'])
|
||||
|
||||
# Save Core ML model
|
||||
coreml_model.save("out-models/" + model_name + ".mlmodel")
|
||||
else:
|
||||
# set device for dummy input
|
||||
dummy_input = dummy_input.to(device)
|
||||
torch.onnx.export(model, # model being run
|
||||
dummy_input, # model input (or a tuple for multiple inputs)
|
||||
'out-models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object)
|
||||
export_params=True, # store the trained parameter weights inside the model file
|
||||
opset_version=9, # the ONNX version to export the model to
|
||||
do_constant_folding=True, # whether to execute constant folding for optimization
|
||||
input_names = ['X'], # the model's input names
|
||||
output_names = ['Y'] # the model's output names
|
||||
)
|
||||
|
||||
torch.onnx.export(model, # model being run
|
||||
dummy_input, # model input (or a tuple for multiple inputs)
|
||||
'out-models/' + model_name + '.onnx', # where to save the model (can be a file or file-like object)
|
||||
export_params=True, # store the trained parameter weights inside the model file
|
||||
opset_version=9, # the ONNX version to export the model to
|
||||
do_constant_folding=True, # whether to execute constant folding for optimization
|
||||
input_names = ['X'], # the model's input names
|
||||
output_names = ['Y'] # the model's output names
|
||||
)
|
||||
|
||||
# load exported ONNX model for verification
|
||||
onnx_model = onnx.load(output_file)
|
||||
onnx.checker.check_model(onnx_model)
|
||||
@@ -88,10 +88,9 @@ def train_epoch_embedding_online(model, epoch_iters, train_loader, val_loader, c
|
||||
if enable_batch_sorting:
|
||||
if labels_size < train_loader.batch_size:
|
||||
trim_count = labels_size % mini_batch
|
||||
if trim_count > 0:
|
||||
inputs = inputs[:-trim_count]
|
||||
labels = labels[:-trim_count]
|
||||
masks = masks[:-trim_count]
|
||||
inputs = inputs[:-trim_count]
|
||||
labels = labels[:-trim_count]
|
||||
masks = masks[:-trim_count]
|
||||
embeddings = None
|
||||
with torch.no_grad():
|
||||
for j in range(batch_loop_count):
|
||||
|
||||
File diff suppressed because one or more lines are too long
93
predictions/k_nearest.py
Normal file
93
predictions/k_nearest.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
|
||||
# TODO scaling van distance tov intra distances?
|
||||
# TODO efficientere manier om k=1 te doen
|
||||
|
||||
|
||||
def minkowski_distance_p(x, y, p=2):
|
||||
x = np.asarray(x)
|
||||
y = np.asarray(y)
|
||||
|
||||
# Find the smallest common datatype with float64 (return type of this
|
||||
# function) - addresses #10262.
|
||||
# Don't just cast to float64 for complex input case.
|
||||
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
|
||||
'float64')
|
||||
|
||||
# Make sure x and y are NumPy arrays of correct datatype.
|
||||
x = x.astype(common_datatype)
|
||||
y = y.astype(common_datatype)
|
||||
|
||||
if p == np.inf:
|
||||
return np.amax(np.abs(y - x), axis=-1)
|
||||
elif p == 1:
|
||||
return np.sum(np.abs(y - x), axis=-1)
|
||||
else:
|
||||
return np.sum(np.abs(y - x) ** p, axis=-1)
|
||||
|
||||
|
||||
def minkowski_distance(x, y, p=2):
|
||||
x = np.asarray(x)
|
||||
y = np.asarray(y)
|
||||
if p == np.inf or p == 1:
|
||||
return minkowski_distance_p(x, y, p)
|
||||
else:
|
||||
return minkowski_distance_p(x, y, p) ** (1. / p)
|
||||
|
||||
|
||||
class KNearestNeighbours:
|
||||
def __init__(self, k=5):
|
||||
self.k = k
|
||||
self.embeddings = None
|
||||
self.embeddings_list = None
|
||||
|
||||
def set_embeddings(self, embeddings):
|
||||
self.embeddings = embeddings
|
||||
df = embeddings.drop(columns=['labels', 'label_name', 'embeddings'])
|
||||
# convert embedding from string to list of floats
|
||||
df["embeddings"] = df["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
|
||||
# drop embeddings2
|
||||
df = df.drop(columns=['embeddings2'])
|
||||
# to list
|
||||
self.embeddings_list = df["embeddings"].tolist()
|
||||
|
||||
def distance_matrix(self, keypoints, p=2, threshold=1000000):
|
||||
x = np.array(keypoints)
|
||||
m, k = x.shape
|
||||
y = np.asarray(self.embeddings_list)
|
||||
n, kk = y.shape
|
||||
|
||||
if k != kk:
|
||||
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
|
||||
f"{kk}-dimensional vectors")
|
||||
|
||||
if m * n * k <= threshold:
|
||||
# print("Using minkowski_distance")
|
||||
return minkowski_distance(x[:, np.newaxis, :], y[np.newaxis, :, :], p)
|
||||
else:
|
||||
result = np.empty((m, n), dtype=float) # FIXME: figure out the best dtype
|
||||
if m < n:
|
||||
for i in range(m):
|
||||
result[i, :] = minkowski_distance(x[i], y, p)
|
||||
else:
|
||||
for j in range(n):
|
||||
result[:, j] = minkowski_distance(x, y[j], p)
|
||||
return result
|
||||
|
||||
def predict(self, key_points_embeddings):
|
||||
# calculate distance matrix
|
||||
dist_matrix = self.distance_matrix(key_points_embeddings, p=2, threshold=1000000)
|
||||
|
||||
# get the 5 closest matches and select the class that is most common and use the average distance as the score
|
||||
# get the 5 closest matches
|
||||
indeces = np.argsort(dist_matrix)[0][:self.k]
|
||||
# get the labels
|
||||
labels = self.embeddings["label_name"].iloc[indeces].tolist()
|
||||
c = Counter(labels).most_common()[0][0]
|
||||
|
||||
# filter indeces to only include the most common label
|
||||
indeces = [i for i in indeces if self.embeddings["label_name"].iloc[i] == c]
|
||||
# get the average distance
|
||||
score = np.mean(dist_matrix[0][indeces])
|
||||
return c, score
|
||||
86
predictions/plotting.py
Normal file
86
predictions/plotting.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import json
|
||||
|
||||
from matplotlib import pyplot as plt
|
||||
|
||||
|
||||
def load_results():
|
||||
with open("predictions/test_results/knn.json", 'r') as f:
|
||||
results = json.load(f)
|
||||
return results
|
||||
|
||||
def plot_all():
|
||||
results = load_results()
|
||||
print(f"average elapsed time to detect a sign: {get_general_elapsed_time(results)}")
|
||||
plot_general_accuracy(results)
|
||||
for label in results.keys():
|
||||
plot_accuracy_per_label(results, label)
|
||||
|
||||
|
||||
def general_accuracy(results):
|
||||
label_accuracy = get_label_accuracy(results)
|
||||
accuracy = []
|
||||
amount = []
|
||||
response = []
|
||||
for label in label_accuracy.keys():
|
||||
for index, value in enumerate(label_accuracy[label]):
|
||||
if index >= len(accuracy):
|
||||
accuracy.append(0)
|
||||
amount.append(0)
|
||||
accuracy[index] += label_accuracy[label][index]
|
||||
amount[index] += 1
|
||||
for a, b in zip(accuracy, amount):
|
||||
if b < 5:
|
||||
break
|
||||
response.append(a / b)
|
||||
return response
|
||||
def plot_general_accuracy(results):
|
||||
accuracy = general_accuracy(results)
|
||||
plt.plot(accuracy)
|
||||
plt.title = "General accuracy"
|
||||
plt.ylabel('accuracy')
|
||||
plt.xlabel('buffer')
|
||||
plt.show()
|
||||
|
||||
|
||||
def plot_accuracy_per_label(results, label):
|
||||
accuracy = get_label_accuracy(results)
|
||||
plt.plot(accuracy[label], label=label)
|
||||
plt.titel = f"Accuracy per label {label}"
|
||||
plt.ylabel('accuracy')
|
||||
plt.xlabel('prediction')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
def get_label_accuracy(results):
|
||||
accuracy = {}
|
||||
amount = {}
|
||||
response = {}
|
||||
for label, predictions in results.items():
|
||||
if label not in accuracy:
|
||||
accuracy[label] = []
|
||||
amount[label] = []
|
||||
for prediction in predictions:
|
||||
for index, value in enumerate(prediction["predictions"]):
|
||||
if index >= len(accuracy[label]):
|
||||
accuracy[label].append(0)
|
||||
amount[label].append(0)
|
||||
accuracy[label][index] += 1 if value["correct"] else 0
|
||||
amount[label][index] += 1
|
||||
for label in accuracy:
|
||||
response[label] = []
|
||||
for index, value in enumerate(accuracy[label]):
|
||||
if amount[label][index] < 2:
|
||||
break
|
||||
response[label].append(accuracy[label][index] / amount[label][index])
|
||||
return response
|
||||
|
||||
def get_general_elapsed_time(results):
|
||||
label_time = get_label_elapsed_time(results)
|
||||
return sum([label_time[label] for label in results]) / len(results)
|
||||
|
||||
def get_label_elapsed_time(results):
|
||||
return {label: sum([result["elapsed_time"] for result in results[label]]) / len(results[label]) for label in results}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
plot_all()
|
||||
267
predictions/predictor.py
Normal file
267
predictions/predictor.py
Normal file
@@ -0,0 +1,267 @@
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
from predictions.k_nearest import KNearestNeighbours
|
||||
|
||||
device = torch.device("cpu")
|
||||
if torch.cuda.is_available():
|
||||
device = torch.device("cuda")
|
||||
from models import SPOTER_EMBEDDINGS
|
||||
|
||||
BODY_IDENTIFIERS = [
|
||||
0,
|
||||
33,
|
||||
5,
|
||||
2,
|
||||
8,
|
||||
7,
|
||||
12,
|
||||
11,
|
||||
14,
|
||||
13,
|
||||
16,
|
||||
15,
|
||||
]
|
||||
|
||||
HAND_IDENTIFIERS = [
|
||||
0,
|
||||
8,
|
||||
7,
|
||||
6,
|
||||
5,
|
||||
12,
|
||||
11,
|
||||
10,
|
||||
9,
|
||||
16,
|
||||
15,
|
||||
14,
|
||||
13,
|
||||
20,
|
||||
19,
|
||||
18,
|
||||
17,
|
||||
4,
|
||||
3,
|
||||
2,
|
||||
1,
|
||||
]
|
||||
|
||||
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
|
||||
|
||||
|
||||
class Predictor:
|
||||
def __init__(self, embeddings_path, predictor_type):
|
||||
|
||||
# Initialize MediaPipe Hands model
|
||||
self.holistic = mp.solutions.holistic.Holistic(
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
model_complexity=2
|
||||
)
|
||||
|
||||
self.mp_holistic = mp.solutions.holistic
|
||||
self.mp_drawing = mp.solutions.drawing_utils
|
||||
# buffer = []
|
||||
self.left_shoulder_index = 11
|
||||
self.right_shoulder_index = 12
|
||||
self.neck_index = 33
|
||||
self.nose_index = 0
|
||||
self.left_eye_index = 2
|
||||
|
||||
# load training embedding csv
|
||||
self.embeddings = pd.read_csv(embeddings_path)
|
||||
|
||||
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
|
||||
|
||||
self.model = SPOTER_EMBEDDINGS(
|
||||
features=checkpoint["config_args"].vector_length,
|
||||
hidden_dim=checkpoint["config_args"].hidden_dim,
|
||||
norm_emb=checkpoint["config_args"].normalize_embeddings,
|
||||
).to(device)
|
||||
|
||||
self.model.load_state_dict(checkpoint["state_dict"])
|
||||
|
||||
if predictor_type is None:
|
||||
self.predictor = KNearestNeighbours(1)
|
||||
else:
|
||||
self.predictor = predictor_type
|
||||
self.predictor.set_embeddings(self.embeddings)
|
||||
|
||||
def extract_keypoints(self, image_orig):
|
||||
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
|
||||
results = self.holistic.process(image)
|
||||
|
||||
def extract_keypoints(lmks):
|
||||
if lmks:
|
||||
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
|
||||
return a
|
||||
return None
|
||||
|
||||
def calculate_neck(keypoints):
|
||||
if keypoints is not None:
|
||||
left_shoulder = keypoints[11]
|
||||
right_shoulder = keypoints[12]
|
||||
|
||||
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2,
|
||||
(float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
|
||||
# add neck to keypoints
|
||||
keypoints = np.append(keypoints, [neck], axis=0)
|
||||
return keypoints
|
||||
return None
|
||||
|
||||
pose = extract_keypoints(results.pose_landmarks)
|
||||
pose = calculate_neck(pose)
|
||||
if pose is None:
|
||||
return None
|
||||
pose_norm = self.normalize_pose(pose)
|
||||
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
|
||||
pose_norm = pose_norm[BODY_IDENTIFIERS]
|
||||
|
||||
left_hand = extract_keypoints(results.left_hand_landmarks)
|
||||
right_hand = extract_keypoints(results.right_hand_landmarks)
|
||||
|
||||
if left_hand is None and right_hand is None:
|
||||
return None
|
||||
|
||||
# normalize hands
|
||||
if left_hand is not None:
|
||||
left_hand = self.normalize_hand(left_hand)
|
||||
else:
|
||||
left_hand = np.zeros((21, 2))
|
||||
if right_hand is not None:
|
||||
right_hand = self.normalize_hand(right_hand)
|
||||
else:
|
||||
right_hand = np.zeros((21, 2))
|
||||
|
||||
left_hand = left_hand[HAND_IDENTIFIERS]
|
||||
|
||||
right_hand = right_hand[HAND_IDENTIFIERS]
|
||||
|
||||
# combine pose and hands
|
||||
pose_norm = np.append(pose_norm, left_hand, axis=0)
|
||||
pose_norm = np.append(pose_norm, right_hand, axis=0)
|
||||
|
||||
# move interval
|
||||
pose_norm -= 0.5
|
||||
|
||||
return pose_norm
|
||||
|
||||
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
|
||||
def normalize_pose(self, keypoints):
|
||||
left_shoulder = keypoints[self.left_shoulder_index]
|
||||
right_shoulder = keypoints[self.right_shoulder_index]
|
||||
|
||||
neck = keypoints[self.neck_index]
|
||||
nose = keypoints[self.nose_index]
|
||||
|
||||
# Prevent from even starting the analysis if some necessary elements are not present
|
||||
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
|
||||
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
|
||||
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
|
||||
return keypoints
|
||||
|
||||
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (
|
||||
left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
|
||||
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + (
|
||||
(left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
|
||||
head_metric = shoulder_distance
|
||||
else:
|
||||
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
|
||||
head_metric = neck_nose_distance
|
||||
|
||||
# Set the starting and ending point of the normalization bounding box
|
||||
starting_point = [keypoints[self.neck_index][0] - 3 * head_metric,
|
||||
keypoints[self.left_eye_index][1] + head_metric]
|
||||
ending_point = [keypoints[self.neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
|
||||
|
||||
if starting_point[0] < 0:
|
||||
starting_point[0] = 0
|
||||
if starting_point[1] < 0:
|
||||
starting_point[1] = 0
|
||||
if ending_point[0] < 0:
|
||||
ending_point[0] = 0
|
||||
if ending_point[1] < 0:
|
||||
ending_point[1] = 0
|
||||
|
||||
# Normalize the keypoints
|
||||
for i in range(len(keypoints)):
|
||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
|
||||
|
||||
return keypoints
|
||||
|
||||
def normalize_hand(self, keypoints):
|
||||
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
|
||||
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
|
||||
|
||||
if not x_values or not y_values:
|
||||
return keypoints
|
||||
|
||||
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
|
||||
if width > height:
|
||||
delta_x = 0.1 * width
|
||||
delta_y = delta_x + ((width - height) / 2)
|
||||
else:
|
||||
delta_y = 0.1 * height
|
||||
delta_x = delta_y + ((height - width) / 2)
|
||||
|
||||
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
|
||||
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
|
||||
|
||||
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
|
||||
return keypoints
|
||||
|
||||
# normalize keypoints
|
||||
for i in range(len(keypoints)):
|
||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
|
||||
|
||||
return keypoints
|
||||
|
||||
|
||||
def get_embedding(self, keypoints):
|
||||
# run model on frame
|
||||
self.model.eval()
|
||||
with torch.no_grad():
|
||||
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
|
||||
new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0]
|
||||
return new_embeddings
|
||||
|
||||
def predict(self, embeddings):
|
||||
return self.predictor.predict(embeddings)
|
||||
|
||||
def make_prediction(self, keypoints):
|
||||
# run model on frame
|
||||
self.model.eval()
|
||||
with torch.no_grad():
|
||||
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
|
||||
new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0]
|
||||
|
||||
return self.predictor.predict(new_embeddings)
|
||||
|
||||
def validation(self):
|
||||
# load validation data
|
||||
validation_data = np.load('validation_data.npy', allow_pickle=True)
|
||||
validation_labels = np.load('validation_labels.npy', allow_pickle=True)
|
||||
|
||||
# run model on validation data
|
||||
self.model.eval()
|
||||
with torch.no_grad():
|
||||
validation_embeddings = self.model(torch.from_numpy(validation_data).float().to(device)).cpu().numpy()
|
||||
|
||||
# predict validation data
|
||||
predictions = self.predictor.predict(validation_embeddings)
|
||||
|
||||
# calculate accuracy
|
||||
correct = 0
|
||||
for i in range(len(predictions)):
|
||||
if predictions[i] == validation_labels[i]:
|
||||
correct += 1
|
||||
accuracy = correct / len(predictions)
|
||||
print('Accuracy: ' + str(accuracy))
|
||||
|
||||
|
||||
34
predictions/svm_model.py
Normal file
34
predictions/svm_model.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from sklearn import svm
|
||||
|
||||
class SVM:
|
||||
def __init__(self, type="ovo"):
|
||||
self.label_name_to_label = None
|
||||
self.clf = None
|
||||
self.embeddings_list = None
|
||||
self.labels = None
|
||||
self.type = type
|
||||
|
||||
def set_embeddings(self, embeddings):
|
||||
# convert embedding from string to list of floats
|
||||
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
|
||||
# drop embeddings2
|
||||
df = embeddings.drop(columns=['embeddings2'])
|
||||
# to list
|
||||
self.embeddings_list = df["embeddings"].tolist()
|
||||
self.labels = df["labels"].tolist()
|
||||
self.label_name_to_label = df[["label_name", "labels"]]
|
||||
self.label_name_to_label.columns = ["label_name", "label"]
|
||||
self.label_name_to_label = self.label_name_to_label.drop_duplicates()
|
||||
|
||||
self.train()
|
||||
|
||||
def train(self):
|
||||
self.clf = svm.SVC(decision_function_shape=self.type, probability=True)
|
||||
self.clf.fit(self.embeddings_list, self.labels)
|
||||
|
||||
def predict(self, key_points_embeddings):
|
||||
label = self.clf.predict(key_points_embeddings)
|
||||
score = self.clf.predict_log_proba(key_points_embeddings)
|
||||
# TODO fix dictionary
|
||||
label = label.item()
|
||||
return self.label_name_to_label.loc[self.label_name_to_label["label"] == label]["label_name"].iloc[0], score[0][label]
|
||||
1
predictions/test_results/knn.json
Normal file
1
predictions/test_results/knn.json
Normal file
File diff suppressed because one or more lines are too long
137
predictions/validation.py
Normal file
137
predictions/validation.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
|
||||
from predictions.k_nearest import KNearestNeighbours
|
||||
from predictions.predictor import Predictor
|
||||
from predictions.svm_model import SVM
|
||||
|
||||
buffer_size = 15
|
||||
|
||||
|
||||
def predict_video(predictor, path_video):
|
||||
# open mp4 video
|
||||
cap = cv2.VideoCapture(path_video)
|
||||
buffer = []
|
||||
ret, img = cap.read() # read one frame from the 'capture' object; img is (H, W, C)
|
||||
desired_fps = 15
|
||||
original_fps = int(cap.get(cv2.CAP_PROP_FPS))
|
||||
print("Original FPS: ", original_fps)
|
||||
# Calculate the frame skipping rate based on desired frame rate
|
||||
frame_skip = original_fps // desired_fps
|
||||
if frame_skip == 0:
|
||||
frame_skip = 1
|
||||
print("Frame skip: ", frame_skip)
|
||||
frame_number = 0
|
||||
while img is not None:
|
||||
pose = predictor.extract_keypoints(img)
|
||||
if pose is not None and frame_number % frame_skip == 0:
|
||||
buffer.append(pose)
|
||||
frame_number += 1
|
||||
ret, img = cap.read() # read one frame from the 'capture' object; img is (H, W, C)
|
||||
print(len(buffer))
|
||||
return buffer
|
||||
|
||||
|
||||
def get_embeddings(predictor, buffer, name):
|
||||
# check if file exists with name
|
||||
# if os.path.exists("predictions/test_embeddings/" + name + ".csv"):
|
||||
# print("Loading embeddings from file")
|
||||
# # load embeddings from file
|
||||
# with open("predictions/test_embeddings/" + name + ".csv", 'r') as f:
|
||||
# embeddings = json.load(f)
|
||||
# else:
|
||||
embeddings = []
|
||||
for index in range(buffer_size, len(buffer)):
|
||||
embedding = predictor.get_embedding(buffer[index - buffer_size:index])
|
||||
embeddings.append(embedding)
|
||||
with open("predictions/test_embeddings/" + name + ".csv", 'w') as f:
|
||||
json.dump(embeddings, f)
|
||||
return embeddings
|
||||
|
||||
|
||||
def compare_embeddings(predictor, embeddings, label_video, ):
|
||||
results = []
|
||||
for embedding in embeddings:
|
||||
label, score = predictor.predict(embedding)
|
||||
|
||||
results.append({"label": label, "score": score, "label_video": label_video, "correct": label == label_video})
|
||||
return results
|
||||
|
||||
|
||||
def predict_video_files(predictor, path_video, label_video):
|
||||
buffer = predict_video(predictor, path_video)
|
||||
embeddings = get_embeddings(predictor, buffer, path_video.split("/")[-1].split(".")[0])
|
||||
return compare_embeddings(predictor, embeddings, label_video)
|
||||
|
||||
|
||||
def get_test_data(data_folder):
|
||||
files = np.array([data_folder + f for f in os.listdir(data_folder) if f.endswith(".mp4")])
|
||||
train_test = [f.split("/")[-1].split("!")[1] for f in files]
|
||||
test_files = files[np.array(train_test) == "test"]
|
||||
test_labels = [f.split("/")[-1].split("!")[0] for f in test_files]
|
||||
|
||||
return test_files, test_labels
|
||||
|
||||
|
||||
def test_data(predictor, data_folder):
|
||||
results = {}
|
||||
for path_video, label_video in zip(*get_test_data(data_folder)):
|
||||
print(path_video, label_video)
|
||||
start_time = time.time()
|
||||
prediction = predict_video_files(predictor, path_video, label_video)
|
||||
end_time = time.time()
|
||||
elapsed_time = end_time - start_time
|
||||
|
||||
# divide elapsed time by amount of predictions made so it represents an avarage execution time
|
||||
if len(prediction) > 0:
|
||||
elapsed_time /= len(prediction)
|
||||
if label_video not in results:
|
||||
results[label_video] = []
|
||||
results[label_video].append({"predictions": prediction, "elapsed_time": elapsed_time, "video": path_video})
|
||||
|
||||
print("DONE")
|
||||
return results
|
||||
|
||||
|
||||
def plot_general_accuracy(results):
|
||||
accuracy = []
|
||||
amount = []
|
||||
for result in results:
|
||||
for index, value in enumerate(result[0]):
|
||||
if len(accuracy) <= index:
|
||||
accuracy.append(0)
|
||||
amount.append(0)
|
||||
accuracy[index] += 1 if value["correct"] else 0
|
||||
amount[index] += 1
|
||||
# plot the general accuracy
|
||||
plt.plot(accuracy)
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
type_predictor = "knn"
|
||||
if type_predictor == "knn":
|
||||
k = 1
|
||||
predictor_type = KNearestNeighbours(k)
|
||||
elif type_predictor == "svm":
|
||||
predictor_type = SVM()
|
||||
else:
|
||||
predictor_type = KNearestNeighbours(1)
|
||||
|
||||
# embeddings_path = 'embeddings/basic-signs/embeddings.csv'
|
||||
embeddings_path = 'embeddings/fingerspelling/embeddings.csv'
|
||||
|
||||
predictor = Predictor(embeddings_path, predictor_type)
|
||||
|
||||
data_folder = '/home/tibe/Projects/design_project/sign-predictor/data/fingerspelling/data/'
|
||||
results = test_data(predictor, data_folder)
|
||||
# write results to a results json file
|
||||
with open("predictions/test_results/" + type_predictor + ".json", 'w') as f:
|
||||
json.dump(results, f)
|
||||
print(results)
|
||||
# plot_general_accuracy(results)
|
||||
@@ -35,11 +35,7 @@ class LandmarksResults:
|
||||
):
|
||||
self.results = results
|
||||
self.num_landmarks_pose = num_landmarks_pose
|
||||
self.num_landmarks_hand = num_landmarks_hand
|
||||
|
||||
@property
|
||||
def empty(self):
|
||||
return self.results.pose_landmarks is None or (self.results.left_hand_landmarks is None and self.results.right_hand_landmarks is None)
|
||||
self.num_landmarks_hand = num_landmarks_hand
|
||||
|
||||
@property
|
||||
def pose_landmarks(self):
|
||||
@@ -71,10 +67,6 @@ def get_landmarks(image_orig, holistic, debug=False):
|
||||
# Convert the BGR image to RGB before processing.
|
||||
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
|
||||
results = LandmarksResults(holistic.process(image))
|
||||
|
||||
if results.empty:
|
||||
return None
|
||||
|
||||
if debug:
|
||||
lmks_pose = []
|
||||
for lmk in results.pose_landmarks:
|
||||
@@ -102,7 +94,6 @@ def get_landmarks(image_orig, holistic, debug=False):
|
||||
len(lmks_right_hand) == 2 * LEN_LANDMARKS_HAND
|
||||
), f"{len(lmks_right_hand)} != {2 * LEN_LANDMARKS_HAND}"
|
||||
landmarks = []
|
||||
|
||||
for lmk in chain(
|
||||
results.pose_landmarks,
|
||||
results.left_hand_landmarks,
|
||||
@@ -137,11 +128,6 @@ def extract(args):
|
||||
videos_folder = args.videos_folder
|
||||
os.makedirs(landmarks_output, exist_ok=True)
|
||||
for fn_video in tqdm(sorted(glob.glob(op.join(videos_folder, "*mp4")))):
|
||||
|
||||
# check if landmarks already exist
|
||||
if op.exists(op.join(landmarks_output, op.basename(fn_video).split(".")[0] + ".npy")):
|
||||
continue
|
||||
|
||||
cap = cv2.VideoCapture(fn_video)
|
||||
ret, image_orig = cap.read()
|
||||
height, width = image_orig.shape[:2]
|
||||
@@ -149,7 +135,7 @@ def extract(args):
|
||||
|
||||
# make sure fps is 20 by determining the number of frames to be skipped
|
||||
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
|
||||
frame_skip = (frame_rate // 10) - 1
|
||||
frame_skip = (frame_rate // 20) - 1
|
||||
|
||||
|
||||
with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
|
||||
@@ -168,8 +154,7 @@ def extract(args):
|
||||
for _ in range(frame_skip):
|
||||
ret, image_orig = cap.read()
|
||||
pbar.update(1)
|
||||
if landmarks:
|
||||
landmarks_video.append(landmarks)
|
||||
landmarks_video.append(landmarks)
|
||||
pbar.update(1)
|
||||
landmarks_video = np.vstack(landmarks_video)
|
||||
np.save(
|
||||
|
||||
@@ -16,9 +16,6 @@ with open("data/sign_to_prediction_index_map.json", "r") as f:
|
||||
# filter df to make sure each sign has at least 4 samples
|
||||
df = df[df["sign"].map(df["sign"].value_counts()) > 4]
|
||||
|
||||
# print number of unique signs
|
||||
print("Number of unique signs: ", len(df["sign"].unique()))
|
||||
|
||||
# use the path column to split the dataset
|
||||
paths = df["path"].unique()
|
||||
|
||||
|
||||
@@ -12,5 +12,4 @@ clearml==1.10.3
|
||||
torch==2.0.0
|
||||
torchvision==0.15.1
|
||||
tqdm==4.54.1
|
||||
optuna==3.1.1
|
||||
onnx==1.14.0
|
||||
optuna==3.1.1
|
||||
3
train.py
3
train.py
@@ -246,9 +246,6 @@ def train(args, tracker: Tracker):
|
||||
val_accs.append(val_acc)
|
||||
tracker.log_scalar_metric("acc", "val", epoch, val_acc)
|
||||
|
||||
create_embedding_scatter_plots(tracker, slrt_model, train_loader, val_loader, device, id_to_label, epoch,
|
||||
top_model_name)
|
||||
|
||||
logger.info(f"Epoch time: {datetime.now() - start_time}")
|
||||
logger.info("[" + str(epoch) + "] TRAIN loss: " + str(train_loss) + " acc: " + str(train_accs[-1]))
|
||||
logger.info("[" + str(epoch) + "] VALIDATION acc: " + str(val_accs[-1]))
|
||||
|
||||
19
train.sh
19
train.sh
@@ -1,24 +1,23 @@
|
||||
#!/bin/sh
|
||||
python3 -m train \
|
||||
--save_checkpoints_every 10 \
|
||||
--experiment_name "Finetune Fingerspelling Signs" \
|
||||
--epochs 1000 \
|
||||
--save_checkpoints_every 1 \
|
||||
--experiment_name "Finetune Basic Signs" \
|
||||
--epochs 100 \
|
||||
--optimizer "ADAM" \
|
||||
--lr 0.00001 \
|
||||
--batch_size 8 \
|
||||
--dataset_name "FingerSpelling" \
|
||||
--batch_size 16 \
|
||||
--dataset_name "BasicSigns" \
|
||||
--training_set_path "train.csv" \
|
||||
--validation_set_path "val.csv" \
|
||||
--vector_length 32 \
|
||||
--epoch_iters -1 \
|
||||
--scheduler_factor 0 \
|
||||
--hard_triplet_mining "in_batch" \
|
||||
--scheduler_factor 0.05 \
|
||||
--hard_triplet_mining "None" \
|
||||
--filter_easy_triplets \
|
||||
--start_mining_hard 50 \
|
||||
--triplet_loss_margin 4 \
|
||||
--triplet_loss_margin 2 \
|
||||
--dropout 0.2 \
|
||||
--tracker=clearml \
|
||||
--dataset_loader=clearml \
|
||||
--dataset_project="SpoterEmbedding" \
|
||||
--finetune \
|
||||
--checkpoint_path "checkpoints/checkpoint_embed_3835.pth"
|
||||
--checkpoint_path "checkpoints/checkpoint_embed_3006.pth"
|
||||
359
webcam.py
359
webcam.py
@@ -1,339 +1,54 @@
|
||||
|
||||
from collections import Counter
|
||||
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
device = torch.device("cpu")
|
||||
if torch.cuda.is_available():
|
||||
device = torch.device("cuda")
|
||||
from models import SPOTER_EMBEDDINGS
|
||||
from predictions.k_nearest import KNearestNeighbours
|
||||
from predictions.predictor import Predictor
|
||||
from predictions.svm_model import SVM
|
||||
|
||||
# Initialize MediaPipe Hands model
|
||||
holistic = mp.solutions.holistic.Holistic(
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
model_complexity=2
|
||||
)
|
||||
mp_holistic = mp.solutions.holistic
|
||||
mp_drawing = mp.solutions.drawing_utils
|
||||
if __name__ == '__main__':
|
||||
buffer = []
|
||||
# open webcam stream
|
||||
cap = cv2.VideoCapture(0)
|
||||
|
||||
BODY_IDENTIFIERS = [
|
||||
0,
|
||||
33,
|
||||
5,
|
||||
2,
|
||||
8,
|
||||
7,
|
||||
12,
|
||||
11,
|
||||
14,
|
||||
13,
|
||||
16,
|
||||
15,
|
||||
]
|
||||
|
||||
HAND_IDENTIFIERS = [
|
||||
0,
|
||||
8,
|
||||
7,
|
||||
6,
|
||||
5,
|
||||
12,
|
||||
11,
|
||||
10,
|
||||
9,
|
||||
16,
|
||||
15,
|
||||
14,
|
||||
13,
|
||||
20,
|
||||
19,
|
||||
18,
|
||||
17,
|
||||
4,
|
||||
3,
|
||||
2,
|
||||
1,
|
||||
]
|
||||
|
||||
def extract_keypoints(image_orig):
|
||||
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
|
||||
results = holistic.process(image)
|
||||
|
||||
def extract_keypoints(lmks):
|
||||
if lmks:
|
||||
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
|
||||
return a
|
||||
return None
|
||||
|
||||
def calculate_neck(keypoints):
|
||||
left_shoulder = keypoints[11]
|
||||
right_shoulder = keypoints[12]
|
||||
|
||||
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
|
||||
# add neck to keypoints
|
||||
keypoints = np.append(keypoints, [neck], axis=0)
|
||||
return keypoints
|
||||
|
||||
pose = extract_keypoints(results.pose_landmarks)
|
||||
pose = calculate_neck(pose)
|
||||
pose_norm = normalize_pose(pose)
|
||||
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
|
||||
pose_norm = pose_norm[BODY_IDENTIFIERS]
|
||||
|
||||
left_hand = extract_keypoints(results.left_hand_landmarks)
|
||||
right_hand = extract_keypoints(results.right_hand_landmarks)
|
||||
|
||||
if left_hand is None and right_hand is None:
|
||||
return None
|
||||
|
||||
# normalize hands
|
||||
if left_hand is not None:
|
||||
left_hand = normalize_hand(left_hand)
|
||||
type_predictor = "svm"
|
||||
if type_predictor == "knn":
|
||||
k = 10
|
||||
predictor_type = KNearestNeighbours(k)
|
||||
elif type_predictor == "svm":
|
||||
predictor_type = SVM()
|
||||
else:
|
||||
left_hand = np.zeros((21, 2))
|
||||
if right_hand is not None:
|
||||
right_hand = normalize_hand(right_hand)
|
||||
else:
|
||||
right_hand = np.zeros((21, 2))
|
||||
|
||||
left_hand = left_hand[HAND_IDENTIFIERS]
|
||||
|
||||
right_hand = right_hand[HAND_IDENTIFIERS]
|
||||
|
||||
# combine pose and hands
|
||||
pose_norm = np.append(pose_norm, left_hand, axis=0)
|
||||
pose_norm = np.append(pose_norm, right_hand, axis=0)
|
||||
|
||||
# move interval
|
||||
pose_norm -= 0.5
|
||||
|
||||
return pose_norm
|
||||
predictor_type = KNearestNeighbours(1)
|
||||
|
||||
|
||||
buffer = []
|
||||
|
||||
left_shoulder_index = 11
|
||||
right_shoulder_index = 12
|
||||
neck_index = 33
|
||||
nose_index = 0
|
||||
left_eye_index = 2
|
||||
# embeddings_path = 'embeddings/basic-signs/embeddings.csv'
|
||||
embeddings_path = 'embeddings/fingerspelling/embeddings.csv'
|
||||
|
||||
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
|
||||
def normalize_pose(keypoints):
|
||||
left_shoulder = keypoints[left_shoulder_index]
|
||||
right_shoulder = keypoints[right_shoulder_index]
|
||||
predictor = Predictor(embeddings_path, predictor_type)
|
||||
|
||||
neck = keypoints[neck_index]
|
||||
nose = keypoints[nose_index]
|
||||
index = 0
|
||||
|
||||
# Prevent from even starting the analysis if some necessary elements are not present
|
||||
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
|
||||
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
|
||||
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
|
||||
return keypoints
|
||||
|
||||
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
|
||||
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
|
||||
head_metric = shoulder_distance
|
||||
else:
|
||||
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
|
||||
head_metric = neck_nose_distance
|
||||
while cap.isOpened():
|
||||
# Wait for key press to exit
|
||||
if cv2.waitKey(5) & 0xFF == 27:
|
||||
break
|
||||
|
||||
# Set the starting and ending point of the normalization bounding box
|
||||
starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric]
|
||||
ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
|
||||
ret, frame = cap.read()
|
||||
pose = predictor.extract_keypoints(frame)
|
||||
|
||||
if starting_point[0] < 0:
|
||||
starting_point[0] = 0
|
||||
if starting_point[1] < 0:
|
||||
starting_point[1] = 0
|
||||
if ending_point[0] < 0:
|
||||
ending_point[0] = 0
|
||||
if ending_point[1] < 0:
|
||||
ending_point[1] = 0
|
||||
if pose is None:
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
continue
|
||||
|
||||
# Normalize the keypoints
|
||||
for i in range(len(keypoints)):
|
||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
|
||||
buffer.append(pose)
|
||||
if len(buffer) > 15:
|
||||
buffer.pop(0)
|
||||
|
||||
return keypoints
|
||||
if len(buffer) == 15:
|
||||
label, score = predictor.make_prediction(buffer)
|
||||
|
||||
def normalize_hand(keypoints):
|
||||
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
|
||||
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
|
||||
# draw label
|
||||
cv2.putText(frame, str(label), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||
|
||||
if not x_values or not y_values:
|
||||
return keypoints
|
||||
|
||||
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
|
||||
if width > height:
|
||||
delta_x = 0.1 * width
|
||||
delta_y = delta_x + ((width - height) / 2)
|
||||
else:
|
||||
delta_y = 0.1 * height
|
||||
delta_x = delta_y + ((height - width) / 2)
|
||||
|
||||
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
|
||||
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
|
||||
|
||||
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
|
||||
return keypoints
|
||||
|
||||
# normalize keypoints
|
||||
for i in range(len(keypoints)):
|
||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
|
||||
|
||||
return keypoints
|
||||
|
||||
|
||||
# load training embedding csv
|
||||
df = pd.read_csv('embeddings/basic-signs/embeddings.csv')
|
||||
|
||||
def minkowski_distance_p(x, y, p=2):
|
||||
x = np.asarray(x)
|
||||
y = np.asarray(y)
|
||||
|
||||
# Find smallest common datatype with float64 (return type of this
|
||||
# function) - addresses #10262.
|
||||
# Don't just cast to float64 for complex input case.
|
||||
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
|
||||
'float64')
|
||||
|
||||
# Make sure x and y are NumPy arrays of correct datatype.
|
||||
x = x.astype(common_datatype)
|
||||
y = y.astype(common_datatype)
|
||||
|
||||
if p == np.inf:
|
||||
return np.amax(np.abs(y-x), axis=-1)
|
||||
elif p == 1:
|
||||
return np.sum(np.abs(y-x), axis=-1)
|
||||
else:
|
||||
return np.sum(np.abs(y-x)**p, axis=-1)
|
||||
|
||||
def minkowski_distance(x, y, p=2):
|
||||
x = np.asarray(x)
|
||||
y = np.asarray(y)
|
||||
if p == np.inf or p == 1:
|
||||
return minkowski_distance_p(x, y, p)
|
||||
else:
|
||||
return minkowski_distance_p(x, y, p)**(1./p)
|
||||
|
||||
|
||||
def distance_matrix(keypoints, embeddings, p=2, threshold=1000000):
|
||||
|
||||
x = np.array(keypoints)
|
||||
m, k = x.shape
|
||||
y = np.asarray(embeddings)
|
||||
n, kk = y.shape
|
||||
|
||||
if k != kk:
|
||||
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
|
||||
f"{kk}-dimensional vectors")
|
||||
|
||||
if m*n*k <= threshold:
|
||||
print("Using minkowski_distance")
|
||||
return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p)
|
||||
else:
|
||||
result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype
|
||||
if m < n:
|
||||
for i in range(m):
|
||||
result[i,:] = minkowski_distance(x[i],y,p)
|
||||
else:
|
||||
for j in range(n):
|
||||
result[:,j] = minkowski_distance(x,y[j],p)
|
||||
return result
|
||||
|
||||
|
||||
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
|
||||
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
|
||||
|
||||
model = SPOTER_EMBEDDINGS(
|
||||
features=checkpoint["config_args"].vector_length,
|
||||
hidden_dim=checkpoint["config_args"].hidden_dim,
|
||||
norm_emb=checkpoint["config_args"].normalize_embeddings,
|
||||
).to(device)
|
||||
|
||||
model.load_state_dict(checkpoint["state_dict"])
|
||||
embeddings = df.drop(columns=['labels', 'label_name', 'embeddings'])
|
||||
|
||||
# convert embedding from string to list of floats
|
||||
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
|
||||
# drop embeddings2
|
||||
embeddings = embeddings.drop(columns=['embeddings2'])
|
||||
# to list
|
||||
embeddings = embeddings["embeddings"].tolist()
|
||||
|
||||
def make_prediction(keypoints):
|
||||
# run model on frame
|
||||
model.eval()
|
||||
with torch.no_grad():
|
||||
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
|
||||
new_embeddings = model(keypoints).cpu().numpy().tolist()[0]
|
||||
|
||||
# calculate distance matrix
|
||||
dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000)
|
||||
|
||||
# get the 5 closest matches and select the class that is most common and use the average distance as the score
|
||||
# get the 5 closest matches
|
||||
indeces = np.argsort(dist_matrix)[0][:5]
|
||||
# get the labels
|
||||
labels = df["label_name"].iloc[indeces].tolist()
|
||||
c = Counter(labels).most_common()[0][0]
|
||||
|
||||
# filter indeces to only include the most common label
|
||||
indeces = [i for i in indeces if df["label_name"].iloc[i] == c]
|
||||
# get the average distance
|
||||
score = np.mean(dist_matrix[0][indeces])
|
||||
|
||||
return c, score
|
||||
|
||||
# open webcam stream
|
||||
cap = cv2.VideoCapture(0)
|
||||
|
||||
while cap.isOpened():
|
||||
# read frame
|
||||
ret, frame = cap.read()
|
||||
pose = extract_keypoints(frame)
|
||||
|
||||
if pose is None:
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
continue
|
||||
|
||||
buffer.append(pose)
|
||||
if len(buffer) > 15:
|
||||
buffer.pop(0)
|
||||
|
||||
if len(buffer) == 15:
|
||||
label, score = make_prediction(buffer)
|
||||
|
||||
# draw label
|
||||
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||
|
||||
# Show the frame
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
|
||||
# Wait for key press to exit
|
||||
if cv2.waitKey(5) & 0xFF == 27:
|
||||
break
|
||||
|
||||
# open video A.mp4
|
||||
# cap = cv2.VideoCapture('E.mp4')
|
||||
# while cap.isOpened():
|
||||
# # read frame
|
||||
# ret, frame = cap.read()
|
||||
# if frame is None:
|
||||
# break
|
||||
# pose = extract_keypoints(frame)
|
||||
|
||||
# buffer.append(pose)
|
||||
|
||||
# label, score = make_prediction(buffer)
|
||||
# print(label, score)
|
||||
# Show the frame
|
||||
cv2.imshow('MediaPipe Hands', frame)
|
||||
Reference in New Issue
Block a user