prb_1/API/main.py
2025-01-28 19:12:54 +05:00

366 lines
12 KiB
Python

import datetime
import pprint
import random
import feedparser
import jwt
from sqlalchemy import desc
from data.connect import init_db, connect, User, Document, DocumentCategory, Comment, Event
from flask import Flask, Response, request, jsonify
from flask_cors import CORS
app = Flask(__name__)
app.config['SECRET_KEY'] = 'jyeraghueykgaeyugheaughkawefy'
CORS(app)
@app.route('/api/v1/SignIn', methods=['POST'])
def login():
try:
data = request.json
with connect() as session:
user = session.query(User).filter(User.email == data['name'], User.password == data['password']).first()
if user is None:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1304
}, 403
token = jwt.encode({
'sub': user.email,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=180)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
except Exception:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильно сформированный запрос',
'errorCode': 1400
}, 400
@app.route('/api/v1/Documents', methods=['GET'])
def get_documents():
try:
token = request.headers.get('Authorization')
if not token:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1304
}, 403
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 401
except jwt.InvalidTokenError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 403
resp = []
with connect() as session:
documents = session.query(Document).all()
for document in documents:
# category = session.query(DocumentCategory).filter(DocumentCategory.id == document.category_id).first()
# comments = session.query(Comment).filter(Comment.document_id == document.id).all()
category = document.category
comments = document.comments
resp.append({
'id': document.id,
'title': document.title,
'date_created': document.date_created,
'date_updated': document.date_updated,
'category': document.category.title,
'has_comments': True if len(document.comments) else False
})
return resp
except Exception:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильно сформированный запрос',
'errorCode': 1400
}, 400
@app.route('/api/v1/Document/<int:documentId>/Comments', methods=['GET'])
def get_comments(documentId):
try:
token = request.headers.get('Authorization')
if not token:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1304
}, 403
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 401
except jwt.InvalidTokenError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 403
resp = []
with connect() as session:
comments = session.query(Comment).filter(Comment.document_id == documentId).all()
if not comments:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Документ не найден',
'errorCode': 1404
}, 404
for comment in comments:
resp.append({
'id': comment.id,
'document_id': comment.document_id,
'text': comment.text,
'date_created': comment.date_created,
'date_updated': comment.date_updated,
'author': {
'name': f'{comment.user.last_name} {comment.user.first_name}',
'position': comment.user.post.title,
}
})
return resp
except Exception:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильно сформированный запрос',
'errorCode': 1400
}, 400
@app.route('/api/v1/Document/<int:documentId>/Comments', methods=['POST'])
def create_comment(documentId):
try:
token = request.headers.get('Authorization')
if not token:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1304
}, 403
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 401
except jwt.InvalidTokenError:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильные авторизационные даныне',
'errorCode': 1401
}, 403
resp = []
comment_data = request.json
if not comment_data:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильно сформированный запрос',
'errorCode': 1400
}, 400
with connect() as session:
document = session.query(Document).filter(Document.id == documentId).first()
if document is None:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Документ не найден',
'errorCode': 1404
}, 404
user = session.query(User).filter(User.id == comment_data['user_id']).first()
if user is None:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Пользователь не найден',
'errorCode': 1404
}, 404
comment = Comment(
text=comment_data['text'],
document_id=document.id,
date_created=datetime.datetime.now(),
date_updated=datetime.datetime.now(),
user_id=user.id,
)
session.add(comment)
session.commit()
return Response(status=200)
except Exception:
return {
'timestamp': int(datetime.datetime.now().timestamp()),
'message': 'Неправильно сформированный запрос',
'errorCode': 1400
}, 400
@app.route('/protected', methods=['GET'])
def protected():
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing!'}), 403
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return jsonify({'message': f'Welcome {data["sub"]}!'})
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token has expired!'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token!'}), 401
@app.route('/employees')
def get_employee_list():
resp = []
months = {
1: 'января',
2: 'февраля',
3: 'марта',
4: 'апреля',
5: 'мая',
6: 'июня',
7: 'июля',
8: 'августа',
9: 'сентября',
10: 'октября',
11: 'ноября',
12: 'декабря'
}
with connect() as session:
employees = session.query(User).all()
for employee in employees:
resp.append(
{
'id': employee.id,
'first_name': employee.first_name,
'last_name': employee.last_name,
'patronymic': employee.patronymic,
'email': employee.email,
'phone': employee.work_phone,
'post': employee.post.title,
'birthday': f'{str(employee.birthday.day)} {months[employee.birthday.month]}'
}
)
return resp
@app.route("/rss")
def get_rss_feed():
feed = feedparser.parse('https://naukatv.ru/rss')
news_items = []
for entry in feed.entries:
news_items.append({
"title": entry.title,
"link": entry.link,
"description": entry.get("description", "Без описания"),
"pubDate": entry.get("published", "Нет даты"),
"image": entry.links[1].href,
})
return news_items
@app.route('/events')
def get_events():
resp = []
with connect() as session:
events = session.query(Event).order_by(desc(Event.datetime_event)).all()
users = session.query(User).all()
for event in events:
user = random.choice(users)
resp.append({
'title': event.title,
'date': str(event.datetime_event.date()),
'author': f'{user.last_name} {user.first_name[0]}. {user.patronymic[0]}.',
'description': event.title,
})
return resp
@app.route('/users_birthdays')
def get_users_birthday():
resp = []
with connect() as session:
employees = session.query(User).all()
for employee in employees:
resp.append(
{
'id': employee.id,
'first_name': employee.first_name,
'last_name': employee.last_name,
'patronymic': employee.patronymic,
'email': employee.email,
'phone': employee.work_phone,
'post': employee.post.title,
'birthday': str(employee.birthday)
}
)
return resp
def main():
init_db()
app.run('0.0.0.0')
if __name__ == '__main__':
main()