优化mysql连接数,以及几个查询接口增加了redis缓存

main
zc 1 month ago
parent 5b1718b4dc
commit 560500b66b
  1. 112
      app/__init__.py
  2. 55
      app/api/chat/ai/chat_service.py

@ -1,9 +1,11 @@
import os
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from tortoise import Tortoise
from redis.asyncio import Redis
from fastapi.staticfiles import StaticFiles
from aiomysql import create_pool, OperationalError as AiomysqlOperationalError
from app.core.exceptions import SettingNotFound
from app.core.init_app import (
@ -12,59 +14,110 @@ from app.core.init_app import (
register_exceptions,
register_routers,
)
# 导入限流中间件
#from app.core.ratelimit import RateLimitMiddleware
from aiomysql import create_pool
try:
from app.settings.config import settings
except ImportError:
raise SettingNotFound("Can not import settings")
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化资源
app.state.mysql_pool = None
app.state.redis_client = None
try:
# 初始化 MySQL 连接池(支持100-200并发的配置)
app.state.mysql_pool = await create_pool(
host=settings.FLOW_MYSQL_HOST,
port=settings.FLOW_MYSQL_PORT,
user=settings.FLOW_MYSQL_USER,
password=settings.FLOW_MYSQL_PASSWORD,
db=settings.FLOW_MYSQL_DB,
minsize=10,
maxsize=50,
autocommit=True,
pool_recycle=300
# 核心并发参数
minsize=20, # 初始保持20个连接(避免频繁创建销毁)
maxsize=80, # 最大连接数(关键参数)
# 连接管理优化
pool_recycle=180, # 180秒回收连接(小于MySQL的wait_timeout)
connect_timeout=10, # 连接超时时间
read_timeout=15, # 读操作超时(应对复杂查询)
write_timeout=15, # 写操作超时
# 其他优化
charset='utf8mb4',
echo=False,
# 连接复用策略
maxcached=60, # 最多缓存60个空闲连接
maxusage=None, # 不限制连接使用次数
setsession=None, # 连接建立时执行的SQL(如SET time_zone = '+8:00')
)
# 简单测试连接有效性
# 验证连接有效性
async with app.state.mysql_pool.acquire() as conn:
await conn.ping()
print("✅ MySQL 连接成功")
logger.info("✅ MySQL 连接池初始化成功")
except AiomysqlOperationalError as e:
logger.error(f"❌ MySQL 连接池初始化失败: 数据库操作错误 - {str(e)}")
raise
except Exception as e:
print(f"❌ MySQL 连接失败: {e}")
logger.error(f"❌ MySQL 连接池初始化失败: 未知错误 - {str(e)}")
raise
try:
# 初始化 Redis 连接
# 初始化 Redis 连接(增强并发支持)
app.state.redis_client = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True,
password=settings.REDIS_PASSWORD
password=settings.REDIS_PASSWORD,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True,
# 连接池参数(Redis默认有连接池,这里调整)
max_connections=100, # Redis连接池最大连接数
)
# 测试 Redis 连接
await app.state.redis_client.ping()
print("✅ Redis 连接成功")
logger.info("✅ Redis 连接成功")
except Exception as e:
print(f"❌ Redis 连接失败: {e}")
# 执行其他初始化操作
await init_data()
logger.error(f"❌ Redis 连接失败: {str(e)}")
try:
await init_data()
except Exception as e:
logger.error(f"❌ 初始化数据失败: {str(e)}")
raise
yield
# 关闭 Redis 连接
if hasattr(app.state, 'redis_client'):
await app.state.redis_client.close()
print("🛑 Redis 连接关闭")
# 关闭 Tortoise 连接
await Tortoise.close_connections()
# 资源清理
if app.state.redis_client:
try:
await app.state.redis_client.close()
logger.info("🛑 Redis 连接已关闭")
except Exception as e:
logger.warning(f" Redis 关闭失败: {str(e)}")
if app.state.mysql_pool:
try:
app.state.mysql_pool.close()
await app.state.mysql_pool.wait_closed()
logger.info("🛑 MySQL 连接池已关闭")
except Exception as e:
logger.warning(f" MySQL 连接池关闭失败: {str(e)}")
try:
await Tortoise.close_connections()
logger.info("🛑 Tortoise 连接已关闭")
except Exception as e:
logger.warning(f" Tortoise 连接关闭失败: {str(e)}")
def create_app() -> FastAPI:
@ -76,13 +129,16 @@ def create_app() -> FastAPI:
middleware=make_middlewares(),
lifespan=lifespan,
)
# 配置静态文件服务
app.mount("/resource", StaticFiles(directory=os.path.join(settings.BASE_DIR, 'web', 'public', 'resource')), name="resource")
static_dir = os.path.join(settings.BASE_DIR, 'web', 'public', 'resource')
if os.path.exists(static_dir):
app.mount("/resource", StaticFiles(directory=static_dir), name="resource")
else:
logger.warning(f" 静态文件目录不存在: {static_dir}")
register_exceptions(app)
register_routers(app, prefix="/api")
return app
app = create_app()
app = create_app()

@ -491,6 +491,17 @@ async def get_all_scenic_flow_data(request: Request) -> list:
"""
查询所有景区的客流数据计算承载率并按承载率倒序排列
"""
# Redis 缓存查询
cache_key = "all_scenic_flow_data"
try:
redis_client = request.app.state.redis_client
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
@ -540,6 +551,12 @@ async def get_all_scenic_flow_data(request: Request) -> list:
"capacity_rate": capacity_rate
})
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:
@ -551,6 +568,17 @@ async def get_scenic_detail_data(request: Request, id: int) -> dict:
"""
查询单个景区的详细信息包含舒适度判断
"""
# Redis 缓存查询
cache_key = f"scenic_detail:{id}"
try:
redis_client = request.app.state.redis_client
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
@ -603,7 +631,7 @@ async def get_scenic_detail_data(request: Request, id: int) -> dict:
capacity_rate = 0.0
comfort_level = "舒适"
return {
result = {
"scenic_name": scenic_name,
"enter_num": enter_num or 0,
"leave_num": leave_num or 0,
@ -613,6 +641,14 @@ async def get_scenic_detail_data(request: Request, id: int) -> dict:
"comfort_level": comfort_level
}
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:
print(f"[MySQL] 查询景区详情数据失败: {e}")
return None
@ -631,6 +667,17 @@ async def get_scenic_parking_data(request: Request, scenic_id: int, distance: in
Returns:
list: 停车场信息列表按距离排序
"""
# Redis 缓存查询
cache_key = f"scenic_parking:{scenic_id}:{distance}"
try:
redis_client = request.app.state.redis_client
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
@ -697,6 +744,12 @@ async def get_scenic_parking_data(request: Request, scenic_id: int, distance: in
"lat": lat or 0
})
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:

Loading…
Cancel
Save