daren/apps/chat/services/chat_api.py
2025-05-23 19:25:35 +08:00

259 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import logging
from django.conf import settings
from rest_framework.exceptions import APIException
from daren import settings
logger = logging.getLogger(__name__)
class ExternalAPIError(APIException):
status_code = 500
default_detail = '外部API调用失败'
default_code = 'external_api_error'
def stream_chat_answer(conversation_id, question, dataset_external_ids, metadata):
"""流式调用外部聊天API返回生成器以实时传输回答"""
try:
# 构造聊天请求数据
chat_request_data = {
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
"dataset_id_list": [str(id) for id in dataset_external_ids],
"multiple_rounds_dialogue": False,
"dataset_setting": {
"top_n": 10, "similarity": "0.3",
"max_paragraph_char_number": 10000,
"search_mode": "blend",
"no_references_setting": {
"value": "{question}",
"status": "ai_questioning"
}
},
"model_setting": {
"prompt": "**相关文档内容**{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**{question}"
},
"problem_optimization": False
}
# 发起聊天请求
logger.info(f"调用流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
chat_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat/open",
json=chat_request_data,
headers={"Content-Type": "application/json"}
)
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
if chat_response.status_code != 200:
error_msg = f"聊天API调用失败: {chat_response.text}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
chat_data = chat_response.json()
if chat_data.get('code') != 200 or not chat_data.get('data'):
error_msg = f"聊天API返回错误: {chat_data}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
chat_id = chat_data['data']
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
logger.info(f"调用消息API: {message_url}")
# 发起消息请求(流式)
message_request = requests.post(
url=message_url,
json={"message": question, "re_chat": False, "stream": True},
headers={"Content-Type": "application/json"},
stream=True
)
if message_request.status_code != 200:
error_msg = f"消息API调用失败: {message_request.status_code}, {message_request.text}"
logger.error(error_msg)
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
return
buffer = ""
for chunk in message_request.iter_content(chunk_size=1):
if not chunk:
continue
chunk_str = chunk.decode('utf-8')
buffer += chunk_str
if '\n\n' in buffer:
lines = buffer.split('\n\n')
for line in lines[:-1]:
if line.startswith('data: '):
try:
json_str = line[6:]
data = json.loads(json_str)
if 'content' in data:
content_part = data['content']
response_data = {
'code': 200,
'message': 'partial',
'data': {
'content': content_part,
'is_end': data.get('is_end', False)
}
}
yield f"data: {json.dumps(response_data)}\n\n"
if data.get('is_end', False):
return
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 数据: {line}")
buffer = lines[-1]
if buffer and buffer.startswith('data: '):
try:
json_str = buffer[6:]
data = json.loads(json_str)
if 'content' in data:
content_part = data['content']
response_data = {
'code': 200,
'message': 'partial',
'data': {
'content': content_part,
'is_end': data.get('is_end', False)
}
}
yield f"data: {json.dumps(response_data)}\n\n"
except json.JSONDecodeError:
logger.error(f"处理剩余数据时JSON解析错误: {buffer}")
except Exception as e:
logger.error(f"流式聊天API处理出错: {str(e)}")
yield f"data: {json.dumps({'code': 500, 'message': f'流式处理出错: {str(e)}', 'data': {'is_end': True}})}\n\n"
def get_chat_answer(dataset_external_ids, question):
"""非流式调用外部聊天API获取回答"""
try:
chat_request_data = {
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
"dataset_id_list": [str(id) for id in dataset_external_ids],
"multiple_rounds_dialogue": False,
"dataset_setting": {
"top_n": 10,
"similarity": "0.3",
"max_paragraph_char_number": 10000,
"search_mode": "blend",
"no_references_setting": {
"value": "{question}",
"status": "ai_questioning"
}
},
"model_setting": {
"prompt": "**相关文档内容**{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**{question}"
},
"problem_optimization": False
}
logger.info(f"调用非流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
chat_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat/open",
json=chat_request_data,
headers={"Content-Type": "application/json"}
)
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
if chat_response.status_code != 200:
logger.error(f"聊天API调用失败: {chat_response.text}")
raise ExternalAPIError(f"聊天API调用失败: {chat_response.text}")
chat_data = chat_response.json()
if chat_data.get('code') != 200 or not chat_data.get('data'):
logger.error(f"聊天API返回错误: {chat_data}")
raise ExternalAPIError(f"聊天API返回错误: {chat_data}")
chat_id = chat_data['data']
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
logger.info(f"调用消息API: {message_url}")
message_response = requests.post(
url=message_url,
json={"message": question, "re_chat": False, "stream": False},
headers={"Content-Type": "application/json"}
)
if message_response.status_code != 200:
logger.error(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
raise ExternalAPIError(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
response_data = message_response.json()
if response_data.get('code') != 200 or 'data' not in response_data:
logger.error(f"消息API返回错误: {response_data}")
raise ExternalAPIError(f"消息API返回错误: {response_data}")
answer_content = response_data.get('data', {}).get('content', '')
return answer_content if answer_content else "无法获取回答内容"
except requests.exceptions.RequestException as e:
logger.error(f"聊天API网络错误: {str(e)}")
raise ExternalAPIError(f"聊天API网络错误: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"解析聊天API响应JSON失败: {str(e)}")
raise ExternalAPIError(f"解析响应数据失败: {str(e)}")
except Exception as e:
logger.error(f"调用聊天API失败: {str(e)}")
raise ExternalAPIError(f"调用聊天API失败: {str(e)}")
def generate_conversation_title_from_deepseek(user_question, assistant_answer):
"""调用SiliconCloud API生成会话标题直接基于当前问题和回答内容"""
try:
# 从Django设置中获取API密钥
api_key = settings.SILICON_CLOUD_API_KEY
if not api_key:
return "新对话"
# 构建提示信息
prompt = f"请根据用户的问题和助手的回答生成一个简短的对话标题不超过20个字\n\n用户问题: {user_question}\n\n助手回答: {assistant_answer}"
import requests
url = "https://api.siliconflow.cn/v1/chat/completions"
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"stream": False,
"max_tokens": 512,
"temperature": 0.7,
"top_p": 0.7,
"top_k": 50,
"frequency_penalty": 0.5,
"n": 1,
"stop": [],
"messages": [
{
"role": "user",
"content": prompt
}
]
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
response_data = response.json()
if response.status_code == 200 and 'choices' in response_data and response_data['choices']:
title = response_data['choices'][0]['message']['content'].strip()
return title[:50] # 截断过长的标题
else:
logger.error(f"生成标题时出错: {response.text}")
return "新对话"
except Exception as e:
logger.exception(f"生成对话标题时发生错误: {str(e)}")
return "新对话"