Files
spoterembedding/webcam.py
2023-04-14 14:35:53 +02:00

338 lines
9.9 KiB
Python

from collections import Counter
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import torch
device = torch.device("cpu")
if torch.cuda.is_available():
device = torch.device("cuda")
from models import SPOTER_EMBEDDINGS
# Initialize MediaPipe Hands model
holistic = mp.solutions.holistic.Holistic(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=2
)
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
BODY_IDENTIFIERS = [
0,
33,
5,
2,
8,
7,
12,
11,
14,
13,
16,
15,
]
HAND_IDENTIFIERS = [
0,
8,
7,
6,
5,
12,
11,
10,
9,
16,
15,
14,
13,
20,
19,
18,
17,
4,
3,
2,
1,
]
def extract_keypoints(image_orig):
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
results = holistic.process(image)
def extract_keypoints(lmks):
if lmks:
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
return a
return None
def calculate_neck(keypoints):
left_shoulder = keypoints[11]
right_shoulder = keypoints[12]
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
# add neck to keypoints
keypoints = np.append(keypoints, [neck], axis=0)
return keypoints
pose = extract_keypoints(results.pose_landmarks)
pose = calculate_neck(pose)
pose_norm = normalize_pose(pose)
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
pose_norm = pose_norm[BODY_IDENTIFIERS]
left_hand = extract_keypoints(results.left_hand_landmarks)
right_hand = extract_keypoints(results.right_hand_landmarks)
if left_hand is None and right_hand is None:
return None
# normalize hands
if left_hand is not None:
left_hand = normalize_hand(left_hand)
else:
left_hand = np.zeros((21, 2))
if right_hand is not None:
right_hand = normalize_hand(right_hand)
else:
right_hand = np.zeros((21, 2))
left_hand = left_hand[HAND_IDENTIFIERS]
right_hand = right_hand[HAND_IDENTIFIERS]
# combine pose and hands
pose_norm = np.append(pose_norm, left_hand, axis=0)
pose_norm = np.append(pose_norm, right_hand, axis=0)
# move interval
pose_norm -= 0.5
return pose_norm
buffer = []
left_shoulder_index = 11
right_shoulder_index = 12
neck_index = 33
nose_index = 0
left_eye_index = 2
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
def normalize_pose(keypoints):
left_shoulder = keypoints[left_shoulder_index]
right_shoulder = keypoints[right_shoulder_index]
neck = keypoints[neck_index]
nose = keypoints[nose_index]
# Prevent from even starting the analysis if some necessary elements are not present
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
return keypoints
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
head_metric = shoulder_distance
else:
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
head_metric = neck_nose_distance
# Set the starting and ending point of the normalization bounding box
starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric]
ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
if starting_point[0] < 0:
starting_point[0] = 0
if starting_point[1] < 0:
starting_point[1] = 0
if ending_point[0] < 0:
ending_point[0] = 0
if ending_point[1] < 0:
ending_point[1] = 0
# Normalize the keypoints
for i in range(len(keypoints)):
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
return keypoints
def normalize_hand(keypoints):
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
if not x_values or not y_values:
return keypoints
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
if width > height:
delta_x = 0.1 * width
delta_y = delta_x + ((width - height) / 2)
else:
delta_y = 0.1 * height
delta_x = delta_y + ((height - width) / 2)
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
return keypoints
# normalize keypoints
for i in range(len(keypoints)):
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
return keypoints
# load training embedding csv
df = pd.read_csv('embeddings/basic-signs/embeddings.csv')
def minkowski_distance_p(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
# Find smallest common datatype with float64 (return type of this
# function) - addresses #10262.
# Don't just cast to float64 for complex input case.
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
'float64')
# Make sure x and y are NumPy arrays of correct datatype.
x = x.astype(common_datatype)
y = y.astype(common_datatype)
if p == np.inf:
return np.amax(np.abs(y-x), axis=-1)
elif p == 1:
return np.sum(np.abs(y-x), axis=-1)
else:
return np.sum(np.abs(y-x)**p, axis=-1)
def minkowski_distance(x, y, p=2):
x = np.asarray(x)
y = np.asarray(y)
if p == np.inf or p == 1:
return minkowski_distance_p(x, y, p)
else:
return minkowski_distance_p(x, y, p)**(1./p)
def distance_matrix(keypoints, embeddings, p=2, threshold=1000000):
x = np.array(keypoints)
m, k = x.shape
y = np.asarray(embeddings)
n, kk = y.shape
if k != kk:
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
f"{kk}-dimensional vectors")
if m*n*k <= threshold:
return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p)
else:
result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype
if m < n:
for i in range(m):
result[i,:] = minkowski_distance(x[i],y,p)
else:
for j in range(n):
result[:,j] = minkowski_distance(x,y[j],p)
return result
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
model = SPOTER_EMBEDDINGS(
features=checkpoint["config_args"].vector_length,
hidden_dim=checkpoint["config_args"].hidden_dim,
norm_emb=checkpoint["config_args"].normalize_embeddings,
).to(device)
model.load_state_dict(checkpoint["state_dict"])
embeddings = df.drop(columns=['labels', 'label_name', 'embeddings'])
# convert embedding from string to list of floats
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
# drop embeddings2
embeddings = embeddings.drop(columns=['embeddings2'])
# to list
embeddings = embeddings["embeddings"].tolist()
def make_prediction(keypoints):
# run model on frame
model.eval()
with torch.no_grad():
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
new_embeddings = model(keypoints).cpu().numpy().tolist()[0]
# calculate distance matrix
dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000)
# get the 5 closest matches and select the class that is most common and use the average distance as the score
# get the 5 closest matches
indeces = np.argsort(dist_matrix)[0][:5]
# get the labels
labels = df["label_name"].iloc[indeces].tolist()
c = Counter(labels).most_common()[0][0]
# filter indeces to only include the most common label
indeces = [i for i in indeces if df["label_name"].iloc[i] == c]
# get the average distance
score = np.mean(dist_matrix[0][indeces])
return c, score
# open webcam stream
cap = cv2.VideoCapture(0)
while cap.isOpened():
# read frame
ret, frame = cap.read()
pose = extract_keypoints(frame)
if pose is None:
cv2.imshow('MediaPipe Hands', frame)
continue
buffer.append(pose)
if len(buffer) > 15:
buffer.pop(0)
if len(buffer) == 15:
label, score = make_prediction(buffer)
# draw label
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
# Show the frame
cv2.imshow('MediaPipe Hands', frame)
# Wait for key press to exit
if cv2.waitKey(5) & 0xFF == 27:
break
# open video A.mp4
# cap = cv2.VideoCapture('E.mp4')
# while cap.isOpened():
# # read frame
# ret, frame = cap.read()
# if frame is None:
# break
# pose = extract_keypoints(frame)
# buffer.append(pose)
# label, score = make_prediction(buffer)
# print(label, score)