import asyncio
from openai import AsyncOpenAI
from dotenv import load_dotenv
from typing import AsyncGenerator
from app.models.ChatIn import ChatIn
from fastapi import Request
from app.settings.config import settings
import json
import requests
from chinese_calendar import is_holiday
from datetime import datetime
from typing import Optional
import time
import random
from anyio import sleep
load_dotenv()
async_client = AsyncOpenAI(api_key=settings.DEEPSEEK_API_KEY, base_url=settings.DEEPSEEK_API_URL)
# 知识库接口
KNOWLEDGE_URL = "http://172.21.11.20:8886/v3/chat"
# KNOWLEDGE_URL = "http://192.168.130.144:8888/v3/chat"
# 知识库应用id
BOT_ID = "7550848889243303936"
# 知识库token
KNOWL_TOKEN = "Bearer pat_5cb11052e7b4b517015467902cd7775742120fc88fe66b926c93fde3a39843c7"
#分类提示词
CATEGORY_PROMPT = """你是一个分类助手,请根据用户的问题判断属于以下哪一类:
如果用户的问题涉及保定市某个景区当前的人数、客流量、拥挤程度或是否适合前往(例如:“某个保定市景区现在人多么”、“某个保定市景区现在适不适合去”、"现在可以去吗"、"现在适合去吗"、"某个景区实时客流量"、"某个景区客流",注意:只有涉及实时、现在等当前时间的,如果是明天、后天等未来时间的不包括在内),请返回:游玩判断。
如果用户的问题涉及比较两个或多个保定市景区的人流量(例如:“A景区和B景区哪个更人少”、“A景区和B景区的人流量比较”),请返回:多景区比较。
如果用户的问题不属于上述情况,请返回:保定文旅。
注意:
只处理与保定市景区相关的问题
如果涉及到雄安(包括雄县、安新、蓉城三县)或者定州的景区,包括但不限于白洋淀、宋辽古战道、南阳遗址、开元寺等这些景区,不属于保定市范围”
仅返回“游玩判断”、“多景区比较”或“保定文旅”这三个分类结果之一
不回复任何其他内容
不进行额外解释或对话"""
#提取景区名称提示词
EXTRACT_PROMPT = """你是一名景区名称精准匹配助手。用户的问题中可能只包含景区简称、别称或部分关键词,你需要根据下面的完整景区名称列表,把用户提到的景区准确匹配到唯一最符合的完整名称并仅返回该名称,不要输出其他文字。如果用户没有提到任何景区,返回空字符串。注意:如果包含多个景区名称,固定返回最后一个。
完整景区名称列表:
仙人峪景区
空中草原景区
白石山景区
阜平云花溪谷-玫瑰谷
保定军校纪念馆
直隶总督署博物馆
冉庄地道战遗址
刘伶醉景区
曲阳北岳庙景区
唐县华峪山庄
古莲花池
阜平天生桥景区
涿州三义宫
易水湖景区
晋察冀边区革命纪念馆
安国市毛主席视察纪念馆
清西陵景区
满城汉墓景区
灵山聚龙洞旅游风景区
易县狼牙山风景区
留法勤工俭学运动纪念馆
白求恩柯棣华纪念馆
唐县秀水峪
腰山王氏庄园
安国市药王庙景区
虎山风景区
唐县西胜沟景区
野三坡景区
鱼谷洞景区
昌利农业示范园景区
蒙牛乳业工业景区
金木国际产业园
顺平享水溪
顺平三妙峰景区
安国市中药文化博物馆
清苑古城香文化体验馆展厅
安国数字中药都
天香工业游景区
唐县潭瀑峡景区
顾家台骆驼湾景区
中药都药博园
秋闲阁艺术馆
华海·中央步行街
恋乡·太行水镇旅游综合体景区
保定宴饮食博物馆
绿建科技工业旅游景区
燕都古城景区
台湾农业工园景区
尧母文化园
永济桥景区
保定西大街
卓正神农现代农业示范园
和道国际箱包城旅游景区
古镇大激店旅游区
大平台景区
七山旅游景区
涿州清行宫
辽塔文化园
京作壹号酒庄
唐尧古镇
大慈阁
淮军公所博物馆
中国古动物馆(保定自然博物馆)
保定市博物馆
保定市图书馆
"""
# 客流查询后回答的提示词
ANSWER_PROMPT = """
**景区游览指南生成要求**
输入数据格式:
- 用户问题:{msg}
- 查询到的数据:{data}(包含在园人数、舒适度等级、停车场数据等,可能包含知识库内容)
- 输出语言:{language}
输出要求:
1. 标题格式:
**完整景区名称**游览建议
2. 实时客流展示:
- 展示当前在园人数
- 按以下标准显示舒适度:
舒适:舒适
较舒适:较舒适
一般:一般
较拥挤:较拥挤
拥挤:拥挤
- 附加简短体验描述(1-2句)
3. 周边停车指南:
- 停车场名称
- 距离(米)
- 步行时间(分钟)
- 当前余位/总车位
- 收费标准(注明"具体以现场公示为准")
4. 出行与游览建议:
- 推荐程度(强烈推荐/推荐/谨慎考虑/暂缓前往)
- 具体建议(包含错峰时间、交通方式、路线建议)
- 建议游览时长(根据舒适度调整)
- 替代景区推荐(当拥挤时)
5. 温馨提示:
- 安全提醒
- 介绍景区特色
- 其他注意事项
6. 数据缺失时的通用建议:
- 标题:**景区名称**游览建议
- 实时客流提示
- 常规交通建议
- 游览路线说明
- 附近景区推荐
语言风格要求:
- 使用生动形象的语言描述,适当添加emoji表情符号增强表现力
- 采用"您""记得""推荐"等暖心词汇
- 禁用专业术语
- 使用**加粗**强调关键信息
- 结尾提示:"动态信息请以现场为准,祝您旅途愉快!"
数据要求:
- 仅使用查询到的数据
- 不虚构未提供的信息
- 如果舒适度为拥挤则只输出舒适度等级而不输出实际的在园人数
- 不输出数据更新时间
- 如果包含知识库内容则严格参考内容,如果没有则自行生成
- 不允许出现知识库字眼,只需要告诉用户是你查询所得到的信息
- 注意:如果已闭园则不展示周边停车指南那一项
"""
# - 标题字体大小为 18–20px,加粗显示
# - 正文内容字体大小为 15–16px
# - 行间距为 1.6–1.8 倍字体大小
# - 段落与模块之间上下边距应为10–16px
# - 不能使用小于 14px 的字体
# - 不要解释排版等内容
async def classify(msg: str) -> str:
print(f"Starting classification for message: {msg}")
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "system", "content": CATEGORY_PROMPT}, {"role": "user", "content": msg}]
)
result = response.choices[0].message.content
print(f"Classification result: {result}")
return result
except Exception as e:
print(f"Error in classification: {e}")
raise
async def ai_chat_stream(inp: ChatIn, conversation_history: list) -> AsyncGenerator[str, None]:
chat_prompt = f"""
你是一个专门服务河北省保定市旅游的AI助手,包括但不限于提供以下精准服务:
1. 行程规划:根据游客的停留天数(1-7天)、预算范围(经济型/中档/豪华)、兴趣偏好(历史文化/自然风光/美食体验)提供定制化行程方案
2. 景点推荐:详细介绍保定市3A级以上旅游景区(如野三坡、清西陵等)的开放时间、门票价格、最佳游览季节和交通方式
3. 特色推荐:提供保定驴肉火烧、槐茂酱菜等地方特色美食的具体店铺地址和人均消费
4. 实用信息:提供保定市区及周边县市的公共交通线路、出租车参考价格等实用旅行信息
5. 景区介绍相关:提供包括但不限于保定市旅游景区的历史背景、景点特色、开放时间、门票价格、最佳游览季节和交通方式等等内容
5. 如果content中包含知识库查询到的景区内容,则需要严格参考查询到的内容,如果没有则正常按照上述要求回答
6. 注意:如果涉及到雄安(包括雄县、安新、蓉城三县)时,或者包括但不限于白洋淀、宋辽古战道、南阳遗址等这些景区的相关问题,固定回复:“您搜索的区域归雄安新区管辖,请查询该地官方宣传渠道或实地到相关部门进行咨询,感谢您的理解”。如果涉及到定州时,或者包括但不限于开元寺、定州文庙等这些景区的相关问题,固定回复:“您搜索的区域归定州市管辖,请查询该地官方宣传渠道或实地到相关部门进行咨询,感谢您的理解”
7. 输出的内容中严禁出现任何有关雄安或者定州的景区、非遗项目等信息,包括但不限于白洋淀、宋辽古战道、南阳遗址、开元寺、定州文庙等这些景区,以及白洋淀苇编、雄县古乐、雄县鹰爪翻子拳、定州缂丝、雄州黑陶等非遗项目。
**服务要求**:
- 使用生动形象的语言描述,适当添加emoji表情符号增强表现力
- 采用Markdown语法组织内容,合理使用**加粗**、*斜体*等格式
- 分点说明时使用清晰的列表格式
- 重要信息使用高亮标记
**服务限制**:
- 地理范围:仅限保定市行政区划内(含下辖县市)的文化相关和旅游相关信息
- 语言输出:严格使用用户指定的{inp.language}语言回复
- 问题边界:对非保定文旅相关问题统一回复"我是您的保定旅行助手,专注于解答有关保定市的旅行问题哦~"
"""
messages = [{"role": "system", "content": chat_prompt}] + conversation_history
messages.append({"role": "user", "content": inp.message})
full_response = ""
response = None
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
last_chunk_time = time.time()
empty_chunk_count = 0
# 使用异步方式处理同步流
async for chunk in response:
# 检查单块间隔是否超时(例如 10 秒没新数据)
if time.time() - last_chunk_time > 10:
print("ai_chat_stream函数deepseek响应超时")
break
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
last_chunk_time = time.time()
empty_chunk_count = 0
full_response += delta.content
yield delta.content
else:
empty_chunk_count += 1
# 连续 3 次空数据判定为结束
if empty_chunk_count >= 3:
break
# 生成推荐问题
if full_response:
recommended_questions = await generate_recommended_questions(inp.message, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}\n"
# 添加结束标记
yield " "
done_sent = True
except GeneratorExit:
# -------------------------- 内层客户端断开日志 --------------------------
print(f"[客户端断开-AI流中断] 用户ID: {inp.user_id},AI响应未完成,已终止底层流")
client_ip = getattr(inp, "client_ip", "未知IP") # 若 inp 无 client_ip,可从外层 request 传参
print(f"[AI流中断详情] IP: {client_ip},未完成响应长度: {len(full_response)} 字符")
# 关闭底层 AI 流(原有逻辑)
if response and hasattr(response, "aclose"):
await response.aclose() # type: ignore
raise # 重新抛出,让外层捕获
except Exception as e:
error_msg = f"当前访问人数过多,请稍后重试: {str(e)}"
print(error_msg)
yield " "
finally:
if response and hasattr(response, "aclose"):
try:
await response.aclose()
print(f"[AI流最终关闭] 用户ID: {inp.user_id}(finally块)")
except Exception as e:
print(f"[AI流最终关闭失败] 用户ID: {inp.user_id},错误: {str(e)}")
print("ai_chat_stream: 进入finally,强制发送DONE")
yield " "
if full_response:
conversation_history.append({"role": "assistant", "content": full_response})
# 限制对话历史长度为10条(5轮对话)
if len(conversation_history) > 10:
conversation_history = conversation_history[-10:]
print("AI chat stream finished.")
def get_formatted_prompt(user_language,msg,data):
# 使用format方法替换占位符
return ANSWER_PROMPT.format(language=user_language, msg=msg, data=data)
async def get_ai_quick_question(questions, inp):
await asyncio.sleep(0.5)
full_response = ""
# 处理快捷问题
question_content = questions.strip('"')
# 每次输出随机1-10个字符
chunk_size = random.randint(5, 15)
for i in range(0, len(question_content), chunk_size):
chunk = question_content[i:i + chunk_size]
full_response += chunk
yield f"{chunk}"
await asyncio.sleep(0.05)
if full_response:
recommended_questions = await generate_recommended_questions(inp.message, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}\n"
async def gen_markdown_stream(msg: str, data: str, language: str, conversation_history: list) -> AsyncGenerator[str, None]:
prompt = get_formatted_prompt(language,msg,data) # 将变量替换到提示词中
messages = conversation_history + [{"role": "user", "content": prompt}]
full_response = ""
done_sent = False
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
last_chunk_time = time.time()
empty_chunk_count = 0
# 使用异步方式处理同步流
async for chunk in response:
# 检查单块间隔是否超时(例如 10 秒没新数据)
if time.time() - last_chunk_time > 10:
print("gen_markdown_stream函数deepseek响应超时")
break
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
last_chunk_time = time.time()
empty_chunk_count = 0
full_response += delta.content
yield delta.content
else:
empty_chunk_count += 1
# 连续 3 次空数据判定为结束
if empty_chunk_count >= 3:
break
# 生成推荐问题
if full_response:
recommended_questions = await generate_recommended_questions(msg, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}\n"
# 添加结束标记
yield " "
done_sent = True
except Exception as e:
error_msg = f"当前访问人数过多,请稍后重试: {str(e)}"
print(error_msg)
yield " "
done_sent = True
finally:
if not done_sent:
print("ai_chat_stream: 进入finally,强制发送DONE")
yield " "
done_sent = True
if full_response:
conversation_history.append({"role": "assistant", "content": full_response})
# 限制对话历史长度为10条(5轮对话)
if len(conversation_history) > 10:
conversation_history = conversation_history[-10:]
print("Markdown stream finished.")
async def extract_spot(msg) -> str:
# 如果msg是列表,则将其内容连接成字符串
if isinstance(msg, list):
msg_content = '\n'.join(msg)
else:
msg_content = msg
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "system", "content": EXTRACT_PROMPT}, {"role": "user", "content": msg_content}]
)
result = response.choices[0].message.content
print(f"Extracted spot: {result}")
return result
except Exception as e:
print(f"Error in spot extraction: {e}")
raise
async def query_flow(request: Request, spot: str, redis_client = None) -> str:
if not spot:
print("No spot found, returning default message.")
return "**未找到景区信息,请检查名称是否正确。\n\n(内容由AI生成,仅供参考)"
# 获取当前日期
now = datetime.now()
# 根据是否为节假日设置状态条件
if is_holiday(now):
redis_key_str = "yes"
else:
# 非节假日添加status=0的条件
redis_key_str = "no"
cache_key = f"flow:{spot}:{redis_key_str}"
if redis_client is None:
redis_client = request.app.state.redis_client
# Step 1: Redis 缓存查询
try:
cached = await redis_client.get(cache_key)
if cached:
return cached
else:
return f"未找到景区【{spot}】的客流相关信息,在园人数和舒适度未知;停车场信息:暂无数据。"
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
# Step 2: MySQL 查询(使用连接池)
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
search_spot = f"%{spot}%"
# 查询景区基本信息以获取ID
id_query = "SELECT id FROM cyjcpt_bd.zhly_scenic_basic WHERE `name` LIKE %s LIMIT 1"
await cur.execute(id_query, (search_spot,))
id_row = await cur.fetchone()
scenic_id = id_row[0] if id_row else None
# 查询景区客流信息
query = """SELECT ABS(SUM(t1.init_num)-SUM(t1.out_num)) AS in_num , t3.realtime_load_capacity AS capacity
FROM equipment_passenger_flow.flow_current_video t1
LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address
LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id
WHERE DATE_FORMAT(t1.create_time,'%%Y%%m%%d') = DATE_FORMAT(NOW(),'%%Y%%m%%d') AND t3.id = {scenic_id_condition}"""
formatted_flow_query = query.format(scenic_id_condition=scenic_id)
await cur.execute(formatted_flow_query)
row = await cur.fetchone()
# 查询停车场信息
park_query = """SELECT t3.park_name AS park_name, IFNULL(t3.rate_info,'暂无收费标准信息') AS rate_info, t3.total_count AS total_count, t4.space AS space, t1.distance_value AS distance_value
FROM cyjcpt_bd.scenic_pack_distance t1
LEFT JOIN cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id
LEFT JOIN cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code
LEFT JOIN equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code
WHERE t2.id = {scenic_id_condition} AND t1.distance_value <= 1000 AND t3.total_count != 0 {status_condition} ORDER BY t1.distance_value ASC LIMIT 2"""
#获取当前日期
now = datetime.now()
# 根据是否为节假日设置状态条件
if is_holiday(now):
# 节假日不添加status条件,查询全部
status_condition = ""
else:
# 非节假日添加status=0的条件
status_condition = "AND t3.`status` = '0'"
# 格式化SQL查询,插入状态条件
formatted_park_query = park_query.format(scenic_id_condition=scenic_id, status_condition=status_condition)
await cur.execute(formatted_park_query)
park_rows = await cur.fetchall()
except Exception as e:
print(f"[MySQL] 查询失败: {e}")
return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)"
result = ""
if row and all(v is not None for v in row):
# 使用变量名访问客流数据
in_num, capacity = row
#in_num/capacity 计算承载率,取两位小数
hold_rate = round(in_num / capacity, 2)
""" 小于0.3:舒适
0.3-0.5:较舒适
0.5-0.7:一般
0.7-0.8:较拥挤
<0.9 拥挤
大于0.9:爆满"""
hold_level = "舒适" if hold_rate < 0.3 else "较舒适" if hold_rate < 0.5 else "一般" if hold_rate < 0.7 else "较拥挤" if hold_rate < 0.8 else "拥挤" if hold_rate < 0.9 else "爆满"
result = f"{spot} 客流:在园人数: {in_num};舒适度:{hold_level};"
else:
result = f"未找到景区【{spot}】的客流相关信息,在园人数和舒适度未知;"
if park_rows:
for park_row in park_rows:
# 使用变量名访问停车场数据
park_name, rate_info, total_count, space, distance_value = park_row
result += f"\n停车场信息:名称:{park_name} ,距离:{distance_value}米,空余车位:{space if space is not None else '未知'},总车位:{total_count},收费标准:{rate_info if rate_info else '暂无收费标准信息'}。"
else:
result += "停车场信息:暂无数据。"
if scenic_id:
result += f"注:全部内容输出完以后,最后输出一段固定内容,内容为:
;"
try:
await redis_client.setex(cache_key, 120, result)
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
async def handle_quick_question(inp: ChatIn, question_content: str) -> AsyncGenerator[str, None]:
chat_prompt = f"""
你是一个专门格式化内容的AI助手,不能修改内容,仅修改格式,
负责将接收到的包含html标签内容进行格式化,要求是将能够转换成markdown语法的内容中的html标签转换成markdown语法,
不能转换的保留html标签。注意:不修改和处理
标签。
"""
# 只包含系统提示和问题内容,不包含历史记录
messages = [
{"role": "system", "content": chat_prompt},
{"role": "user", "content": question_content}
]
full_response = ""
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
last_chunk_time = time.time()
empty_chunk_count = 0
# 使用异步方式处理同步流
async for chunk in response:
# 检查单块间隔是否超时(例如 10 秒没新数据)
if time.time() - last_chunk_time > 10:
print("ai_chat_stream函数deepseek响应超时")
break
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
last_chunk_time = time.time() # 重置计时器
empty_chunk_count = 0
full_response += delta.content
yield delta.content
else:
empty_chunk_count += 1
# 连续 3 次空数据判定为结束
if empty_chunk_count >= 3:
break
# 生成推荐问题
if full_response:
recommended_questions = await generate_recommended_questions(inp.message, full_response)
if recommended_questions:
# 添加分隔符和标题
yield "\n\n### 您可能还想了解:"
# 逐个返回推荐问题
for i, question in enumerate(recommended_questions, 1):
yield f"\n{i}. {question}\n"
# 添加结束标记
yield " "
except Exception as e:
error_msg = f"当前访问人数过多,请稍后重试: {str(e)}"
print(error_msg)
yield " "
# 在chat_service.py中添加推荐问题生成函数
async def generate_recommended_questions(user_msg: str, ai_response: str) -> list:
"""基于用户问题和AI回答生成1-3个纵向延伸的推荐问题"""
prompt = f"""
基于用户的问题和AI的回答,生成1-3个相关的纵向延伸问题,帮助用户深入了解相关内容。
用户问题: {user_msg}
AI回答: {ai_response}
注意:如果上面的用户问题或者AI回答的内容超出了保定市范围,例如是雄安(包括雄县、安新、蓉城三县)或者定州的内容,或者包括但不限于白洋淀、宋辽古战道、南阳遗址、定州文庙、开元寺等这些属于雄安或者定州的景区的相关问题,延伸问题要生成保定市范围内的文旅方向内容,不需要再跟原回答有关,不要再出现雄安和定州的内容
要求:
1. 问题需与当前话题高度相关,具有延续性
2. 问题应具有探索性,引导用户深入了解
3. 用简洁明了的中文表达
4. 只返回问题列表,每个问题占一行,不添加编号和其他内容
"""
try:
response = await asyncio.wait_for(
async_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个问题推荐助手,擅长基于对话内容生成相关的延伸问题"},
{"role": "user", "content": prompt}
]
),
timeout=30.0
)
questions = response.choices[0].message.content.strip().split('\n')
# 过滤空行并限制数量为1-3个
return [q for q in questions if q][:3]
except asyncio.TimeoutError:
print("生成推荐问题超时")
return []
except Exception as e:
print(f"生成推荐问题失败: {e}")
return []
# 添加新函数用于获取所有景区客流数据
async def get_all_scenic_flow_data(request: Request, redis_client = None) -> list:
"""
查询所有景区的客流数据,计算承载率,并按承载率倒序排列
"""
# Redis 缓存查询
cache_key = "all_scenic_flow_data"
if redis_client is None:
redis_client = request.app.state.redis_client
try:
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询所有景区客流信息
query = """
SELECT
t3.id AS id,
t3.`name` AS scenic_name,
SUM(t1.init_num) AS enter_num,
SUM(t1.out_num) AS leave_num,
t3.realtime_load_capacity AS max_capacity
FROM
equipment_passenger_flow.flow_current_video t1
LEFT JOIN
cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address
LEFT JOIN
cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id
WHERE t3.`name` IS NOT NULL AND DATE_FORMAT(t1.create_time,'%Y%m%d') = DATE_FORMAT(NOW(),'%Y%m%d')
GROUP BY
t3.`name`
ORDER BY
(SUM(t1.init_num) - SUM(t1.out_num)) / t3.realtime_load_capacity DESC
"""
await cur.execute(query)
rows = await cur.fetchall()
# 处理结果
result = []
for row in rows:
id,scenic_name, enter_num, leave_num, max_capacity = row
in_park_num = abs(enter_num - leave_num) # 确保是正数
if in_park_num > max_capacity:
in_park_num = max_capacity
# 避免除以零的情况
if max_capacity > 0:
capacity_rate = in_park_num / max_capacity
else:
capacity_rate = 0
result.append({
"id": id,
"scenic_name": scenic_name,
"enter_num": enter_num,
"leave_num": leave_num,
"in_park_num": in_park_num,
"max_capacity": max_capacity,
"capacity_rate": capacity_rate
})
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:
print(f"[MySQL] 查询所有景区客流数据失败: {e}")
return []
# 在文件末尾添加新函数
async def get_scenic_detail_data(request: Request, id: int, redis_client = None) -> dict:
"""
查询单个景区的详细信息,包含舒适度判断
"""
# Redis 缓存查询
cache_key = f"scenic_detail:{id}"
if redis_client is None:
redis_client = request.app.state.redis_client
try:
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询单个景区客流信息
query = """
SELECT
t3.`name` AS scenic_name,
COALESCE(SUM(t1.init_num), 0) AS enter_num,
COALESCE(SUM(t1.out_num), 0) AS leave_num,
COALESCE(t3.realtime_load_capacity, 0) AS max_capacity
FROM
equipment_passenger_flow.flow_current_video t1
LEFT JOIN
cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address
LEFT JOIN
cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id
WHERE
t3.id = %s
"""
await cur.execute(query, (id,))
row = await cur.fetchone()
if not row:
return None
scenic_name, enter_num, leave_num, max_capacity = row
# 计算在园人数
in_park_num = abs(enter_num - leave_num) # 确保是正数
if in_park_num > max_capacity:
in_park_num = max_capacity
# 计算承载率和舒适度
if max_capacity > 0:
capacity_rate = in_park_num / max_capacity
# 根据承载率判断舒适度
if capacity_rate < 0.3:
comfort_level = "舒适"
elif capacity_rate < 0.5:
comfort_level = "较舒适"
elif capacity_rate < 0.7:
comfort_level = "一般"
elif capacity_rate < 0.8:
comfort_level = "较拥挤"
elif capacity_rate < 0.9:
comfort_level = "拥挤"
else:
comfort_level = "爆满"
else:
capacity_rate = 0.0
comfort_level = "舒适"
result = {
"scenic_name": scenic_name,
"enter_num": enter_num or 0,
"leave_num": leave_num or 0,
"in_park_num": in_park_num,
"max_capacity": max_capacity or 0,
"capacity_rate": round(capacity_rate, 4),
"comfort_level": comfort_level
}
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:
print(f"[MySQL] 查询景区详情数据失败: {e}")
return None
# 在chat_service.py末尾添加新的停车场查询函数
async def get_scenic_parking_data(request: Request, scenic_id: int, distance: int, redis_client = None) -> list:
"""
查询景区附近的停车场信息
Args:
request: FastAPI请求对象
scenic_id: 景区id
distance: 查询距离(米),>=1000时查询全部
redis_client: Redis客户端实例(可选)
Returns:
list: 停车场信息列表,按距离排序
"""
# 获取当前日期
now = datetime.now()
# 根据是否为节假日设置状态条件
if is_holiday(now):
redis_key_str = "yes"
else:
# 非节假日添加status=0的条件
redis_key_str = "no"
# Redis 缓存查询
cache_key = f"scenic_parking:{scenic_id}:{distance}:{redis_key_str}"
if redis_client is None:
redis_client = request.app.state.redis_client
try:
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 构建基础查询
base_query = """
SELECT
t3.park_name AS park_name,
t3.total_count AS total_parking_spaces,
COALESCE(t4.space, 0) AS available_spaces,
t1.distance_value AS distance_meters,
t3.gould_coordinate_x AS lon,
t3.gould_coordinate_y AS lat,
t3.`status` AS park_type
FROM
cyjcpt_bd.scenic_pack_distance t1
LEFT JOIN
cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code
LEFT JOIN
equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code
WHERE
t1.scenic_id = {scenic_id_condition}
{status_condition}
{distance_value}
AND t3.total_count > 0
ORDER BY t1.distance_value ASC
"""
# 获取当前日期
now = datetime.now()
# 根据是否为节假日设置状态条件
if is_holiday(now):
# 节假日不添加status条件,查询全部
status_condition = ""
else:
# 非节假日添加status=0的条件
status_condition = "AND t3.`status` = '0'"
# 根据距离参数构建WHERE条件
if distance >= 1000:
# 距离>=1000米,查询全部停车场
where_condition = ""
else:
# 距离<1000米,按指定距离查询
where_condition = " AND t1.distance_value <= %s"
# 格式化SQL查询,插入状态条件
formatted_park_base = base_query.format(scenic_id_condition=scenic_id,
status_condition=status_condition,distance_value=where_condition)
await cur.execute(formatted_park_base)
rows = await cur.fetchall()
# 处理结果
result = []
for row in rows:
park_name, total_spaces, available_spaces, distance_meters, lon, lat, park_type = row
result.append({
"park_name": park_name,
"total_parking_spaces": total_spaces or 0,
"available_spaces": available_spaces or 0,
"distance_meters": distance_meters or 0,
"lon": lon or 0,
"lat": lat or 0,
"park_type": park_type
})
# 将结果存入Redis缓存,过期时间1分钟
try:
await redis_client.setex(cache_key, 60, json.dumps(result))
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result
except Exception as e:
print(f"[MySQL] 查询景区停车场数据失败: {e}")
return []
# 添加用于获取完整响应数据的新函数
def fetch_and_parse_markdown(user_id: int, question: str) -> str:
"""
功能:发送请求、解析SSE流、提取完整知识库内容、处理乱码
返回:纯净的知识库markdown内容(与原fetch_and_parse_markdown返回格式一致)
"""
# 1. 新接口基础配置
HEADERS = {
"Authorization": KNOWL_TOKEN,
"Content-Type": "application/json",
"Accept": "text/event-stream",
"Accept-Charset": "utf-8"
}
# 请求体(user_id和question动态传入,其他参数固定)
PAYLOAD = {
"bot_id": BOT_ID, # 接口固定bot_id
"user_id": str(user_id), # 转为字符串适配接口
"additional_messages": [
{
"role": "user",
"type": "question",
"content": question, # 用户查询问题
"content_type": "text"
}
],
"stream": False,
"auto_save_history": True,
"enable_card": True
}
full_answer = "" # 核心:拼接你需要的「最终完整回答」
current_event_type: Optional[str] = None
try:
# 2. 发送SSE请求并流式接收
with requests.post(
url=KNOWLEDGE_URL,
headers=HEADERS,
data=json.dumps(PAYLOAD, ensure_ascii=False), # 请求体UTF-8编码
stream=True,
timeout=60
) as response:
response.raise_for_status() # 检查请求是否成功
# 3. 逐行解析SSE流
for line_bytes in response.iter_lines():
if not line_bytes: # 跳过空行(事件分隔符)
continue
# 4. 处理乱码(重点解决双重编码问题)
try:
# 优先UTF-8解码(正常情况)
line = line_bytes.decode("utf-8", errors="strict")
except UnicodeDecodeError:
# 修复"UTF-8→ISO-8859-1"双重编码(常见中文乱码原因)
line = line_bytes.decode("iso-8859-1").encode("iso-8859-1").decode("utf-8")
# 5. 提取事件类型(如 conversation.message.delta)
if line.startswith("event:"):
current_event_type = line.split(":", 1)[1].strip()
continue
# 6. 提取事件数据(只关注回答片段)
if line.startswith("data:"):
data_str = line.split(":", 1)[1].strip()
if not data_str:
continue
try:
# 解析JSON数据(处理可能的编码问题)
try:
event_data = json.loads(data_str)
except UnicodeDecodeError:
data_str_fixed = data_str.encode("iso-8859-1").decode("utf-8")
event_data = json.loads(data_str_fixed)
# 7. 核心:拼接回答片段(只取 "conversation.message.delta" 事件的 answer 内容)
if (current_event_type == "conversation.message.delta"
and event_data.get("type") == "answer"):
# 提取当前片段(如"直"、"隶"、"总督署")
answer_chunk = event_data.get("content", "").strip()
# 修复片段中的乱码(兜底)
try:
answer_chunk = answer_chunk.encode("iso-8859-1").decode("utf-8")
except:
pass
full_answer += answer_chunk # 拼接成完整回答
except json.JSONDecodeError:
continue # 跳过无效JSON,不影响整体
# 8. 清理最终回答(去除多余空格/空行)
full_answer = full_answer.strip()
print("【调试】,知识库内容:", full_answer)
return full_answer
except requests.exceptions.RequestException as e:
error_msg = f"请求错误: {e}"
print(error_msg)
return "" # 错误时返回空字符串
except Exception as e:
error_msg = f"解析错误: {e}"
print(error_msg)
return ""
# 添加用于多景区比较的新提示词
MULTI_SCENIC_EXTRACT_PROMPT = """你是一名景区名称提取助手。用户的问题中可能包含多个景区名称,请根据下面的完整景区名称列表,准确提取用户提到的所有景区名称并返回,每个景区名称占一行。如果用户没有提到任何景区,返回空字符串。
完整景区名称列表:
仙人峪景区
空中草原景区
白石山景区
阜平云花溪谷-玫瑰谷
保定军校纪念馆
直隶总督署博物馆
冉庄地道战遗址
刘伶醉景区
曲阳北岳庙景区
唐县华峪山庄
古莲花池
阜平天生桥景区
涿州三义宫
易水湖景区
晋察冀边区革命纪念馆
安国市毛主席视察纪念馆
清西陵景区
满城汉墓景区
灵山聚龙洞旅游风景区
易县狼牙山风景区
留法勤工俭学运动纪念馆
白求恩柯棣华纪念馆
唐县秀水峪
腰山王氏庄园
安国市药王庙景区
虎山风景区
唐县西胜沟景区
野三坡景区
鱼谷洞景区
昌利农业示范园景区
蒙牛乳业工业景区
金木国际产业园
顺平享水溪
顺平三妙峰景区
安国市中药文化博物馆
清苑古城香文化体验馆展厅
安国数字中药都
天香工业游景区
唐县潭瀑峡景区
顾家台骆驼湾景区
中药都药博园
秋闲阁艺术馆
华海·中央步行街
恋乡·太行水镇旅游综合体景区
保定宴饮食博物馆
绿建科技工业旅游景区
燕都古城景区
台湾农业工园景区
尧母文化园
永济桥景区
保定西大街
卓正神农现代农业示范园
和道国际箱包城旅游景区
古镇大激店旅游区
大平台景区
七山旅游景区
涿州清行宫
辽塔文化园
京作壹号酒庄
唐尧古镇
大慈阁
淮军公所博物馆
中国古动物馆(保定自然博物馆)
保定市博物馆
保定市图书馆
"""
async def extract_multi_scenic(msg) -> list:
# 如果msg是列表,则将其内容连接成字符串
if isinstance(msg, list):
msg_content = '\n'.join(msg)
else:
msg_content = msg
print(f"Starting multi scenic extraction for message: {msg_content}")
try:
response = await async_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "system", "content": MULTI_SCENIC_EXTRACT_PROMPT}, {"role": "user", "content": msg_content}]
)
result = response.choices[0].message.content
print(f"Extracted scenics: {result}")
# 将结果按行分割并过滤空行
scenics = [line.strip() for line in result.split('\n') if line.strip()]
return scenics
except Exception as e:
print(f"Error in multi scenic extraction: {e}")
return []
async def query_multi_scenic_flow(request: Request, scenics: list, msg: str, redis_client = None) -> str:
if not scenics:
print("No scenics found, returning default message.")
return "**未找到景区信息,请检查名称是否正确。**\n\n(内容由AI生成,仅供参考)"
# 查询多个景区的客流数据
results = []
for scenic in scenics:
data = await query_flow(request, scenic, redis_client)
results.append({
"scenic": scenic,
"data": data
})
# 生成比较结果
if len(results) == 1:
return results[0]["data"]
elif len(results) >= 2:
# 构造比较提示词
comparison_prompt = f"请结合数据,基于用户的问题:{ msg} 给于简短的建议,注意:需要将原始数据返回\n\n"
for result in results:
comparison_prompt += f"**{result['scenic']}**: {result['data']}\n\n"
try:
response = await asyncio.wait_for(
async_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": comparison_prompt}]
),
timeout=30.0
)
comparison_result = response.choices[0].message.content
return comparison_result
except asyncio.TimeoutError:
print("比较景区超时")
result_str = "\n\n".join([f"**{r['scenic']}**:\n{r['data']}" for r in results])
return result_str
except Exception as e:
print(f"Error in multi scenic comparison: {e}")
# 如果AI比较失败,返回原始数据
result_str = "\n\n".join([f"**{r['scenic']}**:\n{r['data']}" for r in results])
return result_str
return "**未找到景区信息,请检查名称是否正确。**\n\n(内容由AI生成,仅供参考)"
# 在文件末尾添加新函数用于获取所有厕所信息
async def get_all_toilet_data(request: Request, redis_client = None) -> list:
"""
查询所有厕所信息
"""
try:
cache_key = "all_toilet_list"
if redis_client is None:
redis_client = request.app.state.redis_client
try:
cached = await redis_client.get(cache_key)
if cached:
# 缓存命中,直接返回缓存数据
return json.loads(cached)
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询所有厕所信息
query = """
SELECT
id,
banner,
title,
is_recommend,
is_ai,
address,
longitude,
latitude,
open_time,
score,
nan_num,
nv_num,
peitao,
is_power,
is_aixin,
createtime,
updatetime
FROM
cyjcpt_bd.ai_toilet_info
ORDER BY
id
"""
await cur.execute(query)
rows = await cur.fetchall()
# 处理结果
result = []
for row in rows:
(
id,
banner,
title,
is_recommend,
is_ai,
address,
longitude,
latitude,
open_time,
score,
nan_num,
nv_num,
peitao,
is_power,
is_aixin,
createtime,
updatetime
) = row
result.append({
"id": id,
"banner": banner,
"title": title,
"is_recommend": is_recommend,
"is_ai": is_ai,
"address": address,
"longitude": longitude,
"latitude": latitude,
"open_time": open_time,
"score": score,
"nan_num": nan_num,
"nv_num": nv_num,
"peitao": peitao,
"is_power": is_power,
"is_aixin": is_aixin,
"createtime": createtime,
"updatetime": updatetime
})
return result
except Exception as e:
print(f"[MySQL] 查询所有厕所数据失败: {e}")
return []