Comment system for Hugo
https://labertasche.tuxstash.de/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
234 lines
7.4 KiB
234 lines
7.4 KiB
3 years ago
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# /**********************************************************************************
|
||
|
# * _author : Domeniko Gentner
|
||
|
# * _mail : code@tuxstash.de
|
||
|
# * _repo : https://git.tuxstash.de/gothseidank/labertasche
|
||
|
# * _license : This project is under MIT License
|
||
|
# *********************************************************************************/
|
||
|
from . import bp_dbupgrades
|
||
|
from flask_cors import cross_origin
|
||
|
from flask_login import login_required
|
||
|
from flask import render_template, jsonify, make_response
|
||
|
from pathlib import Path
|
||
|
from labertasche.database import labertasche_db as db
|
||
|
from labertasche.models import TProjects, TComments, TLocation, TEmail, TVersion
|
||
|
from labertasche.helper import Settings
|
||
|
from json import dump, load
|
||
|
from shutil import copy, make_archive
|
||
|
from re import search
|
||
|
from secrets import compare_digest
|
||
|
from datetime import datetime
|
||
|
|
||
|
|
||
|
def get_backup_folder() -> Path:
|
||
|
path = Path('.').absolute() / "backup" / "v1"
|
||
|
return path
|
||
|
|
||
|
|
||
|
@cross_origin()
|
||
|
@bp_dbupgrades.route('/db_v2/')
|
||
|
@login_required
|
||
|
def upgrade_db_to_v2():
|
||
|
# TODO: Check if db has already been upgraded
|
||
|
status = False
|
||
|
try:
|
||
|
version = db.session.query(TVersion).first()
|
||
|
if version:
|
||
|
status = True
|
||
|
except Exception as e:
|
||
|
print(e.__class__)
|
||
|
pass
|
||
|
|
||
|
return render_template("db-upgrades.html", title="DB upgrade V1 to V2",
|
||
|
prev_version=1, new_version=2, status=status)
|
||
|
|
||
|
|
||
|
@cross_origin()
|
||
|
@bp_dbupgrades.route('/db_v2/backup/', methods=['GET'])
|
||
|
@login_required
|
||
|
def upgrade_db_to_v2_backup():
|
||
|
path = get_backup_folder()
|
||
|
# Create path for backup
|
||
|
try:
|
||
|
if not path.exists():
|
||
|
path.mkdir(mode=755, exist_ok=True, parents=True)
|
||
|
except OSError as e:
|
||
|
return make_response(jsonify(status='exception', msg=str(e)), 400)
|
||
|
|
||
|
return make_response(jsonify(status="ok"), 200)
|
||
|
|
||
|
|
||
|
@cross_origin()
|
||
|
@bp_dbupgrades.route('/db_v2/export/')
|
||
|
@login_required
|
||
|
def upgrade_db_to_v2_export():
|
||
|
path = get_backup_folder()
|
||
|
|
||
|
# make sure nothing is pending
|
||
|
db.session.commit()
|
||
|
|
||
|
# Export tables
|
||
|
t_locations = db.session.query(TLocation.id_location, TLocation.location).all()
|
||
|
t_emails = db.session.query(TEmail.id_email, TEmail.email, TEmail.is_allowed, TEmail.is_blocked).all()
|
||
|
t_comments = db.session.query(TComments.comments_id, TComments.location_id, TComments.email,
|
||
|
TComments.content, TComments.created_on, TComments.is_published,
|
||
|
TComments.is_spam, TComments.spam_score, TComments.replied_to,
|
||
|
TComments.confirmation, TComments.deletion, TComments.gravatar).all()
|
||
|
|
||
|
locations = []
|
||
|
for loc in t_locations:
|
||
|
locations.append({
|
||
|
"id_location": loc.id_location,
|
||
|
"location": loc.location
|
||
|
})
|
||
|
|
||
|
emails = []
|
||
|
for mail in t_emails:
|
||
|
emails.append({
|
||
|
"id_email": mail.id_email,
|
||
|
"email": mail.email,
|
||
|
"is_allowed": mail.is_allowed,
|
||
|
"is_blocked": mail.is_blocked
|
||
|
})
|
||
|
|
||
|
comments = []
|
||
|
for comment in t_comments:
|
||
|
comments.append({
|
||
|
"comments_id": comment.comments_id,
|
||
|
"location_id": comment.location_id,
|
||
|
"email": comment.email,
|
||
|
"content": comment.content,
|
||
|
"created_on": f"{comment.created_on.__str__()}",
|
||
|
"is_published": comment.is_published,
|
||
|
"is_spam": comment.is_spam,
|
||
|
"spam_score": comment.spam_score,
|
||
|
"replied_to": comment.replied_to,
|
||
|
"confirmation": comment.confirmation,
|
||
|
"deletion": comment.deletion,
|
||
|
"gravatar": comment.gravatar
|
||
|
})
|
||
|
|
||
|
# Output jsons
|
||
|
try:
|
||
|
p_export_location = path / "locations.json"
|
||
|
with p_export_location.open('w') as fp:
|
||
|
dump(locations, fp, indent=4, sort_keys=True)
|
||
|
|
||
|
p_export_mail = path / "emails.json"
|
||
|
with p_export_mail.open('w') as fp:
|
||
|
dump(emails, fp, indent=4, sort_keys=True)
|
||
|
|
||
|
p_export_comments = path / "comments.json"
|
||
|
with p_export_comments.open('w') as fp:
|
||
|
dump(comments, fp, indent=4, sort_keys=True)
|
||
|
|
||
|
except Exception as e:
|
||
|
return make_response(jsonify(status='exception-write-json', msg=str(e)), 400)
|
||
|
|
||
|
# Copy database
|
||
|
try:
|
||
|
settings = Settings()
|
||
|
db_uri = settings.system['database_uri']
|
||
|
if compare_digest(db_uri[0:6], "sqlite"):
|
||
|
m = search("([/]{3})(.*)", db_uri)
|
||
|
new_db = get_backup_folder() / "labertasche.db"
|
||
|
old_db = Path(m.group(2)).absolute()
|
||
|
copy(old_db, new_db)
|
||
|
except Exception as e:
|
||
|
return make_response(jsonify(status='exception-copy-db', msg=str(e)), 400)
|
||
|
|
||
|
make_archive(path, "zip", path)
|
||
|
|
||
|
return make_response(jsonify(status='ok'), 200)
|
||
|
|
||
|
|
||
|
@cross_origin()
|
||
|
@bp_dbupgrades.route('/db_v2/recreate/')
|
||
|
@login_required
|
||
|
def upgrade_db_to_v2_recreate():
|
||
|
try:
|
||
|
db.drop_all()
|
||
|
db.session.flush()
|
||
|
db.session.commit()
|
||
|
db.create_all()
|
||
|
except Exception as e:
|
||
|
return make_response(jsonify(status='exception', msg=str(e)), 400)
|
||
|
|
||
|
return make_response(jsonify(status='ok'), 200)
|
||
|
|
||
|
|
||
|
@cross_origin()
|
||
|
@bp_dbupgrades.route('/db_v2/import/')
|
||
|
@login_required
|
||
|
def upgrade_db_to_v2_import():
|
||
|
path = get_backup_folder()
|
||
|
settings = Settings()
|
||
|
|
||
|
try:
|
||
|
# load location
|
||
|
p_loc = (path / 'locations.json').absolute()
|
||
|
with p_loc.open('r') as fp:
|
||
|
locations = load(fp)
|
||
|
|
||
|
# load mails
|
||
|
m_loc = (path / 'emails.json').absolute()
|
||
|
with m_loc.open('r') as fp:
|
||
|
mails = load(fp)
|
||
|
|
||
|
# load comments
|
||
|
c_loc = (path / 'comments.json').absolute()
|
||
|
with c_loc.open('r') as fp:
|
||
|
comments = load(fp)
|
||
|
|
||
|
except FileNotFoundError as e:
|
||
|
return make_response(jsonify(status='exception-filenotfound', msg=str(e)), 400)
|
||
|
|
||
|
# Create project
|
||
|
default_project = {
|
||
|
"id_project": 1,
|
||
|
"name": "default",
|
||
|
"weburl": settings.system['web_url'],
|
||
|
"blogurl": settings.system['blog_url'],
|
||
|
"output": settings.system['output'],
|
||
|
"sendotp": settings.system['send_otp_to_publish'],
|
||
|
"gravatar_cache": settings.gravatar['cache'],
|
||
|
"gravatar_cache_dir": settings.gravatar['static_dir'],
|
||
|
"gravatar_size": settings.gravatar['size'],
|
||
|
"addon_smileys": settings.addons['smileys']
|
||
|
}
|
||
|
|
||
|
# Create db version, so we can track it in the future
|
||
|
version = {
|
||
|
"id_version": 1,
|
||
|
"version": 2
|
||
|
}
|
||
|
|
||
|
try:
|
||
|
# Add to db
|
||
|
db.session.add(TVersion(**version))
|
||
|
db.session.add(TProjects(**default_project))
|
||
|
|
||
|
# walk json and readd to database with project set to project 1
|
||
|
for each in mails:
|
||
|
each.update({'project_id': 1})
|
||
|
db.session.add(TEmail(**each))
|
||
|
|
||
|
for each in locations:
|
||
|
each.update({'project_id': 1})
|
||
|
db.session.add(TLocation(**each))
|
||
|
|
||
|
for each in comments:
|
||
|
each.update({'project_id': 1})
|
||
|
dt = datetime.fromisoformat(each['created_on'])
|
||
|
each.update({'created_on': dt})
|
||
|
db.session.add(TComments(**each))
|
||
|
|
||
|
# Commit
|
||
|
db.session.commit()
|
||
|
db.session.flush()
|
||
|
except Exception as e:
|
||
|
return make_response(jsonify(status='exception-database', msg=str(e)), 400)
|
||
|
|
||
|
return make_response(jsonify(status='ok'), 200)
|