daren/apps/chat/services/chat_api.py

259 lines
11 KiB
Python
Raw Normal View History

2025-05-23 19:25:35 +08:00
import requests
import json
import logging
from django.conf import settings
from rest_framework.exceptions import APIException
from daren import settings
logger = logging.getLogger(__name__)
class ExternalAPIError(APIException):
status_code = 500
default_detail = '外部API调用失败'
default_code = 'external_api_error'
def stream_chat_answer(conversation_id, question, dataset_external_ids, metadata):
"""流式调用外部聊天API返回生成器以实时传输回答"""
try:
# 构造聊天请求数据
chat_request_data = {
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
"dataset_id_list": [str(id) for id in dataset_external_ids],
"multiple_rounds_dialogue": False,
"dataset_setting": {
"top_n": 10, "similarity": "0.3",
"max_paragraph_char_number": 10000,
"search_mode": "blend",
"no_references_setting": {
"value": "{question}",
"status": "ai_questioning"
}
},
"model_setting": {
"prompt": "**相关文档内容**{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**{question}"
},
"problem_optimization": False
}
# 发起聊天请求
logger.info(f"调用流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
chat_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat/open",
json=chat_request_data,
headers={"Content-Type": "application/json"}
)
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
if chat_response.status_code != 200:
error_msg = f"聊天API调用失败: {chat_response.text}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
chat_data = chat_response.json()
if chat_data.get('code') != 200 or not chat_data.get('data'):
error_msg = f"聊天API返回错误: {chat_data}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
chat_id = chat_data['data']
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
logger.info(f"调用消息API: {message_url}")
# 发起消息请求(流式)
message_request = requests.post(
url=message_url,
json={"message": question, "re_chat": False, "stream": True},
headers={"Content-Type": "application/json"},
stream=True
)
if message_request.status_code != 200:
error_msg = f"消息API调用失败: {message_request.status_code}, {message_request.text}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
buffer = ""
for chunk in message_request.iter_content(chunk_size=1):
if not chunk:
continue
chunk_str = chunk.decode('utf-8')
buffer += chunk_str
if '\n\n' in buffer:
lines = buffer.split('\n\n')
for line in lines[:-1]:
if line.startswith('data: '):
try:
json_str = line[6:]
data = json.loads(json_str)
if 'content' in data:
content_part = data['content']
response_data = {
'code': 200,
'message': 'partial',
'data': {
'content': content_part,
'is_end': data.get('is_end', False)
}
}
yield f"data: {json.dumps(response_data)}\n\n"
if data.get('is_end', False):
return
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 数据: {line}")
buffer = lines[-1]
if buffer and buffer.startswith('data: '):
try:
json_str = buffer[6:]
data = json.loads(json_str)
if 'content' in data:
content_part = data['content']
response_data = {
'code': 200,
'message': 'partial',
'data': {
'content': content_part,
'is_end': data.get('is_end', False)
}
}
yield f"data: {json.dumps(response_data)}\n\n"
except json.JSONDecodeError:
logger.error(f"处理剩余数据时JSON解析错误: {buffer}")
except Exception as e:
logger.error(f"流式聊天API处理出错: {str(e)}")
yield f"data: {json.dumps({'code': 500, 'message': f'流式处理出错: {str(e)}', 'data': {'is_end': True}})}\n\n"
def get_chat_answer(dataset_external_ids, question):
"""非流式调用外部聊天API获取回答"""
try:
chat_request_data = {
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
"dataset_id_list": [str(id) for id in dataset_external_ids],
"multiple_rounds_dialogue": False,
"dataset_setting": {
"top_n": 10,
"similarity": "0.3",
"max_paragraph_char_number": 10000,
"search_mode": "blend",
"no_references_setting": {
"value": "{question}",
"status": "ai_questioning"
}
},
"model_setting": {
"prompt": "**相关文档内容**{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**{question}"
},
"problem_optimization": False
}
logger.info(f"调用非流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
chat_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat/open",
json=chat_request_data,
headers={"Content-Type": "application/json"}
)
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
if chat_response.status_code != 200:
logger.error(f"聊天API调用失败: {chat_response.text}")
raise ExternalAPIError(f"聊天API调用失败: {chat_response.text}")
chat_data = chat_response.json()
if chat_data.get('code') != 200 or not chat_data.get('data'):
logger.error(f"聊天API返回错误: {chat_data}")
raise ExternalAPIError(f"聊天API返回错误: {chat_data}")
chat_id = chat_data['data']
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
logger.info(f"调用消息API: {message_url}")
message_response = requests.post(
url=message_url,
json={"message": question, "re_chat": False, "stream": False},
headers={"Content-Type": "application/json"}
)
if message_response.status_code != 200:
logger.error(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
raise ExternalAPIError(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
response_data = message_response.json()
if response_data.get('code') != 200 or 'data' not in response_data:
logger.error(f"消息API返回错误: {response_data}")
raise ExternalAPIError(f"消息API返回错误: {response_data}")
answer_content = response_data.get('data', {}).get('content', '')
return answer_content if answer_content else "无法获取回答内容"
except requests.exceptions.RequestException as e:
logger.error(f"聊天API网络错误: {str(e)}")
raise ExternalAPIError(f"聊天API网络错误: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"解析聊天API响应JSON失败: {str(e)}")
raise ExternalAPIError(f"解析响应数据失败: {str(e)}")
except Exception as e:
logger.error(f"调用聊天API失败: {str(e)}")
raise ExternalAPIError(f"调用聊天API失败: {str(e)}")
def generate_conversation_title_from_deepseek(user_question, assistant_answer):
"""调用SiliconCloud API生成会话标题直接基于当前问题和回答内容"""
try:
# 从Django设置中获取API密钥
api_key = settings.SILICON_CLOUD_API_KEY
if not api_key:
return "新对话"
# 构建提示信息
prompt = f"请根据用户的问题和助手的回答生成一个简短的对话标题不超过20个字\n\n用户问题: {user_question}\n\n助手回答: {assistant_answer}"
import requests
url = "https://api.siliconflow.cn/v1/chat/completions"
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"stream": False,
"max_tokens": 512,
"temperature": 0.7,
"top_p": 0.7,
"top_k": 50,
"frequency_penalty": 0.5,
"n": 1,
"stop": [],
"messages": [
{
"role": "user",
"content": prompt
}
]
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
response_data = response.json()
if response.status_code == 200 and 'choices' in response_data and response_data['choices']:
title = response_data['choices'][0]['message']['content'].strip()
return title[:50] # 截断过长的标题
else:
logger.error(f"生成标题时出错: {response.text}")
return "新对话"
except Exception as e:
logger.exception(f"生成对话标题时发生错误: {str(e)}")
return "新对话"