Files
v2/back/main.py
2026-01-31 17:31:53 +08:00

844 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 这里是Sycamore whisper的后端代码喵
# 但愿比V1写的好喵
from flask import Flask, jsonify, request, abort, send_from_directory
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import foreign
import os
import uuid
import json
from datetime import datetime
app = Flask(__name__)
# 跨域策略,仅/api/*允许所有来源的请求
CORS(app, resources={r"/api/*": {"origins": "*"}})
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data', 'db.sqlite')
IMG_DIR = os.path.join(BASE_DIR, 'data', 'img')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 全局配置变量
NEED_AUDIT = True
FILE_SIZE_LIMIT_MB = 10.0
FILE_FORMATS = ["png", "jpg", "jpeg", "gif", "webp"]
def now_time():
return datetime.now()
# --- 定义数据库结构 ---
class SiteSettings(db.Model):
__tablename__ = 'site_settings'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(255))
icon = db.Column(db.String(255))
footer_text = db.Column(db.Text)
repo_link = db.Column(db.String(255))
enable_repo_button = db.Column(db.Boolean, default=False)
enable_notice = db.Column(db.Boolean, default=False)
need_audit = db.Column(db.Boolean, default=True)
about = db.Column(db.Text)
file_size_limit = db.Column(db.Float) # MB
file_formats = db.Column(db.Text) # JSON list
class Identity(db.Model):
__tablename__ = 'identity'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(36), unique=True, nullable=False)
class Submission(db.Model):
__tablename__ = 'submissions'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
content = db.Column(db.Text, nullable=False)
identity_token = db.Column(db.String(36), nullable=True)
status = db.Column(db.String(20), default='Pending', index=True)
created_at = db.Column(db.DateTime, default=now_time)
updated_at = db.Column(db.DateTime, default=now_time, onupdate=now_time)
upvotes = db.Column(db.Integer, default=0)
downvotes = db.Column(db.Integer, default=0)
comments = db.relationship('Comment', backref='submission', lazy=True, cascade='all, delete-orphan')
hashtags = db.relationship(
'Hashtag',
primaryjoin="and_(Hashtag.type==0, foreign(Hashtag.target_id)==Submission.id)",
cascade='all, delete-orphan',
lazy=True,
overlaps="hashtags"
)
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
submission_id = db.Column(db.Integer, db.ForeignKey('submissions.id'), nullable=False, index=True)
nickname = db.Column(db.String(50), default='匿名用户')
content = db.Column(db.Text, nullable=False)
identity_token = db.Column(db.String(36), nullable=True)
created_at = db.Column(db.DateTime, default=now_time)
parent_comment_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True)
hashtags = db.relationship(
'Hashtag',
primaryjoin="and_(Hashtag.type==1, foreign(Hashtag.target_id)==Comment.id)",
cascade='all, delete-orphan',
lazy=True,
overlaps="hashtags"
)
class Report(db.Model):
__tablename__ = 'reports'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
submission_id = db.Column(db.Integer, nullable=False)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
identity_token = db.Column(db.String(36), nullable=True)
status = db.Column(db.String(20), default='Pending')
created_at = db.Column(db.DateTime, default=lambda: datetime.now())
class Hashtag(db.Model):
__tablename__ = 'hashtags'
__table_args__ = (
db.Index('ix_hashtags_type_target', 'type', 'target_id'),
)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
type = db.Column(db.Integer, nullable=False) # 0: Submission, 1: Comment
target_id = db.Column(db.Integer, nullable=False)
name = db.Column(db.String(255), nullable=False, index=True)
class DenyWord(db.Model):
__tablename__ = 'deny_words'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
word = db.Column(db.String(255), unique=True, nullable=False)
class ImgFile(db.Model):
__tablename__ = 'img_files'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
path = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(255), nullable=True)
identity_token = db.Column(db.String(36), nullable=True)
class SiteNotice(db.Model):
__tablename__ = 'site_notice'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
type = db.Column(db.String(10), default='md', nullable=False)
content = db.Column(db.Text, default='', nullable=False)
version = db.Column(db.Integer, default=0, nullable=False)
created_at = db.Column(db.DateTime, default=now_time)
updated_at = db.Column(db.DateTime, default=now_time, onupdate=now_time)
# 初始化数据库函数
def init_db():
if not os.path.exists('./data'):
os.makedirs('./data')
if not os.path.exists(IMG_DIR):
os.makedirs(IMG_DIR)
with app.app_context():
db.create_all()
try:
existing = SiteNotice.query.first()
if not existing:
n = SiteNotice(type='md', content='', version=0)
db.session.add(n)
db.session.commit()
except Exception:
db.session.rollback()
def load_config():
global NEED_AUDIT, FILE_SIZE_LIMIT_MB, FILE_FORMATS
with app.app_context():
try:
settings = SiteSettings.query.first()
if settings:
if hasattr(settings, 'need_audit') and settings.need_audit is not None:
NEED_AUDIT = settings.need_audit
if getattr(settings, 'file_size_limit', None) is not None:
try:
FILE_SIZE_LIMIT_MB = float(settings.file_size_limit)
except Exception:
pass
if getattr(settings, 'file_formats', None):
try:
raw = settings.file_formats
if isinstance(raw, str):
parsed = json.loads(raw)
else:
parsed = raw
if isinstance(parsed, list):
FILE_FORMATS = [str(x).strip().lstrip('.').lower() for x in parsed if str(x).strip()]
except Exception:
pass
except Exception as e:
print(f"Warning: Failed to load settings: {e}")
global DENY_WORDS_CACHE
with app.app_context():
try:
words = db.session.query(DenyWord.word).all()
# words 是 list of tuples [('word1',), ('word2',)]
DENY_WORDS_CACHE = [w[0] for w in words]
print(f"Loaded {len(DENY_WORDS_CACHE)} deny words.")
except Exception as e:
print(f"Warning: Failed to load deny words: {e}")
def is_identity_valid(token):
if not token:
return True
return Identity.query.filter_by(token=token).first() is not None
def normalize_identity(token):
if token and not is_identity_valid(token):
return False, None
return True, token if token else None
def find_deny_word(content):
try:
words = DENY_WORDS_CACHE
except Exception:
words = []
for word in words:
if word in content:
return word
return None
def save_hashtags(tag_type, target_id, hashtopic):
if not hashtopic:
return
for tag in hashtopic:
new_tag = Hashtag(
type=tag_type,
target_id=target_id,
name=tag
)
db.session.add(new_tag)
def detect_image_type(header: bytes):
if header.startswith(b"\xFF\xD8\xFF"):
return "jpeg"
if header.startswith(b"\x89PNG\r\n\x1a\n"):
return "png"
if header.startswith(b"GIF87a") or header.startswith(b"GIF89a"):
return "gif"
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"WEBP":
return "webp"
return None
# --- 用户普通api端点 ---
@app.route('/api/settings', methods=['GET'])
def get_settings():
settings = SiteSettings.query.first()
if settings:
data = {
"title": settings.title,
"icon": settings.icon,
"footer_text": settings.footer_text,
"repo_link": settings.repo_link,
"enable_repo_button": settings.enable_repo_button,
"enable_notice": settings.enable_notice,
"file_size_limit": settings.file_size_limit,
"file_formats": settings.file_formats
}
return jsonify({
"code": 1000,
"data": data
})
else:
return jsonify({"code": 2002,"data":{}})
@app.route('/api/about', methods=['GET'])
def get_about():
settings = SiteSettings.query.first()
if settings and settings.about:
return jsonify({
"code": 1000,
"data": settings.about
})
else:
# about在初始化时不会被设置避免管理面板报错返回默认文本
return jsonify({
"code": 1000,
"data": "# 默认关于页面\n关于页面未设置,请前往管理面板操作。"
})
@app.route('/api/site_notice', methods=['GET'])
def get_site_notice():
try:
notice = SiteNotice.query.order_by(SiteNotice.id.asc()).first()
if not notice:
notice = SiteNotice(type='md', content='', version=0)
db.session.add(notice)
db.session.commit()
data = {
"type": notice.type if notice.type in ['md', 'url'] else 'md',
"content": notice.content or '',
"version": int(notice.version or 0),
}
return jsonify({"code": 1000, "data": data})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/get_id_token', methods=['GET'])
def get_id_token():
try:
token = str(uuid.uuid4())
new_identity = Identity(token=token)
db.session.add(new_identity)
db.session.commit()
return jsonify({
"code": 1000,
"data": token
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/verify_token', methods=['POST'])
def verify_token():
try:
data = request.get_json()
if not data or 'token' not in data:
return jsonify({
"code": 2000,
"data": "参数错误"
})
token = data['token']
identity = Identity.query.filter_by(token=token).first()
if identity:
return jsonify({
"code": 1000,
"data": True
})
else:
return jsonify({
"code": 1000,
"data": False
})
except Exception as e:
return jsonify({
"code": 2003,
"data": f"验证失败: {str(e)}"
})
@app.route('/api/post', methods=['POST'])
def submit_post():
try:
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"code": 2000, "data": "内容不能为空"})
content = data['content']
nickname = data.get('nickname') or '匿名用户'
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 违禁词检测
if find_deny_word(content):
return jsonify({"code": 2005, "data": "提交内容包含违禁词"})
# Identity 验证
ok, identity_token = normalize_identity(identity_token)
if not ok:
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
# 保存
now = now_time()
new_post = Submission(
content=content,
identity_token=identity_token,
status='Pending' if NEED_AUDIT else 'Pass',
created_at=now,
updated_at=now,
)
db.session.add(new_post)
db.session.flush()
# 保存 Hashtags
save_hashtags(0, new_post.id, hashtopic)
db.session.commit()
code = 1002 if new_post.status == 'Pending' else 1001
return jsonify({"code": code, "data": {"id": new_post.id}})
except Exception as e:
db.session.rollback()
return jsonify({"code": 2003, "data": f"投稿失败: {str(e)}"})
@app.route('/api/comment', methods=['POST'])
def submit_comment():
try:
data = request.get_json()
if not data or 'content' not in data or 'submission_id' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
submission_id = data['submission_id']
content = data['content']
nickname = data.get('nickname') or '匿名用户'
parent_comment_id = data.get('parent_comment_id', 0)
try:
parent_comment_id = int(parent_comment_id)
except Exception:
parent_comment_id = 0
hashtopic = data.get('hashtopic', [])
if not isinstance(hashtopic, list):
return jsonify({"code": 2000, "data": "hashtopic必须为列表"})
identity_token = data.get('identity')
# 检查 submission 是否存在
if not db.session.get(Submission, submission_id):
return jsonify({"code": 2002, "data": "投稿不存在"})
# 违禁词检测
if find_deny_word(content):
return jsonify({"code": 2005, "data": "提交内容包含违禁词"})
# Identity 验证
ok, identity_token = normalize_identity(identity_token)
if not ok:
return jsonify({"code": 2004, "data": "无效的 Identity Token"})
new_comment = Comment(
submission_id=submission_id,
nickname=nickname,
content=content,
identity_token=identity_token,
parent_comment_id=None if parent_comment_id == 0 else parent_comment_id
)
db.session.add(new_comment)
db.session.flush()
# 保存 Hashtags
save_hashtags(1, new_comment.id, hashtopic)
db.session.commit()
return jsonify({"code": 1001, "data": {"id": new_comment.id}})
except Exception as e:
db.session.rollback()
return jsonify({"code": 2003, "data": f"评论失败: {str(e)}"})
@app.route('/api/report', methods=['POST'])
def submit_report():
try:
data = request.get_json()
if not data or 'id' not in data or 'title' not in data or 'content' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
submission_id = data.get('id')
title = data.get('title')
content = data.get('content')
title = str(title).strip() if title is not None else ''
content = str(content).strip() if content is not None else ''
if not title or not content:
return jsonify({"code": 2000, "data": "参数错误"})
submission = db.session.get(Submission, submission_id)
if not submission:
return jsonify({"code": 2002, "data": "投稿不存在"})
identity_token = data.get('identity')
ok, identity_token = normalize_identity(identity_token)
if not ok:
return jsonify({"code": 2004, "data": "无效的Identity Token"})
report = Report(
submission_id=submission_id,
title=title,
content=content,
identity_token=identity_token,
status='Pending',
)
db.session.add(report)
db.session.commit()
return jsonify({"code": 1001, "data": {"id": report.id}})
except Exception as e:
return jsonify({"code": 2003, "data": f"投诉失败: {str(e)}"})
@app.route('/api/get/comment', methods=['GET'])
def get_comments():
try:
submission_id = request.args.get("id", type=int)
if not submission_id:
return jsonify({"code": 2000, "data": "参数错误"})
submission = db.session.get(Submission, submission_id)
if not submission or submission.status != "Pass":
return jsonify({"code": 2002, "data": "投稿不存在"})
page = request.args.get("page", 1, type=int)
if page < 1:
page = 1
per_page = 5
pagination = Comment.query.filter_by(submission_id=submission_id)\
.order_by(Comment.id.asc())\
.paginate(page=page, per_page=per_page, error_out=False)
data = [{
"id": c.id,
"nickname": c.nickname,
"content": c.content,
"parent_comment_id": c.parent_comment_id if c.parent_comment_id is not None else 0,
"time": c.created_at.isoformat() if c.created_at else None
} for c in pagination.items]
return jsonify({"code": 1000, "data": {"comments": data, "total_pages": pagination.pages}})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/upload_pic', methods=['POST'])
def upload_pic():
try:
if 'file' not in request.files:
return jsonify({"code": 2000, "data": "参数错误"})
file = request.files['file']
if file.filename == '':
return jsonify({"code": 2000, "data": "参数错误"})
file.seek(0, os.SEEK_END)
file_length = file.tell()
file.seek(0)
if FILE_SIZE_LIMIT_MB is not None:
limit_bytes = float(FILE_SIZE_LIMIT_MB) * 1024 * 1024
if file_length > limit_bytes:
return jsonify({"code": 2006, "data": "上传的图片超出限制大小"})
ext = os.path.splitext(file.filename)[1].lstrip('.').lower()
if not ext or (FILE_FORMATS and ext not in FILE_FORMATS):
return jsonify({"code": 2007, "data": "上传的文件类型不支持"})
header = file.read(512)
file.seek(0)
detected = detect_image_type(header)
ext_map = {
'jpg': 'jpeg',
'jpeg': 'jpeg',
'png': 'png',
'gif': 'gif',
'webp': 'webp',
}
expected = ext_map.get(ext)
if not expected or detected != expected:
return jsonify({"code": 2007, "data": "上传的文件类型不支持"})
filename = f"{uuid.uuid4().hex}.{ext}"
filepath = os.path.join(IMG_DIR, filename)
file.save(filepath)
identity_token = request.form.get('identity_token') or None
name = file.filename or None
db.session.add(ImgFile(path=filename, name=name, identity_token=identity_token))
db.session.commit()
return jsonify({"code": 1001, "data": f"/api/files/{filename}"})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/files/<path:file_name>', methods=['GET'])
def serve_file(file_name):
return send_from_directory(IMG_DIR, file_name)
@app.route('/api/file_name', methods=['GET'])
def get_file_name():
try:
path = request.args.get("path")
if not path:
return jsonify({"code": 2000, "data": "参数错误"})
record = ImgFile.query.filter_by(path=path).first()
if not record:
return jsonify({"code": 2002, "data": "数据不存在"})
return jsonify({"code": 1000, "data": record.name or ""})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/up', methods=['POST'])
def upvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
if item_type == 'submission':
item = db.session.get(Submission, item_id)
if not item:
return jsonify({"code": 2002, "data": "对象不存在"})
item.upvotes += 1
db.session.commit()
else:
return jsonify({"code": 2000, "data": "参数错误"})
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点赞失败: {str(e)}"})
@app.route('/api/down', methods=['POST'])
def downvote():
try:
data = request.get_json()
if not data or 'id' not in data or 'type' not in data:
return jsonify({"code": 2000, "data": "参数错误"})
item_id = data['id']
item_type = data['type']
if item_type == 'submission':
item = db.session.get(Submission, item_id)
if not item:
return jsonify({"code": 2002, "data": "对象不存在"})
item.downvotes += 1
db.session.commit()
else:
return jsonify({"code": 2000, "data": "参数错误"})
return jsonify({"code": 1000, "data": ""})
except Exception as e:
return jsonify({"code": 2003, "data": f"点踩失败: {str(e)}"})
@app.route('/api/test', methods=['GET'])
def test_api():
return jsonify({"code": 1000, "data": ""})
@app.route('/api/statics', methods=['GET'])
def get_statics():
try:
post_count = Submission.query.count()
comment_count = Comment.query.count()
image_count = ImgFile.query.count()
return jsonify({
"code": 1000,
"data": {
"posts": post_count,
"comments": comment_count,
"images": image_count
}
})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
# 注意区分哦这两个路由一个是通过页码获取10个投稿的列表一个是通过id获取单个投稿详情
@app.route('/api/posts_info', methods=['GET'])
def get_posts_info():
try:
page = request.args.get("page", 1, type=int)
if page < 1:
page = 1
per_page = 10
# 相比v1的优化使用数据库层面的过滤和分页避免一次性加载所有数据
pagination = Submission.query.filter_by(status='Pass')\
.order_by(Submission.id.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
page_posts = pagination.items
data = []
for s in page_posts:
data.append({
"id": s.id,
"content": s.content,
"upvotes": s.upvotes,
"downvotes": s.downvotes,
"created_at": s.created_at.isoformat() if s.created_at else None,
"time": s.created_at.isoformat() if s.created_at else None,
"modified": 0 if (not s.updated_at or not s.created_at or s.updated_at == s.created_at) else 1,
"comment_count": len(s.comments),
"total_pages": pagination.total,
})
return jsonify({
"code": 1000,
"data": data
})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/get_posts_by_tag', methods=['GET'])
def get_posts_by_tag():
try:
tag = request.args.get("tag")
if not tag:
return jsonify({"code": 2000, "data": "参数错误"})
tag = str(tag).strip().strip('"').strip("'")
if tag.startswith('#'):
tag = tag[1:]
if not tag:
return jsonify({"code": 2000, "data": "参数错误"})
page = request.args.get("page", 1, type=int)
if page < 1:
page = 1
per_page = 10
submission_ids_query = db.session.query(
Hashtag.target_id.label('submission_id')
).filter(
Hashtag.type == 0,
Hashtag.name == tag
)
comment_submission_ids_query = db.session.query(
Comment.submission_id.label('submission_id')
).join(
Hashtag,
db.and_(Hashtag.type == 1, Hashtag.target_id == Comment.id)
).filter(
Hashtag.name == tag
)
union_subq = submission_ids_query.union(comment_submission_ids_query).subquery()
query = Submission.query.filter(
Submission.id.in_(db.select(union_subq.c.submission_id)),
Submission.status == 'Pass'
).order_by(Submission.id.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
data = []
for s in pagination.items:
data.append({
"id": s.id,
"content": s.content,
"upvotes": s.upvotes,
"downvotes": s.downvotes,
"created_at": s.created_at.isoformat() if s.created_at else None,
"time": s.created_at.isoformat() if s.created_at else None,
"modified": 0 if (not s.updated_at or not s.created_at or s.updated_at == s.created_at) else 1,
"comment_count": len(s.comments),
"total_pages": pagination.total,
})
return jsonify({"code": 1000, "data": data})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/post_info', methods=['GET'])
def get_post_info():
try:
post_id = request.args.get("id", type=int)
if not post_id:
return jsonify({"code": 2000, "data": "参数错误"})
submission = Submission.query.filter_by(id=post_id, status='Pass').first()
if not submission:
return jsonify({"code": 2002, "data": "投稿不存在"})
data = {
"id": submission.id,
"content": submission.content,
"upvotes": submission.upvotes,
"downvotes": submission.downvotes,
"created_at": submission.created_at.isoformat() if submission.created_at else None,
"time": submission.created_at.isoformat() if submission.created_at else None,
"modified": 0 if (not submission.updated_at or not submission.created_at or submission.updated_at == submission.created_at) else 1,
"comment_count": len(submission.comments),
}
return jsonify({"code": 1000, "data": data})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/hot_topics', methods=['GET'])
def get_hot_topics():
try:
submission_tags = db.session.query(
Hashtag.name.label('name')
).join(
Submission,
db.and_(Hashtag.type == 0, Hashtag.target_id == Submission.id)
).filter(
Submission.status == 'Pass'
)
comment_tags = db.session.query(
Hashtag.name.label('name')
).join(
Comment,
db.and_(Hashtag.type == 1, Hashtag.target_id == Comment.id)
).join(
Submission,
Comment.submission_id == Submission.id
).filter(
Submission.status == 'Pass'
)
union_subq = submission_tags.union_all(comment_tags).subquery()
rows = db.session.query(
union_subq.c.name,
db.func.count(union_subq.c.name).label('count')
).group_by(
union_subq.c.name
).order_by(
db.func.count(union_subq.c.name).desc(),
union_subq.c.name.asc()
).limit(3).all()
data = [{"name": name, "count": int(count)} for name, count in rows]
return jsonify({"code": 1000, "data": {"list": data}})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
@app.route('/api/tag_suggest', methods=['GET'])
def tag_suggest():
try:
prefix = request.args.get("prefix", "")
if prefix is None:
prefix = ""
prefix = str(prefix).strip().lstrip('#')
if not prefix:
return jsonify({"code": 1000, "data": {"list": []}})
limit = request.args.get("limit", 5, type=int)
if limit < 1:
limit = 1
if limit > 10:
limit = 10
rows = db.session.query(
Hashtag.name,
db.func.count(Hashtag.name).label('count')
).filter(
Hashtag.name.like(f"{prefix}%")
).group_by(
Hashtag.name
).order_by(
db.func.count(Hashtag.name).desc(),
Hashtag.name.asc()
).limit(limit).all()
data = [name for name, _ in rows]
return jsonify({"code": 1000, "data": {"list": data}})
except Exception as e:
return jsonify({"code": 2003, "data": str(e)})
# --- 彩蛋 ---
@app.route('/api/teapot', methods=['GET'])
def return_418():
abort(418)
# --- 用户的管理api端点 ---
# TODO: 用户管理端点
# --- 管理员api端点 ---
# TODO: 添加管理员端点
# 主函数
if __name__ == '__main__':
init_db()
load_config()
app.run(debug=True, port=5000)