import cv2 import mediapipe as mp import numpy as np import pandas as pd import torch from predictions.k_nearest import KNearestNeighbours device = torch.device("cpu") if torch.cuda.is_available(): device = torch.device("cuda") from models import SPOTER_EMBEDDINGS BODY_IDENTIFIERS = [ 0, 33, 5, 2, 8, 7, 12, 11, 14, 13, 16, 15, ] HAND_IDENTIFIERS = [ 0, 8, 7, 6, 5, 12, 11, 10, 9, 16, 15, 14, 13, 20, 19, 18, 17, 4, 3, 2, 1, ] CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth" class Predictor: def __init__(self, embeddings_path, predictor_type): # Initialize MediaPipe Hands model self.holistic = mp.solutions.holistic.Holistic( min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=2 ) self.mp_holistic = mp.solutions.holistic self.mp_drawing = mp.solutions.drawing_utils # buffer = [] self.left_shoulder_index = 11 self.right_shoulder_index = 12 self.neck_index = 33 self.nose_index = 0 self.left_eye_index = 2 # load training embedding csv self.embeddings = pd.read_csv(embeddings_path) checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) self.model = SPOTER_EMBEDDINGS( features=checkpoint["config_args"].vector_length, hidden_dim=checkpoint["config_args"].hidden_dim, norm_emb=checkpoint["config_args"].normalize_embeddings, ).to(device) self.model.load_state_dict(checkpoint["state_dict"]) if predictor_type is None: self.predictor = KNearestNeighbours(1) else: self.predictor = predictor_type self.predictor.set_embeddings(self.embeddings) def extract_keypoints(self, image_orig): image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) results = self.holistic.process(image) def extract_keypoints(lmks): if lmks: a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark]) return a return None def calculate_neck(keypoints): if keypoints is not None: left_shoulder = keypoints[11] right_shoulder = keypoints[12] neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2] # add neck to keypoints keypoints = np.append(keypoints, [neck], axis=0) return keypoints return None pose = extract_keypoints(results.pose_landmarks) pose = calculate_neck(pose) if pose is None: return None pose_norm = self.normalize_pose(pose) # filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order pose_norm = pose_norm[BODY_IDENTIFIERS] left_hand = extract_keypoints(results.left_hand_landmarks) right_hand = extract_keypoints(results.right_hand_landmarks) if left_hand is None and right_hand is None: return None # normalize hands if left_hand is not None: left_hand = self.normalize_hand(left_hand) else: left_hand = np.zeros((21, 2)) if right_hand is not None: right_hand = self.normalize_hand(right_hand) else: right_hand = np.zeros((21, 2)) left_hand = left_hand[HAND_IDENTIFIERS] right_hand = right_hand[HAND_IDENTIFIERS] # combine pose and hands pose_norm = np.append(pose_norm, left_hand, axis=0) pose_norm = np.append(pose_norm, right_hand, axis=0) # move interval pose_norm -= 0.5 return pose_norm # if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) def normalize_pose(self, keypoints): left_shoulder = keypoints[self.left_shoulder_index] right_shoulder = keypoints[self.right_shoulder_index] neck = keypoints[self.neck_index] nose = keypoints[self.nose_index] # Prevent from even starting the analysis if some necessary elements are not present if (left_shoulder[0] == 0 or right_shoulder[0] == 0 or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): return keypoints if left_shoulder[0] != 0 and right_shoulder[0] != 0 and ( left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]): shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ( (left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5) head_metric = shoulder_distance else: neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5) head_metric = neck_nose_distance # Set the starting and ending point of the normalization bounding box starting_point = [keypoints[self.neck_index][0] - 3 * head_metric, keypoints[self.left_eye_index][1] + head_metric] ending_point = [keypoints[self.neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric] if starting_point[0] < 0: starting_point[0] = 0 if starting_point[1] < 0: starting_point[1] = 0 if ending_point[0] < 0: ending_point[0] = 0 if ending_point[1] < 0: ending_point[1] = 0 # Normalize the keypoints for i in range(len(keypoints)): keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1]) return keypoints def normalize_hand(self, keypoints): x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] if not x_values or not y_values: return keypoints width, height = max(x_values) - min(x_values), max(y_values) - min(y_values) if width > height: delta_x = 0.1 * width delta_y = delta_x + ((width - height) / 2) else: delta_y = 0.1 * height delta_x = delta_y + ((height - width) / 2) starting_point = (min(x_values) - delta_x, min(y_values) - delta_y) ending_point = (max(x_values) + delta_x, max(y_values) + delta_y) if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0: return keypoints # normalize keypoints for i in range(len(keypoints)): keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1]) return keypoints def get_embedding(self, keypoints): # run model on frame self.model.eval() with torch.no_grad(): keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0] return new_embeddings def predict(self, embeddings): return self.predictor.predict(embeddings) def make_prediction(self, keypoints): # run model on frame self.model.eval() with torch.no_grad(): keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0] return self.predictor.predict(new_embeddings) def validation(self): # load validation data validation_data = np.load('validation_data.npy', allow_pickle=True) validation_labels = np.load('validation_labels.npy', allow_pickle=True) # run model on validation data self.model.eval() with torch.no_grad(): validation_embeddings = self.model(torch.from_numpy(validation_data).float().to(device)).cpu().numpy() # predict validation data predictions = self.predictor.predict(validation_embeddings) # calculate accuracy correct = 0 for i in range(len(predictions)): if predictions[i] == validation_labels[i]: correct += 1 accuracy = correct / len(predictions) print('Accuracy: ' + str(accuracy))