268 lines
9.4 KiB
Python
268 lines
9.4 KiB
Python
import datetime
|
|
import os
|
|
import shutil
|
|
import smtplib
|
|
from json import loads
|
|
from email.message import EmailMessage
|
|
|
|
from data.answer import Answer
|
|
from data.proof_file import FileProof
|
|
from data.quests import Quests
|
|
from data.roles import Roles
|
|
from data.users import User
|
|
from data.staff_projects import StaffProjects
|
|
from data.files import Files
|
|
from data import db_session
|
|
|
|
import uuid
|
|
import pymorphy2
|
|
|
|
|
|
def check_password(password=''):
|
|
smb = 'qwertyuiopasdfghjklzxcvbnm'
|
|
if len(password) < 6:
|
|
return 'Пароль должен быть длиннее 6 символов'
|
|
elif False in [True if i.isalpha() and i.lower() in smb or i.isdigit() else False for i in password]:
|
|
return 'Пароль может содержать только буквы латинского алфавита и цифры'
|
|
elif True not in [True if i.isdigit() else False for i in password]:
|
|
return 'Пароль должен содержать буквы разного регистра и цифры'
|
|
elif False not in [True if i.islower() and i.isalpha() else False for i in password]:
|
|
return 'Пароль должен содержать буквы разного регистра и цифры'
|
|
else:
|
|
return 'OK'
|
|
|
|
|
|
def mail(msg, to, topic='Подтверждение почты'):
|
|
with open('../incepted.config', 'r', encoding='utf-8') as file:
|
|
file = loads(file.read())
|
|
login, password = file["mail_login"], file["mail_password"]
|
|
email_server = "smtp.yandex.ru"
|
|
sender = "incepted@yandex.ru"
|
|
em = EmailMessage()
|
|
em.set_content(msg)
|
|
em['To'] = to
|
|
em['From'] = sender
|
|
em['Subject'] = topic
|
|
mailServer = smtplib.SMTP(email_server)
|
|
mailServer.set_debuglevel(1)
|
|
mailServer.ehlo()
|
|
mailServer.starttls()
|
|
mailServer.ehlo()
|
|
mailServer.login(login, password)
|
|
mailServer.ehlo()
|
|
mailServer.send_message(em)
|
|
mailServer.quit()
|
|
|
|
|
|
def init_db_default():
|
|
data_session = db_session.create_session()
|
|
roles = [['admin', 90], ['moderator', 75], ['counselor', 45], ['user', 0]]
|
|
for i in roles:
|
|
role = Roles(
|
|
name=i[0],
|
|
rights=i[1]
|
|
)
|
|
data_session.add(role)
|
|
data_session.commit()
|
|
data_session.close()
|
|
|
|
|
|
def get_user_data(user):
|
|
resp = {
|
|
'id': user.id,
|
|
'name': user.name,
|
|
'surname': user.surname,
|
|
'login': user.login,
|
|
'email': user.email,
|
|
'photo': user.photo,
|
|
'role': user.role
|
|
}
|
|
return resp
|
|
|
|
|
|
def get_projects_data(project):
|
|
data_session = db_session.create_session()
|
|
staff = data_session.query(StaffProjects.user).filter(StaffProjects.project == project.id).all()
|
|
resp = {
|
|
'id': project.id,
|
|
'name': project.name,
|
|
'logo': project.photo,
|
|
'description': project.description,
|
|
'staff': list(map(lambda x: get_user_data(x), data_session.query(User).filter(
|
|
User.id.in_(list(map(lambda x: x[0], staff)))).all())) if staff else []
|
|
}
|
|
resp['staff'].insert(0, get_user_data(data_session.query(User).filter(User.id == project.creator).first()))
|
|
return resp
|
|
|
|
|
|
def save_project_logo(photo):
|
|
filename = f'static/app_files/project_logo/{uuid.uuid4()}.png'
|
|
with open(filename, 'wb') as f:
|
|
photo.save(f)
|
|
return filename
|
|
|
|
|
|
def overdue_quest_project(quest):
|
|
if quest.deadline is None:
|
|
quest.overdue = ''
|
|
elif str(quest.deadline.date()) == str(datetime.datetime.now().date()):
|
|
quest.overdue = 'today'
|
|
elif quest.deadline < datetime.datetime.now():
|
|
quest.overdue = 'yes'
|
|
quest.time_left = 'Просрочено на' + round_date(quest.deadline)
|
|
elif quest.deadline > datetime.datetime.now():
|
|
quest.overdue = 'no'
|
|
quest.time_left = 'Еще есть: ' + round_date(quest.deadline)
|
|
return quest
|
|
|
|
|
|
def round_date(date_time):
|
|
morph = pymorphy2.MorphAnalyzer()
|
|
difference = abs(date_time - datetime.datetime.now()).days
|
|
resp = ''
|
|
if difference // 365:
|
|
resp += f'{difference // 365} {morph.parse("год")[0].make_agree_with_number(difference // 365).word}'
|
|
difference -= 365 * (difference // 365)
|
|
if difference // 30:
|
|
resp += ', ' if resp else ' ' + f'{difference // 30}' \
|
|
f' {morph.parse("месяц")[0].make_agree_with_number(difference // 30).word}'
|
|
difference -= 30 * (difference // 30)
|
|
if difference:
|
|
resp += ', ' if resp else ' ' + f'{difference} {morph.parse("день")[0].make_agree_with_number(difference).word}'
|
|
return f'{resp}'
|
|
|
|
|
|
def save_proof_quest(project, file, user_id):
|
|
data_session = db_session.create_session()
|
|
path = f'static/app_files/all_projects/{str(project.id)}/{str(file.filename)}'
|
|
file_check = data_session.query(Files).filter(Files.path == path).first()
|
|
file.save(path)
|
|
if file_check:
|
|
return file_check.id
|
|
file = Files(
|
|
path=path,
|
|
user=user_id,
|
|
up_date=datetime.datetime.now()
|
|
)
|
|
data_session.add(file)
|
|
data_session.flush()
|
|
data_session.refresh(file)
|
|
file_id = file.id
|
|
data_session.commit()
|
|
data_session.close()
|
|
return file_id
|
|
|
|
|
|
def find_files_answer(file_id):
|
|
data_session = db_session.create_session()
|
|
file = data_session.query(Files).filter(Files.id == file_id).first()
|
|
return {'id': file.id, 'path': file.path, 'user': file.user, 'up_date': file.up_date,
|
|
'current_path': file.path[str(file.path).find('all_projects') + 13:].split('/')}
|
|
|
|
|
|
def file_tree(path):
|
|
tree = []
|
|
data_session = db_session.create_session()
|
|
h = 1
|
|
for i in os.listdir(path):
|
|
if os.path.isfile(f'{path}/{i}'):
|
|
file = data_session.query(Files).filter(Files.path == f'{path}/{i}').first()
|
|
tree.append(
|
|
{
|
|
'path': f'{path}/{i}',
|
|
'type': 'file',
|
|
'object': file if file else None,
|
|
'current_path': f'{path}/{i}'[str(file.path).find('all_projects') + 13:].split('/')
|
|
}
|
|
)
|
|
else:
|
|
tree.append(
|
|
{
|
|
'id': h,
|
|
'name': i,
|
|
'path': f'{path}/{i}',
|
|
'type': 'folder',
|
|
'tree': file_tree(f'{path}/{i}')
|
|
}
|
|
)
|
|
h += 1
|
|
data_session.close()
|
|
return tree
|
|
|
|
|
|
def delete_file_proof_data(file_proof, data_session):
|
|
file = data_session.query(Files).filter(Files.id == file_proof.file).first()
|
|
data_session.delete(file)
|
|
|
|
|
|
def delete_answer_data(answer, data_session):
|
|
file_proofs = data_session.query(FileProof).filter(FileProof.answer == answer.id).all()
|
|
list(map(lambda file: delete_file_proof_data(file, data_session), file_proofs))
|
|
list(map(data_session.delete, file_proofs))
|
|
|
|
|
|
def delete_quest_data(quest, data_session):
|
|
answers = data_session.query(Answer).filter(Answer.quest == quest.id).all()
|
|
list(map(lambda answer: delete_answer_data(answer, data_session), answers))
|
|
list(map(data_session.delete, answers))
|
|
|
|
|
|
def delete_project_data(project, data_session):
|
|
staff = data_session.query(StaffProjects).filter(StaffProjects.project == project.id).all()
|
|
list(map(data_session.delete, staff))
|
|
if 'none_project' not in project.photo:
|
|
os.remove(project.photo)
|
|
quests = data_session.query(Quests).filter(Quests.project == project.id).all()
|
|
list(map(lambda quest: delete_quest_data(quest, data_session), quests))
|
|
list(map(data_session.delete, quests))
|
|
list(map(data_session.delete,
|
|
data_session.query(Files).filter(Files.path.contains(f'all_projects/{str(project.id)}/')).all()))
|
|
shutil.rmtree(f'static/app_files/all_projects/{str(project.id)}')
|
|
data_session.delete(project)
|
|
data_session.commit()
|
|
|
|
|
|
def copy_file_from_template(file, new_project, data_session, current_user):
|
|
path = f'static/app_files/all_projects/{str(new_project.id)}/{str(file.path).split("/")[-1]}'
|
|
shutil.copy(file.path, path)
|
|
new_file = Files(
|
|
path=path,
|
|
user=current_user.id,
|
|
up_date=datetime.datetime.now()
|
|
)
|
|
data_session.add(new_file)
|
|
|
|
|
|
def copy_quests_from_template(quest, new_project, data_session, current_user):
|
|
new_quest = Quests(
|
|
project=new_project.id,
|
|
creator=current_user.id,
|
|
name=quest.name,
|
|
description=quest.description,
|
|
date_create=datetime.datetime.now(),
|
|
deadline=quest.deadline,
|
|
realized=False
|
|
)
|
|
data_session.add(new_quest)
|
|
|
|
|
|
def copy_template(template, new_project, data_session, current_user):
|
|
os.mkdir(f'static/app_files/all_projects/{str(new_project.id)}')
|
|
if 'none_project' not in template.photo:
|
|
filename = f'static/app_files/project_logo/{uuid.uuid4()}.png'
|
|
shutil.copy(template.photo, filename)
|
|
new_project.photo = filename
|
|
else:
|
|
new_project.photo = 'static/images/none_project.png'
|
|
list(map(lambda file: copy_file_from_template(file, new_project, data_session, current_user),
|
|
data_session.query(Files).filter(Files.path.contains(f'all_projects/{str(template.id)}/')).all()))
|
|
list(map(lambda quest: copy_quests_from_template(quest, new_project, data_session, current_user),
|
|
data_session.query(Quests).filter(Quests.project == template.id).all()))
|
|
data_session.commit()
|
|
|
|
|
|
def save_admin_data(data, data_session):
|
|
user = data_session.query(User).filter(User.id == data[0].split('_')[-1]).first()
|
|
if user.role != data[1]:
|
|
user.role = data[1]
|