347 lines
15 KiB
Python
347 lines
15 KiB
Python
import requests
|
||
import json
|
||
import logging
|
||
from django.conf import settings
|
||
from rest_framework.exceptions import APIException
|
||
from daren_project import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class ExternalAPIError(APIException):
|
||
status_code = 500
|
||
default_detail = '外部API调用失败'
|
||
default_code = 'external_api_error'
|
||
|
||
def stream_chat_answer(conversation_id, question, dataset_external_ids, metadata):
|
||
"""流式调用外部聊天API,返回生成器以实时传输回答"""
|
||
try:
|
||
# 构造聊天请求数据
|
||
chat_request_data = {
|
||
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
|
||
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
|
||
"dataset_id_list": [str(id) for id in dataset_external_ids],
|
||
"multiple_rounds_dialogue": False,
|
||
"dataset_setting": {
|
||
"top_n": 10, "similarity": "0.3",
|
||
"max_paragraph_char_number": 10000,
|
||
"search_mode": "blend",
|
||
"no_references_setting": {
|
||
"value": "{question}",
|
||
"status": "ai_questioning"
|
||
}
|
||
},
|
||
"model_setting": {
|
||
"prompt": "**相关文档内容**:{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**:{question}"
|
||
},
|
||
"problem_optimization": False
|
||
}
|
||
|
||
# 发起聊天请求
|
||
logger.info(f"调用流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
|
||
chat_response = requests.post(
|
||
url=f"{settings.API_BASE_URL}/api/application/chat/open",
|
||
json=chat_request_data,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
|
||
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
|
||
if chat_response.status_code != 200:
|
||
error_msg = f"聊天API调用失败: {chat_response.text}"
|
||
logger.error(error_msg)
|
||
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
|
||
return
|
||
|
||
chat_data = chat_response.json()
|
||
if chat_data.get('code') != 200 or not chat_data.get('data'):
|
||
error_msg = f"聊天API返回错误: {chat_data}"
|
||
logger.error(error_msg)
|
||
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
|
||
return
|
||
|
||
chat_id = chat_data['data']
|
||
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
|
||
logger.info(f"调用消息API: {message_url}")
|
||
|
||
# 发起消息请求(流式)
|
||
message_request = requests.post(
|
||
url=message_url,
|
||
json={"message": question, "re_chat": False, "stream": True},
|
||
headers={"Content-Type": "application/json"},
|
||
stream=True
|
||
)
|
||
|
||
if message_request.status_code != 200:
|
||
error_msg = f"消息API调用失败: {message_request.status_code}, {message_request.text}"
|
||
logger.error(error_msg)
|
||
yield f"data: {json.dumps({'code': 500, 'message': error_msg, 'data': {'is_end': True}})}\n\n"
|
||
return
|
||
|
||
buffer = ""
|
||
for chunk in message_request.iter_content(chunk_size=1):
|
||
if not chunk:
|
||
continue
|
||
|
||
chunk_str = chunk.decode('utf-8')
|
||
buffer += chunk_str
|
||
|
||
if '\n\n' in buffer:
|
||
lines = buffer.split('\n\n')
|
||
for line in lines[:-1]:
|
||
if line.startswith('data: '):
|
||
try:
|
||
json_str = line[6:]
|
||
data = json.loads(json_str)
|
||
if 'content' in data:
|
||
content_part = data['content']
|
||
response_data = {
|
||
'code': 200,
|
||
'message': 'partial',
|
||
'data': {
|
||
'content': content_part,
|
||
'is_end': data.get('is_end', False)
|
||
}
|
||
}
|
||
yield f"data: {json.dumps(response_data)}\n\n"
|
||
|
||
if data.get('is_end', False):
|
||
return
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"JSON解析错误: {e}, 数据: {line}")
|
||
|
||
buffer = lines[-1]
|
||
|
||
if buffer and buffer.startswith('data: '):
|
||
try:
|
||
json_str = buffer[6:]
|
||
data = json.loads(json_str)
|
||
if 'content' in data:
|
||
content_part = data['content']
|
||
response_data = {
|
||
'code': 200,
|
||
'message': 'partial',
|
||
'data': {
|
||
'content': content_part,
|
||
'is_end': data.get('is_end', False)
|
||
}
|
||
}
|
||
yield f"data: {json.dumps(response_data)}\n\n"
|
||
except json.JSONDecodeError:
|
||
logger.error(f"处理剩余数据时JSON解析错误: {buffer}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"流式聊天API处理出错: {str(e)}")
|
||
yield f"data: {json.dumps({'code': 500, 'message': f'流式处理出错: {str(e)}', 'data': {'is_end': True}})}\n\n"
|
||
|
||
def get_chat_answer(dataset_external_ids, question):
|
||
"""非流式调用外部聊天API获取回答"""
|
||
try:
|
||
chat_request_data = {
|
||
"id": "d5d11efa-ea9a-11ef-9933-0242ac120006",
|
||
"model_id": "7a214d0e-e65e-11ef-9f4a-0242ac120006",
|
||
"dataset_id_list": [str(id) for id in dataset_external_ids],
|
||
"multiple_rounds_dialogue": False,
|
||
"dataset_setting": {
|
||
"top_n": 10,
|
||
"similarity": "0.3",
|
||
"max_paragraph_char_number": 10000,
|
||
"search_mode": "blend",
|
||
"no_references_setting": {
|
||
"value": "{question}",
|
||
"status": "ai_questioning"
|
||
}
|
||
},
|
||
"model_setting": {
|
||
"prompt": "**相关文档内容**:{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**:{question}"
|
||
},
|
||
"problem_optimization": False
|
||
}
|
||
|
||
logger.info(f"调用非流式聊天API: {settings.API_BASE_URL}/api/application/chat/open")
|
||
chat_response = requests.post(
|
||
url=f"{settings.API_BASE_URL}/api/application/chat/open",
|
||
json=chat_request_data,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
|
||
logger.info(f"聊天API响应状态码: {chat_response.status_code}")
|
||
if chat_response.status_code != 200:
|
||
logger.error(f"聊天API调用失败: {chat_response.text}")
|
||
raise ExternalAPIError(f"聊天API调用失败: {chat_response.text}")
|
||
|
||
chat_data = chat_response.json()
|
||
if chat_data.get('code') != 200 or not chat_data.get('data'):
|
||
logger.error(f"聊天API返回错误: {chat_data}")
|
||
raise ExternalAPIError(f"聊天API返回错误: {chat_data}")
|
||
|
||
chat_id = chat_data['data']
|
||
message_url = f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}"
|
||
logger.info(f"调用消息API: {message_url}")
|
||
|
||
message_response = requests.post(
|
||
url=message_url,
|
||
json={"message": question, "re_chat": False, "stream": False},
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
|
||
if message_response.status_code != 200:
|
||
logger.error(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
|
||
raise ExternalAPIError(f"消息API调用失败: {message_response.status_code}, {message_response.text}")
|
||
|
||
response_data = message_response.json()
|
||
if response_data.get('code') != 200 or 'data' not in response_data:
|
||
logger.error(f"消息API返回错误: {response_data}")
|
||
raise ExternalAPIError(f"消息API返回错误: {response_data}")
|
||
|
||
answer_content = response_data.get('data', {}).get('content', '')
|
||
return answer_content if answer_content else "无法获取回答内容"
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"聊天API网络错误: {str(e)}")
|
||
raise ExternalAPIError(f"聊天API网络错误: {str(e)}")
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"解析聊天API响应JSON失败: {str(e)}")
|
||
raise ExternalAPIError(f"解析响应数据失败: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"调用聊天API失败: {str(e)}")
|
||
raise ExternalAPIError(f"调用聊天API失败: {str(e)}")
|
||
|
||
def generate_conversation_title(question, answer):
|
||
"""调用DeepSeek API生成会话标题"""
|
||
try:
|
||
prompt = f"请根据以下用户问题和AI回答,为本次对话生成一个简洁的标题(不超过20个字):\n\n**用户问题**:\n{question}\n\n**AI回答**:\n{answer}\n\n生成的标题应概括对话主题,语言简洁明了,使用中文。"
|
||
deepseek_url = "https://api.deepseek.com/v1/chat/completions"
|
||
deepseek_data = {
|
||
"model": "deepseek-pro",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个擅长总结的助手,生成简洁的对话标题。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"max_tokens": 50,
|
||
"temperature": 0.7
|
||
}
|
||
|
||
logger.info(f"调用DeepSeek API生成标题: {deepseek_url}")
|
||
response = requests.post(
|
||
deepseek_url,
|
||
json=deepseek_data,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {settings.DEEPSEEK_API_KEY}"
|
||
}
|
||
)
|
||
|
||
logger.info(f"DeepSeek API响应状态码: {response.status_code}")
|
||
if response.status_code != 200:
|
||
logger.error(f"DeepSeek API调用失败: {response.text}")
|
||
raise ExternalAPIError(f"DeepSeek API调用失败: {response.text}")
|
||
|
||
result = response.json()
|
||
title = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
|
||
return title if title else None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"DeepSeek API网络错误: {str(e)}")
|
||
raise ExternalAPIError(f"DeepSeek API网络错误: {str(e)}")
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"解析DeepSeek API响应JSON失败: {str(e)}")
|
||
raise ExternalAPIError(f"解析响应数据失败: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"调用DeepSeek API失败: {str(e)}")
|
||
raise ExternalAPIError(f"调用DeepSeek API失败: {str(e)}")
|
||
|
||
def get_hit_test_documents(dataset_id, query_text):
|
||
"""调用知识库hit_test接口获取相关文档信息"""
|
||
try:
|
||
url = f"{settings.API_BASE_URL}/api/dataset/{dataset_id}/hit_test"
|
||
params = {
|
||
"query_text": query_text,
|
||
"top_number": 10,
|
||
"similarity": 0.3,
|
||
"search_mode": "blend"
|
||
}
|
||
|
||
logger.info(f"调用hit_test API: {url}")
|
||
response = requests.get(url=url, params=params)
|
||
|
||
logger.info(f"hit_test API响应状态码: {response.status_code}")
|
||
if response.status_code != 200:
|
||
logger.error(f"hit_test API调用失败: {response.status_code}, {response.text}")
|
||
raise ExternalAPIError(f"hit_test API调用失败: {response.status_code}, {response.text}")
|
||
|
||
result = response.json()
|
||
if result.get('code') != 200:
|
||
logger.error(f"hit_test API业务错误: {result}")
|
||
raise ExternalAPIError(f"hit_test API业务错误: {result}")
|
||
|
||
documents = result.get('data', [])
|
||
return [
|
||
{
|
||
"document_name": doc.get("document_name", ""),
|
||
"dataset_name": doc.get("dataset_name", ""),
|
||
"similarity": doc.get("similarity", 0),
|
||
"comprehensive_score": doc.get("comprehensive_score", 0)
|
||
}
|
||
for doc in documents
|
||
]
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"hit_test API网络错误: {str(e)}")
|
||
raise ExternalAPIError(f"hit_test API网络错误: {str(e)}")
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"解析hit_test API响应JSON失败: {str(e)}")
|
||
raise ExternalAPIError(f"解析响应数据失败: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"调用hit_test API失败: {str(e)}")
|
||
raise ExternalAPIError(f"调用hit_test API失败: {str(e)}")
|
||
|
||
def generate_conversation_title_from_deepseek(user_question, assistant_answer):
|
||
"""调用SiliconCloud API生成会话标题,直接基于当前问题和回答内容"""
|
||
try:
|
||
# 从Django设置中获取API密钥
|
||
api_key = settings.SILICON_CLOUD_API_KEY
|
||
if not api_key:
|
||
return "新对话"
|
||
|
||
# 构建提示信息
|
||
prompt = f"请根据用户的问题和助手的回答,生成一个简短的对话标题(不超过20个字)。\n\n用户问题: {user_question}\n\n助手回答: {assistant_answer}"
|
||
|
||
import requests
|
||
|
||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||
|
||
payload = {
|
||
"model": "deepseek-ai/DeepSeek-V3",
|
||
"stream": False,
|
||
"max_tokens": 512,
|
||
"temperature": 0.7,
|
||
"top_p": 0.7,
|
||
"top_k": 50,
|
||
"frequency_penalty": 0.5,
|
||
"n": 1,
|
||
"stop": [],
|
||
"messages": [
|
||
{
|
||
"role": "user",
|
||
"content": prompt
|
||
}
|
||
]
|
||
}
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
response_data = response.json()
|
||
|
||
if response.status_code == 200 and 'choices' in response_data and response_data['choices']:
|
||
title = response_data['choices'][0]['message']['content'].strip()
|
||
return title[:50] # 截断过长的标题
|
||
else:
|
||
logger.error(f"生成标题时出错: {response.text}")
|
||
return "新对话"
|
||
|
||
except Exception as e:
|
||
logger.exception(f"生成对话标题时发生错误: {str(e)}")
|
||
return "新对话"
|