fixed auth, added more endpoints and config saving

This commit is contained in:
2025-07-28 18:52:53 +03:00
parent 08d2ebb1b7
commit 2ef27a9137
14 changed files with 409 additions and 115 deletions

40
src/api/anon.py Normal file
View File

@ -0,0 +1,40 @@
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from psycopg2._psycopg import connection
import db.users as db
import settings.settings as settings
from api.utils import get_password_hash
from db.internal import get_db_connection
anon_router = APIRouter(prefix="/api/anon", tags=["anon"])
@anon_router.post("/add/admin")
async def add_admin(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)]
):
if not settings.settings.allow_create_admins:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
hashed_password = get_password_hash(password)
return db.create_user(conn, username, hashed_password, "admin")
@anon_router.post("/add/user")
async def add_user(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)]
):
if not settings.settings.allow_create_users:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
hashed_password = get_password_hash(password)
return db.create_user(conn, username, hashed_password, "user")

View File

@ -8,7 +8,7 @@ from psycopg2._psycopg import connection
from api.models import Token
from api.utils import authenticate_user, create_access_token
from db.internal import get_db_connection
from settings import settings
from settings import startup_settings
auth_router = APIRouter(prefix="/api", tags=["auth"])
@ -18,16 +18,16 @@ async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
conn: Annotated[connection, Depends(get_db_connection)]
) -> Token:
user = authenticate_user(conn, form_data.username, form_data.password) # change db
if not user:
password_correct = authenticate_user(conn, form_data.username, form_data.password)
if not password_correct:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expire_time = timedelta(minutes=settings.access_token_expiration_time)
access_token_expire_time = timedelta(minutes=startup_settings.access_token_expiration_time)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expire_time
data={"sub": form_data.username}, expires_delta=access_token_expire_time
)
return Token(access_token=access_token, token_type="bearer")

64
src/api/general.py Normal file
View File

@ -0,0 +1,64 @@
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
import settings.settings as settings
from api.models import User
from api.utils import get_current_user
from settings.settings import load_settings, reset_settings, save_settings
general_router = APIRouter(prefix="/api", tags=["status"])
@general_router.get('/ping')
async def ping():
return {'ok'}
@general_router.get('/settings/get')
async def get_settings():
return settings.settings
@general_router.post('/settings/update')
async def update_settings(data: dict, current_user: Annotated[User, Depends(get_current_user)]):
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
for key, value in data.items():
setattr(settings.settings, key, value)
return settings.settings
@general_router.get('/settings/reset')
async def reset_settings_api(current_user: Annotated[User, Depends(get_current_user)]):
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
reset_settings()
return settings.settings
@general_router.get('/settings/load_from_file')
async def load_settings_api(current_user: Annotated[User, Depends(get_current_user)]):
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
load_settings()
return settings.settings
@general_router.get('/settings/save_to_file')
async def save_settings_api(current_user: Annotated[User, Depends(get_current_user)]):
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
save_settings()
return settings.settings

View File

@ -16,12 +16,14 @@ class User(BaseModel):
def fill(self, params):
self.username = params['username']
self.password = params['password']
self.role = params['role']
self.disabled = params['disabled']
self.groups_ids = params['groups_ids']
self.last_seen_at = params['last_seen_at']
self.created_at = params['created_at']
username: str = ''
password: str = ''
role: str = 'user'
disabled: bool = False
groups_ids: list[str] | None = None
last_seen_at: datetime | None = None

View File

@ -1,8 +0,0 @@
from fastapi import APIRouter
status_router = APIRouter(prefix="/api", tags=["status"])
@status_router.get('/ping')
async def ping():
return {'ok'}

View File

@ -1,21 +0,0 @@
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from api.models import User
from api.utils import get_current_user
test_router = APIRouter(prefix="/api", tags=["test"])
@test_router.get('/test-private')
async def test_private_func(token: Annotated[User, Depends(get_current_user)]):
return {'private nya'}
@test_router.post('/test')
async def test_func(text: str):
print(text)
if text == 'thighs':
raise HTTPException(status_code=status.HTTP_402_PAYMENT_REQUIRED)
return {'nya'}

View File

@ -1,11 +1,12 @@
from typing import Annotated
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException, status
from psycopg2._psycopg import connection
import db.users as db
import settings.settings as settings
from api.models import User
from api.utils import get_current_user
from api.utils import get_current_user, get_password_hash
from db.internal import get_db_connection
users_router = APIRouter(prefix="/api/users", tags=["users"])
@ -15,19 +16,126 @@ users_router = APIRouter(prefix="/api/users", tags=["users"])
async def read_users_me(current_user: Annotated[User, Depends(get_current_user)]):
return current_user
@users_router.post("/user")
async def read_users_any(username: str, conn: Annotated[connection, Depends(get_db_connection)]):
async def read_users_any(
username: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
user = User()
user.fill(db.get_user(conn, username))
user_data = db.get_user(conn, username)
if user_data is None:
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No such user",
)
user.fill(user_data)
return user
@users_router.post("/add")
async def add_user(username: str, password: str, conn: Annotated[connection, Depends(get_db_connection)]):
return db.create_user(conn, username, password)
@users_router.post("/add/admin")
async def add_admin(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if not settings.settings.allow_create_admins_by_admins:
if current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
hashed_password = get_password_hash(password)
return db.create_user(conn, username, hashed_password, "admin")
@users_router.post("/add/user")
async def add_user(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if not settings.settings.allow_create_users or current_user.role not in settings.settings.admin_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
hashed_password = get_password_hash(password)
return db.create_user(conn, username, hashed_password, "user")
@users_router.post("/delete")
async def delete_user(username: str, conn: Annotated[connection, Depends(get_db_connection)]):
return db.delete_user(conn, username)
async def delete_user(
username: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.username == username or current_user.role in settings.settings.admin_roles:
return db.delete_user(conn, username)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
@users_router.post("/update/disabled")
async def update_disabled(
username: str,
disabled: bool,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.role in settings.settings.admin_roles:
return db.update_user_disabled(conn, username, disabled)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
@users_router.post("/update/username")
async def update_username(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.username == username or current_user.role in settings.settings.admin_roles:
return db.update_user_username(conn, username, password)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
@users_router.post("/update/password")
async def update_password(
username: str,
password: str,
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.username == username or current_user.role in settings.settings.admin_roles:
hashed_password = get_password_hash(password)
return db.update_user_password(conn, username, hashed_password)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not allowed",
)
@users_router.get("/update/last_seen")
async def update_last_seen(
conn: Annotated[connection, Depends(get_db_connection)],
current_user: Annotated[User, Depends(get_current_user)]
):
return db.update_user_last_seen(conn, current_user.username)

View File

@ -1,47 +1,50 @@
from datetime import datetime, timedelta, timezone
from typing import Annotated
import bcrypt
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
# from passlib.context import CryptContext
from psycopg2._psycopg import connection
import db.users
import settings.settings as settings
import settings.startup_settings as startup_settings
from api.models import TokenData, User
from db.internal import get_db_connection
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password: str, hashed_password: str):
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
def get_password_hash(password: str):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def decode_token(token):
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return jwt.decode(token, startup_settings.secret_key, algorithms=[startup_settings.algorithm])
def encode_token(payload):
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
return jwt.encode(payload, startup_settings.secret_key, algorithm=startup_settings.algorithm)
def authenticate_user(
conn: connection,
username: str,
password: str
user_password: str
):
user = User()
userdata = db.users.get_user(conn, username)
if not userdata:
db_user_password = db.users.get_user_password(conn, username)
if not user_password:
return False
if not verify_password(password, user.password):
if not verify_password(user_password, db_user_password):
return False
user.fill(userdata)
return user
return True
def create_access_token(
data: dict,
@ -66,7 +69,6 @@ async def get_current_user(
try:
payload = decode_token(token)
print(payload)
username = payload.get("sub")
if username is None:
raise credentials_exception
@ -75,14 +77,16 @@ async def get_current_user(
raise credentials_exception
user = User()
user.fill(db.users.get_user(conn, username=token_data.username))
if user is None:
user_data = db.users.get_user(conn, token_data.username)
if user_data is None:
raise credentials_exception
user.fill(user_data)
if user.disabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
detail="User is disabled"
)
return user