Files
spoterembedding/preprocessing/create_fingerspelling_dataset.py
2023-05-21 20:30:12 +00:00

174 lines
6.8 KiB
Python

import os
import os.path as op
import json
import shutil
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from utils import get_logger
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from normalization.blazepose_mapping import map_blazepose_df
BASE_DATA_FOLDER = 'data/'
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands
mp_holistic = mp.solutions.holistic
pose_landmarks = mp_holistic.PoseLandmark
hand_landmarks = mp_holistic.HandLandmark
def get_landmarks_names():
'''
Returns landmark names for mediapipe holistic model
'''
pose_lmks = ','.join([f'{lmk.name.lower()}_x,{lmk.name.lower()}_y' for lmk in pose_landmarks])
left_hand_lmks = ','.join([f'left_hand_{lmk.name.lower()}_x,left_hand_{lmk.name.lower()}_y'
for lmk in hand_landmarks])
right_hand_lmks = ','.join([f'right_hand_{lmk.name.lower()}_x,right_hand_{lmk.name.lower()}_y'
for lmk in hand_landmarks])
lmks_names = f'{pose_lmks},{left_hand_lmks},{right_hand_lmks}'
return lmks_names
def convert_to_str(arr, precision=6):
if isinstance(arr, np.ndarray):
values = []
for val in arr:
if val == 0:
values.append('0')
else:
values.append(f'{val:.{precision}f}')
return f"[{','.join(values)}]"
else:
return str(arr)
def parse_create_args(parser):
parser.add_argument('--landmarks-dataset', '-lmks', required=True,
help='Path to folder with landmarks npy files. \
You need to run `extract_mediapipe_landmarks.py` script first')
parser.add_argument('--dataset-folder', '-df', default='data/wlasl',
help='Path to folder where original `WLASL_v0.3.json` and `id_to_label.json` are stored. \
Note that final CSV files will be saved in this folder too.')
parser.add_argument('--videos-folder', '-videos', default=None,
help='Path to folder with videos. If None, then no information of videos (fps, length, \
width and height) will be stored in final csv file')
parser.add_argument('--num-classes', '-nc', default=100, type=int, help='Number of classes to use in WLASL dataset')
parser.add_argument('--create-new-split', action='store_true')
parser.add_argument('--test-size', '-ts', default=0.25, type=float,
help='Test split percentage size. Only required if --create-new-split is set')
# python3 preprocessing.py --landmarks-dataset=data/landmarks -videos data/wlasl/videos
def create(args):
logger = get_logger(__name__)
landmarks_dataset = args.landmarks_dataset
videos_folder = args.videos_folder
dataset_folder = args.dataset_folder
num_classes = args.num_classes
test_size = args.test_size
os.makedirs(dataset_folder, exist_ok=True)
# shutil.copy(os.path.join(BASE_DATA_FOLDER, 'wlasl/id_to_label.json'), dataset_folder)
# shutil.copy(os.path.join(BASE_DATA_FOLDER, 'wlasl/WLASL_v0.3.json'), dataset_folder)
# get files in landmarks_dataset folder
landmarks_files = os.listdir(landmarks_dataset)
video_data = []
for i, file in enumerate(tqdm(landmarks_files)):
# split by !
label = file.split('!')[0]
subset = file.split('!')[1].split('.')[0]
# remove npy and set mp4
video_id = file.replace('.npy', "")
video_dict = {'video_id': video_id,
'label_name': label,
'split': subset}
if videos_folder is not None:
cap = cv2.VideoCapture(op.join(videos_folder, f'{video_id}.mp4'))
if not cap.isOpened():
logger.warning(f'Video {video_id}.mp4 not found')
continue
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
length = cap.get(cv2.CAP_PROP_FRAME_COUNT) / float(cap.get(cv2.CAP_PROP_FPS))
video_info = {'video_width': width,
'video_height': height,
'fps': fps,
'length': length}
video_dict.update(video_info)
video_data.append(video_dict)
df_video = pd.DataFrame(video_data)
video_ids = df_video['video_id'].unique()
lmks_data = []
lmks_names = get_landmarks_names().split(',')
# get labels from df_video
labels = df_video['label_name'].unique()
# map labels to ids
label_to_id = {label: i for i, label in enumerate(labels)}
# add label_id column to df_video
df_video['labels'] = df_video['label_name'].map(label_to_id)
# export to json file as id to label
id_to_label = {i: label for label, i in label_to_id.items()}
with open(op.join(dataset_folder, 'id_to_label.json'), 'w') as f:
json.dump(id_to_label, f, indent=4)
for video_id in video_ids:
lmk_fn = op.join(landmarks_dataset, f'{video_id}.npy')
if not op.exists(lmk_fn):
logger.warning(f'{lmk_fn} file not found. Skipping')
continue
lmk = np.load(lmk_fn).T
lmks_dict = {'video_id': video_id}
for lmk_, name in zip(lmk, lmks_names):
lmks_dict[name] = lmk_
lmks_data.append(lmks_dict)
df_lmks = pd.DataFrame(lmks_data)
df = pd.merge(df_video, df_lmks)
aux_columns = ['split', 'video_id', 'labels', 'label_name']
if videos_folder is not None:
aux_columns += ['video_width', 'video_height', 'fps', 'length']
df_aux = df[aux_columns]
df = map_blazepose_df(df)
df = pd.concat([df, df_aux], axis=1)
if args.create_new_split:
df_train, df_test = train_test_split(df, test_size=test_size, stratify=df['labels'], random_state=42)
print(f'Num classes: {num_classes}')
print(df_train['labels'].value_counts())
print(df_test['labels'].value_counts())
assert set(df_train['labels'].unique()) == set(df_test['labels'].unique(
)), 'The labels for train and test dataframe are different. We recommend to download the dataset again, or to use \
the --create-new-split flag'
for split, df_split in zip(['train', 'val'],
[df_train, df_test]):
fn_out = op.join(dataset_folder, f'{split}.csv')
(df_split.reset_index(drop=True)
.applymap(convert_to_str)
.to_csv(fn_out, index=False))
else:
fn_out = op.join(dataset_folder, 'train.csv')
(df.reset_index(drop=True)
.applymap(convert_to_str)
.to_csv(fn_out, index=False))