from openai import OpenAI from dotenv import load_dotenv from typing import AsyncGenerator from app.models.ChatIn import ChatIn from fastapi import Request from app.settings.config import settings import json import re from typing import List import requests load_dotenv() client = OpenAI(api_key=settings.DEEPSEEK_API_KEY, base_url=settings.DEEPSEEK_API_URL) #分类提示词 CATEGORY_PROMPT = """你是一个分类助手,请根据用户的问题判断属于哪一类: 1. 如果用户是问“某个保定市景区现在适不适合去”,或者“某个保定市景区现在人多么”此类涉及某个景区人数或者客流量的,注意只有保定的景区,请返回:游玩判断。 2. 其他均返回保定文旅。 只能返回以上两个分类词之一,不能多说话,不回复其他多余内容。""" #提取景区名称提示词 EXTRACT_PROMPT = """你是一名景区名称精准匹配助手。用户的问题中可能只包含景区简称、别称或部分关键词,你需要根据下面的完整景区名称列表,把用户提到的景区准确匹配到唯一最符合的完整名称并仅返回该名称,不要输出其他文字。如果用户没有提到任何景区,返回空字符串。 完整景区名称列表: 仙人峪景区 空中草原景区 白石山景区 阜平云花溪谷-玫瑰谷 保定军校纪念馆 保定直隶总督署博物馆 冉庄地道战遗址 刘伶醉景区 曲阳北岳庙景区 唐县华峪山庄 古莲花池 阜平天生桥景区 涿州三义宫 易水湖景区 晋察冀边区革命纪念馆 安国市毛主席视察纪念馆 清西陵景区 满城汉墓景区 灵山聚龙洞旅游风景区 易县狼牙山风景区 留法勤工俭学纪念馆 白求恩柯棣华纪念馆 唐县秀水峪 腰山王氏庄园 安国市药王庙景区 虎山风景区 唐县西胜沟景区 野三坡景区 鱼谷洞景区 昌利农业示范园景区 蒙牛乳业工业景区 金木国际产业园 顺平享水溪 顺平三妙峰景区 安国市中药文化博物馆 清苑古城香文化体验馆展厅 安国数字中药都 天香工业游景区 唐县潭瀑峡景区 顾家台骆驼湾景区 中药都药博园 秋闲阁艺术馆 华海·中央步行街 恋乡·太行水镇旅游综合体景区 保定宴饮食博物馆 绿建科技工业旅游景区 燕都古城景区 台湾农业工园景区 尧母文化园 永济桥景区 保定西大街 卓正神农现代农业示范园 和道国际箱包城旅游景区 古镇大激店旅游区 大平台景区 七山旅游景区 涿州清行宫 辽塔文化园 京作壹号酒庄 唐尧古镇 大慈阁""" # 客流查询后回答的提示词 ANSWER_PROMPT = """ **景区游览指南生成要求** 输入数据格式: - 用户问题:{msg} - 查询到的数据:{data}(包含景区名称、在园人数、最大承载量、进出人数等) - 输出语言:{language} 输出要求: 1. 标题格式: **完整景区名称**游览建议 2. 实时客流展示: - 展示当前在园人数 - 按以下标准显示舒适度等级: 舒适:舒适 较舒适:较舒适 一般:一般 较拥挤:较拥挤 拥挤:拥挤 - 附加简短体验描述(1-2句) 3. 周边停车指南: - 停车场名称 - 距离(米) - 步行时间(分钟) - 当前余位/总车位 - 收费标准(注明"具体以现场公示为准") 4. 出行与游览建议: - 推荐程度(强烈推荐/推荐/谨慎考虑/暂缓前往) - 具体建议(包含错峰时间、交通方式、路线建议) - 建议游览时长(根据舒适度调整) - 替代景区推荐(当拥挤时) 5. 温馨提示: - 安全提醒 - 介绍景区特色 - 其他注意事项 6. 数据缺失时的通用建议: - 标题:**景区名称**游览建议 - 实时客流提示 - 常规交通建议 - 游览路线说明 - 附近景区推荐 语言风格要求: - 使用生动形象的语言描述,适当添加emoji表情符号增强表现力 - 采用"您""记得""推荐"等暖心词汇 - 禁用专业术语 - 使用**加粗**强调关键信息 - 结尾提示:"动态信息请以现场为准,祝您旅途愉快!" 数据要求: - 仅使用查询到的数据 - 不虚构未提供的信息 - 如果舒适度为拥挤则只输出舒适度等级而不输出实际的在园人数 - 不输出数据更新时间 """ # - 标题字体大小为 18–20px,加粗显示 # - 正文内容字体大小为 15–16px # - 行间距为 1.6–1.8 倍字体大小 # - 段落与模块之间上下边距应为10–16px # - 不能使用小于 14px 的字体 # - 不要解释排版等内容 async def classify(msg: str) -> str: print(f"Starting classification for message: {msg}") try: response = client.chat.completions.create( model="deepseek-chat", messages=[{"role": "system", "content": CATEGORY_PROMPT}, {"role": "user", "content": msg}] ) result = response.choices[0].message.content print(f"Classification result: {result}") return result except Exception as e: print(f"Error in classification: {e}") raise async def ai_chat_stream(inp: ChatIn, conversation_history: list) -> AsyncGenerator[str, None]: chat_prompt = f""" 你是一个专门服务河北省保定市旅游的AI助手,主要提供以下精准服务: 1. 行程规划:根据游客的停留天数(1-7天)、预算范围(经济型/中档/豪华)、兴趣偏好(历史文化/自然风光/美食体验)提供定制化行程方案 2. 景点推荐:详细介绍保定市3A级以上旅游景区(如白洋淀、野三坡、清西陵等)的开放时间、门票价格、最佳游览季节和交通方式 3. 特色推荐:提供保定驴肉火烧、槐茂酱菜等地方特色美食的具体店铺地址和人均消费 4. 实用信息:提供保定市区及周边县市的公共交通线路、出租车参考价格等实用旅行信息 5. 如果content中包含知识库查询到的景区内容,则需要严格参考查询到的内容,如果没有则正常按照上述要求回答 **服务要求**: - 使用生动形象的语言描述,适当添加emoji表情符号增强表现力 - 采用Markdown语法组织内容,合理使用**加粗**、*斜体*等格式 - 分点说明时使用清晰的列表格式 - 重要信息使用高亮标记 **服务限制**: - 地理范围:仅限保定市行政区划内(含下辖县市)的文旅信息 - 语言输出:严格使用用户指定的{inp.language}语言回复 - 问题边界:对非保定文旅相关问题统一回复"我是您的保定旅行助手,专注于解答有关保定市的旅行问题哦~" """ messages = [{"role": "system", "content": chat_prompt}] + conversation_history messages.append({"role": "user", "content": inp.message}) print(f"Starting AI chat stream with input: {inp.message}") full_response = "" try: response = client.chat.completions.create( model="deepseek-chat", messages=messages, stream=True ) # 使用异步方式处理同步流 import asyncio for chunk in response: delta = chunk.choices[0].delta if hasattr(delta, "content") and delta.content: full_response += delta.content yield delta.content import sys sys.stdout.flush() # 短暂让出控制权,避免阻塞 await asyncio.sleep(0) # 生成推荐问题 if full_response: recommended_questions = await generate_recommended_questions(inp.message, full_response) if recommended_questions: # 添加分隔符和标题 yield "\n\n### 您可能还想了解:" # 逐个返回推荐问题 for i, question in enumerate(recommended_questions, 1): yield f"\n{i}. {question}" # 添加结束标记 yield " " except Exception as e: error_msg = f"当前访问人数过多,请稍后重试: {str(e)}" print(error_msg) yield error_msg if full_response: conversation_history.append({"role": "assistant", "content": full_response}) print("AI chat stream finished.") def get_formatted_prompt(user_language,msg,data): # 使用format方法替换占位符 return ANSWER_PROMPT.format(language=user_language, msg=msg, data=data) async def gen_markdown_stream(msg: str, data: str, language: str, conversation_history: list) -> AsyncGenerator[str, None]: prompt = get_formatted_prompt(language,msg,data) # 将变量替换到提示词中 messages = conversation_history + [{"role": "user", "content": prompt}] print(f"Starting markdown stream with message: {msg} and data: {data}") full_response = "" try: response = client.chat.completions.create( model="deepseek-chat", messages=messages, stream=True ) # 使用异步方式处理同步流 import asyncio for chunk in response: delta = chunk.choices[0].delta if hasattr(delta, "content") and delta.content: full_response += delta.content yield delta.content import sys sys.stdout.flush() # 短暂让出控制权,避免阻塞 await asyncio.sleep(0) # 生成推荐问题 if full_response: recommended_questions = await generate_recommended_questions(msg, full_response) if recommended_questions: # 添加分隔符和标题 yield "\n\n### 您可能还想了解:" # 逐个返回推荐问题 for i, question in enumerate(recommended_questions, 1): yield f"\n{i}. {question}" # 添加结束标记 yield " " except Exception as e: error_msg = f"当前访问人数过多,请稍后重试: {str(e)}" print(error_msg) yield error_msg if full_response: conversation_history.append({"role": "assistant", "content": full_response}) print("Markdown stream finished.") async def extract_spot(msg: str) -> str: print(f"Starting spot extraction for message: {msg}") try: response = client.chat.completions.create( model="deepseek-chat", messages=[{"role": "system", "content": EXTRACT_PROMPT}, {"role": "user", "content": msg}] ) result = response.choices[0].message.content print(f"Extracted spot: {result}") return result except Exception as e: print(f"Error in spot extraction: {e}") raise async def query_flow(request: Request, spot: str) -> str: if not spot: print("No spot found, returning default message.") return "**未找到景区信息,请检查名称是否正确。\n\n(内容由AI生成,仅供参考)" cache_key = f"flow:{spot}" # Step 1: Redis 缓存查询 print(f"Querying Redis cache for key: {cache_key}") try: redis_client = request.app.state.redis_client cached = await redis_client.get(cache_key) if cached: print(f"Found cached data for key: {cache_key}") return cached except Exception as e: print(f"[Redis] 查询缓存失败: {e}") # Step 2: MySQL 查询(使用连接池) try: pool = request.app.state.mysql_pool async with pool.acquire() as conn: async with conn.cursor() as cur: # 查询景区客流信息 query = "SELECT ABS(SUM(t1.init_num)-SUM(t1.out_num)) AS in_num , t3.realtime_load_capacity AS capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s" search_spot = f"%{spot}%" await cur.execute(query, (search_spot,)) row = await cur.fetchone() # 查询停车场信息 park_query = "SELECT t3.park_name AS park_name, IFNULL(t3.rate_info,'暂无收费标准信息') AS rate_info, t3.total_count AS total_count, t4.space AS space, t1.distance_value AS distance_value FROM cyjcpt_bd.scenic_pack_distance t1 LEFT JOIN cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id LEFT JOIN cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code LEFT JOIN equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code WHERE t2.`name` LIKE %s AND t1.distance_value <= 1000 AND t3.total_count != 0 ORDER BY t1.distance_value ASC LIMIT 2" park_search_spot = f"%{spot}%" await cur.execute(park_query, (park_search_spot,)) park_rows = await cur.fetchall() # 先查询景区基本信息以获取ID id_query = "SELECT id FROM cyjcpt_bd.zhly_scenic_basic WHERE `name` LIKE %s LIMIT 1" await cur.execute(id_query, (search_spot,)) id_row = await cur.fetchone() scenic_id = id_row[0] if id_row else None except Exception as e: print(f"[MySQL] 查询失败: {e}") return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)" result = "" if row and scenic_id and all(v is not None for v in row): # 使用变量名访问客流数据 in_num, capacity = row #in_num/capacity 计算承载率,取两位小数 hold_rate = round(in_num / capacity, 2) """ 小于0.3:舒适 0.3-0.5:较舒适 0.5-0.7:一般 0.7-0.9:较拥挤 大于0.9%:拥挤""" hold_level = "舒适" if hold_rate < 0.3 else "较舒适" if hold_rate < 0.5 else "一般" if hold_rate < 0.7 else "较拥挤" if hold_rate < 0.9 else "拥挤" result = f"{spot} 客流\n\n在园人数: {in_num}\n\n舒适度等级:{hold_level};\n\n注:全部内容输出完以后,最后输出一段固定内容,内容为:
{{问题}}
""" try: response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一个问题推荐助手,擅长基于对话内容生成相关的延伸问题"}, {"role": "user", "content": prompt} ] ) questions = response.choices[0].message.content.strip().split('\n') # 过滤空行并限制数量为1-3个 return [q for q in questions if q][:3] except Exception as e: print(f"生成推荐问题失败: {e}") return [] # 添加新函数用于获取所有景区客流数据 async def get_all_scenic_flow_data(request: Request) -> list: """ 查询所有景区的客流数据,计算承载率,并按承载率倒序排列 """ try: pool = request.app.state.mysql_pool async with pool.acquire() as conn: async with conn.cursor() as cur: # 查询所有景区客流信息 query = """ SELECT t3.id AS id, t3.`name` AS scenic_name, SUM(t1.init_num) AS enter_num, SUM(t1.out_num) AS leave_num, t3.realtime_load_capacity AS max_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` IS NOT NULL GROUP BY t3.`name`, t3.realtime_load_capacity ORDER BY (SUM(t1.init_num) - SUM(t1.out_num)) / t3.realtime_load_capacity DESC """ await cur.execute(query) rows = await cur.fetchall() # 处理结果 result = [] for row in rows: id,scenic_name, enter_num, leave_num, max_capacity = row in_park_num = abs(enter_num - leave_num) # 确保是正数 # 避免除以零的情况 if max_capacity > 0: capacity_rate = in_park_num / max_capacity else: capacity_rate = 0 result.append({ "id": id, "scenic_name": scenic_name, "enter_num": enter_num, "leave_num": leave_num, "in_park_num": in_park_num, "max_capacity": max_capacity, "capacity_rate": capacity_rate }) return result except Exception as e: print(f"[MySQL] 查询所有景区客流数据失败: {e}") return [] # 在文件末尾添加新函数 async def get_scenic_detail_data(request: Request, id: int) -> dict: """ 查询单个景区的详细信息,包含舒适度判断 """ try: pool = request.app.state.mysql_pool async with pool.acquire() as conn: async with conn.cursor() as cur: # 查询单个景区客流信息 query = """ SELECT t3.`name` AS scenic_name, COALESCE(SUM(t1.init_num), 0) AS enter_num, COALESCE(SUM(t1.out_num), 0) AS leave_num, COALESCE(t3.realtime_load_capacity, 0) AS max_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.id = %s GROUP BY t3.`name`, t3.realtime_load_capacity """ await cur.execute(query, (id,)) row = await cur.fetchone() if not row: return None scenic_name, enter_num, leave_num, max_capacity = row # 计算在园人数 in_park_num = max(0, enter_num - leave_num) # 计算承载率和舒适度 if max_capacity > 0: capacity_rate = in_park_num / max_capacity capacity_percentage = round(capacity_rate * 100, 1) # 根据承载率判断舒适度 if capacity_rate < 0.3: comfort_level = "舒适" elif capacity_rate < 0.5: comfort_level = "较舒适" elif capacity_rate < 0.7: comfort_level = "一般" elif capacity_rate < 0.9: comfort_level = "较拥挤" else: comfort_level = "拥挤" else: capacity_rate = 0.0 capacity_percentage = 0.0 return { "scenic_name": scenic_name, "enter_num": enter_num or 0, "leave_num": leave_num or 0, "in_park_num": in_park_num, "max_capacity": max_capacity or 0, "capacity_rate": round(capacity_rate, 4), "comfort_level": comfort_level } except Exception as e: print(f"[MySQL] 查询景区详情数据失败: {e}") return None # 在chat_service.py末尾添加新的停车场查询函数 async def get_scenic_parking_data(request: Request, scenic_name: str, distance: int) -> list: """ 查询景区附近的停车场信息 Args: request: FastAPI请求对象 scenic_name: 景区名称 distance: 查询距离(米),>=1000时查询全部 Returns: list: 停车场信息列表,按距离排序 """ try: pool = request.app.state.mysql_pool async with pool.acquire() as conn: async with conn.cursor() as cur: # 构建基础查询 base_query = """ SELECT t3.park_name AS park_name, t3.total_count AS total_parking_spaces, COALESCE(t4.space, 0) AS available_spaces, t1.distance_value AS distance_meters FROM cyjcpt_bd.scenic_pack_distance t1 LEFT JOIN cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id LEFT JOIN cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code LEFT JOIN equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code WHERE t2.`name` LIKE %s AND t3.total_count > 0 """ # 根据距离参数构建WHERE条件 if distance >= 1000: # 距离>=1000米,查询全部停车场 where_condition = "" params = (f"%{scenic_name}%",) else: # 距离<1000米,按指定距离查询 where_condition = " AND t1.distance_value <= %s" params = (f"%{scenic_name}%", distance) # 完整的查询语句 query = base_query + where_condition + " ORDER BY t1.distance_value ASC" await cur.execute(query, params) rows = await cur.fetchall() # 处理结果 result = [] for row in rows: park_name, total_spaces, available_spaces, distance_meters = row result.append({ "park_name": park_name, "total_parking_spaces": total_spaces or 0, "available_spaces": available_spaces or 0, "distance_meters": distance_meters or 0 }) return result except Exception as e: print(f"[MySQL] 查询景区停车场数据失败: {e}") return [] # 添加用于获取完整响应数据的新函数 async def fetch_and_parse_markdown(user_id: int, question: str) -> str: """ 只提取最终完整的markdown内容(过滤流式中间片段) """ import httpx import asyncio encoded_question = requests.utils.quote(question) url = f"http://cjy.aitto.net:45678/api/v3/user_share_chat_completions?random={user_id}&api_key=cjy-626e50140e934936b8c82a3be5f6dea3&app_code=f5b3d4ba-7e7a-11f0-9de7-00e04f309c26&user_input={encoded_question}" all_markdowns: List[str] = [] final_content = "" # 存储最终完整内容 try: async with httpx.AsyncClient() as client: async with client.stream("GET", url, timeout=30.0) as response: async for line in response.aiter_lines(): if not line: continue line_str = line.strip() if not line_str.startswith("data:"): continue data_str = line_str[5:].strip() try: data_json = json.loads(data_str) vis_content = data_json.get("vis", "") # 提取所有markdown内容 code_blocks = re.findall(r'```(.*?)```', vis_content, re.DOTALL) for block in code_blocks: block_parts = block.split('\n', 1) if len(block_parts) < 2: continue block_type, block_content = block_parts block_content = block_content.strip() try: items = json.loads(block_content) if isinstance(items, list): for item in items: if isinstance(item, dict) and "markdown" in item: md_content = item["markdown"].strip() all_markdowns.append(md_content) # 处理嵌套的markdown nested_blocks = re.findall(r'```(.*?)```', md_content, re.DOTALL) for nested in nested_blocks: nested_parts = nested.split('\n', 1) if len(nested_parts) >= 2: nested_content = nested_parts[1].strip() try: nested_items = json.loads(nested_content) if isinstance(nested_items, list): for ni in nested_items: if isinstance(ni, dict) and "markdown" in ni: nested_md = ni["markdown"].strip() all_markdowns.append(nested_md) except json.JSONDecodeError: continue except json.JSONDecodeError: continue except json.JSONDecodeError: continue except httpx.RequestError as e: print(f"请求错误: {e}") return "" # 核心逻辑:筛选出最长且完整的内容(流式响应中最后完成的内容通常最长) if all_markdowns: # 按长度倒序排序,取最长的非空内容 all_markdowns = [md for md in all_markdowns if md] # 过滤空字符串 if all_markdowns: final_content = max(all_markdowns, key=len) return final_content