starting scraper development
This commit is contained in:
@ -5,16 +5,32 @@ from psycopg2._psycopg import connection
|
||||
|
||||
import db.accounts as db
|
||||
import settings.settings as settings
|
||||
from api.utils import encrypt_str, get_current_user
|
||||
from api.models import User, Account
|
||||
from api.models import Account, User
|
||||
from api.utils import encode_str, get_current_user
|
||||
from db.groups import check_group_author
|
||||
from db.internal import get_db_connection
|
||||
|
||||
accounts_router = APIRouter(prefix="/api/accounts", tags=["anon"])
|
||||
accounts_router = APIRouter(prefix="/api/accounts", tags=["accounts"])
|
||||
|
||||
|
||||
@accounts_router.post("/account")
|
||||
async def get_account(
|
||||
account_id: int,
|
||||
@accounts_router.post("/group")
|
||||
async def read_accounts_by_group(
|
||||
groupname: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
if not check_group_author(conn, groupname, current_user.username) and current_user.role not in settings.settings.admin_roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
return db.get_accounts_by_group(conn, groupname)
|
||||
|
||||
|
||||
@accounts_router.post("/group/platform")
|
||||
async def read_accounts_by_group_platform(
|
||||
groupname: str,
|
||||
platform: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
@ -23,7 +39,7 @@ async def get_account(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
account_data = db.get_account(conn, account_id)
|
||||
account_data = db.get_accounts_by_group_platform(conn, groupname, platform)
|
||||
if account_data is None:
|
||||
return HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
@ -36,6 +52,7 @@ async def get_account(
|
||||
|
||||
@accounts_router.post("/add")
|
||||
async def add_account(
|
||||
groupname: str,
|
||||
platform: str,
|
||||
login: str,
|
||||
password: str,
|
||||
@ -43,28 +60,43 @@ async def add_account(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
if db.check_account_existence(conn, platform, login, password):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="Account already exists",
|
||||
)
|
||||
hashed_password = encrypt_str(password)
|
||||
return db.create_account(conn, login, hashed_password, metadata)
|
||||
|
||||
# TODO: add author to check editing rights?
|
||||
@accounts_router.post("/update")
|
||||
async def update_account(
|
||||
account_id: int,
|
||||
platform: str,
|
||||
login: str,
|
||||
password: str,
|
||||
metadata: dict,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
if current_user.role not in settings.settings.admin_roles:
|
||||
if not check_group_author(conn, groupname, current_user.username) and current_user.role not in settings.settings.admin_roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
return db.update_account(conn, account_id, platform, login, password, metadata)
|
||||
if db.check_account_existence(conn, groupname, platform):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="Account already exists",
|
||||
)
|
||||
hashed_password = encode_str(password)
|
||||
return db.create_account(conn, platform, login, hashed_password, metadata)
|
||||
|
||||
|
||||
@accounts_router.post("/update")
|
||||
async def update_account(
|
||||
groupname: str,
|
||||
platform: str,
|
||||
author: str,
|
||||
login: str,
|
||||
password: str,
|
||||
metadata: dict,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
account_data = db.get_accounts_by_group_platform(conn, groupname, platform)
|
||||
if account_data is None:
|
||||
return HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="No such account",
|
||||
)
|
||||
account = Account()
|
||||
account.fill(account_data)
|
||||
|
||||
if current_user.username != account.author and current_user.role not in settings.settings.admin_roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
return db.update_account(conn, groupname, author, platform, login, password, metadata)
|
||||
|
||||
@ -5,12 +5,14 @@ from psycopg2._psycopg import connection
|
||||
|
||||
import db.feeds as db
|
||||
import settings.settings as settings
|
||||
from api.models import Feed, Group, User
|
||||
from api.models import Feed, User
|
||||
from api.utils import get_current_user
|
||||
from db.accounts import get_accounts_by_group
|
||||
from db.feeds import get_groupname_by_feed_id
|
||||
from db.groups import get_group
|
||||
from db.groups import check_group_author
|
||||
from db.internal import get_db_connection
|
||||
from db.memberships import check_membership_exists
|
||||
from scraper.utils import generate_feed
|
||||
|
||||
feeds_router = APIRouter(prefix="/api/feeds", tags=["feeds"])
|
||||
|
||||
@ -46,29 +48,28 @@ async def read_feed(
|
||||
return feed
|
||||
|
||||
|
||||
# maybe to delete
|
||||
@feeds_router.post("/add")
|
||||
async def add_feed(
|
||||
# TODO: most logic + exception
|
||||
@feeds_router.post("/new")
|
||||
async def new_feed(
|
||||
groupname: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = Group()
|
||||
group_data = get_group(conn, groupname)
|
||||
if group_data is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="No such feed or feed is not linked to group",
|
||||
)
|
||||
group.fill(group_data)
|
||||
|
||||
if group.author != current_user.username and current_user.role not in settings.settings.admin_roles:
|
||||
if not check_group_author(conn, groupname, current_user.username) and current_user.role not in settings.settings.admin_roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
|
||||
return db.create_feed(conn, groupname, []) # TODO: image list
|
||||
accounts = get_accounts_by_group(conn, groupname)
|
||||
feed = generate_feed(conn, accounts)
|
||||
if not isinstance(feed, Exception):
|
||||
return db.create_feed(conn, groupname, feed)
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_418_IM_A_TEAPOT,
|
||||
detail="Failed to generate feed",
|
||||
)
|
||||
|
||||
|
||||
# maybe to delete
|
||||
@ -85,27 +86,3 @@ async def delete_feed(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
|
||||
|
||||
@feeds_router.post("/reset")
|
||||
async def reset_feed(
|
||||
groupname: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = Group()
|
||||
group_data = get_group(conn, groupname)
|
||||
if group_data is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="No such group",
|
||||
)
|
||||
group.fill(group_data)
|
||||
|
||||
if group.author != current_user.username and current_user.role not in settings.settings.admin_roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
|
||||
return db.create_feed(conn, groupname, []) # TODO: image list
|
||||
|
||||
@ -8,7 +8,7 @@ import db.groups as db
|
||||
import settings.settings as settings
|
||||
import settings.startup_settings as startup_settings
|
||||
from api.models import Group, User
|
||||
from api.utils import get_current_user, get_group_by_name
|
||||
from api.utils import get_current_user
|
||||
from db.internal import get_db_connection
|
||||
from db.memberships import check_membership_exists
|
||||
from settings.consts import JOIN_CODE_SYMBOLS
|
||||
@ -104,10 +104,9 @@ async def delete_group(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = get_group_by_name(conn, groupname)
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.delete_group(conn, groupname)
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return db.delete_group(conn, groupname)
|
||||
else:
|
||||
raise HTTPException(
|
||||
@ -128,11 +127,10 @@ async def update_groupname(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="Groupname is already taken",
|
||||
)
|
||||
group = get_group_by_name(conn, groupname)
|
||||
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.update_group_groupname(conn, groupname, new_groupname)
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return db.update_group_groupname(conn, groupname, new_groupname)
|
||||
else:
|
||||
raise HTTPException(
|
||||
@ -147,10 +145,9 @@ async def update_author(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = get_group_by_name(conn, groupname)
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.update_group_author(conn, groupname, new_author)
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return db.update_group_author(conn, groupname, new_author)
|
||||
else:
|
||||
raise HTTPException(
|
||||
@ -164,8 +161,6 @@ async def update_invite_code(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = get_group_by_name(conn, groupname)
|
||||
|
||||
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
||||
while db.check_invite_code(conn, invite_code):
|
||||
invite_code = "".join(secrets.choice(JOIN_CODE_SYMBOLS) for _ in range(startup_settings.join_code_length))
|
||||
@ -175,7 +170,7 @@ async def update_invite_code(
|
||||
"result": db.update_group_invite_code(conn, groupname, invite_code),
|
||||
"invite code": invite_code
|
||||
}
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return {
|
||||
"result": db.update_group_invite_code(conn, groupname, invite_code),
|
||||
"invite code": invite_code
|
||||
@ -194,10 +189,9 @@ async def update_allow_skips(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = get_group_by_name(conn, groupname)
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.update_group_allow_skips(conn, groupname, allow_skips)
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return db.update_group_allow_skips(conn, groupname, allow_skips)
|
||||
else:
|
||||
raise HTTPException(
|
||||
@ -213,10 +207,9 @@ async def update_feed_interval(
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
group = get_group_by_name(conn, groupname)
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.update_group_feed_interval(conn, groupname, feed_interval)
|
||||
if current_user.username == group.author:
|
||||
if db.check_group_author(conn, groupname, current_user.username):
|
||||
return db.update_group_feed_interval(conn, groupname, feed_interval)
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
||||
@ -6,8 +6,12 @@ from psycopg2._psycopg import connection
|
||||
import db.memberships as db
|
||||
import settings.settings as settings
|
||||
from api.models import User
|
||||
from api.utils import get_current_user, get_group_by_name
|
||||
from db.groups import check_group_existence, get_groupname_by_invite_code
|
||||
from api.utils import get_current_user
|
||||
from db.groups import (
|
||||
check_group_author,
|
||||
check_group_existence,
|
||||
get_groupname_by_invite_code,
|
||||
)
|
||||
from db.internal import get_db_connection
|
||||
from db.users import check_user_existence
|
||||
|
||||
@ -107,8 +111,7 @@ async def delete_membership(
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.delete_membership(conn, username, groupname)
|
||||
|
||||
group = get_group_by_name(conn, groupname)
|
||||
if current_user.username == group.author:
|
||||
if check_group_author(conn, groupname, current_user.username):
|
||||
return db.delete_membership(conn, username, groupname)
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
from api.utils import decrypt_str
|
||||
|
||||
from api.utils import decode_str
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
@ -102,12 +103,14 @@ class Feed(BaseModel):
|
||||
class Account(BaseModel):
|
||||
def fill(self, params):
|
||||
self.id = params["id"]
|
||||
self.author = params["author"]
|
||||
self.platform = params["platform"]
|
||||
self.login = params["login"]
|
||||
self.password = decrypt_str(params["password"])
|
||||
self.password = decode_str(params["password"])
|
||||
self.metadata = params["metadata"]
|
||||
self.created_at = params["created_at"]
|
||||
id: int = -1
|
||||
author: str = ""
|
||||
platform: str = ""
|
||||
login: str = ""
|
||||
password: str = ""
|
||||
|
||||
@ -12,32 +12,14 @@ from db.internal import get_db_connection
|
||||
pictures_router = APIRouter(prefix="/api/pictures", tags=["pictures"])
|
||||
|
||||
|
||||
# maybe to delete
|
||||
@pictures_router.post("/picture/url")
|
||||
async def read_picture_by_url(
|
||||
groupname: str,
|
||||
@pictures_router.post("/picture")
|
||||
async def read_picture(
|
||||
id: int,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
picture = Picture()
|
||||
picture_data = db.get_picture_by_url(conn, groupname)
|
||||
if picture_data is None:
|
||||
return HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="No such picture",
|
||||
)
|
||||
picture.fill(picture_data)
|
||||
return picture
|
||||
|
||||
|
||||
@pictures_router.post("/picture/id")
|
||||
async def read_picture_by_id(
|
||||
groupname: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
picture = Picture()
|
||||
picture_data = db.get_picture_by_id(conn, groupname)
|
||||
picture_data = db.get_picture(conn, id)
|
||||
if picture_data is None:
|
||||
return HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
@ -67,29 +49,14 @@ async def add_picture(
|
||||
}
|
||||
|
||||
|
||||
# maybe to delete
|
||||
@pictures_router.post("/delete/url")
|
||||
async def delete_picture_by_url(
|
||||
picture_url: str,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.delete_picture_by_url(conn, picture_url)
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Not allowed",
|
||||
)
|
||||
|
||||
@pictures_router.post("/delete/id")
|
||||
async def delete_picture_by_id(
|
||||
@pictures_router.post("/delete")
|
||||
async def delete_picture(
|
||||
picture_id: int,
|
||||
conn: Annotated[connection, Depends(get_db_connection)],
|
||||
current_user: Annotated[User, Depends(get_current_user)]
|
||||
):
|
||||
if current_user.role in settings.settings.admin_roles:
|
||||
return db.delete_picture_by_id(conn, picture_id)
|
||||
return db.delete_picture(conn, picture_id)
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
|
||||
@ -3,8 +3,6 @@ from typing import Annotated
|
||||
|
||||
import bcrypt
|
||||
import jwt
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jwt.exceptions import InvalidTokenError
|
||||
@ -19,17 +17,25 @@ from db.internal import get_db_connection
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
|
||||
def encrypt_str(string):
|
||||
key = bytes.fromhex(startup_settings.secret_key)
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
encrypted_string = cipher.encrypt(pad(string.encode(), AES.block_size))
|
||||
return encrypted_string
|
||||
def encode_str(string) -> str:
|
||||
key = startup_settings.secret_key
|
||||
encoded_chars = []
|
||||
for i in range(len(string)):
|
||||
key_c = key[i % len(key)]
|
||||
encoded_c = chr(ord(string[i]) + ord(key_c) % 256)
|
||||
encoded_chars.append(encoded_c)
|
||||
encoded_string = ''.join(encoded_chars)
|
||||
return encoded_string
|
||||
|
||||
def decrypt_str(encrypted_string):
|
||||
key = bytes.fromhex(startup_settings.secret_key)
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
string = unpad(cipher.decrypt(encrypted_string), AES.block_size)
|
||||
return string.decode()
|
||||
def decode_str(encoded_string) -> str:
|
||||
key = startup_settings.secret_key
|
||||
encoded_chars = []
|
||||
for i in range(len(encoded_string)):
|
||||
key_c = key[i % len(key)]
|
||||
encoded_c = chr((ord(encoded_string[i]) - ord(key_c) + 256) % 256)
|
||||
encoded_chars.append(encoded_c)
|
||||
decoded_string = ''.join(encoded_chars)
|
||||
return decoded_string
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str):
|
||||
|
||||
@ -3,13 +3,15 @@ import json
|
||||
import psycopg2.extras
|
||||
from psycopg2._psycopg import connection
|
||||
|
||||
from api.models import Account
|
||||
|
||||
# account create and delete
|
||||
|
||||
def create_account(
|
||||
conn: connection,
|
||||
platform: str,
|
||||
login: str,
|
||||
password: int,
|
||||
password: str,
|
||||
metadata: dict
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
@ -44,28 +46,10 @@ def delete_account(
|
||||
|
||||
# account checks
|
||||
|
||||
def check_account_existence_by_id(
|
||||
conn: connection,
|
||||
account_id: str
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select exists(
|
||||
select 1
|
||||
from picrinth.accounts
|
||||
where account_id = %s
|
||||
);
|
||||
""",
|
||||
(account_id),
|
||||
)
|
||||
return cur.fetchone()[0] # type: ignore
|
||||
|
||||
def check_account_existence(
|
||||
conn: connection,
|
||||
platform: str,
|
||||
login: str,
|
||||
password: str
|
||||
groupname: str,
|
||||
platform: str
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
@ -73,10 +57,10 @@ def check_account_existence(
|
||||
select exists(
|
||||
select 1
|
||||
from picrinth.accounts
|
||||
where platform = %s, login = %s, password = %s
|
||||
where groupname = %s and platform = %s
|
||||
);
|
||||
""",
|
||||
(platform, login, password),
|
||||
(groupname, platform),
|
||||
)
|
||||
return cur.fetchone()[0] # type: ignore
|
||||
|
||||
@ -85,10 +69,11 @@ def check_account_existence(
|
||||
|
||||
def update_account(
|
||||
conn: connection,
|
||||
account_id: str,
|
||||
groupname: str,
|
||||
author: str,
|
||||
platform: str,
|
||||
login: str,
|
||||
password: int,
|
||||
password: str,
|
||||
metadata: dict
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
@ -96,12 +81,13 @@ def update_account(
|
||||
"""
|
||||
update picrinth.accounts
|
||||
SET platform = %s,
|
||||
author = %s,
|
||||
login = %s,
|
||||
password = %s,
|
||||
metadata = %s
|
||||
where account_id = %s
|
||||
where groupname = %s
|
||||
""",
|
||||
(platform, login, password, json.dumps(metadata), account_id),
|
||||
(platform, author, login, password, json.dumps(metadata), groupname),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount > 0
|
||||
@ -109,19 +95,37 @@ def update_account(
|
||||
|
||||
# account receiving
|
||||
|
||||
def get_account(
|
||||
# TODO: fix list comprehension
|
||||
def get_accounts_by_group(
|
||||
conn: connection,
|
||||
account_id: str
|
||||
groupname: str
|
||||
) -> list[Account]:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select *
|
||||
from picrinth.accounts
|
||||
where groupname = %s
|
||||
""",
|
||||
(groupname,),
|
||||
)
|
||||
return [Account().fill(account_data) for (account_data,) in cur.fetchall()]
|
||||
|
||||
|
||||
def get_accounts_by_group_platform(
|
||||
conn: connection,
|
||||
groupname: str,
|
||||
platform: str
|
||||
):
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select username,
|
||||
account_id, platform, login,
|
||||
password, metadata, created_at
|
||||
select groupname, author,
|
||||
platform, login, password,
|
||||
metadata, created_at
|
||||
from picrinth.accounts
|
||||
where account_id = %s
|
||||
where groupname = %s and platform = %s
|
||||
""",
|
||||
(account_id),
|
||||
(groupname, platform),
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
@ -79,6 +79,25 @@ def check_invite_code(
|
||||
return cur.fetchone()[0] # type: ignore
|
||||
|
||||
|
||||
def check_group_author(
|
||||
conn: connection,
|
||||
groupname: str,
|
||||
author: str
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select exists(
|
||||
select 1
|
||||
from picrinth.groups
|
||||
where groupname = %s and author = %s
|
||||
);
|
||||
""",
|
||||
(groupname, author),
|
||||
)
|
||||
return cur.fetchone()[0] # type: ignore
|
||||
|
||||
|
||||
# group updates
|
||||
|
||||
def update_group_groupname(
|
||||
|
||||
@ -29,22 +29,7 @@ def create_picture(
|
||||
return result[0]
|
||||
|
||||
|
||||
def delete_picture_by_url(
|
||||
conn: connection,
|
||||
url: str
|
||||
):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
delete from picrinth.pictures
|
||||
where url = %s
|
||||
""",
|
||||
(url,),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount > 0
|
||||
|
||||
def delete_picture_by_id(
|
||||
def delete_picture(
|
||||
conn: connection,
|
||||
id: int
|
||||
):
|
||||
@ -62,26 +47,9 @@ def delete_picture_by_id(
|
||||
|
||||
# picture receiving
|
||||
|
||||
def get_picture_by_url(
|
||||
def get_picture(
|
||||
conn: connection,
|
||||
url: str
|
||||
):
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select id, source,
|
||||
external_id, url,
|
||||
metadata, created_at
|
||||
from picrinth.pictures
|
||||
where url = %s
|
||||
""",
|
||||
(url,),
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
def get_picture_by_id(
|
||||
conn: connection,
|
||||
id: str
|
||||
id: int
|
||||
):
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
cur.execute(
|
||||
|
||||
@ -11,7 +11,7 @@ from settings.consts import SUPPORTED_PLATFORMS
|
||||
def generate_feed(
|
||||
conn: connection,
|
||||
accounts: list[Account]
|
||||
):
|
||||
) -> list | Exception:
|
||||
for account in accounts:
|
||||
if account.platform not in SUPPORTED_PLATFORMS:
|
||||
raise Exception
|
||||
@ -50,10 +50,10 @@ def gelbooru():
|
||||
|
||||
def get_picture(
|
||||
conn: connection,
|
||||
picture_id: str
|
||||
picture_id: int
|
||||
) -> int:
|
||||
picture = Picture()
|
||||
picture_data = db.get_picture_by_id(conn, picture_id)
|
||||
picture_data = db.get_picture(conn, picture_id)
|
||||
if picture_id is None:
|
||||
return -1
|
||||
picture.fill(picture_data)
|
||||
|
||||
Reference in New Issue
Block a user