Files
v2/back/main.py
2026-01-23 13:51:53 +08:00

341 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 这里是Sycamore whisper的后端代码喵
# 但愿比V1写的好喵
from flask import Flask, jsonify, request
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
import os
import uuid
import json
from datetime import datetime
app = Flask(__name__)
# 跨域策略,仅/api/*允许所有来源的请求
CORS(app, resources={r"/api/*": {"origins": "*"}})
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data', 'db.sqlite')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 全局配置变量
NEED_AUDIT = True
# --- 定义数据库结构 ---
class SiteSettings(db.Model):
__tablename__ = 'site_settings'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(255))
icon = db.Column(db.String(255))
footer_text = db.Column(db.Text)
repo_link = db.Column(db.String(255))
enable_repo_button = db.Column(db.Boolean, default=False)
enable_notice = db.Column(db.Boolean, default=False)
need_audit = db.Column(db.Boolean, default=True)
about = db.Column(db.Text)
class Identity(db.Model):
__tablename__ = 'identity'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(36), unique=True, nullable=False)
class Submission(db.Model):
__tablename__ = 'submissions'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
content = db.Column(db.Text, nullable=False)
hashtopic = db.Column(db.Text) # JSON string
identity_token = db.Column(db.String(36), nullable=True)
status = db.Column(db.String(20), default='Pending')
created_at = db.Column(db.DateTime, default=lambda: datetime.now())
upvotes = db.Column(db.Integer, default=0)
downvotes = db.Column(db.Integer, default=0)
comments = db.relationship('Comment', backref='submission', lazy=True, cascade='all, delete-orphan')
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
submission_id = db.Column(db.Integer, db.ForeignKey('submissions.id'), nullable=False)
content = db.Column(db.Text, nullable=False)
hashtopic = db.Column(db.Text) # JSON string
identity_token = db.Column(db.String(36), nullable=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now())
upvotes = db.Column(db.Integer, default=0)
downvotes = db.Column(db.Integer, default=0)
parent_comment_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True)
class DenyWord(db.Model):
__tablename__ = 'deny_words'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
word = db.Column(db.String(255), unique=True, nullable=False)
# 初始化数据库函数
def init_db():
if not os.path.exists('./data'):
os.makedirs('./data')
with app.app_context():
db.create_all()
def load_config():
global NEED_AUDIT
with app.app_context():
try:
settings = SiteSettings.query.first()
if settings:
if hasattr(settings, 'need_audit') and settings.need_audit is not None:
NEED_AUDIT = settings.need_audit
except Exception as e:
print(f"Warning: Failed to load settings: {e}")
# --- 用户普通api端点 ---
@app.route('/api/settings', methods=['GET'])
def get_settings():
settings = SiteSettings.query.first()
if settings:
data = {
"title": settings.title,
"icon": settings.icon,
"footer_text": settings.footer_text,
"repo_link": settings.repo_link,
"enable_repo_button": settings.enable_repo_button,
"enable_notice": settings.enable_notice
}
return jsonify({
"code": 1000,
"data": data
})
else:
return jsonify({"code": 2002,"data":{}})
@app.route('/api/about', methods=['GET'])
def get_about():
settings = SiteSettings.query.first()
if settings and settings.about:
return jsonify({
"code": 1000,
"data": settings.about
})
else:
# about在初始化时不会被设置避免管理面板报错返回默认文本
return jsonify({
"code": 1000,
"data": "# 默认关于页面\n关于页面未设置,请前往管理面板操作。"
})
@app.route('/api/get_id_token', methods=['GET'])
def get_id_token():
try:
token = str(uuid.uuid4())
new_identity = Identity(token=token)
db.session.add(new_identity)
db.session.commit()
return jsonify({
"code": 1000,
"data": token
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/verify_token', methods=['POST'])
def verify_token():
try:
data = request.get_json()
if not data or 'token' not in data:
return jsonify({
"code": 2000,
"data": "参数错误"
})
token = data['token']
identity = Identity.query.filter_by(token=token).first()
if identity:
return jsonify({
"code": 1000,
"data": True
})
else:
return jsonify({
"code": 1000,
"data": False
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/post', methods=['POST'])
def submit_post():
try:
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"code": 2000, "data": "内容不能为空"})
content = data['content']
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 违禁词检测
deny_words = DenyWord.query.all()
for dw in deny_words:
if dw.word in content:
return jsonify({"code": 2005, "data": "提交内容包含违禁词"})
# Identity 验证
if identity_token:
if not Identity.query.filter_by(token=identity_token).first():
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
else:
identity_token = None
# 保存
new_post = Submission(
content=content,
hashtopic=json.dumps(hashtopic) if hashtopic else '[]',
identity_token=identity_token,
status='Pending' if NEED_AUDIT else 'Pass'
)
db.session.add(new_post)
db.session.commit()
code = 1002 if new_post.status == 'Pending' else 1001
return jsonify({"code": code, "data": {"id": new_post.id}})
except Exception as e:
return jsonify({"code": 2003, "data": f"投稿失败: {str(e)}"})
@app.route('/api/comment', methods=['POST'])
def submit_comment():
try:
data = request.get_json()
if not data or 'content' not in data or 'submission_id' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
submission_id = data['submission_id']
content = data['content']
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 检查 submission 是否存在
if not Submission.query.get(submission_id):
return jsonify({"code": 2004, "data": "投稿不存在"})
# Identity 验证
if identity_token:
if not Identity.query.filter_by(token=identity_token).first():
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
else:
identity_token = None
new_comment = Comment(
submission_id=submission_id,
content=content,
hashtopic=json.dumps(hashtopic) if hashtopic else '[]',
identity_token=identity_token
)
db.session.add(new_comment)
db.session.commit()
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"评论失败: {str(e)}"})
@app.route('/api/up', methods=['POST'])
def upvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
item = None
if item_type == 'submission':
item = db.session.get(Submission, item_id)
elif item_type == 'comment':
item = db.session.get(Comment, item_id)
if not item:
return jsonify({"code": 2004, "data": "对象不存在"})
item.upvotes += 1
db.session.commit()
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点赞失败: {str(e)}"})
@app.route('/api/down', methods=['POST'])
def downvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
item = None
if item_type == 'submission':
item = db.session.get(Submission, item_id)
elif item_type == 'comment':
item = db.session.get(Comment, item_id)
if not item:
return jsonify({"code": 2004, "data": "对象不存在"})
item.downvotes += 1
db.session.commit()
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点踩失败: {str(e)}"})
@app.route('/api/test', methods=['GET'])
def test_api():
return jsonify({"code": 1000, "data": ""})
@app.route('/api/statics', methods=['GET'])
def get_statics():
try:
post_count = Submission.query.count()
comment_count = Comment.query.count()
# TODO目前暂未实现图片上传逻辑先返回114514
image_count = 114514
return jsonify({
"code": 1000,
"data": {
"posts": post_count,
"comments": comment_count,
"images": image_count
}
})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
# --- 用户的管理api端点 ---
# TODO: 用户管理端点
# --- 管理员api端点 ---
# TODO: 添加管理员端点
# 主函数
if __name__ == '__main__':
init_db()
load_config()
app.run(debug=True, port=5000)