From eb8b7e95248b9412c256e4cc2843e6eeca337f43 Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 21 Aug 2025 21:15:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=86=E7=9F=A5=E8=AF=86?= =?UTF-8?q?=E5=BA=93=E7=9A=84=E8=B0=83=E7=94=A8=EF=BC=8C=E4=BD=86=E6=98=AF?= =?UTF-8?q?=E7=8E=B0=E5=9C=A8=E5=A4=AA=E6=85=A2=E4=BC=9A=E5=87=BA=E7=8E=B0?= =?UTF-8?q?=E8=B6=85=E6=97=B6=EF=BC=8C=E9=9C=80=E8=A6=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/chat/ai/chat_router.py | 12 ++- app/api/chat/ai/chat_service.py | 163 ++++++++++++++++++++++++++------ requirements.txt | 3 +- 3 files changed, 145 insertions(+), 33 deletions(-) diff --git a/app/api/chat/ai/chat_router.py b/app/api/chat/ai/chat_router.py index d65c486..c572682 100644 --- a/app/api/chat/ai/chat_router.py +++ b/app/api/chat/ai/chat_router.py @@ -3,7 +3,8 @@ from fastapi import APIRouter, HTTPException from starlette.requests import Request from fastapi.responses import StreamingResponse from typing import AsyncGenerator, List -from .chat_service import classify, extract_spot, query_flow, gen_markdown_stream, ai_chat_stream, handle_quick_question +from .chat_service import classify, extract_spot, query_flow, gen_markdown_stream, ai_chat_stream, \ + handle_quick_question, fetch_and_parse_markdown from app.models.ChatIn import ChatIn, AllScenicFlowRequest, ScenicDetailRequest, ScenicParkingRequest from app.models.quick_question import QuickQuestion from app.models.hot_question import HotQuestion @@ -79,9 +80,18 @@ async def h5_chat_stream(request: Request, inp: ChatIn): if cat == "游玩判断": spot = await extract_spot(inp.message) data = await query_flow(request, spot) + knowledge = await fetch_and_parse_markdown(user_id,spot) + #如果知识库返回的内容不包含"知识库内未找到相应资源"则拼接字符串 + if "知识库内未找到相应资源" not in knowledge: + data += "知识库查询到的景区内容:"+ knowledge async for chunk in gen_markdown_stream(inp.message, data, inp.language, conversation_history): yield chunk else: + spot = await extract_spot(inp.message) + if spot: + knowledge = await fetch_and_parse_markdown(user_id, spot) + if "知识库内未找到相应资源" not in knowledge: + inp.message += ";知识库查询到的景区内容:"+ knowledge async for chunk in ai_chat_stream(inp, conversation_history): yield chunk diff --git a/app/api/chat/ai/chat_service.py b/app/api/chat/ai/chat_service.py index 0f05d66..80f0cc0 100644 --- a/app/api/chat/ai/chat_service.py +++ b/app/api/chat/ai/chat_service.py @@ -1,14 +1,15 @@ -import os, openai, aiomysql -import random - from openai import OpenAI from dotenv import load_dotenv -import redis.asyncio as redis from typing import AsyncGenerator from app.models.ChatIn import ChatIn from fastapi import Request from app.settings.config import settings +import json +import re +from typing import List +import requests + load_dotenv() @@ -99,14 +100,13 @@ ANSWER_PROMPT = """ **完整景区名称**游览建议 2. 实时客流展示: -- 计算当前在园人数(进入人数-离开人数的绝对值) -- 计算承载率(在园人数/最大承载量) +- 展示当前在园人数 - 按以下标准显示舒适度等级: - <30%:舒适 - 30%-50%:较舒适 - 50%-70%:一般 - 70%-90%:较拥挤 - >90%:拥挤 + 舒适:舒适 + 较舒适:较舒适 + 一般:一般 + 较拥挤:较拥挤 + 拥挤:拥挤 - 附加简短体验描述(1-2句) 3. 周边停车指南: @@ -144,16 +144,15 @@ ANSWER_PROMPT = """ 数据要求: - 仅使用查询到的数据 - 不虚构未提供的信息 -- 数字数据仅展示计算后的在园人数,不显示例如进入人数、离开人数、承载量和承载率等原始数据字段,如果承载率超过100%则只输出舒适度等级而不输出实际的在园人数 +- 如果舒适度为拥挤则只输出舒适度等级而不输出实际的在园人数 - 不输出数据更新时间 -- 标题字体大小为 18–20px,加粗显示 -- 正文内容字体大小为 15–16px -- 行间距为 1.6–1.8 倍字体大小 -- 段落与模块之间上下边距应为10–16px -- 不能使用小于 14px 的字体 -- 不要解释排版等内容 """ - +# - 标题字体大小为 18–20px,加粗显示 +# - 正文内容字体大小为 15–16px +# - 行间距为 1.6–1.8 倍字体大小 +# - 段落与模块之间上下边距应为10–16px +# - 不能使用小于 14px 的字体 +# - 不要解释排版等内容 async def classify(msg: str) -> str: print(f"Starting classification for message: {msg}") try: @@ -174,7 +173,8 @@ async def ai_chat_stream(inp: ChatIn, conversation_history: list) -> AsyncGenera 1. 行程规划:根据游客的停留天数(1-7天)、预算范围(经济型/中档/豪华)、兴趣偏好(历史文化/自然风光/美食体验)提供定制化行程方案 2. 景点推荐:详细介绍保定市3A级以上旅游景区(如白洋淀、野三坡、清西陵等)的开放时间、门票价格、最佳游览季节和交通方式 3. 特色推荐:提供保定驴肉火烧、槐茂酱菜等地方特色美食的具体店铺地址和人均消费 - 4. 实用信息:提供保定市区及周边县市的公共交通线路、出租车参考价格、天气情况等实用旅行信息 + 4. 实用信息:提供保定市区及周边县市的公共交通线路、出租车参考价格等实用旅行信息 + 5. 如果content中包含知识库查询到的景区内容,则需要严格参考查询到的内容,如果没有则正常按照上述要求回答 **服务要求**: - 使用生动形象的语言描述,适当添加emoji表情符号增强表现力 @@ -315,26 +315,48 @@ async def query_flow(request: Request, spot: str) -> str: async with pool.acquire() as conn: async with conn.cursor() as cur: # 查询景区客流信息 - query = "SELECT t3.id AS id,SUM(t1.init_num) AS init_num, SUM(t1.out_num) AS out_num,t3.realtime_load_capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s" + query = "SELECT ABS(SUM(t1.init_num)-SUM(t1.out_num)) AS in_num , t3.realtime_load_capacity AS capacity FROM equipment_passenger_flow.flow_current_video t1 LEFT JOIN cyjcpt_bd.zhly_video_manage t2 ON t1.mac_address = t2.mac_address LEFT JOIN cyjcpt_bd.zhly_scenic_basic t3 ON t2.video_scenic_id = t3.id WHERE t3.`name` LIKE %s" search_spot = f"%{spot}%" await cur.execute(query, (search_spot,)) row = await cur.fetchone() - # 查询停车场信息 - park_query = "SELECT t3.park_name AS park_name,IFNULL(t3.rate_info,'暂无收费标准信息') AS rate_info,t3.total_count AS total_count,t4.space AS space,t1.distance_value AS distance_value FROM cyjcpt_bd.scenic_pack_distance t1 LEFT JOIN cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id LEFT JOIN cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code LEFT JOIN equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code WHERE t2.`name` LIKE %s AND t1.distance_value <= 1000 AND t3.total_count != 0 ORDER BY t1.distance_value ASC LIMIT 2" + + # 查询停车场信息 + park_query = "SELECT t3.park_name AS park_name, IFNULL(t3.rate_info,'暂无收费标准信息') AS rate_info, t3.total_count AS total_count, t4.space AS space, t1.distance_value AS distance_value FROM cyjcpt_bd.scenic_pack_distance t1 LEFT JOIN cyjcpt_bd.zhly_scenic_basic t2 ON t1.scenic_id = t2.id LEFT JOIN cyjcpt_bd.park_info t3 ON t1.park_code = t3.park_code LEFT JOIN equipment_passenger_flow.park_current t4 ON t1.park_code = t4.park_code WHERE t2.`name` LIKE %s AND t1.distance_value <= 1000 AND t3.total_count != 0 ORDER BY t1.distance_value ASC LIMIT 2" park_search_spot = f"%{spot}%" await cur.execute(park_query, (park_search_spot,)) - park_row = await cur.fetchall() + park_rows = await cur.fetchall() + + # 先查询景区基本信息以获取ID + id_query = "SELECT id FROM cyjcpt_bd.zhly_scenic_basic WHERE `name` LIKE %s LIMIT 1" + await cur.execute(id_query, (search_spot,)) + id_row = await cur.fetchone() + scenic_id = id_row[0] if id_row else None except Exception as e: print(f"[MySQL] 查询失败: {e}") return f"**未找到景区【{spot}】的信息,请检查名称是否正确。\n\n(内容仅供参考)" + result = "" - if row : - result = f"{spot} 客流\n\n进入人数: {row[1]}\n离开人数: {row[2]}\n\n景区瞬时承载量:{row[3]};注:全部内容输出完以后,最后输出一段固定内容,内容为:

;" + if row and scenic_id and all(v is not None for v in row): + # 使用变量名访问客流数据 + in_num, capacity = row + #in_num/capacity 计算承载率,取两位小数 + hold_rate = round(in_num / capacity, 2) + """ 小于0.3:舒适 + 0.3-0.5:较舒适 + 0.5-0.7:一般 + 0.7-0.9:较拥挤 + 大于0.9%:拥挤""" + hold_level = "舒适" if hold_rate < 0.3 else "较舒适" if hold_rate < 0.5 else "一般" if hold_rate < 0.7 else "较拥挤" if hold_rate < 0.9 else "拥挤" + result = f"{spot} 客流\n\n在园人数: {in_num}\n\n舒适度等级:{hold_level};\n\n注:全部内容输出完以后,最后输出一段固定内容,内容为:
;" + elif scenic_id: + result = f"未找到景区【{spot}】的客流相关信息;注:全部内容输出完以后,最后输出一段固定内容,内容为:
;" else: result = f"未找到景区【{spot}】的客流相关信息" - if park_row: - for park_row in park_row: - result += f"停车场名称:{park_row[0]} ,距离:{park_row[4]}米,空余车位:{park_row[3]},总车位:{park_row[2]},收费标准:{park_row[1]}。" + if park_rows: + for park_row in park_rows: + # 使用变量名访问停车场数据 + park_name, rate_info, total_count, space, distance_value = park_row + result += f"停车场名称:{park_name} ,距离:{distance_value}米,空余车位:{space},总车位:{total_count},收费标准:{rate_info}。" else: result += "停车场信息:暂无数据。" try: @@ -348,7 +370,7 @@ async def handle_quick_question(inp: ChatIn, question_content: str) -> AsyncGene chat_prompt = f""" 你是一个专门格式化内容的AI助手,不能修改内容,仅修改格式, 负责将接收到的包含html标签内容进行格式化,要求是将能够转换成markdown语法的内容中的html标签转换成markdown语法, - 不能转换的保留html标签。 + 不能转换的保留html标签。注意:不修改和处理标签。 """ # 只包含系统提示和问题内容,不包含历史记录 messages = [ @@ -619,4 +641,83 @@ async def get_scenic_parking_data(request: Request, scenic_name: str, distance: except Exception as e: print(f"[MySQL] 查询景区停车场数据失败: {e}") - return [] \ No newline at end of file + return [] + +# 添加用于获取完整响应数据的新函数 +async def fetch_and_parse_markdown(user_id: int, question: str) -> str: + """ + 只提取最终完整的markdown内容(过滤流式中间片段) + """ + import httpx + import asyncio + + encoded_question = requests.utils.quote(question) + url = f"http://cjy.aitto.net:45678/api/v3/user_share_chat_completions?random={user_id}&api_key=cjy-626e50140e934936b8c82a3be5f6dea3&app_code=f5b3d4ba-7e7a-11f0-9de7-00e04f309c26&user_input={encoded_question}" + + all_markdowns: List[str] = [] + final_content = "" # 存储最终完整内容 + + try: + async with httpx.AsyncClient() as client: + async with client.stream("GET", url, timeout=30.0) as response: + async for line in response.aiter_lines(): + if not line: + continue + line_str = line.strip() + if not line_str.startswith("data:"): + continue + + data_str = line_str[5:].strip() + try: + data_json = json.loads(data_str) + vis_content = data_json.get("vis", "") + + # 提取所有markdown内容 + code_blocks = re.findall(r'```(.*?)```', vis_content, re.DOTALL) + for block in code_blocks: + block_parts = block.split('\n', 1) + if len(block_parts) < 2: + continue + block_type, block_content = block_parts + block_content = block_content.strip() + + try: + items = json.loads(block_content) + if isinstance(items, list): + for item in items: + if isinstance(item, dict) and "markdown" in item: + md_content = item["markdown"].strip() + all_markdowns.append(md_content) + + # 处理嵌套的markdown + nested_blocks = re.findall(r'```(.*?)```', md_content, re.DOTALL) + for nested in nested_blocks: + nested_parts = nested.split('\n', 1) + if len(nested_parts) >= 2: + nested_content = nested_parts[1].strip() + try: + nested_items = json.loads(nested_content) + if isinstance(nested_items, list): + for ni in nested_items: + if isinstance(ni, dict) and "markdown" in ni: + nested_md = ni["markdown"].strip() + all_markdowns.append(nested_md) + except json.JSONDecodeError: + continue + except json.JSONDecodeError: + continue + except json.JSONDecodeError: + continue + + except httpx.RequestError as e: + print(f"请求错误: {e}") + return "" + + # 核心逻辑:筛选出最长且完整的内容(流式响应中最后完成的内容通常最长) + if all_markdowns: + # 按长度倒序排序,取最长的非空内容 + all_markdowns = [md for md in all_markdowns if md] # 过滤空字符串 + if all_markdowns: + final_content = max(all_markdowns, key=len) + + return final_content diff --git a/requirements.txt b/requirements.txt index 0fcec98..3336aaa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -61,4 +61,5 @@ websockets==14.1 openai==1.97.1 aiomysql==0.2.0 redis==6.2.0 -sqlalchemy==2.0.42 \ No newline at end of file +sqlalchemy==2.0.42 +requests~=2.32.4 \ No newline at end of file