# 这里是Sycamore whisper的后端代码喵! # 但愿比V1写的好喵(逃 from flask import Flask, jsonify, request, abort, send_from_directory from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy import os import uuid import json from datetime import datetime app = Flask(__name__) # 跨域策略,仅/api/*允许所有来源的请求 CORS(app, resources={r"/api/*": {"origins": "*"}}) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, 'data', 'db.sqlite') IMG_DIR = os.path.join(BASE_DIR, 'data', 'img') app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 全局配置变量 NEED_AUDIT = True FILE_SIZE_LIMIT_MB = 10.0 FILE_FORMATS = ["png", "jpg", "jpeg", "gif", "webp"] # --- 定义数据库结构 --- class SiteSettings(db.Model): __tablename__ = 'site_settings' id = db.Column(db.Integer, primary_key=True, autoincrement=True) title = db.Column(db.String(255)) icon = db.Column(db.String(255)) footer_text = db.Column(db.Text) repo_link = db.Column(db.String(255)) enable_repo_button = db.Column(db.Boolean, default=False) enable_notice = db.Column(db.Boolean, default=False) need_audit = db.Column(db.Boolean, default=True) about = db.Column(db.Text) file_size_limit = db.Column(db.Float) # MB file_formats = db.Column(db.Text) # JSON list class Identity(db.Model): __tablename__ = 'identity' id = db.Column(db.Integer, primary_key=True, autoincrement=True) token = db.Column(db.String(36), unique=True, nullable=False) class Submission(db.Model): __tablename__ = 'submissions' id = db.Column(db.Integer, primary_key=True, autoincrement=True) content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) status = db.Column(db.String(20), default='Pending') created_at = db.Column(db.DateTime, default=lambda: datetime.now()) upvotes = db.Column(db.Integer, default=0) downvotes = db.Column(db.Integer, default=0) comments = db.relationship('Comment', backref='submission', lazy=True, cascade='all, delete-orphan') class Comment(db.Model): __tablename__ = 'comments' id = db.Column(db.Integer, primary_key=True, autoincrement=True) submission_id = db.Column(db.Integer, db.ForeignKey('submissions.id'), nullable=False) nickname = db.Column(db.String(50), default='匿名用户') content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) created_at = db.Column(db.DateTime, default=lambda: datetime.now()) parent_comment_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True) class Report(db.Model): __tablename__ = 'reports' id = db.Column(db.Integer, primary_key=True, autoincrement=True) submission_id = db.Column(db.Integer, nullable=False) title = db.Column(db.String(200), nullable=False) content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) status = db.Column(db.String(20), default='Pending') created_at = db.Column(db.DateTime, default=lambda: datetime.now()) class Hashtag(db.Model): __tablename__ = 'hashtags' id = db.Column(db.Integer, primary_key=True, autoincrement=True) type = db.Column(db.Integer, nullable=False) # 0: Submission, 1: Comment target_id = db.Column(db.Integer, nullable=False) name = db.Column(db.String(255), nullable=False) class DenyWord(db.Model): __tablename__ = 'deny_words' id = db.Column(db.Integer, primary_key=True, autoincrement=True) word = db.Column(db.String(255), unique=True, nullable=False) class ImgFile(db.Model): __tablename__ = 'img_files' id = db.Column(db.Integer, primary_key=True, autoincrement=True) path = db.Column(db.String(255), nullable=False) name = db.Column(db.String(255), nullable=True) identity_token = db.Column(db.String(36), nullable=True) # 初始化数据库函数 def init_db(): if not os.path.exists('./data'): os.makedirs('./data') if not os.path.exists(IMG_DIR): os.makedirs(IMG_DIR) with app.app_context(): db.create_all() def load_config(): global NEED_AUDIT, FILE_SIZE_LIMIT_MB, FILE_FORMATS with app.app_context(): try: settings = SiteSettings.query.first() if settings: if hasattr(settings, 'need_audit') and settings.need_audit is not None: NEED_AUDIT = settings.need_audit if getattr(settings, 'file_size_limit', None) is not None: try: FILE_SIZE_LIMIT_MB = float(settings.file_size_limit) except Exception: pass if getattr(settings, 'file_formats', None): try: raw = settings.file_formats if isinstance(raw, str): parsed = json.loads(raw) else: parsed = raw if isinstance(parsed, list): FILE_FORMATS = [str(x).strip().lstrip('.').lower() for x in parsed if str(x).strip()] except Exception: pass except Exception as e: print(f"Warning: Failed to load settings: {e}") global DENY_WORDS_CACHE with app.app_context(): try: words = db.session.query(DenyWord.word).all() # words 是 list of tuples [('word1',), ('word2',)] DENY_WORDS_CACHE = [w[0] for w in words] print(f"Loaded {len(DENY_WORDS_CACHE)} deny words.") except Exception as e: print(f"Warning: Failed to load deny words: {e}") # --- 用户普通api端点 --- @app.route('/api/settings', methods=['GET']) def get_settings(): settings = SiteSettings.query.first() if settings: data = { "title": settings.title, "icon": settings.icon, "footer_text": settings.footer_text, "repo_link": settings.repo_link, "enable_repo_button": settings.enable_repo_button, "enable_notice": settings.enable_notice } return jsonify({ "code": 1000, "data": data }) else: return jsonify({"code": 2002,"data":{}}) @app.route('/api/about', methods=['GET']) def get_about(): settings = SiteSettings.query.first() if settings and settings.about: return jsonify({ "code": 1000, "data": settings.about }) else: # about在初始化时不会被设置,避免管理面板报错,返回默认文本 return jsonify({ "code": 1000, "data": "# 默认关于页面\n关于页面未设置,请前往管理面板操作。" }) @app.route('/api/get_id_token', methods=['GET']) def get_id_token(): try: token = str(uuid.uuid4()) new_identity = Identity(token=token) db.session.add(new_identity) db.session.commit() return jsonify({ "code": 1000, "data": token }) except Exception as e: return jsonify({ "code": 2003, "data": f"验证失败: {str(e)}" }) @app.route('/api/verify_token', methods=['POST']) def verify_token(): try: data = request.get_json() if not data or 'token' not in data: return jsonify({ "code": 2000, "data": "参数错误" }) token = data['token'] identity = Identity.query.filter_by(token=token).first() if identity: return jsonify({ "code": 1000, "data": True }) else: return jsonify({ "code": 1000, "data": False }) except Exception as e: return jsonify({ "code": 2003, "data": f"验证失败: {str(e)}" }) @app.route('/api/post', methods=['POST']) def submit_post(): try: data = request.get_json() if not data or 'content' not in data: return jsonify({"code": 2000, "data": "内容不能为空"}) content = data['content'] nickname = data.get('nickname') or '匿名用户' hashtopic = data.get('hashtopic', []) if not isinstance(hashtopic, list): return jsonify({"code": 2000, "data": "hashtopic必须为列表"}) identity_token = data.get('identity') # 违禁词检测 for word in DENY_WORDS_CACHE: if word in content: return jsonify({"code": 2005, "data": "提交内容包含违禁词"}) # Identity 验证 if identity_token: if not Identity.query.filter_by(token=identity_token).first(): return jsonify({"code": 2004, "data": "无效的 Identity Token"}) else: identity_token = None # 保存 new_post = Submission( content=content, identity_token=identity_token, status='Pending' if NEED_AUDIT else 'Pass' ) db.session.add(new_post) db.session.commit() # 保存 Hashtags if hashtopic: for tag in hashtopic: new_tag = Hashtag( type=0, # 0 for Submission target_id=new_post.id, name=tag ) db.session.add(new_tag) db.session.commit() code = 1002 if new_post.status == 'Pending' else 1001 return jsonify({"code": code, "data": {"id": new_post.id}}) except Exception as e: return jsonify({"code": 2003, "data": f"投稿失败: {str(e)}"}) @app.route('/api/comment', methods=['POST']) def submit_comment(): try: data = request.get_json() if not data or 'content' not in data or 'submission_id' not in data: return jsonify({"code": 2000, "data": "参数错误"}) submission_id = data['submission_id'] content = data['content'] nickname = data.get('nickname') or '匿名用户' parent_comment_id = data.get('parent_comment_id', 0) try: parent_comment_id = int(parent_comment_id) except Exception: parent_comment_id = 0 hashtopic = data.get('hashtopic', []) if not isinstance(hashtopic, list): return jsonify({"code": 2000, "data": "hashtopic必须为列表"}) identity_token = data.get('identity') # 检查 submission 是否存在 if not db.session.get(Submission, submission_id): return jsonify({"code": 2002, "data": "投稿不存在"}) # 违禁词检测 for word in DENY_WORDS_CACHE: if word in content: return jsonify({"code": 2005, "data": "提交内容包含违禁词"}) # Identity 验证 if identity_token: if not Identity.query.filter_by(token=identity_token).first(): return jsonify({"code": 2004, "data": "无效的 Identity Token"}) else: identity_token = None new_comment = Comment( submission_id=submission_id, nickname=nickname, content=content, identity_token=identity_token, parent_comment_id=None if parent_comment_id == 0 else parent_comment_id ) db.session.add(new_comment) db.session.commit() # 保存 Hashtags if hashtopic: for tag in hashtopic: new_tag = Hashtag( type=1, # 1 for Comment target_id=new_comment.id, name=tag ) db.session.add(new_tag) db.session.commit() return jsonify({"code": 1001, "data": {"id": new_comment.id}}) except Exception as e: return jsonify({"code": 2003, "data": f"评论失败: {str(e)}"}) @app.route('/api/report', methods=['POST']) def submit_report(): try: data = request.get_json() if not data or 'id' not in data or 'title' not in data or 'content' not in data: return jsonify({"code": 2000, "data": "参数错误"}) submission_id = data.get('id') title = data.get('title') content = data.get('content') submission = db.session.get(Submission, submission_id) if not submission: return jsonify({"code": 2002, "data": "投稿不存在"}) identity_token = data.get('identity') if identity_token: if not Identity.query.filter_by(token=identity_token).first(): return jsonify({"code": 2004, "data": "无效的Identity Token"}) else: identity_token = None report = Report( submission_id=submission_id, title=str(title).strip() if title is not None else '', content=str(content).strip() if content is not None else '', identity_token=identity_token, status='Pending', ) db.session.add(report) db.session.commit() return jsonify({"code": 1001, "data": {"id": report.id}}) except Exception as e: return jsonify({"code": 2003, "data": f"投诉失败: {str(e)}"}) @app.route('/api/get/comment', methods=['GET']) def get_comments(): try: submission_id = request.args.get("id", type=int) if not submission_id: return jsonify({"code": 2000, "data": "参数错误"}) submission = db.session.get(Submission, submission_id) if not submission or submission.status != "Pass": return jsonify({"code": 2002, "data": "投稿不存在"}) page = request.args.get("page", 1, type=int) if page < 1: page = 1 per_page = 5 pagination = Comment.query.filter_by(submission_id=submission_id)\ .order_by(Comment.id.asc())\ .paginate(page=page, per_page=per_page, error_out=False) data = [{ "id": c.id, "nickname": c.nickname, "content": c.content, "parent_comment_id": c.parent_comment_id if c.parent_comment_id is not None else 0 } for c in pagination.items] return jsonify({"code": 1000, "data": {"comments": data, "total_pages": pagination.pages}}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/upload_pic', methods=['POST']) def upload_pic(): try: if 'file' not in request.files: return jsonify({"code": 2000, "data": "参数错误"}) file = request.files['file'] if file.filename == '': return jsonify({"code": 2000, "data": "参数错误"}) file.seek(0, os.SEEK_END) file_length = file.tell() file.seek(0) if FILE_SIZE_LIMIT_MB is not None: limit_bytes = float(FILE_SIZE_LIMIT_MB) * 1024 * 1024 if file_length > limit_bytes: return jsonify({"code": 2006, "data": "上传的图片超出限制大小"}) ext = os.path.splitext(file.filename)[1].lstrip('.').lower() if not ext or (FILE_FORMATS and ext not in FILE_FORMATS): return jsonify({"code": 2007, "data": "上传的文件类型不支持"}) filename = f"{uuid.uuid4().hex}.{ext}" filepath = os.path.join(IMG_DIR, filename) file.save(filepath) identity_token = request.form.get('identity_token') or None name = file.filename or None db.session.add(ImgFile(path=filename, name=name, identity_token=identity_token)) db.session.commit() return jsonify({"code": 1001, "data": f"/api/files/{filename}"}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/files/', methods=['GET']) def serve_file(file_name): return send_from_directory(IMG_DIR, file_name) @app.route('/api/file_name', methods=['GET']) def get_file_name(): try: path = request.args.get("path") if not path: return jsonify({"code": 2000, "data": "参数错误"}) record = ImgFile.query.filter_by(path=path).first() if not record: return jsonify({"code": 2002, "data": "数据不存在"}) return jsonify({"code": 1000, "data": record.name or ""}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/up', methods=['POST']) def upvote(): try: data = request.get_json() if not data or 'id' not in data or 'type' not in data: return jsonify({"code": 2000, "data": "参数错误"}) item_id = data['id'] item_type = data['type'] if item_type == 'submission': item = db.session.get(Submission, item_id) if not item: return jsonify({"code": 2002, "data": "对象不存在"}) item.upvotes += 1 db.session.commit() else: return jsonify({"code": 2000, "data": "参数错误"}) return jsonify({"code": 1000, "data": ""}) except Exception as e: return jsonify({"code": 2003, "data": f"点赞失败: {str(e)}"}) @app.route('/api/down', methods=['POST']) def downvote(): try: data = request.get_json() if not data or 'id' not in data or 'type' not in data: return jsonify({"code": 2000, "data": "参数错误"}) item_id = data['id'] item_type = data['type'] if item_type == 'submission': item = db.session.get(Submission, item_id) if not item: return jsonify({"code": 2002, "data": "对象不存在"}) item.downvotes += 1 db.session.commit() else: return jsonify({"code": 2000, "data": "参数错误"}) return jsonify({"code": 1000, "data": ""}) except Exception as e: return jsonify({"code": 2003, "data": f"点踩失败: {str(e)}"}) @app.route('/api/test', methods=['GET']) def test_api(): return jsonify({"code": 1000, "data": ""}) @app.route('/api/statics', methods=['GET']) def get_statics(): try: post_count = Submission.query.count() comment_count = Comment.query.count() image_count = ImgFile.query.count() return jsonify({ "code": 1000, "data": { "posts": post_count, "comments": comment_count, "images": image_count } }) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/posts_info', methods=['GET']) def get_posts_info(): try: page = request.args.get("page", 1, type=int) if page < 1: page = 1 per_page = 10 # 相比v1的优化:使用数据库层面的过滤和分页,避免一次性加载所有数据 pagination = Submission.query.filter_by(status='Pass')\ .order_by(Submission.id.desc())\ .paginate(page=page, per_page=per_page, error_out=False) page_posts = pagination.items data = [] for s in page_posts: data.append({ "id": s.id, "content": s.content, "upvotes": s.upvotes, "downvotes": s.downvotes, "created_at": s.created_at.isoformat() if s.created_at else None, "comment_count": len(s.comments), "total_pages": pagination.total, }) return jsonify({ "code": 1000, "data": data }) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/post_info', methods=['GET']) def get_post_info(): try: post_id = request.args.get("id", type=int) if not post_id: return jsonify({"code": 2000, "data": "参数错误"}) submission = Submission.query.filter_by(id=post_id, status='Pass').first() if not submission: return jsonify({"code": 2002, "data": "投稿不存在"}) data = { "id": submission.id, "content": submission.content, "upvotes": submission.upvotes, "downvotes": submission.downvotes, "created_at": submission.created_at.isoformat() if submission.created_at else None, "comment_count": len(submission.comments), } return jsonify({"code": 1000, "data": data}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/hot_topics', methods=['GET']) def get_hot_topics(): try: rows = db.session.query( Hashtag.name, db.func.count(Hashtag.name).label('count') ).group_by(Hashtag.name).order_by(db.func.count(Hashtag.name).desc()).limit(5).all() data = [{"name": name, "count": int(count)} for name, count in rows] return jsonify({"code": 1000, "data": {"list": data}}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) # --- 彩蛋 --- @app.route('/api/teapot', methods=['GET']) def return_418(): abort(418) # --- 用户的管理api端点 --- # TODO: 用户管理端点 # --- 管理员api端点 --- # TODO: 添加管理员端点 # 主函数 if __name__ == '__main__': init_db() load_config() app.run(debug=True, port=5000)