сделал авторизацию, регистрацию и проверку токена

This commit is contained in:
Андрей Дувакин 2025-02-09 17:24:14 +05:00
parent 9f96d15567
commit 17b6446051
9 changed files with 219 additions and 13 deletions

View File

@ -0,0 +1,20 @@
from typing import List
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.application.answer_files_repository import AnswerFilesRepository
from app.database.dependencies import get_db
from app.domain.entities.answer_files_entitity import AnswerFileEntity
from app.infrastructure.dependencies import get_current_user
router = APIRouter()
@router.get("/answer_files/", response_model=List[AnswerFileEntity])
def get_answer_files(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
answer_files_service = AnswerFilesRepository(db)
return answer_files_service.get_all()

View File

@ -0,0 +1,27 @@
from fastapi import APIRouter, HTTPException
from fastapi.params import Depends
from sqlalchemy.orm import Session
from app.database.dependencies import get_db
from app.domain.entities.auth_entity import AuthEntity
from app.infrastructure.auth_service import AuthService
router = APIRouter()
@router.get("/login/", response_model=dict)
def login(
auth_data: AuthEntity,
db: Session = Depends(get_db)
):
auth_service = AuthService(db)
token = auth_service.authenticate(auth_data.login, auth_data.password)
if token is None:
raise HTTPException(
status_code=401,
detail="Incorrect username or password"
)
return token

View File

@ -0,0 +1,20 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database.dependencies import get_db
from app.domain.entities.register_entity import RegisterEntity
from app.domain.entities.users_entity import UserEntity
from app.infrastructure.users_service import UsersService
router = APIRouter()
@router.post("/register/", response_model=UserEntity)
def register(
register_data: RegisterEntity,
db: Session = Depends(get_db)
):
users_service = UsersService(db)
user = users_service.create(register_data)
return user

View File

@ -0,0 +1,9 @@
from pydantic import BaseModel
class AuthEntity(BaseModel):
login: str
password: str
class Config:
from_attributes = True

View File

@ -0,0 +1,20 @@
import datetime
from typing import Optional
from pydantic import BaseModel
class RegisterEntity(BaseModel):
first_name: str
last_name: str
patronymic: Optional[str] = None
gender: str
birthday: datetime.date
login: str
password: str
email: str
role_id: int
class Config:
from_attributes = True

View File

@ -0,0 +1,34 @@
import datetime
from typing import Optional
import jwt
from sqlalchemy.orm import Session
from app.application.users_repository import UsersRepository
from app.settings import get_auth_data
class AuthService:
def __init__(self, db: Session):
self.users_repository = UsersRepository(db)
def authenticate(self, login: str, password: str) -> Optional[dict]:
user = self.users_repository.get_by_login(login)
if user and user.check_password(password):
access_token = self.create_access_token({"user_id": user.id})
return {
"access_token": access_token,
"user_id": user.id
}
return None
@staticmethod
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=30)
to_encode.update({"exp": expire})
auth_data = get_auth_data()
encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
return encode_jwt

View File

@ -0,0 +1,36 @@
import jwt
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.application.users_repository import UsersRepository
from app.database.dependencies import get_db
from app.settings import get_auth_data
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: Session = Depends(get_db)
):
token = credentials.credentials
auth_data = get_auth_data()
try:
payload = jwt.decode(token, auth_data["secret_key"], algorithms=[auth_data["algorithm"]])
user_id = payload.get("user_id")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
user = UsersRepository(db).get_by_id(user_id)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")

View File

@ -1,15 +1,20 @@
import datetime
from typing import Optional
from fastapi import HTTPException
from sqlalchemy.orm import Session
from app.application.roles_repository import RolesRepository
from app.application.users_repository import UsersRepository
from app.domain.entities.register_entity import RegisterEntity
from app.domain.entities.users_entity import UserEntity
from app.domain.models.users import User
from app.domain.models.users import User, UserGenderEnum
class UsersService:
def __init__(self, db: Session):
self.users_repository = UsersRepository(db)
self.roles_repository = RolesRepository(db)
def get_all(self) -> list[UserEntity]:
users = self.users_repository.get_all()
@ -64,12 +69,33 @@ class UsersService:
return None
def create(self, user: UserEntity) -> UserEntity:
def create(self, user: RegisterEntity) -> UserEntity:
if not user.gender in ['мужской', 'женский', 'не указан']:
raise HTTPException(
status_code=400,
detail="Gender must be 'мужской', 'женский' or 'не указан'"
)
gender = UserGenderEnum(user.gender)
if user.birthday > datetime.date.today():
raise HTTPException(
status_code=400,
detail="Birthday must be in the past"
)
role = self.roles_repository.get_by_id(user.role_id)
if not role:
raise HTTPException(
status_code=400,
detail="Role not found"
)
user_model = User(
first_name=user.first_name,
last_name=user.last_name,
patronymic=user.patronymic,
gender=user.gender,
gender=gender,
birthday=user.birthday,
registration_date=user.registration_date,
login=user.login,

View File

@ -1,19 +1,33 @@
import logging
from app.database.database import init_db
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from app.controllers.answer_files_entity import router as answer_files_router
from app.controllers.auth_router import router as auth_router
from app.controllers.register_router import router as register_router
from app.settings import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['GET', 'POST', 'PUT', 'DELETE'],
allow_headers=['*'],
)
init_db()
def start_app():
api_app = FastAPI()
api_app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['GET', 'POST', 'PUT', 'DELETE'],
allow_headers=['*'],
)
api_app.include_router(auth_router, prefix=settings.APP_PREFIX, tags=['auth'])
api_app.include_router(register_router, prefix=settings.APP_PREFIX, tags=['register'])
api_app.include_router(answer_files_router, prefix=settings.APP_PREFIX, tags=['answer_files'])
return api_app
app = start_app()