热门问题相关、推荐问题、跳转景区

main
zc 2 months ago
parent 54a52fe536
commit 1f9629dd1c
  1. 256
      app/api/chat/ai/chat_router.py
  2. 267
      app/api/chat/ai/chat_service.py
  3. 20
      app/models/ChatIn.py
  4. 14
      app/models/hot_question.py
  5. 2
      app/settings/config.py

@ -4,14 +4,23 @@ from starlette.requests import Request
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator, List
from .chat_service import classify, extract_spot, query_flow, gen_markdown_stream, ai_chat_stream, handle_quick_question
from app.models.ChatIn import ChatIn
from app.models.ChatIn import ChatIn, AllScenicFlowRequest, ScenicDetailRequest, ScenicParkingRequest
from app.models.quick_question import QuickQuestion
from app.models.hot_question import HotQuestion
from pydantic import BaseModel
import hmac
import hashlib
import time
import json
from app.settings.config import settings
# 更新导入语句,添加新函数
from app.api.chat.ai.chat_service import (
query_flow,
handle_quick_question,
get_all_scenic_flow_data,
get_scenic_detail_data,
get_scenic_parking_data
)
router = APIRouter()
@ -80,6 +89,9 @@ async def h5_chat_stream(request: Request, inp: ChatIn):
# 只有非快捷问题才保存对话历史
if not is_quick_question:
await redis_client.setex(conversation_history_key, CONVERSATION_EXPIRE_TIME, json.dumps(conversation_history))
# 记录热门问题(包括快捷问题)
await record_hot_question(inp.message)
except Exception as e:
print(f"Error in content_stream: {e}")
raise
@ -125,6 +137,22 @@ def verify_timestamp(timestamp: int) -> bool:
return abs(current_timestamp - timestamp) <= TIMESTAMP_TOLERANCE * 1000
async def record_hot_question(question: str):
"""记录热门问题,如果存在则次数加1,不存在则新增"""
try:
# 查找是否存在该问题
hot_q = await HotQuestion.filter(title=question).first()
if hot_q:
# 已存在,次数加1
hot_q.num += 1
await hot_q.save()
else:
# 不存在,新增记录
await HotQuestion.create(title=question, num=1)
except Exception as e:
print(f"记录热门问题失败: {e}")
# 定义获取问题的响应模型
class QuestionResponse(BaseModel):
id: int
@ -134,8 +162,224 @@ class QuestionResponse(BaseModel):
orm_mode = True
@router.get("/getQuestion", summary="获取开启的前4个问题")
async def get_question():
# 查询状态为正常(0)的问题,按order_num正序排序,取前4条
questions = await QuickQuestion.filter(status="0").order_by("order_num").limit(4).values("title","subtitle","logo","label")
return questions
@router.post("/get_question", summary="获取开启的前4个问题")
async def get_question(request: Request, req: AllScenicFlowRequest):
# 验签逻辑
if not req.sign:
raise HTTPException(status_code=401, detail="缺少签名参数")
if not verify_timestamp(req.timestamp):
raise HTTPException(status_code=401, detail="时间戳无效")
try:
# 查询状态为正常(0)的问题,按order_num正序排序,取前4条
questions = await QuickQuestion.filter(status="0").order_by("order_num").limit(4).values("title","subtitle","logo","label")
return {
"code": 200,
"message": "查询成功",
"data": questions
}
except Exception as e:
print(f"查询快捷问题失败: {e}")
return {
"code": 500,
"message": f"查询失败: {str(e)}",
"data": []
}
class HotQuestionResponse(BaseModel):
id: int
title: str
num: int
update_time: str
class Config:
orm_mode = True
@router.post("/get_hot_questions", summary="获取热门问题top10")
async def get_hot_questions(request: Request, req: AllScenicFlowRequest):
# 验签逻辑
if not req.sign:
raise HTTPException(status_code=401, detail="缺少签名参数")
if not verify_timestamp(req.timestamp):
raise HTTPException(status_code=401, detail="时间戳无效")
"""
获取热门问题top10按次数倒序排列
"""
try:
# 查询热门问题,按次数倒序排序,取前10条
hot_questions = await HotQuestion \
.filter() \
.order_by("-num") \
.limit(10) \
.values("id", "title", "num", "update_time")
# 格式化时间
for q in hot_questions:
q["update_time"] = q["update_time"].strftime("%Y-%m-%d %H:%M:%S")
return {
"code": 200,
"message": "查询成功",
"data": hot_questions
}
except Exception as e:
print(f"查询热门问题失败: {e}")
return {
"code": 500,
"message": f"查询失败: {str(e)}",
"data": []
}
@router.post("/get_all_scenic_flow")
async def get_all_scenic_flow(request: Request, req: AllScenicFlowRequest):
"""
查询所有景区的进入人数离开人数计算承载率并按承载率倒序排列
"""
# 验签逻辑
if not req.sign:
raise HTTPException(status_code=401, detail="缺少签名参数")
if not verify_timestamp(req.timestamp):
raise HTTPException(status_code=401, detail="时间戳无效")
# 构建验证数据(无其他参数,仅包含timestamp)
data = {"timestamp": req.timestamp}
if not verify_signature(data, req.sign):
raise HTTPException(status_code=401, detail="无效的签名")
try:
data = await get_all_scenic_flow_data(request)
if not data:
return {
"code": 404,
"message": "未找到景区客流数据",
"data": []
}
return {
"code": 200,
"message": "查询成功",
"data": data
}
except Exception as e:
print(f"查询所有景区客流数据异常: {e}")
return {
"code": 500,
"message": f"查询异常: {str(e)}",
"data": []
}
# 在现有路由下方添加新接口
@router.post("/get_scenic_detail")
async def get_scenic_detail(request: Request, req: ScenicDetailRequest):
"""
查询单个景区的详细信息包含舒适度判断
"""
# 验签逻辑
if not req.sign:
raise HTTPException(status_code=401, detail="缺少签名参数")
if not verify_timestamp(req.timestamp):
raise HTTPException(status_code=401, detail="时间戳无效")
# 构建验证数据
data = {"id": req.id, "timestamp": req.timestamp}
if not verify_signature(data, req.sign):
raise HTTPException(status_code=401, detail="无效的签名")
if not req.id:
return {
"code": 400,
"message": "景区id不能为空",
"data": None
}
try:
data = await get_scenic_detail_data(request, req.id)
if not data:
return {
"code": 404,
"message": f"未找到景区信息",
"data": None
}
return {
"code": 200,
"message": "查询成功",
"data": data
}
except Exception as e:
print(f"查询景区详情异常: {e}")
return {
"code": 500,
"message": f"查询异常: {str(e)}",
"data": None
}
# 在现有路由下方添加新接口 - 景区停车场查询
@router.post("/get_scenic_parking")
async def get_scenic_parking(request: Request, req: ScenicParkingRequest):
"""
查询景区附近的停车场信息
"""
# 验签逻辑
if not req.sign:
raise HTTPException(status_code=401, detail="缺少签名参数")
if not verify_timestamp(req.timestamp):
raise HTTPException(status_code=401, detail="时间戳无效")
# 构建验证数据
data = {
"scenic_name": req.scenic_name,
"distance": req.distance,
"timestamp": req.timestamp
}
if not verify_signature(data, req.sign):
raise HTTPException(status_code=401, detail="无效的签名")
if not req.scenic_name:
return {
"code": 400,
"message": "景区名称不能为空",
"data": []
}
if req.distance <= 0:
return {
"code": 400,
"message": "查询距离必须大于0",
"data": []
}
try:
data = await get_scenic_parking_data(request, req.scenic_name, req.distance)
if not data:
return {
"code": 404,
"message": f"未找到【{req.scenic_name}】附近的停车场信息",
"data": []
}
return {
"code": 200,
"message": "查询成功",
"data": data
}
except Exception as e:
print(f"查询景区停车场数据异常: {e}")
return {
"code": 500,
"message": f"查询异常: {str(e)}",
"data": []
}

@ -124,7 +124,7 @@ ANSWER_PROMPT = """
5. 温馨提示
- 安全提醒
- 天气影响
- 介绍景区特色
- 其他注意事项
6. 数据缺失时的通用建议
@ -144,13 +144,14 @@ ANSWER_PROMPT = """
数据要求
- 仅使用查询到的数据
- 不虚构未提供的信息
- 数字数据仅展示计算后的在园人数不显示例如进入人数离开人数承载量和承载率等原始数据字段
- 数字数据仅展示计算后的在园人数不显示例如进入人数离开人数承载量和承载率等原始数据字段如果承载率超过100%则只输出舒适度等级而不输出实际的在园人数
- 不输出数据更新时间
- 标题字体大小为 1820px加粗显示
- 正文内容字体大小为 1516px
- 行间距为 1.61.8 倍字体大小
- 段落与模块之间上下边距应为1016px
- 不能使用小于 14px 的字体
- 不要解释排版等内容
"""
async def classify(msg: str) -> str:
@ -208,6 +209,16 @@ async def ai_chat_stream(inp: ChatIn, conversation_history: list) -> AsyncGenera
sys.stdout.flush()
# 短暂让出控制权,避免阻塞
await asyncio.sleep(0)
# 生成推荐问题
if full_response:
recommended_questions = await generate_recommended_questions(inp.message, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}"
# 添加结束标记
yield " "
except Exception as e:
@ -247,6 +258,15 @@ async def gen_markdown_stream(msg: str, data: str, language: str, conversation_h
sys.stdout.flush()
# 短暂让出控制权,避免阻塞
await asyncio.sleep(0)
# 生成推荐问题
if full_response:
recommended_questions = await generate_recommended_questions(msg, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}"
# 添加结束标记
yield " "
except Exception as e:
@ -295,7 +315,7 @@ async def query_flow(request: Request, spot: str) -> str:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询景区客流信息
query = "SELECT SUM(t1.init_num) AS init_num, SUM(t1.out_num) AS out_num,t3.realtime_load_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s"
query = "SELECT t3.id AS id,SUM(t1.init_num) AS init_num, SUM(t1.out_num) AS out_num,t3.realtime_load_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s"
search_spot = f"%{spot}%"
await cur.execute(query, (search_spot,))
row = await cur.fetchone()
@ -309,7 +329,7 @@ async def query_flow(request: Request, spot: str) -> str:
return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)"
result = ""
if row :
result = f"{spot} 客流\n\n进入人数: {row[0]}\n离开人数: {row[1]}\n\n景区瞬时承载量:{row[2]}"
result = f"{spot} 客流\n\n进入人数: {row[1]}\n离开人数: {row[2]}\n\n景区瞬时承载量:{row[3]};注:全部内容输出完以后,最后输出一段固定内容,内容为:<p data-type=\"keliu\" data-id=\"{row[0]}\"></p>"
else:
result = f"未找到景区【{spot}】的客流相关信息"
if park_row:
@ -362,4 +382,241 @@ async def handle_quick_question(inp: ChatIn, question_content: str) -> AsyncGene
yield error_msg
# 不保存快捷问题的对话历史
print("Quick question handling finished.")
print("Quick question handling finished.")
# 在chat_service.py中添加推荐问题生成函数
async def generate_recommended_questions(user_msg: str, ai_response: str) -> list:
"""基于用户问题和AI回答生成1-3个纵向延伸的推荐问题"""
prompt = f"""
基于用户的问题和AI的回答生成1-3个相关的纵向延伸问题帮助用户深入了解相关内容
用户问题: {user_msg}
AI回答: {ai_response}
要求:
1. 问题需与当前话题高度相关具有延续性
2. 问题应具有探索性引导用户深入了解
3. 用简洁明了的中文表达
4. 只返回问题列表每个问题占一行不添加编号和其他内容
5. 每个问题使用固定的格式<p class="anser-tuijian">{{问题}}</p>
"""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个问题推荐助手,擅长基于对话内容生成相关的延伸问题"},
{"role": "user", "content": prompt}
]
)
questions = response.choices[0].message.content.strip().split('\n')
# 过滤空行并限制数量为1-3个
return [q for q in questions if q][:3]
except Exception as e:
print(f"生成推荐问题失败: {e}")
return []
# 添加新函数用于获取所有景区客流数据
async def get_all_scenic_flow_data(request: Request) -> list:
"""
查询所有景区的客流数据计算承载率并按承载率倒序排列
"""
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询所有景区客流信息
query = """
SELECT
t3.id AS id,
t3.`name` AS scenic_name,
SUM(t1.init_num) AS enter_num,
SUM(t1.out_num) AS leave_num,
t3.realtime_load_capacity AS max_capacity
FROM
equipment_passenger_flow.flow_current_video t1
LEFT JOIN
cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address
LEFT JOIN
cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id
WHERE t3.`name` IS NOT NULL
GROUP BY
t3.`name`, t3.realtime_load_capacity
ORDER BY
(SUM(t1.init_num) - SUM(t1.out_num)) / t3.realtime_load_capacity DESC
"""
await cur.execute(query)
rows = await cur.fetchall()
# 处理结果
result = []
for row in rows:
id,scenic_name, enter_num, leave_num, max_capacity = row
in_park_num = abs(enter_num - leave_num) # 确保是正数
# 避免除以零的情况
if max_capacity > 0:
capacity_rate = in_park_num / max_capacity
else:
capacity_rate = 0
result.append({
"id": id,
"scenic_name": scenic_name,
"enter_num": enter_num,
"leave_num": leave_num,
"in_park_num": in_park_num,
"max_capacity": max_capacity,
"capacity_rate": capacity_rate
})
return result
except Exception as e:
print(f"[MySQL] 查询所有景区客流数据失败: {e}")
return []
# 在文件末尾添加新函数
async def get_scenic_detail_data(request: Request, id: int) -> dict:
"""
查询单个景区的详细信息包含舒适度判断
"""
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询单个景区客流信息
query = """
SELECT
t3.`name` AS scenic_name,
COALESCE(SUM(t1.init_num), 0) AS enter_num,
COALESCE(SUM(t1.out_num), 0) AS leave_num,
COALESCE(t3.realtime_load_capacity, 0) AS max_capacity
FROM
equipment_passenger_flow.flow_current_video t1
LEFT JOIN
cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address
LEFT JOIN
cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id
WHERE
t3.id = %s
GROUP BY
t3.`name`, t3.realtime_load_capacity
"""
await cur.execute(query, (id,))
row = await cur.fetchone()
if not row:
return None
scenic_name, enter_num, leave_num, max_capacity = row
# 计算在园人数
in_park_num = max(0, enter_num - leave_num)
# 计算承载率和舒适度
if max_capacity > 0:
capacity_rate = in_park_num / max_capacity
capacity_percentage = round(capacity_rate * 100, 1)
# 根据承载率判断舒适度
if capacity_rate < 0.3:
comfort_level = "舒适"
elif capacity_rate < 0.5:
comfort_level = "较舒适"
elif capacity_rate < 0.7:
comfort_level = "一般"
elif capacity_rate < 0.9:
comfort_level = "较拥挤"
else:
comfort_level = "拥挤"
else:
capacity_rate = 0.0
capacity_percentage = 0.0
return {
"scenic_name": scenic_name,
"enter_num": enter_num or 0,
"leave_num": leave_num or 0,
"in_park_num": in_park_num,
"max_capacity": max_capacity or 0,
"capacity_rate": round(capacity_rate, 4),
"comfort_level": comfort_level
}
except Exception as e:
print(f"[MySQL] 查询景区详情数据失败: {e}")
return None
# 在chat_service.py末尾添加新的停车场查询函数
async def get_scenic_parking_data(request: Request, scenic_name: str, distance: int) -> list:
"""
查询景区附近的停车场信息
Args:
request: FastAPI请求对象
scenic_name: 景区名称
distance: 查询距离()>=1000时查询全部
Returns:
list: 停车场信息列表按距离排序
"""
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 构建基础查询
base_query = """
SELECT
t3.park_name AS park_name,
t3.total_count AS total_parking_spaces,
COALESCE(t4.space, 0) AS available_spaces,
t1.distance_value AS distance_meters
FROM
cyjcpt_bd.scenic_pack_distance t1
LEFT JOIN
cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id
LEFT JOIN
cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code
LEFT JOIN
equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code
WHERE
t2.`name` LIKE %s
AND t3.total_count > 0
"""
# 根据距离参数构建WHERE条件
if distance >= 1000:
# 距离>=1000米,查询全部停车场
where_condition = ""
params = (f"%{scenic_name}%",)
else:
# 距离<1000米,按指定距离查询
where_condition = " AND t1.distance_value <= %s"
params = (f"%{scenic_name}%", distance)
# 完整的查询语句
query = base_query + where_condition + " ORDER BY t1.distance_value ASC"
await cur.execute(query, params)
rows = await cur.fetchall()
# 处理结果
result = []
for row in rows:
park_name, total_spaces, available_spaces, distance_meters = row
result.append({
"park_name": park_name,
"total_parking_spaces": total_spaces or 0,
"available_spaces": available_spaces or 0,
"distance_meters": distance_meters or 0
})
return result
except Exception as e:
print(f"[MySQL] 查询景区停车场数据失败: {e}")
return []

@ -5,4 +5,22 @@ class ChatIn(BaseModel):
message: str
sign: str
timestamp: int
user_id: int
user_id: int
class AllScenicFlowRequest(BaseModel):
timestamp: int
sign: str
class ScenicDetailRequest(BaseModel):
id: int
timestamp: int
sign: str
class ScenicParkingRequest(BaseModel):
scenic_name: str
distance: int = 1000
timestamp: int
sign: str

@ -0,0 +1,14 @@
from tortoise.models import Model
from tortoise import fields
from datetime import datetime
class HotQuestion(Model):
id = fields.IntField(pk=True, description="ID")
title = fields.CharField(max_length=255, description="问题")
num = fields.IntField(default=0, description="次数")
update_time = fields.DatetimeField(auto_now=True, description="更新时间")
class Meta:
table = "hot_question"
table_description = "热门问题表"

@ -100,7 +100,7 @@ class Settings(BaseSettings):
},
"apps": {
"models": {
"models": ["app.models", "aerich.models","app.models.quick_question"],
"models": ["app.models", "aerich.models", "app.models.quick_question", "app.models.hot_question"],
"default_connection": "mysql",
},
},

Loading…
Cancel
Save