skill-forge/API/app/infrastructure/notifications_service.py

86 lines
3.6 KiB
Python

from typing import Optional
from sqlalchemy.orm import Session
from app.application.notifications_repository import NotificationsRepository
from app.domain.entities.notification_entity import NotificationEntity
class NotificationsService:
def __init__(self, db: Session):
self.notifications_repository = NotificationsRepository(db)
def get_all(self) -> list[NotificationEntity]:
notifications = self.notifications_repository.get_all()
return [
NotificationEntity(
id=notification.id,
text=notification.text,
datetime_notification=notification.datetime_notification,
is_read=notification.is_read,
user_id=notification.user_id,
type_id=notification.type_id,
course_id=notification.course_id,
) for notification in notifications
]
def get_by_id(self, notification_id: int) -> Optional[NotificationEntity]:
notification = self.notifications_repository.get_by_id(notification_id)
if notification:
return NotificationEntity(
id=notification.id,
text=notification.text,
datetime_notification=notification.datetime_notification,
is_read=notification.is_read,
user_id=notification.user_id,
type_id=notification.type_id,
course_id=notification.course_id,
)
return None
def create(self, notification: NotificationEntity) -> NotificationEntity:
notification_model = Notification(
text=notification.text,
datetime_notification=notification.datetime_notification,
is_read=notification.is_read,
user_id=notification.user_id,
type_id=notification.type_id,
course_id=notification.course_id,
)
created_notification = self.notifications_repository.create(notification_model)
return NotificationEntity(
id=created_notification.id,
text=created_notification.text,
datetime_notification=created_notification.datetime_notification,
is_read=created_notification.is_read,
user_id=created_notification.user_id,
type_id=created_notification.type_id,
course_id=created_notification.course_id,
)
def update(self, notification_id: int, notification: NotificationEntity) -> Optional[NotificationEntity]:
notification_model = self.notifications_repository.get_by_id(notification_id)
if notification_model:
notification_model.text = notification.text
notification_model.datetime_notification = notification.datetime_notification
notification_model.is_read = notification.is_read
notification_model.user_id = notification.user_id
notification_model.type_id = notification.type_id
notification_model.course_id = notification.course_id
updated_notification = self.notifications_repository.update(notification_model)
return NotificationEntity(
id=updated_notification.id,
text=updated_notification.text,
datetime_notification=updated_notification.datetime_notification,
is_read=updated_notification.is_read,
user_id=updated_notification.user_id,
type_id=updated_notification.type_id,
course_id=updated_notification.course_id,
)
return None
def delete(self, notification_id: int) -> bool:
return self.notifications_repository.delete(notification_id) is not None