217 lines
7.8 KiB
Python
217 lines
7.8 KiB
Python
import secrets
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from psycopg2._psycopg import connection
|
|
|
|
import db.groups as db
|
|
import settings.settings as settings
|
|
import settings.startup_settings as startup_settings
|
|
from api.models import Group, User
|
|
from api.utils import get_current_user
|
|
from db.internal import get_db_connection
|
|
from db.memberships import check_membership_exists
|
|
from settings.consts import JOIN_CODE_SYMBOLS
|
|
|
|
groups_router = APIRouter(prefix="/api/groups", tags=["groups"])
|
|
|
|
|
|
@groups_router.post("/group")
|
|
async def read_any_group(
|
|
groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if current_user.role not in settings.settings.admin_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
group_data = db.get_group(conn, groupname)
|
|
if group_data is None:
|
|
return HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No such group",
|
|
)
|
|
return Group().fill(group_data)
|
|
|
|
@groups_router.post("/invite_code")
|
|
async def read_group_invite_code(
|
|
groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if not check_membership_exists(conn, current_user.username, groupname) and current_user.role not in settings.settings.admin_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
invite_code = db.get_group_invite_code(conn, groupname)
|
|
if invite_code is None:
|
|
return HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No such group",
|
|
)
|
|
return invite_code
|
|
|
|
@groups_router.post("/last_feed")
|
|
async def read_group_last_feed_id(
|
|
groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if not check_membership_exists(conn, current_user.username, groupname) and current_user.role not in settings.settings.admin_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
return db.get_group_last_feed_id(conn, groupname)
|
|
|
|
|
|
@groups_router.post("/add")
|
|
async def add_group(
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
groupname: str,
|
|
allow_skips: bool = True,
|
|
feed_interval_minutes: int = 1440,
|
|
):
|
|
if not settings.settings.allow_create_groups and current_user.role not in settings.settings.admin_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
if db.check_group_existence(conn, groupname):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Group already exists",
|
|
)
|
|
|
|
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
|
while db.check_invite_code(conn, invite_code):
|
|
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
|
return {
|
|
"result": db.create_group(conn, groupname, invite_code, current_user.username, allow_skips, feed_interval_minutes),
|
|
"invite code": invite_code
|
|
}
|
|
|
|
@groups_router.post("/delete")
|
|
async def delete_group(
|
|
groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return db.delete_group(conn, groupname)
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return db.delete_group(conn, groupname)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
|
|
@groups_router.post("/update/groupname")
|
|
async def update_groupname(
|
|
groupname: str,
|
|
new_groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if db.check_group_existence(conn, new_groupname):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Groupname is already taken",
|
|
)
|
|
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return db.update_group_groupname(conn, groupname, new_groupname)
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return db.update_group_groupname(conn, groupname, new_groupname)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
@groups_router.post("/update/author")
|
|
async def update_author(
|
|
groupname: str,
|
|
new_author: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return db.update_group_author(conn, groupname, new_author)
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return db.update_group_author(conn, groupname, new_author)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
@groups_router.get("/update/invite_code")
|
|
async def update_invite_code(
|
|
groupname: str,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
|
while db.check_invite_code(conn, invite_code):
|
|
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
|
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return {
|
|
"result": db.update_group_invite_code(conn, groupname, invite_code),
|
|
"invite code": invite_code
|
|
}
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return {
|
|
"result": db.update_group_invite_code(conn, groupname, invite_code),
|
|
"invite code": invite_code
|
|
}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
|
|
@groups_router.get("/update/allow_skips")
|
|
async def update_allow_skips(
|
|
groupname: str,
|
|
allow_skips: bool,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return db.update_group_allow_skips(conn, groupname, allow_skips)
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return db.update_group_allow_skips(conn, groupname, allow_skips)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|
|
|
|
|
|
@groups_router.get("/update/feed_interval")
|
|
async def update_feed_interval(
|
|
groupname: str,
|
|
feed_interval: int,
|
|
conn: Annotated[connection, Depends(get_db_connection)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
if current_user.role in settings.settings.admin_roles:
|
|
return db.update_group_feed_interval(conn, groupname, feed_interval)
|
|
if db.check_group_author(conn, groupname, current_user.username):
|
|
return db.update_group_feed_interval(conn, groupname, feed_interval)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not allowed",
|
|
)
|