daren/apps/chat/views.py

770 lines
30 KiB
Python
Raw Normal View History

2025-05-23 19:25:35 +08:00
import logging
import json
import traceback
import uuid
from datetime import datetime
from django.db.models import Q, Max, Count
from django.http import HttpResponse, StreamingHttpResponse
from rest_framework import viewsets, status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.decorators import action
from apps.user.models import User
from apps.chat.models import ChatHistory
from apps.chat.serializers import ChatHistorySerializer
2025-05-23 19:54:32 +08:00
from apps.user.authentication import CustomTokenAuthentication
2025-05-23 19:25:35 +08:00
from apps.common.services.chat_service import ChatService
from apps.chat.services.chat_api import (
ExternalAPIError, stream_chat_answer, get_chat_answer,
generate_conversation_title_from_deepseek
)
2025-05-26 11:01:01 +08:00
from apps.chat.services.goal_service import (
get_conversation_summary, get_last_message, generate_recommended_reply
)
2025-05-23 20:35:52 +08:00
from apps.daren_detail.models import CreatorProfile
from apps.daren_detail.serializers import CreatorProfileSerializer, CreatorProfileListSerializer
2025-05-23 19:25:35 +08:00
logger = logging.getLogger(__name__)
class ChatHistoryViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated]
2025-05-23 19:54:32 +08:00
authentication_classes = [CustomTokenAuthentication]
2025-05-23 19:25:35 +08:00
serializer_class = ChatHistorySerializer
queryset = ChatHistory.objects.all()
# 固定知识库ID
DEFAULT_KNOWLEDGE_BASE_ID = "b680a4fa-37be-11f0-a7cb-0242ac120002"
def get_queryset(self):
"""只过滤用户自己的未删除的聊天记录"""
user = self.request.user
return ChatHistory.objects.filter(
user=user,
is_deleted=False
)
def list(self, request):
"""获取对话列表概览"""
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
latest_chats = self.get_queryset().values(
'conversation_id'
).annotate(
latest_id=Max('id'),
message_count=Count('id'),
last_message=Max('created_at')
).order_by('-last_message')
total = latest_chats.count()
start = (page - 1) * page_size
end = start + page_size
chats = latest_chats[start:end]
results = []
for chat in chats:
latest_record = ChatHistory.objects.get(id=chat['latest_id'])
results.append({
'conversation_id': chat['conversation_id'],
'message_count': chat['message_count'],
'last_message': latest_record.content,
'last_time': chat['last_message'].strftime('%Y-%m-%d %H:%M:%S'),
})
return Response({
'code': 200,
'message': '获取成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"获取聊天记录失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def conversation_detail(self, request):
"""获取特定对话的详细信息"""
try:
conversation_id = request.query_params.get('conversation_id')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少conversation_id参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
chat_service = ChatService()
result = chat_service.get_conversation_detail(request.user, conversation_id)
return Response({
'code': 200,
'message': '获取成功',
'data': result
})
except ValueError as e:
return Response({
'code': 404,
'message': str(e),
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"获取对话详情失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取对话详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def available_datasets(self, request):
"""获取默认知识库"""
return Response({
'code': 200,
'message': '获取成功',
'data': []
})
@action(detail=False, methods=['post'])
def create_conversation(self, request):
"""创建会话 - 使用默认知识库ID"""
try:
# 创建一个新的会话ID
conversation_id = str(uuid.uuid4())
logger.info(f"创建新的会话ID: {conversation_id}")
return Response({
'code': 200,
'message': '会话创建成功',
'data': {
'conversation_id': conversation_id
}
})
except Exception as e:
logger.error(f"创建会话失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建会话失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def create(self, request):
"""创建聊天记录 - 使用默认知识库"""
try:
# 验证请求数据
if 'question' not in request.data:
return Response({
'code': 400,
'message': '缺少必填字段: question',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if 'conversation_id' not in request.data:
return Response({
'code': 400,
'message': '缺少必填字段: conversation_id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
conversation_id = request.data['conversation_id']
question = request.data['question']
title = request.data.get('title', 'New chat')
# 准备metadata
metadata = {}
# 创建用户问题记录
question_record = ChatHistory.objects.create(
user=request.user,
knowledge_base_id=self.DEFAULT_KNOWLEDGE_BASE_ID,
conversation_id=conversation_id,
title=title,
role='user',
content=question,
metadata=metadata
)
# 设置外部API需要的ID列表
external_id_list = []
use_stream = request.data.get('stream', True)
if use_stream:
def stream_response():
answer_record = ChatHistory.objects.create(
user=question_record.user,
knowledge_base_id=self.DEFAULT_KNOWLEDGE_BASE_ID,
conversation_id=conversation_id,
title=title,
parent_id=str(question_record.id),
role='assistant',
content="",
metadata=metadata
)
yield f"data: {json.dumps({'code': 200, 'message': '开始流式传输', 'data': {'id': str(answer_record.id), 'conversation_id': conversation_id, 'content': '', 'is_end': False}})}\n\n"
full_content = ""
for data in stream_chat_answer(conversation_id, request.data['question'], external_id_list, metadata):
parsed_data = json.loads(data[5:-2]) # 移除"data: "和"\n\n"
if parsed_data['code'] == 200 and 'content' in parsed_data['data']:
content_part = parsed_data['data']['content']
full_content += content_part
response_data = {
'code': 200,
'message': 'partial',
'data': {
'id': str(answer_record.id),
'conversation_id': conversation_id,
'title': title,
'content': content_part,
'is_end': parsed_data['data']['is_end']
}
}
yield f"data: {json.dumps(response_data)}\n\n"
if parsed_data['data']['is_end']:
answer_record.content = full_content.strip()
answer_record.save()
current_title = ChatHistory.objects.filter(
conversation_id=conversation_id
).exclude(
title__in=["New chat", "新对话", ""]
).values_list('title', flat=True).first()
if current_title:
title_updated = current_title
else:
try:
generated_title = generate_conversation_title_from_deepseek(
request.data['question'], full_content.strip()
)
if generated_title:
ChatHistory.objects.filter(
conversation_id=conversation_id
).update(title=generated_title)
title_updated = generated_title
else:
title_updated = "新对话"
except Exception as e:
logger.error(f"自动生成标题失败: {str(e)}")
title_updated = "新对话"
final_response = {
'code': 200,
'message': '完成',
'data': {
'id': str(answer_record.id),
'conversation_id': conversation_id,
'title': title_updated,
'role': 'assistant',
'content': full_content.strip(),
'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'is_end': True
}
}
yield f"data: {json.dumps(final_response)}\n\n"
break
elif parsed_data['code'] != 200:
yield data
break
if full_content:
try:
answer_record.content = full_content.strip()
answer_record.save()
except Exception as save_error:
logger.error(f"保存部分内容失败: {str(save_error)}")
response = StreamingHttpResponse(
stream_response(),
content_type='text/event-stream',
status=status.HTTP_201_CREATED
)
response['Cache-Control'] = 'no-cache, no-store'
response['Connection'] = 'keep-alive'
return response
else:
logger.info("使用非流式输出模式")
try:
answer = get_chat_answer(external_id_list, request.data['question'])
except ExternalAPIError as e:
logger.error(f"获取回答失败: {str(e)}")
return Response({
'code': 500,
'message': f'获取回答失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
if answer is None:
return Response({
'code': 500,
'message': '获取回答失败',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
answer_record = ChatHistory.objects.create(
user=request.user,
knowledge_base_id=self.DEFAULT_KNOWLEDGE_BASE_ID,
conversation_id=conversation_id,
title=title,
parent_id=str(question_record.id),
role='assistant',
content=answer,
metadata=metadata
)
existing_records = ChatHistory.objects.filter(conversation_id=conversation_id)
should_generate_title = not existing_records.exclude(id=question_record.id).exists() and (not title or title == 'New chat')
if should_generate_title:
try:
generated_title = generate_conversation_title_from_deepseek(
request.data['question'], answer
)
if generated_title:
ChatHistory.objects.filter(conversation_id=conversation_id).update(title=generated_title)
title = generated_title
except Exception as e:
logger.error(f"自动生成标题失败: {str(e)}")
return Response({
'code': 200,
'message': '成功',
'data': {
'id': str(answer_record.id),
'conversation_id': conversation_id,
'title': title,
'role': 'assistant',
'content': answer,
'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"创建聊天记录失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _highlight_keyword(self, text, keyword):
"""高亮关键词"""
if not keyword or not text:
return text
return text.replace(keyword, f'<em class="highlight">{keyword}</em>')
def update(self, request, pk=None):
"""更新聊天记录"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
data = request.data
updateable_fields = ['content', 'metadata']
if 'content' in data:
record.content = data['content']
if 'metadata' in data:
current_metadata = record.metadata or {}
current_metadata.update(data['metadata'])
record.metadata = current_metadata
record.save()
return Response({
'code': 200,
'message': '更新成功',
'data': {
'id': str(record.id),
'conversation_id': record.conversation_id,
'role': record.role,
'content': record.content,
'metadata': record.metadata,
'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
logger.error(f"更新聊天记录失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, pk=None):
"""删除聊天记录(软删除)"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
record.soft_delete()
return Response({
'code': 200,
'message': '删除成功',
'data': None
})
except Exception as e:
logger.error(f"删除聊天记录失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索聊天记录"""
try:
keyword = request.query_params.get('keyword', '').strip()
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
query = self.get_queryset()
if keyword:
query = query.filter(
Q(content__icontains=keyword)
)
if start_date:
query = query.filter(created_at__gte=start_date)
if end_date:
query = query.filter(created_at__lte=end_date)
total = query.count()
start = (page - 1) * page_size
end = start + page_size
records = query.order_by('-created_at')[start:end]
results = [
{
'id': str(record.id),
'conversation_id': record.conversation_id,
'role': record.role,
'content': record.content,
'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'metadata': record.metadata,
'highlights': {'content': self._highlight_keyword(record.content, keyword)} if keyword else {}
}
for record in records
]
return Response({
'code': 200,
'message': '搜索成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"搜索聊天记录失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'搜索失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['delete'])
def delete_conversation(self, request):
"""通过conversation_id删除一组会话"""
try:
conversation_id = request.query_params.get('conversation_id')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少必要参数: conversation_id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
records = self.get_queryset().filter(conversation_id=conversation_id)
if not records.exists():
return Response({
'code': 404,
'message': '未找到该会话或无权限访问',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
records_count = records.count()
for record in records:
record.soft_delete()
return Response({
'code': 200,
'message': '删除成功',
'data': {
'conversation_id': conversation_id,
'deleted_count': records_count
}
})
except Exception as e:
logger.error(f"删除会话失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除会话失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'], url_path='generate-conversation-title')
def generate_conversation_title(self, request):
"""更新会话标题"""
try:
conversation_id = request.query_params.get('conversation_id')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少conversation_id参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查对话是否存在
messages = self.get_queryset().filter(
conversation_id=conversation_id,
is_deleted=False,
user=request.user
).order_by('created_at')
if not messages.exists():
return Response({
'code': 404,
'message': '对话不存在或无权访问',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 检查是否有自定义标题参数
custom_title = request.query_params.get('title')
if not custom_title:
return Response({
'code': 400,
'message': '缺少title参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 更新所有相关记录的标题
ChatHistory.objects.filter(
conversation_id=conversation_id,
user=request.user
).update(title=custom_title)
return Response({
'code': 200,
'message': '更新会话标题成功',
'data': {
'conversation_id': conversation_id,
'title': custom_title
}
})
except Exception as e:
logger.error(f"更新会话标题失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f"更新会话标题失败: {str(e)}",
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-05-23 20:35:52 +08:00
@action(detail=False, methods=['get'])
def get_creator_by_id(self, request):
"""根据ID获取单个达人信息"""
try:
creator_id = request.query_params.get('id')
if not creator_id:
return Response({
'code': 400,
'message': '缺少必要参数: id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return Response({
'code': 404,
'message': '未找到该达人信息',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 处理头像URL
avatar_url = creator.get_avatar_url()
if avatar_url and request:
if creator.avatar:
avatar_url = request.build_absolute_uri(avatar_url)
# 组装返回数据
result = {
'id': creator.id,
'name': creator.name,
'avatar_url': avatar_url,
'category': creator.category,
'mcn': creator.mcn if creator.mcn else '--',
'pricing': f"${creator.pricing}/video" if creator.pricing else '--',
'collab': creator.e_commerce_platforms if creator.e_commerce_platforms else []
}
return Response({
'code': 200,
'message': '获取成功',
'data': result
})
except Exception as e:
logger.error(f"获取达人信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取达人信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-05-26 11:01:01 +08:00
@action(detail=False, methods=['get'])
def generate_summary(self, request):
"""获取对话摘要"""
try:
conversation_id = request.query_params.get('conversation_id')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少必要参数: conversation_id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取对话摘要
summary = get_conversation_summary(conversation_id)
if summary is None:
return Response({
'code': 404,
'message': '未找到对话或无法生成摘要',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
return Response({
'code': 200,
'message': '获取摘要成功',
'data': {
'conversation_id': conversation_id,
'summary': summary
}
})
except Exception as e:
logger.error(f"获取对话摘要失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取对话摘要失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def generate_reply(self, request):
"""生成推荐回复话术"""
try:
data = request.data
conversation_id = data.get('conversation_id')
goal_description = data.get('goal_description')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少必要参数: conversation_id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if not goal_description:
return Response({
'code': 400,
'message': '缺少必要参数: goal_description',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取对话摘要
conversation_summary = get_conversation_summary(conversation_id)
# 获取助手最后一条消息
last_message = get_last_message(conversation_id)
if last_message is None:
return Response({
'code': 404,
'message': '未找到对话或无法获取最后一条消息',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 生成推荐回复
reply, error = generate_recommended_reply(
request.user,
goal_description,
conversation_summary,
last_message
)
if error:
return Response({
'code': 500,
'message': f'生成推荐回复失败: {error}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'code': 200,
'message': '生成推荐回复成功',
'data': {
'conversation_id': conversation_id,
'reply': reply
}
})
except Exception as e:
logger.error(f"生成推荐回复失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'生成推荐回复失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)