# 这里是Sycamore whisper的后端代码喵! # 但愿比V1写的好喵(逃 from flask import Flask, jsonify, request, abort, send_from_directory from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm import foreign import os import uuid import json import filetype from PIL import Image from PIL import UnidentifiedImageError from datetime import datetime app = Flask(__name__) # 跨域策略,仅/api/*允许所有来源的请求 CORS(app, resources={r"/api/*": {"origins": "*"}}) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, 'data', 'db.sqlite') IMG_DIR = os.path.join(BASE_DIR, 'data', 'img') app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 全局配置变量 NEED_AUDIT = True FILE_SIZE_LIMIT_MB = 10.0 FILE_FORMATS = ["image/png", "image/jpeg", "image/gif", "image/webp"] def now_time(): return datetime.now() # --- 定义数据库结构 --- class SiteSettings(db.Model): __tablename__ = 'site_settings' id = db.Column(db.Integer, primary_key=True, autoincrement=True) title = db.Column(db.String(255)) icon = db.Column(db.String(255)) footer_text = db.Column(db.Text) repo_link = db.Column(db.String(255)) enable_repo_button = db.Column(db.Boolean, default=False) enable_notice = db.Column(db.Boolean, default=False) need_audit = db.Column(db.Boolean, default=True) about = db.Column(db.Text) file_size_limit = db.Column(db.Float) # MB file_formats = db.Column(db.Text) # JSON list class Identity(db.Model): __tablename__ = 'identity' id = db.Column(db.Integer, primary_key=True, autoincrement=True) token = db.Column(db.String(36), unique=True, nullable=False) class Submission(db.Model): __tablename__ = 'submissions' id = db.Column(db.Integer, primary_key=True, autoincrement=True) content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) status = db.Column(db.String(20), default='Pending', index=True) created_at = db.Column(db.DateTime, default=now_time) updated_at = db.Column(db.DateTime, default=now_time, onupdate=now_time) upvotes = db.Column(db.Integer, default=0) downvotes = db.Column(db.Integer, default=0) comments = db.relationship('Comment', backref='submission', lazy=True, cascade='all, delete-orphan') hashtags = db.relationship( 'Hashtag', primaryjoin="and_(Hashtag.type==0, foreign(Hashtag.target_id)==Submission.id)", cascade='all, delete-orphan', lazy=True, overlaps="hashtags" ) class Comment(db.Model): __tablename__ = 'comments' id = db.Column(db.Integer, primary_key=True, autoincrement=True) submission_id = db.Column(db.Integer, db.ForeignKey('submissions.id'), nullable=False, index=True) nickname = db.Column(db.String(50), default='匿名用户') content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) created_at = db.Column(db.DateTime, default=now_time) parent_comment_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True) hashtags = db.relationship( 'Hashtag', primaryjoin="and_(Hashtag.type==1, foreign(Hashtag.target_id)==Comment.id)", cascade='all, delete-orphan', lazy=True, overlaps="hashtags" ) class Report(db.Model): __tablename__ = 'reports' id = db.Column(db.Integer, primary_key=True, autoincrement=True) submission_id = db.Column(db.Integer, nullable=False) title = db.Column(db.String(200), nullable=False) content = db.Column(db.Text, nullable=False) identity_token = db.Column(db.String(36), nullable=True) status = db.Column(db.String(20), default='Pending') created_at = db.Column(db.DateTime, default=lambda: datetime.now()) class Hashtag(db.Model): __tablename__ = 'hashtags' __table_args__ = ( db.Index('ix_hashtags_type_target', 'type', 'target_id'), ) id = db.Column(db.Integer, primary_key=True, autoincrement=True) type = db.Column(db.Integer, nullable=False) # 0: Submission, 1: Comment target_id = db.Column(db.Integer, nullable=False) name = db.Column(db.String(255), nullable=False, index=True) class DenyWord(db.Model): __tablename__ = 'deny_words' id = db.Column(db.Integer, primary_key=True, autoincrement=True) word = db.Column(db.String(255), unique=True, nullable=False) class ImgFile(db.Model): __tablename__ = 'img_files' id = db.Column(db.Integer, primary_key=True, autoincrement=True) path = db.Column(db.String(255), nullable=False) name = db.Column(db.String(255), nullable=True) identity_token = db.Column(db.String(36), nullable=True) class SiteNotice(db.Model): __tablename__ = 'site_notice' id = db.Column(db.Integer, primary_key=True, autoincrement=True) type = db.Column(db.String(10), default='md', nullable=False) content = db.Column(db.Text, default='', nullable=False) version = db.Column(db.Integer, default=0, nullable=False) created_at = db.Column(db.DateTime, default=now_time) updated_at = db.Column(db.DateTime, default=now_time, onupdate=now_time) # 初始化数据库函数 def init_db(): if not os.path.exists('./data'): os.makedirs('./data') if not os.path.exists(IMG_DIR): os.makedirs(IMG_DIR) with app.app_context(): db.create_all() try: existing = SiteNotice.query.first() if not existing: n = SiteNotice(type='md', content='', version=0) db.session.add(n) db.session.commit() except Exception: db.session.rollback() def load_config(): global NEED_AUDIT, FILE_SIZE_LIMIT_MB, FILE_FORMATS with app.app_context(): try: settings = SiteSettings.query.first() if settings: if hasattr(settings, 'need_audit') and settings.need_audit is not None: NEED_AUDIT = settings.need_audit if getattr(settings, 'file_size_limit', None) is not None: try: FILE_SIZE_LIMIT_MB = float(settings.file_size_limit) except Exception: pass if getattr(settings, 'file_formats', None): try: raw = settings.file_formats if isinstance(raw, str): parsed = json.loads(raw) else: parsed = raw if isinstance(parsed, list): FILE_FORMATS = [str(x).strip().lower() for x in parsed if str(x).strip()] except Exception: pass except Exception as e: print(f"Warning: Failed to load settings: {e}") global DENY_WORDS_CACHE with app.app_context(): try: words = db.session.query(DenyWord.word).all() # words 是 list of tuples [('word1',), ('word2',)] DENY_WORDS_CACHE = [w[0] for w in words] print(f"Loaded {len(DENY_WORDS_CACHE)} deny words.") except Exception as e: print(f"Warning: Failed to load deny words: {e}") def is_identity_valid(token): if not token: return True return Identity.query.filter_by(token=token).first() is not None def normalize_identity(token): if token and not is_identity_valid(token): return False, None return True, token if token else None def find_deny_word(content): try: words = DENY_WORDS_CACHE except Exception: words = [] for word in words: if word in content: return word return None def save_hashtags(tag_type, target_id, hashtopic): if not hashtopic: return for tag in hashtopic: new_tag = Hashtag( type=tag_type, target_id=target_id, name=tag ) db.session.add(new_tag) def mime_allowed(mime, rules): if not rules: return True if not mime: return False mime = mime.lower() for rule in rules: rule = str(rule).strip().lower() if not rule: continue if rule == 'image/*': if mime.startswith('image/'): return True if mime == rule: return True return False # --- 用户普通api端点 --- @app.route('/api/settings', methods=['GET']) def get_settings(): settings = SiteSettings.query.first() if settings: data = { "title": settings.title, "icon": settings.icon, "footer_text": settings.footer_text, "repo_link": settings.repo_link, "enable_repo_button": settings.enable_repo_button, "enable_notice": settings.enable_notice, "file_size_limit": settings.file_size_limit, "file_formats": settings.file_formats } return jsonify({ "code": 1000, "data": data }) else: return jsonify({"code": 2002,"data":{}}) @app.route('/api/about', methods=['GET']) def get_about(): settings = SiteSettings.query.first() if settings and settings.about: return jsonify({ "code": 1000, "data": settings.about }) else: # about在初始化时不会被设置,避免报错,返回默认文本 return jsonify({ "code": 1000, "data": "# 默认关于页面\n关于页面未设置,请前往管理面板操作。" }) @app.route('/api/site_notice', methods=['GET']) def get_site_notice(): try: notice = SiteNotice.query.order_by(SiteNotice.id.asc()).first() if not notice: notice = SiteNotice(type='md', content='', version=0) db.session.add(notice) db.session.commit() data = { "type": notice.type if notice.type in ['md', 'url'] else 'md', "content": notice.content or '', "version": int(notice.version or 0), } return jsonify({"code": 1000, "data": data}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/get_id_token', methods=['GET']) def get_id_token(): try: token = str(uuid.uuid4()) new_identity = Identity(token=token) db.session.add(new_identity) db.session.commit() return jsonify({ "code": 1000, "data": token }) except Exception as e: return jsonify({ "code": 2003, "data": f"验证失败: {str(e)}" }) @app.route('/api/verify_token', methods=['POST']) def verify_token(): try: data = request.get_json() if not data or 'token' not in data: return jsonify({ "code": 2000, "data": "参数错误" }) token = data['token'] identity = Identity.query.filter_by(token=token).first() if identity: return jsonify({ "code": 1000, "data": True }) else: return jsonify({ "code": 1000, "data": False }) except Exception as e: return jsonify({ "code": 2003, "data": f"验证失败: {str(e)}" }) @app.route('/api/post', methods=['POST']) def submit_post(): try: data = request.get_json() if not data or 'content' not in data: return jsonify({"code": 2000, "data": "内容不能为空"}) content = data['content'] nickname = data.get('nickname') or '匿名用户' hashtopic = data.get('hashtopic', []) if not isinstance(hashtopic, list): return jsonify({"code": 2000, "data": "hashtopic必须为列表"}) identity_token = data.get('identity') # 违禁词检测 if find_deny_word(content): return jsonify({"code": 2005, "data": "提交内容包含违禁词"}) # Identity 验证 ok, identity_token = normalize_identity(identity_token) if not ok: return jsonify({"code": 2004, "data": "无效的 Identity Token"}) # 保存 now = now_time() new_post = Submission( content=content, identity_token=identity_token, status='Pending' if NEED_AUDIT else 'Pass', created_at=now, updated_at=now, ) db.session.add(new_post) db.session.flush() # 保存 Hashtags save_hashtags(0, new_post.id, hashtopic) db.session.commit() code = 1002 if new_post.status == 'Pending' else 1000 return jsonify({"code": code, "data": {"id": new_post.id}}) except Exception as e: db.session.rollback() return jsonify({"code": 2003, "data": f"投稿失败: {str(e)}"}) @app.route('/api/comment', methods=['POST']) def submit_comment(): try: data = request.get_json() if not data or 'content' not in data or 'submission_id' not in data: return jsonify({"code": 2000, "data": "参数错误"}) submission_id = data['submission_id'] content = data['content'] nickname = data.get('nickname') or '匿名用户' parent_comment_id = data.get('parent_comment_id', 0) try: parent_comment_id = int(parent_comment_id) except Exception: parent_comment_id = 0 hashtopic = data.get('hashtopic', []) if not isinstance(hashtopic, list): return jsonify({"code": 2000, "data": "hashtopic必须为列表"}) identity_token = data.get('identity') # 检查 submission 是否存在 if not db.session.get(Submission, submission_id): return jsonify({"code": 2002, "data": "投稿不存在"}) # 违禁词检测 if find_deny_word(content): return jsonify({"code": 2005, "data": "提交内容包含违禁词"}) # Identity 验证 ok, identity_token = normalize_identity(identity_token) if not ok: return jsonify({"code": 2004, "data": "无效的 Identity Token"}) new_comment = Comment( submission_id=submission_id, nickname=nickname, content=content, identity_token=identity_token, parent_comment_id=None if parent_comment_id == 0 else parent_comment_id ) db.session.add(new_comment) db.session.flush() # 保存 Hashtags save_hashtags(1, new_comment.id, hashtopic) db.session.commit() return jsonify({"code": 1000, "data": {"id": new_comment.id}}) except Exception as e: db.session.rollback() return jsonify({"code": 2003, "data": f"评论失败: {str(e)}"}) @app.route('/api/report', methods=['POST']) def submit_report(): try: data = request.get_json() if not data or 'id' not in data or 'title' not in data or 'content' not in data: return jsonify({"code": 2000, "data": "参数错误"}) submission_id = data.get('id') title = data.get('title') content = data.get('content') title = str(title).strip() if title is not None else '' content = str(content).strip() if content is not None else '' if not title or not content: return jsonify({"code": 2000, "data": "参数错误"}) submission = db.session.get(Submission, submission_id) if not submission: return jsonify({"code": 2002, "data": "投稿不存在"}) identity_token = data.get('identity') ok, identity_token = normalize_identity(identity_token) if not ok: return jsonify({"code": 2004, "data": "无效的Identity Token"}) report = Report( submission_id=submission_id, title=title, content=content, identity_token=identity_token, status='Pending', ) db.session.add(report) db.session.commit() return jsonify({"code": 1000, "data": {"id": report.id}}) except Exception as e: return jsonify({"code": 2003, "data": f"投诉失败: {str(e)}"}) @app.route('/api/get/comment', methods=['GET']) def get_comments(): try: submission_id = request.args.get("id", type=int) if not submission_id: return jsonify({"code": 2000, "data": "参数错误"}) submission = db.session.get(Submission, submission_id) if not submission or submission.status != "Pass": return jsonify({"code": 2002, "data": "投稿不存在"}) page = request.args.get("page", 1, type=int) if page < 1: page = 1 per_page = 5 pagination = Comment.query.filter_by(submission_id=submission_id)\ .order_by(Comment.id.asc())\ .paginate(page=page, per_page=per_page, error_out=False) data = [{ "id": c.id, "nickname": c.nickname, "content": c.content, "parent_comment_id": c.parent_comment_id if c.parent_comment_id is not None else 0, "time": c.created_at.isoformat() if c.created_at else None } for c in pagination.items] return jsonify({"code": 1000, "data": {"comments": data, "total_pages": pagination.pages}}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/upload_pic', methods=['POST']) def upload_pic(): try: if 'file' not in request.files: return jsonify({"code": 2000, "data": "参数错误"}) file = request.files['file'] if file.filename == '': return jsonify({"code": 2000, "data": "参数错误"}) file.seek(0, os.SEEK_END) file_length = file.tell() file.seek(0) if FILE_SIZE_LIMIT_MB is not None: limit_bytes = float(FILE_SIZE_LIMIT_MB) * 1024 * 1024 if file_length > limit_bytes: return jsonify({"code": 2006, "data": f"上传的图片超出{FILE_SIZE_LIMIT_MB}MB限制大小"}) ext = os.path.splitext(file.filename)[1].lstrip('.').lower() kind = filetype.guess(file.read(5120)) file.seek(0) detected_mime = kind.mime if kind else None if not detected_mime or not mime_allowed(detected_mime, FILE_FORMATS): return jsonify({"code": 2007, "data": "上传的文件类型不支持"}) try: file.seek(0) img = Image.open(file) img.verify() file.seek(0) except (UnidentifiedImageError, OSError): file.seek(0) return jsonify({"code": 2008, "data": "上传的文件损坏"}) except Exception: file.seek(0) return jsonify({"code": 2008, "data": "上传的文件损坏"}) if not ext and kind: ext = kind.extension filename = f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex filepath = os.path.join(IMG_DIR, filename) file.save(filepath) identity_token = request.form.get('identity_token') or None name = file.filename or None db.session.add(ImgFile(path=filename, name=name, identity_token=identity_token)) db.session.commit() return jsonify({"code": 1000, "data": f"/api/files/{filename}"}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/files/', methods=['GET']) def serve_file(file_name): return send_from_directory(IMG_DIR, file_name) @app.route('/api/file_name', methods=['GET']) def get_file_name(): try: path = request.args.get("path") if not path: return jsonify({"code": 2000, "data": "参数错误"}) record = ImgFile.query.filter_by(path=path).first() if not record: return jsonify({"code": 2002, "data": "数据不存在"}) return jsonify({"code": 1000, "data": record.name or ""}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/up', methods=['POST']) def upvote(): try: data = request.get_json() if not data or 'id' not in data or 'type' not in data: return jsonify({"code": 2000, "data": "参数错误"}) item_id = data['id'] item_type = data['type'] if item_type == 'submission': item = db.session.get(Submission, item_id) if not item: return jsonify({"code": 2002, "data": "对象不存在"}) item.upvotes += 1 db.session.commit() else: return jsonify({"code": 2000, "data": "参数错误"}) return jsonify({"code": 1000, "data": ""}) except Exception as e: return jsonify({"code": 2003, "data": f"点赞失败: {str(e)}"}) @app.route('/api/down', methods=['POST']) def downvote(): try: data = request.get_json() if not data or 'id' not in data or 'type' not in data: return jsonify({"code": 2000, "data": "参数错误"}) item_id = data['id'] item_type = data['type'] if item_type == 'submission': item = db.session.get(Submission, item_id) if not item: return jsonify({"code": 2002, "data": "对象不存在"}) item.downvotes += 1 db.session.commit() else: return jsonify({"code": 2000, "data": "参数错误"}) return jsonify({"code": 1000, "data": ""}) except Exception as e: return jsonify({"code": 2003, "data": f"点踩失败: {str(e)}"}) @app.route('/api/test', methods=['GET']) def test_api(): return jsonify({"code": 1000, "data": ""}) @app.route('/api/statics', methods=['GET']) def get_statics(): try: post_count = Submission.query.count() comment_count = Comment.query.count() image_count = ImgFile.query.count() return jsonify({ "code": 1000, "data": { "posts": post_count, "comments": comment_count, "images": image_count } }) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) # 注意区分哦,这两个路由一个是通过页码获取10个投稿的列表,一个是通过id获取单个投稿详情 @app.route('/api/posts_info', methods=['GET']) def get_posts_info(): try: page = request.args.get("page", 1, type=int) if page < 1: page = 1 per_page = 10 # 相比v1的优化:使用数据库层面的过滤和分页,避免一次性加载所有数据 pagination = Submission.query.filter_by(status='Pass')\ .order_by(Submission.id.desc())\ .paginate(page=page, per_page=per_page, error_out=False) page_posts = pagination.items data = [] for s in page_posts: data.append({ "id": s.id, "content": s.content, "upvotes": s.upvotes, "downvotes": s.downvotes, "created_at": s.created_at.isoformat() if s.created_at else None, "time": s.created_at.isoformat() if s.created_at else None, "modified": 0 if (not s.updated_at or not s.created_at or s.updated_at == s.created_at) else 1, "comment_count": len(s.comments), "total_pages": pagination.total, }) return jsonify({ "code": 1000, "data": data }) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/get_posts_by_tag', methods=['GET']) def get_posts_by_tag(): try: tag = request.args.get("tag") if not tag: return jsonify({"code": 2000, "data": "参数错误"}) tag = str(tag).strip().strip('"').strip("'") if tag.startswith('#'): tag = tag[1:] if not tag: return jsonify({"code": 2000, "data": "参数错误"}) page = request.args.get("page", 1, type=int) if page < 1: page = 1 per_page = 10 submission_ids_query = db.session.query( Hashtag.target_id.label('submission_id') ).filter( Hashtag.type == 0, Hashtag.name == tag ) comment_submission_ids_query = db.session.query( Comment.submission_id.label('submission_id') ).join( Hashtag, db.and_(Hashtag.type == 1, Hashtag.target_id == Comment.id) ).filter( Hashtag.name == tag ) union_subq = submission_ids_query.union(comment_submission_ids_query).subquery() query = Submission.query.filter( Submission.id.in_(db.select(union_subq.c.submission_id)), Submission.status == 'Pass' ).order_by(Submission.id.desc()) pagination = query.paginate(page=page, per_page=per_page, error_out=False) data = [] for s in pagination.items: data.append({ "id": s.id, "content": s.content, "upvotes": s.upvotes, "downvotes": s.downvotes, "created_at": s.created_at.isoformat() if s.created_at else None, "time": s.created_at.isoformat() if s.created_at else None, "modified": 0 if (not s.updated_at or not s.created_at or s.updated_at == s.created_at) else 1, "comment_count": len(s.comments), "total_pages": pagination.total, }) return jsonify({"code": 1000, "data": data}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/post_info', methods=['GET']) def get_post_info(): try: post_id = request.args.get("id", type=int) if not post_id: return jsonify({"code": 2000, "data": "参数错误"}) submission = Submission.query.filter_by(id=post_id, status='Pass').first() if not submission: return jsonify({"code": 2002, "data": "投稿不存在"}) data = { "id": submission.id, "content": submission.content, "upvotes": submission.upvotes, "downvotes": submission.downvotes, "created_at": submission.created_at.isoformat() if submission.created_at else None, "time": submission.created_at.isoformat() if submission.created_at else None, "modified": 0 if (not submission.updated_at or not submission.created_at or submission.updated_at == submission.created_at) else 1, "comment_count": len(submission.comments), } return jsonify({"code": 1000, "data": data}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/hot_topics', methods=['GET']) def get_hot_topics(): try: submission_tags = db.session.query( Hashtag.name.label('name') ).join( Submission, db.and_(Hashtag.type == 0, Hashtag.target_id == Submission.id) ).filter( Submission.status == 'Pass' ) comment_tags = db.session.query( Hashtag.name.label('name') ).join( Comment, db.and_(Hashtag.type == 1, Hashtag.target_id == Comment.id) ).join( Submission, Comment.submission_id == Submission.id ).filter( Submission.status == 'Pass' ) union_subq = submission_tags.union_all(comment_tags).subquery() rows = db.session.query( union_subq.c.name, db.func.count(union_subq.c.name).label('count') ).group_by( union_subq.c.name ).order_by( db.func.count(union_subq.c.name).desc(), union_subq.c.name.asc() ).limit(3).all() data = [{"name": name, "count": int(count)} for name, count in rows] return jsonify({"code": 1000, "data": {"list": data}}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) @app.route('/api/tag_suggest', methods=['GET']) def tag_suggest(): try: prefix = request.args.get("prefix", "") if prefix is None: prefix = "" prefix = str(prefix).strip().lstrip('#') if not prefix: return jsonify({"code": 1000, "data": {"list": []}}) limit = request.args.get("limit", 5, type=int) if limit < 1: limit = 1 if limit > 10: limit = 10 rows = db.session.query( Hashtag.name, db.func.count(Hashtag.name).label('count') ).filter( Hashtag.name.like(f"{prefix}%") ).group_by( Hashtag.name ).order_by( db.func.count(Hashtag.name).desc(), Hashtag.name.asc() ).limit(limit).all() data = [name for name, _ in rows] return jsonify({"code": 1000, "data": {"list": data}}) except Exception as e: return jsonify({"code": 2003, "data": str(e)}) # --- 彩蛋 --- @app.route('/api/teapot', methods=['GET']) def return_418(): abort(418) # --- 用户的管理api端点 --- # TODO: 用户管理端点 # --- 管理员api端点 --- # TODO: 添加管理员端点 # 主函数 if __name__ == '__main__': init_db() load_config() app.run(debug=True, port=5000)