Files
v2/back/main.py
2026-01-26 23:47:37 +08:00

522 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 这里是Sycamore whisper的后端代码喵
# 但愿比V1写的好喵
from flask import Flask, jsonify, request, abort, send_from_directory
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
import os
import uuid
import json
from datetime import datetime
app = Flask(__name__)
# 跨域策略,仅/api/*允许所有来源的请求
CORS(app, resources={r"/api/*": {"origins": "*"}})
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data', 'db.sqlite')
IMG_DIR = os.path.join(BASE_DIR, 'data', 'img')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 全局配置变量
NEED_AUDIT = True
FILE_SIZE_LIMIT_MB = 10.0
FILE_FORMATS = ["png", "jpg", "jpeg", "gif", "webp"]
# --- 定义数据库结构 ---
class SiteSettings(db.Model):
__tablename__ = 'site_settings'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(255))
icon = db.Column(db.String(255))
footer_text = db.Column(db.Text)
repo_link = db.Column(db.String(255))
enable_repo_button = db.Column(db.Boolean, default=False)
enable_notice = db.Column(db.Boolean, default=False)
need_audit = db.Column(db.Boolean, default=True)
about = db.Column(db.Text)
file_size_limit = db.Column(db.Float) # MB
file_formats = db.Column(db.Text) # JSON list
class Identity(db.Model):
__tablename__ = 'identity'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(36), unique=True, nullable=False)
class Submission(db.Model):
__tablename__ = 'submissions'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
content = db.Column(db.Text, nullable=False)
identity_token = db.Column(db.String(36), nullable=True)
status = db.Column(db.String(20), default='Pending')
created_at = db.Column(db.DateTime, default=lambda: datetime.now())
upvotes = db.Column(db.Integer, default=0)
downvotes = db.Column(db.Integer, default=0)
comments = db.relationship('Comment', backref='submission', lazy=True, cascade='all, delete-orphan')
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
submission_id = db.Column(db.Integer, db.ForeignKey('submissions.id'), nullable=False)
nickname = db.Column(db.String(50), default='匿名用户')
content = db.Column(db.Text, nullable=False)
identity_token = db.Column(db.String(36), nullable=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now())
parent_comment_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True)
class Hashtag(db.Model):
__tablename__ = 'hashtags'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
type = db.Column(db.Integer, nullable=False) # 0: Submission, 1: Comment
target_id = db.Column(db.Integer, nullable=False)
name = db.Column(db.String(255), nullable=False)
class DenyWord(db.Model):
__tablename__ = 'deny_words'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
word = db.Column(db.String(255), unique=True, nullable=False)
class ImgFile(db.Model):
__tablename__ = 'img_files'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
path = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(255), nullable=True)
identity_token = db.Column(db.String(36), nullable=True)
# 初始化数据库函数
def init_db():
if not os.path.exists('./data'):
os.makedirs('./data')
if not os.path.exists(IMG_DIR):
os.makedirs(IMG_DIR)
with app.app_context():
db.create_all()
def load_config():
global NEED_AUDIT, FILE_SIZE_LIMIT_MB, FILE_FORMATS
with app.app_context():
try:
settings = SiteSettings.query.first()
if settings:
if hasattr(settings, 'need_audit') and settings.need_audit is not None:
NEED_AUDIT = settings.need_audit
if getattr(settings, 'file_size_limit', None) is not None:
try:
FILE_SIZE_LIMIT_MB = float(settings.file_size_limit)
except Exception:
pass
if getattr(settings, 'file_formats', None):
try:
raw = settings.file_formats
if isinstance(raw, str):
parsed = json.loads(raw)
else:
parsed = raw
if isinstance(parsed, list):
FILE_FORMATS = [str(x).strip().lstrip('.').lower() for x in parsed if str(x).strip()]
except Exception:
pass
except Exception as e:
print(f"Warning: Failed to load settings: {e}")
global DENY_WORDS_CACHE
with app.app_context():
try:
words = db.session.query(DenyWord.word).all()
# words 是 list of tuples [('word1',), ('word2',)]
DENY_WORDS_CACHE = [w[0] for w in words]
print(f"Loaded {len(DENY_WORDS_CACHE)} deny words.")
except Exception as e:
print(f"Warning: Failed to load deny words: {e}")
# --- 用户普通api端点 ---
@app.route('/api/settings', methods=['GET'])
def get_settings():
settings = SiteSettings.query.first()
if settings:
data = {
"title": settings.title,
"icon": settings.icon,
"footer_text": settings.footer_text,
"repo_link": settings.repo_link,
"enable_repo_button": settings.enable_repo_button,
"enable_notice": settings.enable_notice
}
return jsonify({
"code": 1000,
"data": data
})
else:
return jsonify({"code": 2002,"data":{}})
@app.route('/api/about', methods=['GET'])
def get_about():
settings = SiteSettings.query.first()
if settings and settings.about:
return jsonify({
"code": 1000,
"data": settings.about
})
else:
# about在初始化时不会被设置避免管理面板报错返回默认文本
return jsonify({
"code": 1000,
"data": "# 默认关于页面\n关于页面未设置,请前往管理面板操作。"
})
@app.route('/api/get_id_token', methods=['GET'])
def get_id_token():
try:
token = str(uuid.uuid4())
new_identity = Identity(token=token)
db.session.add(new_identity)
db.session.commit()
return jsonify({
"code": 1000,
"data": token
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/verify_token', methods=['POST'])
def verify_token():
try:
data = request.get_json()
if not data or 'token' not in data:
return jsonify({
"code": 2000,
"data": "参数错误"
})
token = data['token']
identity = Identity.query.filter_by(token=token).first()
if identity:
return jsonify({
"code": 1000,
"data": True
})
else:
return jsonify({
"code": 1000,
"data": False
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/post', methods=['POST'])
def submit_post():
try:
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"code": 2000, "data": "内容不能为空"})
content = data['content']
nickname = data.get('nickname') or '匿名用户'
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 违禁词检测
for word in DENY_WORDS_CACHE:
if word in content:
return jsonify({"code": 2005, "data": "提交内容包含违禁词"})
# Identity 验证
if identity_token:
if not Identity.query.filter_by(token=identity_token).first():
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
else:
identity_token = None
# 保存
new_post = Submission(
content=content,
identity_token=identity_token,
status='Pending' if NEED_AUDIT else 'Pass'
)
db.session.add(new_post)
db.session.commit()
# 保存 Hashtags
if hashtopic:
for tag in hashtopic:
new_tag = Hashtag(
type=0, # 0 for Submission
target_id=new_post.id,
name=tag
)
db.session.add(new_tag)
db.session.commit()
code = 1002 if new_post.status == 'Pending' else 1001
return jsonify({"code": code, "data": {"id": new_post.id}})
except Exception as e:
return jsonify({"code": 2003, "data": f"投稿失败: {str(e)}"})
@app.route('/api/comment', methods=['POST'])
def submit_comment():
try:
data = request.get_json()
if not data or 'content' not in data or 'submission_id' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
submission_id = data['submission_id']
content = data['content']
nickname = data.get('nickname') or '匿名用户'
parent_comment_id = data.get('parent_comment_id', 0)
try:
parent_comment_id = int(parent_comment_id)
except Exception:
parent_comment_id = 0
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 检查 submission 是否存在
if not db.session.get(Submission, submission_id):
return jsonify({"code": 2002, "data": "投稿不存在"})
# 违禁词检测
for word in DENY_WORDS_CACHE:
if word in content:
return jsonify({"code": 2005, "data": "提交内容包含违禁词"})
# Identity 验证
if identity_token:
if not Identity.query.filter_by(token=identity_token).first():
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
else:
identity_token = None
new_comment = Comment(
submission_id=submission_id,
nickname=nickname,
content=content,
identity_token=identity_token,
parent_comment_id=None if parent_comment_id == 0 else parent_comment_id
)
db.session.add(new_comment)
db.session.commit()
# 保存 Hashtags
if hashtopic:
for tag in hashtopic:
new_tag = Hashtag(
type=1, # 1 for Comment
target_id=new_comment.id,
name=tag
)
db.session.add(new_tag)
db.session.commit()
return jsonify({"code": 1001, "data": {"id": new_comment.id}})
except Exception as e:
return jsonify({"code": 2003, "data": f"评论失败: {str(e)}"})
@app.route('/api/get/comment', methods=['GET'])
def get_comments():
try:
submission_id = request.args.get("id", type=int)
if not submission_id:
return jsonify({"code": 2000, "data": "参数错误"})
submission = db.session.get(Submission, submission_id)
if not submission or submission.status != "Pass":
return jsonify({"code": 2002, "data": "投稿不存在"})
data = [{
"id": c.id,
"nickname": c.nickname,
"content": c.content,
"parent_comment_id": c.parent_comment_id if c.parent_comment_id is not None else 0
} for c in submission.comments]
return jsonify({"code": 1000, "data": data})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/upload_pic', methods=['POST'])
def upload_pic():
try:
if 'file' not in request.files:
return jsonify({"code": 2000, "data": "参数错误"})
file = request.files['file']
if file.filename == '':
return jsonify({"code": 2000, "data": "参数错误"})
file.seek(0, os.SEEK_END)
file_length = file.tell()
file.seek(0)
if FILE_SIZE_LIMIT_MB is not None:
limit_bytes = float(FILE_SIZE_LIMIT_MB) * 1024 * 1024
if file_length > limit_bytes:
return jsonify({"code": 2006, "data": "上传的图片超出限制大小"})
ext = os.path.splitext(file.filename)[1].lstrip('.').lower()
if not ext or (FILE_FORMATS and ext not in FILE_FORMATS):
return jsonify({"code": 2007, "data": "上传的文件类型不支持"})
filename = f"{uuid.uuid4().hex}.{ext}"
filepath = os.path.join(IMG_DIR, filename)
file.save(filepath)
identity_token = request.form.get('identity_token') or None
name = file.filename or None
db.session.add(ImgFile(path=filename, name=name, identity_token=identity_token))
db.session.commit()
return jsonify({"code": 1001, "data": f"/api/files/{filename}"})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/files/<path:file_name>', methods=['GET'])
def serve_file(file_name):
return send_from_directory(IMG_DIR, file_name)
@app.route('/api/file_name', methods=['GET'])
def get_file_name():
try:
path = request.args.get("path")
if not path:
return jsonify({"code": 2000, "data": "参数错误"})
record = ImgFile.query.filter_by(path=path).first()
if not record:
return jsonify({"code": 2002, "data": "数据不存在"})
return jsonify({"code": 1000, "data": record.name or ""})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/up', methods=['POST'])
def upvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
if item_type == 'submission':
item = db.session.get(Submission, item_id)
if not item:
return jsonify({"code": 2002, "data": "对象不存在"})
item.upvotes += 1
db.session.commit()
else:
return jsonify({"code": 2000, "data": "参数错误"})
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点赞失败: {str(e)}"})
@app.route('/api/down', methods=['POST'])
def downvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
if item_type == 'submission':
item = db.session.get(Submission, item_id)
if not item:
return jsonify({"code": 2002, "data": "对象不存在"})
item.downvotes += 1
db.session.commit()
else:
return jsonify({"code": 2000, "data": "参数错误"})
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点踩失败: {str(e)}"})
@app.route('/api/test', methods=['GET'])
def test_api():
return jsonify({"code": 1000, "data": ""})
@app.route('/api/statics', methods=['GET'])
def get_statics():
try:
post_count = Submission.query.count()
comment_count = Comment.query.count()
image_count = ImgFile.query.count()
return jsonify({
"code": 1000,
"data": {
"posts": post_count,
"comments": comment_count,
"images": image_count
}
})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/10_info', methods=['GET'])
def get_10_info():
try:
page = request.args.get("page", 1, type=int)
if page < 1:
page = 1
per_page = 10
# 相比v1的优化使用数据库层面的过滤和分页避免一次性加载所有数据
pagination = Submission.query.filter_by(status='Pass')\
.order_by(Submission.id.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
page_posts = pagination.items
data = []
for s in page_posts:
data.append({
"id": s.id,
"content": s.content,
"upvotes": s.upvotes,
"downvotes": s.downvotes,
"created_at": s.created_at.isoformat() if s.created_at else None,
"comment_count": len(s.comments),
"total_pages": pagination.total,
})
return jsonify({
"code": 1000,
"data": data
})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
# --- 菜单 ---
@app.route('/get/teapot', methods=['GET'])
def return_418():
abort(418)
# --- 用户的管理api端点 ---
# TODO: 用户管理端点
# --- 管理员api端点 ---
# TODO: 添加管理员端点
# 主函数
if __name__ == '__main__':
init_db()
load_config()
app.run(debug=True, port=5000)