Incepted/main.py

448 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import os
from flask import Flask, render_template, request, url_for
from flask_login import login_user, current_user, LoginManager, logout_user, login_required
from flask_wtf import CSRFProtect
from flask_restful import abort
from werkzeug.datastructures import CombinedMultiDict
from werkzeug.utils import redirect
from itsdangerous import URLSafeTimedSerializer, SignatureExpired
from sqlalchemy import or_
from functions import check_password, mail, init_db_default, get_projects_data, get_user_data, save_project_logo
from forms.edit_profile import EditProfileForm
from forms.login import LoginForm
from forms.find_project import FindProjectForm
from forms.register import RegisterForm
from forms.project import ProjectForm
from forms.recovery import RecoveryForm, NewPasswordForm
from forms.conf_delete_project import DeleteProjectForm
from data.users import User
from data.quests import Quests
from data.files import Files
from data.projects import Projects
from data.staff_projects import StaffProjects
from waitress import serve
from data import db_session
app = Flask(__name__)
key = 'test_secret_key'
app.config['SECRET_KEY'] = key
csrf = CSRFProtect(app)
s = URLSafeTimedSerializer(key)
login_manager = LoginManager()
login_manager.init_app(app)
@app.route('/')
def base():
if not current_user.is_authenticated:
return render_template('main.html', title='Главная')
else:
return redirect('/projects')
@app.route('/project/<int:id_project>/edit', methods=['GET', 'POST'])
def edit_project(id_project):
if current_user.is_authenticated:
data_session = db_session.create_session()
current_project = data_session.query(Projects).filter(Projects.id == id_project).first()
if current_project:
staff = data_session.query(StaffProjects).filter(StaffProjects.project == current_project.id).all()
if current_user.id == current_project.creator or current_user.id in list(map(lambda x: x.user, staff)):
list_users = list(
map(lambda x: get_user_data(x), data_session.query(User).filter(User.id != current_user.id).all()))
staff = list(map(lambda x: get_user_data(x), data_session.query(User).filter(
User.id.in_(list(map(lambda x: x.user, staff)))).all())) if staff else []
form = ProjectForm()
if form.save.data:
new_staff = []
for i in list_users:
if request.form.getlist(f"choose_{i['login']}") and i['id'] != current_user.id:
new_staff.append(i)
if i not in staff:
new_staffer = StaffProjects(
user=i['id'],
project=current_project.id,
role='user',
permission=3
)
data_session.add(new_staffer)
data_session.commit()
if sorted(new_staff, key=lambda x: x['id']) != sorted(staff, key=lambda x: x['id']):
for i in staff:
if i not in new_staff:
data_session.delete(data_session.query(StaffProjects).filter(
StaffProjects.user == i['id'], StaffProjects.project == current_project.id).first())
data_session.commit()
if form.logo.data:
current_project.photo = save_project_logo(form.logo.data)
data_session.commit()
current_project.name = form.name.data
current_project.description = form.description.data
data_session.commit()
return redirect(f'/project/{current_project.id}/edit')
if form.del_photo.data:
os.remove(current_project.photo)
current_project.photo = 'static/images/none_project.png'
data_session.commit()
return redirect(f'/project/{current_project.id}/edit')
form.name.data = current_project.name
form.description.data = current_project.description
return render_template('edit_project.html', title='Изменение проекта', form=form, list_users=list_users,
staff=staff, project=current_project)
else:
abort(403)
else:
abort(404)
else:
return redirect('/login')
@app.route('/project/<int:id_project>')
def project(id_project):
if current_user.is_authenticated:
data_session = db_session.create_session()
current_project = data_session.query(Projects).filter(Projects.id == id_project).first()
if current_project:
staff = data_session.query(StaffProjects).filter(StaffProjects.project == current_project.id).all()
if current_user.id == current_project.creator or current_user.id in list(map(lambda x: x.user, staff)):
return render_template('project.html', project=current_project, title=current_project.name)
else:
abort(403)
else:
abort(404)
else:
return redirect('/login')
@app.route('/recovery/confirmation/<token>', methods=['GET', 'POST'])
def conf_recovery(token):
try:
user_email = s.loads(token, max_age=86400)
data_session = db_session.create_session()
user = data_session.query(User).filter(User.email == user_email).first()
if user:
form = NewPasswordForm()
if form.validate_on_submit():
if form.password.data != form.repeat_password.data:
return render_template('recovery.html', title='Восстановление', form=form, recovery=0,
message='Пароли не совпадают')
status_password = check_password(form.password.data)
if status_password != 'OK':
return render_template('recovery.html', title='Восстановление', form=form, recovery=0,
message=str(status_password))
user.set_password(form.password.data)
data_session.commit()
mail(f'Для аккаунта {user.login}, успешно был обновлен пароль', user.email,
'Изменение пароля')
return redirect('/login?message=Пароль обновлен')
return render_template('recovery.html', title='Восстановление', form=form, recovery=0, message='')
else:
return redirect('/login?message=Пользователь не найден&danger=True')
except SignatureExpired:
return redirect('/login?message=Срок действия ссылки истек&danger=True')
@app.route('/recovery', methods=['GET', 'POST'])
def recovery():
if not current_user.is_authenticated:
form = RecoveryForm()
if form.validate_on_submit():
token = s.dumps(form.email.data)
link_conf = url_for('conf_recovery', token=token, _external=True)
mail(f'Для сбросы пароля пройдите по ссылке: {link_conf}', form.email.data,
'Восстановление доступа')
return redirect('/login?message=Мы выслали ссылку для сброса вам на почту')
return render_template('recovery.html', title='Восстановление пароля', form=form, recovery=True, message='')
else:
return redirect('/')
@app.route('/project/<int:id_project>/delete', methods=['GET', 'POST'])
def delete_project(id_project):
if current_user.is_authenticated:
data_session = db_session.create_session()
project_del = data_session.query(Projects).filter(Projects.id == id_project).first()
if project_del:
if project_del.creator == current_user.id:
form = DeleteProjectForm()
if form.validate_on_submit():
if form.conf.data != f'delete/{project_del.name}':
return render_template('delete_project.html', title='Удаление проекта', form=form,
project=project_del,
message='Вы не правильно ввели фразу')
staff = data_session.query(StaffProjects).filter(StaffProjects.project == id_project).all()
for i in staff:
data_session.delete(i)
if 'none_project' not in project_del.photo:
os.remove(project_del.photo)
data_session.delete(project_del)
data_session.commit()
return redirect('/projects')
return render_template('delete_project.html', title='Удаление проекта', form=form, project=project_del,
message='')
else:
abort(403)
else:
abort(404)
else:
return redirect('/login')
@app.route('/user/<string:_login>', methods=['GET', 'POST'])
def user_view(_login):
if current_user.is_authenticated:
data_session = db_session.create_session()
user = data_session.query(User).filter(User.login == _login).first()
if user:
current_projects = data_session.query(Projects).filter(or_(Projects.creator == user.id, Projects.id.in_(
list(map(lambda x: x[0], data_session.query(
StaffProjects.project).filter(
StaffProjects.user == user.id).all()))))).all()
resp = list(map(lambda x: get_projects_data(x), current_projects))
return render_template('user_view.html', title=user.name + ' ' + user.surname, user=user,
list_projects=resp)
else:
abort(404)
else:
return redirect('/login')
@app.route('/projects/new', methods=['GET', 'POST'])
def new_project():
if current_user.is_authenticated:
form = ProjectForm()
data_session = db_session.create_session()
list_users = list(
map(lambda x: get_user_data(x), data_session.query(User).filter(User.id != current_user.id).all()))
if form.validate_on_submit():
currnet_project = Projects(
name=form.name.data,
description=form.description.data,
date_create=datetime.datetime.now(),
creator=current_user.id
)
currnet_project.photo = save_project_logo(
form.logo.data) if form.logo.data else 'static/images/none_project.png'
data_session.add(currnet_project)
data_session.flush()
data_session.refresh(currnet_project)
for i in list_users:
if request.form.getlist(f"choose_{i['login']}") and i['id'] != current_user.id:
new_staffer = StaffProjects(
user=i['id'],
project=currnet_project.id,
role='user',
permission=3
)
data_session.add(new_staffer)
data_session.commit()
return redirect('/projects')
return render_template('new_project.html', title='Новый проект', form=form, list_users=list_users)
else:
return redirect('/login')
@app.route('/projects', methods=['GET', 'POST'])
def projects():
if current_user.is_authenticated:
find = False
form = FindProjectForm()
data_session = db_session.create_session()
resp = []
current_projects = \
data_session.query(Projects).filter(or_(Projects.creator == current_user.id,
Projects.id.in_(
list(map(lambda x: x[0],
data_session.query(
StaffProjects.project).filter(
StaffProjects.user
== current_user.id).all()))))).all()
if form.validate_on_submit():
new_resp = []
for i in range(len(current_projects)):
if str(form.project.data).lower().strip() in str(current_projects[i].name).lower().strip():
new_resp.append(current_projects[i])
current_projects = new_resp
find = True
resp = list(map(lambda x: get_projects_data(x), current_projects))
return render_template('projects.html', title='Проекты', list_projects=resp, form=form, find=find)
else:
return redirect('/login')
@app.route('/profile', methods=['GET', 'POST'])
def profile():
if current_user.is_authenticated:
form = EditProfileForm(
CombinedMultiDict((request.files, request.form)),
email=current_user.email,
name=current_user.name,
surname=current_user.surname,
about=current_user.about,
birthday=current_user.birthday
)
if form.del_photo.data:
data_session = db_session.create_session()
user = data_session.query(User).filter(User.id == current_user.id).first()
if not user:
return render_template('profile.html', title='Профиль', form=form,
message='Ошибка, пользователь ненайден')
os.remove(current_user.photo)
user.photo = 'static/images/none_logo.png'
data_session.commit()
if form.validate_on_submit():
data_session = db_session.create_session()
user = data_session.query(User).filter(User.id == current_user.id).first()
if not user:
return render_template('profile.html', title='Профиль', form=form,
message='Ошибка, пользователь ненайден')
if form.email.data != current_user.email:
pass
if form.photo.data:
with open(f'static/app_files/user_logo/{current_user.login}.png', 'wb') as file:
form.photo.data.save(file)
user.photo = f'static/app_files/user_logo/{current_user.login}.png'
user.name = form.name.data
user.surname = form.surname.data
user.about = form.about.data
user.birthday = form.birthday.data
data_session.commit()
return redirect('/profile')
return render_template('profile.html', title='Профиль', form=form, message='')
else:
return redirect('/login')
@login_manager.user_loader
def load_user(user_id):
db_sess = db_session.create_session()
return db_sess.query(User).get(user_id)
@app.route('/login', methods=['GET', 'POST'])
def login():
if not current_user.is_authenticated:
message = request.args.get('message') if request.args.get('message') else ''
danger = request.args.get('danger') if request.args.get('danger') else False
form = LoginForm()
if form.validate_on_submit():
data_session = db_session.create_session()
user = data_session.query(User).filter(User.email == form.login.data).first()
if not user:
user = data_session.query(User).filter(User.login == form.login.data).first()
if user and user.check_password(form.password.data):
if user.activated:
login_user(user, remember=form.remember_me.data)
return redirect('/projects')
else:
return render_template('login.html',
message="Ваша почта не подтверждена",
danger=True,
form=form)
return render_template('login.html',
message="Неправильный логин или пароль",
danger=True,
form=form)
return render_template('login.html', title='Авторизация', form=form, message=message,
danger=danger)
else:
return redirect('/projects')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect("/")
@app.route('/register', methods=['GET', 'POST'])
def register():
if not current_user.is_authenticated:
form = RegisterForm()
if form.validate_on_submit():
data_session = db_session.create_session()
if data_session.query(User).filter(User.login == form.login.data).first():
return render_template('register.html', form=form, message="Такой пользователь уже есть",
title='Регистрация')
if data_session.query(User).filter(User.email == form.email.data).first():
return render_template('register.html', form=form, message="Такая почта уже есть", title='Регистрация')
status_password = check_password(form.password.data)
if status_password != 'OK':
return render_template('register.html', form=form, message=status_password, title='Регистрация')
user = User(
email=form.email.data,
name=form.name.data,
login=form.login.data,
activity=datetime.datetime.now(),
data_reg=datetime.date.today(),
photo='static/images/none_logo.png',
role=1
)
user.set_password(form.password.data)
data_session.add(user)
data_session.commit()
token = s.dumps(form.email.data)
link_conf = url_for('confirmation', token=token, _external=True)
mail(f'Для завершения регистрации пройдите по ссылке: {link_conf}', form.email.data,
'Подтверждение регистрации')
return redirect('/login?message=Мы выслали ссылку для подтверждения почты')
return render_template('register.html', form=form, message='', title='Регистрация')
else:
return redirect('/projects')
@app.route('/confirmation/<token>')
def confirmation(token):
try:
user_email = s.loads(token, max_age=86400)
data_session = db_session.create_session()
user = data_session.query(User).filter(User.email == user_email).first()
if user:
user.activated = True
data_session.commit()
return redirect('/login?message=Почта успешно подтверждена')
else:
return redirect('/login?message=Пользователь не найден&danger=True')
except SignatureExpired:
data_session = db_session.create_session()
users = data_session.query(User).filter(
User.activated == 0 and User.activated < datetime.datetime.now() - datetime.timedelta(days=1)).all()
if users:
list(map(lambda x: data_session.delete(x), users))
data_session.commit()
return redirect('/login?message=Срок действия ссылки истек, данные удалены&danger=True')
@app.errorhandler(500)
def internal_server_error(error):
return render_template('page_error.html', title='Ошибка сервера', error='500', message='Технические шоколадки')
@app.errorhandler(404)
def page_not_found(error):
return render_template('page_error.html', title='Страница не найдена', error='404', message='Страница не найдена')
@app.errorhandler(403)
def access_error(error):
return render_template('page_error.html', title='Ошибка доступа', error='403', message='Доступ сюда запрещен')
def main():
db_path = 'db/incepted.db'
db = os.path.exists(db_path)
db_session.global_init(db_path)
if not db:
init_db_default()
serve(app, host='0.0.0.0', port=5000)
if __name__ == '__main__':
main()