import os import os.path as op import pandas as pd from tqdm.auto import tqdm import json from normalization.blazepose_mapping import map_blazepose_df def create(train_landmark_files, train_csv, dataset_folder, test_size): os.makedirs(dataset_folder, exist_ok=True) # load json sign_to_prediciton_index_map.json with open('data/sign_to_prediction_index_map.json', 'r') as f: sign_to_prediction_index_map = json.load(f) train_df = pd.read_csv(train_csv) video_data = [] mapping = { 'pose_0': 'nose', 'pose_1': 'leftEye', 'pose_4': 'rightEye', 'pose_7': 'leftEar', 'pose_8': 'rightEar', 'pose_11': 'leftShoulder', 'pose_12': 'rightShoulder', 'pose_13': 'leftElbow', 'pose_14': 'rightElbow', 'pose_15': 'leftWrist', 'pose_16': 'rightWrist', 'left_hand_0': 'wrist_left', 'left_hand_1': 'thumbCMC_left', 'left_hand_2': 'thumbMP_left', 'left_hand_3': 'thumbIP_left', 'left_hand_4': 'thumbTip_left', 'left_hand_5': 'indexMCP_left', 'left_hand_6': 'indexPIP_left', 'left_hand_7': 'indexDIP_left', 'left_hand_8': 'indexTip_left', 'left_hand_9': 'middleMCP_left', 'left_hand_10': 'middlePIP_left', 'left_hand_11': 'middleDIP_left', 'left_hand_12': 'middleTip_left', 'left_hand_13': 'ringMCP_left', 'left_hand_14': 'ringPIP_left', 'left_hand_15': 'ringDIP_left', 'left_hand_16': 'ringTip_left', 'left_hand_17': 'littleMCP_left', 'left_hand_18': 'littlePIP_left', 'left_hand_19': 'littleDIP_left', 'left_hand_20': 'littleTip_left', 'right_hand_0': 'wrist_right', 'right_hand_1': 'thumbCMC_right', 'right_hand_2': 'thumbMP_right', 'right_hand_3': 'thumbIP_right', 'right_hand_4': 'thumbTip_right', 'right_hand_5': 'indexMCP_right', 'right_hand_6': 'indexPIP_right', 'right_hand_7': 'indexDIP_right', 'right_hand_8': 'indexTip_right', 'right_hand_9': 'middleMCP_right', 'right_hand_10': 'middlePIP_right', 'right_hand_11': 'middleDIP_right', 'right_hand_12': 'middleTip_right', 'right_hand_13': 'ringMCP_right', 'right_hand_14': 'ringPIP_right', 'right_hand_15': 'ringDIP_right', 'right_hand_16': 'ringTip_right', 'right_hand_17': 'littleMCP_right', 'right_hand_18': 'littlePIP_right', 'right_hand_19': 'littleDIP_right', 'right_hand_20': 'littleTip_right', } columns = [] for k,v in mapping.items(): columns.append(f'{v}_X') columns.append(f'{v}_Y') for _, row in tqdm(train_df.head(10000).iterrows(), total=10000): path, participant_id, sequence_id, sign = row['path'], row['participant_id'], row['sequence_id'], row['sign'] parquet_file = os.path.join(train_landmark_files, str(participant_id), f"{sequence_id}.parquet") if not os.path.exists(parquet_file): print(f"{parquet_file} not found. Skipping.") continue landmark_data = pd.read_parquet(parquet_file) # all nan to 0 landmark_data = landmark_data.fillna(0) # create a new dataframe with the correct column names (each mapping with x and y coordinates) new_landmark_data = pd.DataFrame(columns=columns) # add each row of the parquet file to the correct column (use mapping based on {type}_{index}) # for each frame, construct the new row frame_column = landmark_data['frame'] # get unique frames frames = frame_column.unique() # sort frames.sort() new_row = {} for frame_id in frames: # get all rows for this frame frame_data = landmark_data.loc[landmark_data['frame'] == frame_id] # construct new row for _, row in frame_data.iterrows(): t = f"{row['type']}_{row['landmark_index']}" if t in mapping: c = mapping[t] new_row.setdefault(f"{c}_X", []).append(row['x']) new_row.setdefault(f"{c}_Y", []).append(row['y']) d = pd.DataFrame({k: [v] for k, v in new_row.items()}) # add to new dataframe new_landmark_data = pd.concat([new_landmark_data, d], axis=0, ignore_index=True) # set nan values to 0 new_landmark_data = new_landmark_data.fillna(0) video_dict = {'path': path, 'participant_id': participant_id, 'sequence_id': sequence_id, 'sign': sign, 'labels': sign_to_prediction_index_map[sign] } # add these columns to the landmark data using concat new_landmark_data = pd.concat([pd.DataFrame(video_dict, index=[0]), new_landmark_data], axis=1) video_data.append(new_landmark_data) video_data = pd.concat(video_data, axis=0, ignore_index=True) video_data = map_blazepose_df(video_data, rename=False) video_data.to_csv(os.path.join(dataset_folder, 'spoter.csv'), index=False) train_landmark_files = 'data/train_landmark_files' train_csv = 'data/train.csv' dataset_folder = 'data/processed' test_size = 0.25 create(train_landmark_files, train_csv, dataset_folder, test_size)