保定ai问答主体项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
bd_ai_fastapi/app/api/chat/ai/chat_service.py

323 lines
12 KiB

import os, openai, aiomysql
import random
from openai import OpenAI
from dotenv import load_dotenv
import redis.asyncio as redis
from typing import AsyncGenerator
from app.models.ChatIn import ChatIn
from fastapi import Request
from app.settings.config import settings
load_dotenv()
client = OpenAI(api_key=settings.DEEPSEEK_API_KEY, base_url=settings.DEEPSEEK_API_URL)
#分类提示词
CATEGORY_PROMPT = """你是一个分类助手,请根据用户的问题判断属于哪一类:
1. 如果用户是问“某个保定市景区现在适不适合去”,或者“某个保定市景区现在人多么”此类涉及某个景区人数或者客流量的,注意只有保定的景区,请返回:游玩判断。
2. 其他均返回保定文旅。
只能返回以上两个分类词之一,不能多说话,不回复其他多余内容。"""
#提取景区名称提示词
EXTRACT_PROMPT = """你是一名景区名称精准匹配助手。用户的问题中可能只包含景区简称、别称或部分关键词,你需要根据下面的完整景区名称列表,把用户提到的景区准确匹配到唯一最符合的完整名称并仅返回该名称,不要输出其他文字。如果用户没有提到任何景区,返回空字符串。
完整景区名称列表:
仙人峪景区
空中草原景区
白石山景区
阜平云花溪谷-玫瑰谷
保定军校纪念馆
保定直隶总督署博物馆
冉庄地道战遗址
刘伶醉景区
曲阳北岳庙景区
唐县华峪山庄
古莲花池
阜平天生桥景区
涿州三义宫
易水湖景区
晋察冀边区革命纪念馆
安国市毛主席视察纪念馆
清西陵景区
满城汉墓景区
灵山聚龙洞旅游风景区
易县狼牙山风景区
留法勤工俭学纪念馆
白求恩柯棣华纪念馆
唐县秀水峪
腰山王氏庄园
安国市药王庙景区
虎山风景区
唐县西胜沟景区
野三坡景区
鱼谷洞景区
昌利农业示范园景区
蒙牛乳业工业景区
金木国际产业园
顺平享水溪
顺平三妙峰景区
安国市中药文化博物馆
清苑古城香文化体验馆展厅
安国数字中药都
天香工业游景区
唐县潭瀑峡景区
顾家台骆驼湾景区
中药都药博园
秋闲阁艺术馆
华海·中央步行街
恋乡·太行水镇旅游综合体景区
保定宴饮食博物馆
绿建科技工业旅游景区
燕都古城景区
台湾农业工园景区
尧母文化园
永济桥景区
保定西大街
卓正神农现代农业示范园
和道国际箱包城旅游景区
古镇大激店旅游区
大平台景区
七山旅游景区
涿州清行宫
辽塔文化园
京作壹号酒庄
唐尧古镇
大慈阁"""
# 客流查询后回答的提示词
ANSWER_PROMPT = """
**景区游览指南生成要求**
输入数据格式:
- 用户问题:{msg}
- 查询到的数据:{data}(包含景区名称、在园人数、最大承载量、进出人数等)
- 输出语言:{language}
输出要求:
1. 标题格式:
**完整景区名称**游览建议
2. 实时客流展示:
- 计算当前在园人数(进入人数-离开人数的绝对值)
- 计算承载率(在园人数/最大承载量)
- 按以下标准显示舒适度等级:
<30%:<font color="green">舒适</font>
30%-50%:<font color="blue">较舒适</font>
50%-70%:<font color="blue">一般</font>
70%-90%:<font color="blue">较拥挤</font>
>90%:<font color="red">拥挤</font>
- 附加简短体验描述(1-2句)
3. 周边停车指南:
- 停车场名称
- 距离(米)
- 步行时间(分钟)
- 当前余位/总车位
- 收费标准(注明"具体以现场公示为准"
4. 出行与游览建议:
- 推荐程度(强烈推荐/推荐/谨慎考虑/暂缓前往)
- 具体建议(包含错峰时间、交通方式、路线建议)
- 建议游览时长(根据舒适度调整)
- 替代景区推荐(当拥挤时)
5. 温馨提示:
- 安全提醒
- 天气影响
- 其他注意事项
6. 数据缺失时的通用建议:
- 标题:**景区名称**游览建议
- 实时客流提示
- 常规交通建议
- 游览路线说明
- 附近景区推荐
语言风格要求:
- 使用生动形象的语言描述,适当添加emoji表情符号增强表现力
- 采用"""记得""推荐"等暖心词汇
- 禁用专业术语
- 使用**加粗**强调关键信息
- 结尾提示:"动态信息请以现场为准,祝您旅途愉快!"
数据要求:
- 仅使用查询到的数据
- 不虚构未提供的信息
- 数字数据仅展示计算后的在园人数,不显示例如进入人数、离开人数、承载量和承载率等原始数据字段
- 不输出数据更新时间
- 标题字体大小为 18–20px,加粗显示
- 正文内容字体大小为 15–16px
- 行间距为 1.6–1.8 倍字体大小
- 段落与模块之间上下边距应为10–16px
- 不能使用小于 14px 的字体
"""
async def classify(msg: str) -> str:
print(f"Starting classification for message: {msg}")
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "system", "content": CATEGORY_PROMPT}, {"role": "user", "content": msg}]
)
result = response.choices[0].message.content
print(f"Classification result: {result}")
return result
except Exception as e:
print(f"Error in classification: {e}")
raise
async def ai_chat_stream(inp: ChatIn, conversation_history: list) -> AsyncGenerator[str, None]:
chat_prompt = f"""
你是一个专门服务河北省保定市旅游的AI助手,主要提供以下精准服务:
1. 行程规划:根据游客的停留天数(1-7天)、预算范围(经济型/中档/豪华)、兴趣偏好(历史文化/自然风光/美食体验)提供定制化行程方案
2. 景点推荐:详细介绍保定市3A级以上旅游景区(如白洋淀、野三坡、清西陵等)的开放时间、门票价格、最佳游览季节和交通方式
3. 特色推荐:提供保定驴肉火烧、槐茂酱菜等地方特色美食的具体店铺地址和人均消费
4. 实用信息:提供保定市区及周边县市的公共交通线路、出租车参考价格、天气情况等实用旅行信息
**服务要求**:
- 使用生动形象的语言描述,适当添加emoji表情符号增强表现力
- 采用Markdown语法组织内容,合理使用**加粗**、*斜体*等格式
- 分点说明时使用清晰的列表格式
- 重要信息使用高亮标记
**服务限制**:
- 地理范围:仅限保定市行政区划内(含下辖县市)的文旅信息
- 语言输出:严格使用用户指定的{inp.language}语言回复
- 问题边界:对非保定文旅相关问题统一回复"我是您的保定旅行助手,专注于解答有关保定市的旅行问题哦~"
"""
messages = [{"role": "system", "content": chat_prompt}] + conversation_history
messages.append({"role": "user", "content": inp.message})
print(f"Starting AI chat stream with input: {inp.message}")
full_response = ""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
# 使用异步方式处理同步流
import asyncio
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
full_response += delta.content
yield delta.content
import sys
sys.stdout.flush()
# 短暂让出控制权,避免阻塞
await asyncio.sleep(0)
# 添加结束标记
yield " "
except Exception as e:
error_msg = f"当前访问人数过多,请稍后重试: {str(e)}"
print(error_msg)
yield error_msg
if full_response:
conversation_history.append({"role": "assistant", "content": full_response})
print("AI chat stream finished.")
def get_formatted_prompt(user_language,msg,data):
# 使用format方法替换占位符
return ANSWER_PROMPT.format(language=user_language, msg=msg, data=data)
async def gen_markdown_stream(msg: str, data: str, language: str, conversation_history: list) -> AsyncGenerator[str, None]:
prompt = get_formatted_prompt(language,msg,data) # 将变量替换到提示词中
messages = conversation_history + [{"role": "user", "content": prompt}]
print(f"Starting markdown stream with message: {msg} and data: {data}")
full_response = ""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
# 使用异步方式处理同步流
import asyncio
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
full_response += delta.content
yield delta.content
import sys
sys.stdout.flush()
# 短暂让出控制权,避免阻塞
await asyncio.sleep(0)
# 添加结束标记
yield " "
except Exception as e:
error_msg = f"当前访问人数过多,请稍后重试: {str(e)}"
print(error_msg)
yield error_msg
if full_response:
conversation_history.append({"role": "assistant", "content": full_response})
print("Markdown stream finished.")
async def extract_spot(msg: str) -> str:
print(f"Starting spot extraction for message: {msg}")
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "system", "content": EXTRACT_PROMPT}, {"role": "user", "content": msg}]
)
result = response.choices[0].message.content
print(f"Extracted spot: {result}")
return result
except Exception as e:
print(f"Error in spot extraction: {e}")
raise
async def query_flow(request: Request, spot: str) -> str:
if not spot:
print("No spot found, returning default message.")
return "**未找到景区信息,请检查名称是否正确。\n\n(内容由AI生成,仅供参考)"
cache_key = f"flow:{spot}"
# Step 1: Redis 缓存查询
print(f"Querying Redis cache for key: {cache_key}")
try:
redis_client = request.app.state.redis_client
cached = await redis_client.get(cache_key)
if cached:
print(f"Found cached data for key: {cache_key}")
return cached
except Exception as e:
print(f"[Redis] 查询缓存失败: {e}")
# Step 2: MySQL 查询(使用连接池)
print(f"Querying MySQL for spot: {spot}")
try:
pool = request.app.state.mysql_pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
query = "SELECT SUM(t1.init_num) AS init_num, SUM(t1.out_num) AS out_num,t3.realtime_load_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s"
search_spot = f"%{spot}%"
await cur.execute(query, (search_spot,))
row = await cur.fetchone()
except Exception as e:
print(f"[MySQL] 查询失败: {e}")
return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)"
print("数据库查询结果", row)
if not row:
print(f"No data found for spot: {spot}")
return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)"
#随机生成剩余车位数量进行测试
car_num = random.randint(0, 100)
jl_num = random.randint(50, 200)
# 修改结果拼接部分,使用新的列名
result = f"**{spot} 客流**\n\n进入人数: {row[0]}\n离开人数: {row[1]}\n\n景区瞬时承载量:{row[2]};停车场名称:临时地上停车场 ,距离:{jl_num}米,空余车位:{car_num},总车位:100,收费标准:30分钟内免费,超出后3元/小时。(内容仅供参考)"
# Step 3: 写入 Redis 缓存
print(f"Writing data to Redis cache for key: {cache_key}")
try:
await redis_client.setex(cache_key, 120, result)
except Exception as e:
print(f"[Redis] 写缓存失败: {e}")
return result