Files
Signlanguage_Datacollector/backend/src/routers/signs.py
2023-03-09 10:15:19 +00:00

139 lines
4.5 KiB
Python

import asyncio
import datetime
import os
import zipfile
from fastapi import APIRouter, BackgroundTasks, Depends, status
from fastapi.responses import FileResponse
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
import src.settings as settings
from src.database.database import get_session
from src.exceptions.login_exception import LoginException
from src.models.auth import Login, User
from src.models.sign import Sign, SignOut, SignOutSimple
from src.models.signvideo import SignVideo
from src.models.token import TokenExtended
router = APIRouter(prefix="/signs")
class SignUrl(BaseModel):
url: str
@router.get("/random", status_code=status.HTTP_200_OK, response_model=SignOutSimple)
async def get_random_sign(session: AsyncSession = Depends(get_session)):
# get a random sign where there is not much data from yet
sign = await Sign.get_random(session=session)
return sign
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=SignOut)
async def add_sign(url: SignUrl, Authorize: AuthJWT = Depends(), session: AsyncSession = Depends(get_session)):
Authorize.jwt_required()
user = Authorize.get_jwt_subject()
user = await User.get_by_id(id=user, session=session)
if not user:
raise LoginException("User not found")
sign = Sign(url.url)
# check if the sign already exists
signs = await Sign.get_all_where(Sign.url == sign.url, session=session)
if len(signs) > 0:
raise LoginException("Sign already exists")
await sign.save(session=session)
return sign
@router.get("/", status_code=status.HTTP_200_OK, response_model=list[SignOut])
async def get_signs(Authorize: AuthJWT = Depends(), session: AsyncSession = Depends(get_session)):
Authorize.jwt_required()
user = Authorize.get_jwt_subject()
user = await User.get_by_id(id=user, session=session)
if not user:
raise LoginException("User not found")
signs = await Sign.get_all(select_in_load=[Sign.sign_videos],session=session)
return signs
@router.get("/{id}", status_code=status.HTTP_200_OK, response_model=SignOut)
async def get_sign(id: int, Authorize: AuthJWT = Depends(), session: AsyncSession = Depends(get_session)):
Authorize.jwt_required()
user = Authorize.get_jwt_subject()
user = await User.get_by_id(id=user, session=session)
if not user:
raise LoginException("User not found")
sign = await Sign.get_by_id(id=id, select_in_load=[Sign.sign_videos], session=session)
return sign
@router.delete("/{id}", status_code=status.HTTP_200_OK)
async def delete_sign(id: int, Authorize: AuthJWT = Depends(), session: AsyncSession = Depends(get_session)):
Authorize.jwt_required()
user = Authorize.get_jwt_subject()
user = await User.get_by_id(id=user, session=session)
if not user:
raise LoginException("User not found")
sign = await Sign.get_by_id(id=id, select_in_load=[Sign.sign_videos], session=session)
if not sign:
raise LoginException("Sign not found")
# delete the sign videos
for sign_video in sign.sign_videos:
# get path of the sign video
path = f"{settings.DATA_PATH}/{sign_video.path}"
# delete the file
os.remove(path)
await sign_video.delete(session=session)
await sign.delete(session=session)
return {"message": "Sign deleted"}
@router.get("/download/all", status_code=status.HTTP_200_OK)
async def download_all(background_tasks: BackgroundTasks, Authorize: AuthJWT = Depends(), session: AsyncSession = Depends(get_session)):
Authorize.jwt_required()
user = Authorize.get_jwt_subject()
user = await User.get_by_id(id=user, session=session)
if not user:
raise LoginException("User not found")
# get the approved sign videos
signs = await SignVideo.get_all_where(SignVideo.approved == True, session=session)
# extract the paths from the sign videos
paths = [sign.path for sign in signs]
zip_path = f"/tmp/{datetime.datetime.now().timestamp()}.zip"
# create the zip file
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED, False) as zip_file:
for path in paths:
zip_file.write(f"{settings.DATA_PATH}/{path}", os.path.basename(path))
background_tasks.add_task(delete_zip_file, zip_path)
# serve the zip file as the response
response = FileResponse(zip_path, media_type="application/zip", filename="signs.zip")
return response
# define a function to delete the zip file
async def delete_zip_file(zip_path):
os.remove(zip_path)