From ed0e0f198baf920f2b3b1577cd0a4b8f3c3bd354 Mon Sep 17 00:00:00 2001 From: Victor Mylle Date: Fri, 14 Apr 2023 11:10:25 +0200 Subject: [PATCH] Added webcam view for embedding --- webcam.py | 331 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 webcam.py diff --git a/webcam.py b/webcam.py new file mode 100644 index 0000000..059e7fe --- /dev/null +++ b/webcam.py @@ -0,0 +1,331 @@ + +import cv2 +import mediapipe as mp +import numpy as np +import pandas as pd +import torch + +device = torch.device("cpu") +if torch.cuda.is_available(): + device = torch.device("cuda") +from models import SPOTER_EMBEDDINGS + +# Initialize MediaPipe Hands model +holistic = mp.solutions.holistic.Holistic( + min_detection_confidence=0.5, + min_tracking_confidence=0.5, + model_complexity=2 + ) +mp_holistic = mp.solutions.holistic +mp_drawing = mp.solutions.drawing_utils + +BODY_IDENTIFIERS = [ + 0, + 33, + 5, + 2, + 8, + 7, + 12, + 11, + 14, + 13, + 16, + 15, +] + +HAND_IDENTIFIERS = [ + 0, + 8, + 7, + 6, + 5, + 12, + 11, + 10, + 9, + 16, + 15, + 14, + 13, + 20, + 19, + 18, + 17, + 4, + 3, + 2, + 1, +] + +def extract_keypoints(image_orig): + image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) + results = holistic.process(image) + + def extract_keypoints(lmks): + if lmks: + a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark]) + return a + return None + + def calculate_neck(keypoints): + left_shoulder = keypoints[11] + right_shoulder = keypoints[12] + + neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2] + # add neck to keypoints + keypoints = np.append(keypoints, [neck], axis=0) + return keypoints + + pose = extract_keypoints(results.pose_landmarks) + pose = calculate_neck(pose) + pose_norm = normalize_pose(pose) + # filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order + pose_norm = pose_norm[BODY_IDENTIFIERS] + + left_hand = extract_keypoints(results.left_hand_landmarks) + right_hand = extract_keypoints(results.right_hand_landmarks) + + if left_hand is None and right_hand is None: + return None + + # normalize hands + if left_hand is not None: + left_hand = normalize_hand(left_hand) + else: + left_hand = np.zeros((21, 2)) + if right_hand is not None: + right_hand = normalize_hand(right_hand) + else: + right_hand = np.zeros((21, 2)) + + left_hand = left_hand[HAND_IDENTIFIERS] + + right_hand = right_hand[HAND_IDENTIFIERS] + + # combine pose and hands + pose_norm = np.append(pose_norm, left_hand, axis=0) + pose_norm = np.append(pose_norm, right_hand, axis=0) + + # move interval + pose_norm -= 0.5 + + return pose_norm + + +buffer = [] + +left_shoulder_index = 11 +right_shoulder_index = 12 +neck_index = 33 +nose_index = 0 +left_eye_index = 2 + +# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) +def normalize_pose(keypoints): + left_shoulder = keypoints[left_shoulder_index] + right_shoulder = keypoints[right_shoulder_index] + + neck = keypoints[neck_index] + nose = keypoints[nose_index] + + # Prevent from even starting the analysis if some necessary elements are not present + if (left_shoulder[0] == 0 or right_shoulder[0] == 0 + or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( + neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): + return keypoints + + if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]): + shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5) + head_metric = shoulder_distance + else: + neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5) + head_metric = neck_nose_distance + + # Set the starting and ending point of the normalization bounding box + starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric] + ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric] + + if starting_point[0] < 0: + starting_point[0] = 0 + if starting_point[1] < 0: + starting_point[1] = 0 + if ending_point[0] < 0: + ending_point[0] = 0 + if ending_point[1] < 0: + ending_point[1] = 0 + + # Normalize the keypoints + for i in range(len(keypoints)): + keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) + keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1]) + + return keypoints + +def normalize_hand(keypoints): + x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] + y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] + + if not x_values or not y_values: + return keypoints + + width, height = max(x_values) - min(x_values), max(y_values) - min(y_values) + if width > height: + delta_x = 0.1 * width + delta_y = delta_x + ((width - height) / 2) + else: + delta_y = 0.1 * height + delta_x = delta_y + ((height - width) / 2) + + starting_point = (min(x_values) - delta_x, min(y_values) - delta_y) + ending_point = (max(x_values) + delta_x, max(y_values) + delta_y) + + if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0: + return keypoints + + # normalize keypoints + for i in range(len(keypoints)): + keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) + keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1]) + + return keypoints + + +# load training embedding csv +df = pd.read_csv('data/fingerspelling/embeddings.csv') + +def minkowski_distance_p(x, y, p=2): + x = np.asarray(x) + y = np.asarray(y) + + # Find smallest common datatype with float64 (return type of this + # function) - addresses #10262. + # Don't just cast to float64 for complex input case. + common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype), + 'float64') + + # Make sure x and y are NumPy arrays of correct datatype. + x = x.astype(common_datatype) + y = y.astype(common_datatype) + + if p == np.inf: + return np.amax(np.abs(y-x), axis=-1) + elif p == 1: + return np.sum(np.abs(y-x), axis=-1) + else: + return np.sum(np.abs(y-x)**p, axis=-1) + +def minkowski_distance(x, y, p=2): + x = np.asarray(x) + y = np.asarray(y) + if p == np.inf or p == 1: + return minkowski_distance_p(x, y, p) + else: + return minkowski_distance_p(x, y, p)**(1./p) + + +def distance_matrix(keypoints, embeddings, p=2, threshold=1000000): + + x = np.array(keypoints) + m, k = x.shape + y = np.asarray(embeddings) + n, kk = y.shape + + if k != kk: + raise ValueError(f"x contains {k}-dimensional vectors but y contains " + f"{kk}-dimensional vectors") + + if m*n*k <= threshold: + return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p) + else: + result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype + if m < n: + for i in range(m): + result[i,:] = minkowski_distance(x[i],y,p) + else: + for j in range(n): + result[:,j] = minkowski_distance(x,y[j],p) + return result + + +CHECKPOINT_PATH = "out_checkpoints/checkpoint_embed_1105.pth" +checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) + +model = SPOTER_EMBEDDINGS( + features=checkpoint["config_args"].vector_length, + hidden_dim=checkpoint["config_args"].hidden_dim, + norm_emb=checkpoint["config_args"].normalize_embeddings, +).to(device) + +model.load_state_dict(checkpoint["state_dict"]) + +def make_prediction(keypoints): + embeddings = df.drop(columns=['labels', 'label_name', 'embeddings']) + + # convert embedding from string to list of floats + embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")]) + # drop embeddings2 + embeddings = embeddings.drop(columns=['embeddings2']) + # to list + embeddings = embeddings["embeddings"].tolist() + + # run model on frame + model.eval() + with torch.no_grad(): + keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) + with open('inputs.txt', 'w') as f: + for j in range(keypoints.shape[1]): + f.write(str(keypoints[0, j, :].cpu().detach().numpy()) + ' ') + new_embeddings = model(keypoints).cpu().numpy().tolist()[0] + + # calculate distance matrix + dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000) + + # find closest match + closest_match = np.argmin(dist_matrix[0]) + + # if dist_matrix[0][closest_match] < 2: + return df.iloc[closest_match]["label_name"], dist_matrix[0][closest_match] + +# open webcam stream +cap = cv2.VideoCapture(0) + +while cap.isOpened(): + # read frame + ret, frame = cap.read() + pose = extract_keypoints(frame) + + if pose is None: + cv2.imshow('MediaPipe Hands', frame) + continue + + buffer.append(pose) + if len(buffer) > 15: + buffer.pop(0) + + if len(buffer) == 15: + label, score = make_prediction(buffer) + + # draw label + cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) + cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) + + # Show the frame + cv2.imshow('MediaPipe Hands', frame) + + # Wait for key press to exit + if cv2.waitKey(5) & 0xFF == 27: + break + +# open video A.mp4 +# cap = cv2.VideoCapture('Z.mp4') +# while cap.isOpened(): +# # read frame +# ret, frame = cap.read() +# if frame is None: +# break +# pose = extract_keypoints(frame) + +# buffer.append(pose) + +# make_prediction(buffer)