from collections import Counter import cv2 import mediapipe as mp import numpy as np import pandas as pd import torch device = torch.device("cpu") if torch.cuda.is_available(): device = torch.device("cuda") from models import SPOTER_EMBEDDINGS # Initialize MediaPipe Hands model holistic = mp.solutions.holistic.Holistic( min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=2 ) mp_holistic = mp.solutions.holistic mp_drawing = mp.solutions.drawing_utils BODY_IDENTIFIERS = [ 0, 33, 5, 2, 8, 7, 12, 11, 14, 13, 16, 15, ] HAND_IDENTIFIERS = [ 0, 8, 7, 6, 5, 12, 11, 10, 9, 16, 15, 14, 13, 20, 19, 18, 17, 4, 3, 2, 1, ] def extract_keypoints(image_orig): image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) results = holistic.process(image) def extract_keypoints(lmks): if lmks: a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark]) return a return None def calculate_neck(keypoints): left_shoulder = keypoints[11] right_shoulder = keypoints[12] neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2] # add neck to keypoints keypoints = np.append(keypoints, [neck], axis=0) return keypoints pose = extract_keypoints(results.pose_landmarks) pose = calculate_neck(pose) pose_norm = normalize_pose(pose) # filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order pose_norm = pose_norm[BODY_IDENTIFIERS] left_hand = extract_keypoints(results.left_hand_landmarks) right_hand = extract_keypoints(results.right_hand_landmarks) if left_hand is None and right_hand is None: return None # normalize hands if left_hand is not None: left_hand = normalize_hand(left_hand) else: left_hand = np.zeros((21, 2)) if right_hand is not None: right_hand = normalize_hand(right_hand) else: right_hand = np.zeros((21, 2)) left_hand = left_hand[HAND_IDENTIFIERS] right_hand = right_hand[HAND_IDENTIFIERS] # combine pose and hands pose_norm = np.append(pose_norm, left_hand, axis=0) pose_norm = np.append(pose_norm, right_hand, axis=0) # move interval pose_norm -= 0.5 return pose_norm buffer = [] left_shoulder_index = 11 right_shoulder_index = 12 neck_index = 33 nose_index = 0 left_eye_index = 2 # if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) def normalize_pose(keypoints): left_shoulder = keypoints[left_shoulder_index] right_shoulder = keypoints[right_shoulder_index] neck = keypoints[neck_index] nose = keypoints[nose_index] # Prevent from even starting the analysis if some necessary elements are not present if (left_shoulder[0] == 0 or right_shoulder[0] == 0 or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): return keypoints if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]): shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5) head_metric = shoulder_distance else: neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5) head_metric = neck_nose_distance # Set the starting and ending point of the normalization bounding box starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric] ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric] if starting_point[0] < 0: starting_point[0] = 0 if starting_point[1] < 0: starting_point[1] = 0 if ending_point[0] < 0: ending_point[0] = 0 if ending_point[1] < 0: ending_point[1] = 0 # Normalize the keypoints for i in range(len(keypoints)): keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1]) return keypoints def normalize_hand(keypoints): x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] if not x_values or not y_values: return keypoints width, height = max(x_values) - min(x_values), max(y_values) - min(y_values) if width > height: delta_x = 0.1 * width delta_y = delta_x + ((width - height) / 2) else: delta_y = 0.1 * height delta_x = delta_y + ((height - width) / 2) starting_point = (min(x_values) - delta_x, min(y_values) - delta_y) ending_point = (max(x_values) + delta_x, max(y_values) + delta_y) if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0: return keypoints # normalize keypoints for i in range(len(keypoints)): keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1]) return keypoints # load training embedding csv df = pd.read_csv('embeddings/basic-signs/embeddings.csv') def minkowski_distance_p(x, y, p=2): x = np.asarray(x) y = np.asarray(y) # Find smallest common datatype with float64 (return type of this # function) - addresses #10262. # Don't just cast to float64 for complex input case. common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype), 'float64') # Make sure x and y are NumPy arrays of correct datatype. x = x.astype(common_datatype) y = y.astype(common_datatype) if p == np.inf: return np.amax(np.abs(y-x), axis=-1) elif p == 1: return np.sum(np.abs(y-x), axis=-1) else: return np.sum(np.abs(y-x)**p, axis=-1) def minkowski_distance(x, y, p=2): x = np.asarray(x) y = np.asarray(y) if p == np.inf or p == 1: return minkowski_distance_p(x, y, p) else: return minkowski_distance_p(x, y, p)**(1./p) def distance_matrix(keypoints, embeddings, p=2, threshold=1000000): x = np.array(keypoints) m, k = x.shape y = np.asarray(embeddings) n, kk = y.shape if k != kk: raise ValueError(f"x contains {k}-dimensional vectors but y contains " f"{kk}-dimensional vectors") if m*n*k <= threshold: print("Using minkowski_distance") return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p) else: result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype if m < n: for i in range(m): result[i,:] = minkowski_distance(x[i],y,p) else: for j in range(n): result[:,j] = minkowski_distance(x,y[j],p) return result CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth" checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) model = SPOTER_EMBEDDINGS( features=checkpoint["config_args"].vector_length, hidden_dim=checkpoint["config_args"].hidden_dim, norm_emb=checkpoint["config_args"].normalize_embeddings, ).to(device) model.load_state_dict(checkpoint["state_dict"]) embeddings = df.drop(columns=['labels', 'label_name', 'embeddings']) # convert embedding from string to list of floats embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")]) # drop embeddings2 embeddings = embeddings.drop(columns=['embeddings2']) # to list embeddings = embeddings["embeddings"].tolist() def make_prediction(keypoints): # run model on frame model.eval() with torch.no_grad(): keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) new_embeddings = model(keypoints).cpu().numpy().tolist()[0] # calculate distance matrix dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000) # get the 5 closest matches and select the class that is most common and use the average distance as the score # get the 5 closest matches indeces = np.argsort(dist_matrix)[0][:5] # get the labels labels = df["label_name"].iloc[indeces].tolist() c = Counter(labels).most_common()[0][0] # filter indeces to only include the most common label indeces = [i for i in indeces if df["label_name"].iloc[i] == c] # get the average distance score = np.mean(dist_matrix[0][indeces]) return c, score # open webcam stream cap = cv2.VideoCapture(0) while cap.isOpened(): # read frame ret, frame = cap.read() pose = extract_keypoints(frame) if pose is None: cv2.imshow('MediaPipe Hands', frame) continue buffer.append(pose) if len(buffer) > 15: buffer.pop(0) if len(buffer) == 15: label, score = make_prediction(buffer) # draw label cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) # Show the frame cv2.imshow('MediaPipe Hands', frame) # Wait for key press to exit if cv2.waitKey(5) & 0xFF == 27: break # open video A.mp4 # cap = cv2.VideoCapture('E.mp4') # while cap.isOpened(): # # read frame # ret, frame = cap.read() # if frame is None: # break # pose = extract_keypoints(frame) # buffer.append(pose) # label, score = make_prediction(buffer) # print(label, score)