import logging import json import traceback import uuid from datetime import datetime from django.db.models import Q, Max, Count from django.http import HttpResponse, StreamingHttpResponse from rest_framework import viewsets, status from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.decorators import action from apps.user.models import User from apps.knowledge_base.models import KnowledgeBase from apps.chat.models import ChatHistory, NegotiationChat from apps.chat.serializers import ChatHistorySerializer from apps.common.services.chat_service import ChatService from apps.expertproducts.models import Negotiation from apps.chat.services.chat_api import ( ExternalAPIError, stream_chat_answer, get_chat_answer, generate_conversation_title, get_hit_test_documents, generate_conversation_title_from_deepseek ) from apps.user.authentication import CustomTokenAuthentication # from apps.permissions.services.permission_service import KnowledgeBasePermissionMixin logger = logging.getLogger(__name__) # class ChatHistoryViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): class ChatHistoryViewSet(viewsets.ModelViewSet): permission_classes = [IsAuthenticated] authentication_classes = [CustomTokenAuthentication] serializer_class = ChatHistorySerializer queryset = ChatHistory.objects.all() def check_knowledge_base_permission(self, kb, user, permission_type): """ 兼容方法:始终返回True表示有权限 此方法替代了原来的权限检查,现在我们不再检查知识库权限 """ return True def get_queryset(self): """确保用户只能看到自己的未删除的聊天记录""" user = self.request.user queryset = ChatHistory.objects.filter( user=user, is_deleted=False ) # 获取status筛选参数 status_filter = self.request.query_params.get('status') if status_filter: # 查找与谈判关联的对话ID negotiation_chats = NegotiationChat.objects.filter( negotiation__status=status_filter ).values_list('conversation_id', flat=True) # 筛选这些对话ID对应的聊天记录 queryset = queryset.filter(conversation_id__in=negotiation_chats) return queryset def list(self, request): """获取对话列表概览""" try: page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) latest_chats = self.get_queryset().values( 'conversation_id' ).annotate( latest_id=Max('id'), message_count=Count('id'), last_message=Max('created_at') ).order_by('-last_message') total = latest_chats.count() start = (page - 1) * page_size end = start + page_size chats = latest_chats[start:end] results = [] for chat in chats: latest_record = ChatHistory.objects.get(id=chat['latest_id']) dataset_info = [] if latest_record.metadata: dataset_id_list = latest_record.metadata.get('dataset_id_list', []) dataset_names = latest_record.metadata.get('dataset_names', []) if dataset_id_list: if dataset_names and len(dataset_names) == len(dataset_id_list): dataset_info = [ {'id': str(id), 'name': name} for id, name in zip(dataset_id_list, dataset_names) ] else: datasets = KnowledgeBase.objects.filter(id__in=dataset_id_list) dataset_info = [ {'id': str(ds.id), 'name': ds.name} for ds in datasets ] results.append({ 'conversation_id': chat['conversation_id'], 'message_count': chat['message_count'], 'last_message': latest_record.content, 'last_time': chat['last_message'].strftime('%Y-%m-%d %H:%M:%S'), 'dataset_id_list': [ds['id'] for ds in dataset_info], 'datasets': dataset_info }) return Response({ 'code': 200, 'message': '获取成功', 'data': { 'total': total, 'page': page, 'page_size': page_size, 'results': results } }) except Exception as e: logger.error(f"获取聊天记录失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取聊天记录失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def conversation_detail(self, request): """获取特定对话的详细信息,并返回creator和product信息""" try: conversation_id = request.query_params.get('conversation_id') if not conversation_id: return Response({ 'code': 400, 'message': '缺少conversation_id参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 查找对话记录 chat_records = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('created_at') if not chat_records.exists(): return Response({ 'code': 404, 'message': '对话不存在或已被删除', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 尝试查找关联的谈判记录 negotiation_chat = NegotiationChat.objects.filter( conversation_id=conversation_id ).select_related('negotiation').first() # 准备基本的返回数据 result = { 'conversation_id': conversation_id, 'messages': [] } # 如果存在谈判关联,添加谈判相关信息 if negotiation_chat: negotiation = negotiation_chat.negotiation # 获取关联的达人信息 creator_id = negotiation_chat.creator_id from apps.daren_detail.models import CreatorProfile creator = CreatorProfile.objects.filter(id=creator_id).first() # 获取关联的产品信息 product_id = negotiation_chat.product_id from apps.brands.models import Product product = Product.objects.filter(id=product_id).first() # 添加谈判相关信息 result['negotiation'] = { 'id': str(negotiation.id), 'status': negotiation.status, 'current_round': negotiation.current_round, 'context': negotiation.context, 'creator': { 'id': creator.id if creator else None, 'name': creator.name if creator else "未知", 'email': creator.email if creator and hasattr(creator, 'email') else None } if creator else {"id": None, "name": "未知"}, 'product': { 'id': str(product.id) if product else None, 'name': product.name if product else "未知", 'description': product.description if product and hasattr(product, 'description') else "", 'brand': product.brand.name if product and hasattr(product, 'brand') else "" } if product else {"id": None, "name": "未知"} } # 整理对话消息 for record in chat_records: result['messages'].append({ 'id': str(record.id), 'role': record.role, 'content': record.content, 'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'metadata': record.metadata }) return Response({ 'code': 200, 'message': '获取成功', 'data': result }) except ValueError as e: return Response({ 'code': 404, 'message': str(e), 'data': None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"获取对话详情失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取对话详情失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def available_datasets(self, request): """获取用户可访问的知识库列表""" try: user = request.user # 直接返回所有知识库,不做权限检查 accessible_datasets = KnowledgeBase.objects.all() return Response({ 'code': 200, 'message': '获取成功', 'data': [ { 'id': str(ds.id), 'name': ds.name, 'type': ds.type, 'department': ds.department, 'description': ds.desc } for ds in accessible_datasets ] }) except Exception as e: logger.error(f"获取可用知识库列表失败: {str(e)}") return Response({ 'code': 500, 'message': f'获取可用知识库列表失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['post']) def create_conversation(self, request): """创建会话 - 关联到谈判ID""" try: data = request.data # 检查谈判ID negotiation_id = data.get('negotiation_id') if not negotiation_id: return Response({ 'code': 400, 'message': '缺少必填字段: negotiation_id', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 验证谈判存在性 try: negotiation = Negotiation.objects.get(id=negotiation_id) except Negotiation.DoesNotExist: return Response({ 'code': 404, 'message': f'谈判记录不存在: {negotiation_id}', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 获取达人邮箱 creator = negotiation.creator creator_email = creator.email if hasattr(creator, 'email') else None # 创建一个新的会话ID conversation_id = str(uuid.uuid4()) logger.info(f"创建新的会话ID: {conversation_id}") # 创建谈判关联 NegotiationChat.objects.create( negotiation=negotiation, conversation_id=conversation_id, creator_id=creator.id, product_id=negotiation.product.id ) # 准备metadata metadata = { 'negotiation_id': str(negotiation_id), 'status': negotiation.status, 'creator_id': str(creator.id), 'creator_email': creator_email } return Response({ 'code': 200, 'message': '会话创建成功', 'data': { 'conversation_id': conversation_id, 'negotiation_id': str(negotiation_id), 'status': negotiation.status, 'creator': { 'id': str(creator.id), 'name': creator.name if hasattr(creator, 'name') else "未知", 'email': creator_email } } }) except Exception as e: logger.error(f"创建会话失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'创建会话失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def create(self, request): """创建聊天记录""" try: chat_service = ChatService() question_record, conversation_id, metadata, knowledge_bases, external_id_list = chat_service.create_chat_record( request.user, request.data, request.data.get('conversation_id') ) use_stream = request.data.get('stream', True) title = request.data.get('title', 'New chat') if use_stream: def stream_response(): answer_record = ChatHistory.objects.create( user=question_record.user, knowledge_base=knowledge_bases[0], conversation_id=conversation_id, title=title, parent_id=str(question_record.id), role='assistant', content="", metadata=metadata ) yield f"data: {json.dumps({'code': 200, 'message': '开始流式传输', 'data': {'id': str(answer_record.id), 'conversation_id': conversation_id, 'content': '', 'is_end': False}})}\n\n" full_content = "" for data in stream_chat_answer(conversation_id, request.data['question'], external_id_list, metadata): parsed_data = json.loads(data[5:-2]) # 移除"data: "和"\n\n" if parsed_data['code'] == 200 and 'content' in parsed_data['data']: content_part = parsed_data['data']['content'] full_content += content_part response_data = { 'code': 200, 'message': 'partial', 'data': { 'id': str(answer_record.id), 'conversation_id': conversation_id, 'title': title, 'content': content_part, 'is_end': parsed_data['data']['is_end'] } } yield f"data: {json.dumps(response_data)}\n\n" if parsed_data['data']['is_end']: answer_record.content = full_content.strip() answer_record.save() current_title = ChatHistory.objects.filter( conversation_id=conversation_id ).exclude( title__in=["New chat", "新对话", ""] ).values_list('title', flat=True).first() if current_title: title_updated = current_title else: try: generated_title = generate_conversation_title( request.data['question'], full_content.strip() ) if generated_title: ChatHistory.objects.filter( conversation_id=conversation_id ).update(title=generated_title) title_updated = generated_title else: title_updated = "新对话" except ExternalAPIError as e: logger.error(f"自动生成标题失败: {str(e)}") title_updated = "新对话" final_response = { 'code': 200, 'message': '完成', 'data': { 'id': str(answer_record.id), 'conversation_id': conversation_id, 'title': title_updated, 'dataset_id_list': metadata.get('dataset_id_list', []), 'dataset_names': metadata.get('dataset_names', []), 'role': 'assistant', 'content': full_content.strip(), 'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'is_end': True } } yield f"data: {json.dumps(final_response)}\n\n" break elif parsed_data['code'] != 200: yield data break if full_content: try: answer_record.content = full_content.strip() answer_record.save() except Exception as save_error: logger.error(f"保存部分内容失败: {str(save_error)}") response = StreamingHttpResponse( stream_response(), content_type='text/event-stream', status=status.HTTP_201_CREATED ) response['Cache-Control'] = 'no-cache, no-store' response['Connection'] = 'keep-alive' return response else: logger.info("使用非流式输出模式") try: answer = get_chat_answer(external_id_list, request.data['question']) except ExternalAPIError as e: logger.error(f"获取回答失败: {str(e)}") return Response({ 'code': 500, 'message': f'获取回答失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) if answer is None: return Response({ 'code': 500, 'message': '获取回答失败', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) answer_record = ChatHistory.objects.create( user=request.user, knowledge_base=knowledge_bases[0], conversation_id=conversation_id, title=title, parent_id=str(question_record.id), role='assistant', content=answer, metadata=metadata ) existing_records = ChatHistory.objects.filter(conversation_id=conversation_id) should_generate_title = not existing_records.exclude(id=question_record.id).exists() and (not title or title == 'New chat') if should_generate_title: try: generated_title = generate_conversation_title( request.data['question'], answer ) if generated_title: ChatHistory.objects.filter(conversation_id=conversation_id).update(title=generated_title) title = generated_title except ExternalAPIError as e: logger.error(f"自动生成标题失败: {str(e)}") return Response({ 'code': 200, 'message': '成功', 'data': { 'id': str(answer_record.id), 'conversation_id': conversation_id, 'title': title, 'dataset_id_list': metadata.get('dataset_id_list', []), 'dataset_names': metadata.get('dataset_names', []), 'role': 'assistant', 'content': answer, 'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S') } }, status=status.HTTP_201_CREATED) except ValueError as e: return Response({ 'code': 400, 'message': str(e), 'data': None }, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error(f"创建聊天记录失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'创建聊天记录失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # @action(detail=False, methods=['post']) # def hit_test(self, request): # """获取问题与知识库文档的匹配度""" # try: # data = request.data # if 'question' not in data or 'dataset_id_list' not in data or not data['dataset_id_list']: # return Response({ # 'code': 400, # 'message': '缺少必填字段: question 或 dataset_id_list', # 'data': None # }, status=status.HTTP_400_BAD_REQUEST) # question = data['question'] # dataset_ids = data['dataset_id_list'] # if not isinstance(dataset_ids, list): # try: # dataset_ids = json.loads(dataset_ids) # if not isinstance(dataset_ids, list): # dataset_ids = [dataset_ids] # except (json.JSONDecodeError, TypeError): # dataset_ids = [dataset_ids] # external_id_list = [] # for kb_id in dataset_ids: # kb = KnowledgeBase.objects.filter(id=kb_id).first() # if not kb: # return Response({ # 'code': 404, # 'message': f'知识库不存在: {kb_id}', # 'data': None # }, status=status.HTTP_404_NOT_FOUND) # if not self.check_knowledge_base_permission(kb, request.user, 'read'): # return Response({ # 'code': 403, # 'message': f'无权访问知识库: {kb.name}', # 'data': None # }, status=status.HTTP_403_FORBIDDEN) # if kb.external_id: # external_id_list.append(str(kb.external_id)) # if not external_id_list: # return Response({ # 'code': 400, # 'message': '没有有效的知识库external_id', # 'data': None # }, status=status.HTTP_400_BAD_REQUEST) # all_documents = [] # for dataset_id in external_id_list: # try: # doc_info = get_hit_test_documents(dataset_id, question) # if doc_info: # all_documents.extend(doc_info) # except ExternalAPIError as e: # logger.error(f"调用hit_test失败: 知识库ID={dataset_id}, 错误={str(e)}") # continue # 宽松处理,跳过失败的知识库 # all_documents = sorted(all_documents, key=lambda x: x.get('similarity', 0), reverse=True) # return Response({ # 'code': 200, # 'message': '成功', # 'data': { # 'question': question, # 'matched_documents': all_documents, # 'total_count': len(all_documents) # } # }) # except Exception as e: # logger.error(f"hit_test接口调用失败: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # return Response({ # 'code': 500, # 'message': f'hit_test接口调用失败: {str(e)}', # 'data': None # }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def _highlight_keyword(self, text, keyword): """高亮关键词""" if not keyword or not text: return text return text.replace(keyword, f'{keyword}') def update(self, request, pk=None): """更新聊天记录""" try: record = self.get_queryset().filter(id=pk).first() if not record: return Response({ 'code': 404, 'message': '记录不存在或无权限', 'data': None }, status=status.HTTP_404_NOT_FOUND) data = request.data updateable_fields = ['content', 'metadata'] if 'content' in data: record.content = data['content'] if 'metadata' in data: current_metadata = record.metadata or {} current_metadata.update(data['metadata']) record.metadata = current_metadata record.save() return Response({ 'code': 200, 'message': '更新成功', 'data': { 'id': str(record.id), 'conversation_id': record.conversation_id, 'role': record.role, 'content': record.content, 'metadata': record.metadata, 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: logger.error(f"更新聊天记录失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'更新聊天记录失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def destroy(self, request, pk=None): """删除聊天记录(软删除)""" try: record = self.get_queryset().filter(id=pk).first() if not record: return Response({ 'code': 404, 'message': '记录不存在或无权限', 'data': None }, status=status.HTTP_404_NOT_FOUND) record.soft_delete() return Response({ 'code': 200, 'message': '删除成功', 'data': None }) except Exception as e: logger.error(f"删除聊天记录失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'删除聊天记录失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def search(self, request): """搜索聊天记录""" try: keyword = request.query_params.get('keyword', '').strip() dataset_id = request.query_params.get('dataset_id') start_date = request.query_params.get('start_date') end_date = request.query_params.get('end_date') page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) # 获取status筛选参数 status_filter = request.query_params.get('status') query = self.get_queryset() if keyword: query = query.filter( Q(content__icontains=keyword) | Q(knowledge_base__name__icontains=keyword) ) if dataset_id: # 移除权限检查,直接筛选知识库ID query = query.filter(knowledge_base__id=dataset_id) # 添加status筛选条件 if status_filter: # 查找与谈判关联的对话ID negotiation_chats = NegotiationChat.objects.filter( negotiation__status=status_filter ).values_list('conversation_id', flat=True) # 筛选这些对话ID对应的聊天记录 query = query.filter(conversation_id__in=negotiation_chats) if start_date: query = query.filter(created_at__gte=start_date) if end_date: query = query.filter(created_at__lte=end_date) total = query.count() start = (page - 1) * page_size end = start + page_size records = query.order_by('-created_at')[start:end] results = [ { 'id': str(record.id), 'conversation_id': record.conversation_id, 'dataset_id': str(record.knowledge_base.id), 'dataset_name': record.knowledge_base.name, 'role': record.role, 'content': record.content, 'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'metadata': record.metadata, 'highlights': {'content': self._highlight_keyword(record.content, keyword)} if keyword else {} } for record in records ] return Response({ 'code': 200, 'message': '搜索成功', 'data': { 'total': total, 'page': page, 'page_size': page_size, 'results': results } }) except Exception as e: logger.error(f"搜索聊天记录失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'搜索失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['delete']) def delete_conversation(self, request): """通过conversation_id删除一组会话""" try: conversation_id = request.query_params.get('conversation_id') if not conversation_id: return Response({ 'code': 400, 'message': '缺少必要参数: conversation_id', 'data': None }, status=status.HTTP_400_BAD_REQUEST) records = self.get_queryset().filter(conversation_id=conversation_id) if not records.exists(): return Response({ 'code': 404, 'message': '未找到该会话或无权限访问', 'data': None }, status=status.HTTP_404_NOT_FOUND) records_count = records.count() for record in records: record.soft_delete() return Response({ 'code': 200, 'message': '删除成功', 'data': { 'conversation_id': conversation_id, 'deleted_count': records_count } }) except Exception as e: logger.error(f"删除会话失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'删除会话失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get'], url_path='generate-conversation-title') def generate_conversation_title(self, request): """更新会话标题""" try: conversation_id = request.query_params.get('conversation_id') if not conversation_id: return Response({ 'code': 400, 'message': '缺少conversation_id参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 检查对话是否存在 messages = self.get_queryset().filter( conversation_id=conversation_id, is_deleted=False, user=request.user ).order_by('created_at') if not messages.exists(): return Response({ 'code': 404, 'message': '对话不存在或无权访问', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 检查是否有自定义标题参数 custom_title = request.query_params.get('title') if not custom_title: return Response({ 'code': 400, 'message': '缺少title参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 更新所有相关记录的标题 ChatHistory.objects.filter( conversation_id=conversation_id, user=request.user ).update(title=custom_title) return Response({ 'code': 200, 'message': '更新会话标题成功', 'data': { 'conversation_id': conversation_id, 'title': custom_title } }) except Exception as e: logger.error(f"更新会话标题失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f"更新会话标题失败: {str(e)}", 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['post']) def send_email_to_creator(self, request): """通过对话ID发送邮件给达人""" try: data = request.data conversation_id = data.get('conversation_id') if not conversation_id: return Response({ 'code': 400, 'message': '缺少必填字段: conversation_id', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 验证邮件参数 subject = data.get('subject', '') # 设置默认主题 body = data.get('body') from_email = data.get('from_email') if not all([body, from_email]): return Response({ 'code': 400, 'message': '缺少必要的邮件参数,请提供body和from_email字段', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 查找关联的谈判聊天记录 negotiation_chat = NegotiationChat.objects.filter( conversation_id=conversation_id ).first() if not negotiation_chat: return Response({ 'code': 404, 'message': f'找不到与对话ID关联的谈判聊天记录: {conversation_id}', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 获取达人ID并查询达人信息 creator_id = negotiation_chat.creator_id from apps.daren_detail.models import CreatorProfile creator = CreatorProfile.objects.filter(id=creator_id).first() if not creator: return Response({ 'code': 404, 'message': f'找不到达人信息: {creator_id}', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 获取达人邮箱 creator_email = creator.email if not creator_email: return Response({ 'code': 404, 'message': '达人没有关联的邮箱地址', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 处理附件 attachments = [] for file_key, file_obj in request.FILES.items(): if file_obj: # 保存临时文件 import os from django.conf import settings tmp_path = os.path.join(settings.MEDIA_ROOT, 'tmp', f'{file_key}_{file_obj.name}') os.makedirs(os.path.dirname(tmp_path), exist_ok=True) with open(tmp_path, 'wb+') as destination: for chunk in file_obj.chunks(): destination.write(chunk) attachments.append({ 'path': tmp_path, 'filename': file_obj.name }) # 直接调用Gmail服务发送邮件,而不是通过视图 from apps.gmail.services.gmail_service import GmailService success, result = GmailService.send_email( request.user, from_email, creator_email, subject or f'与{creator.name}的谈判', body or '', attachments ) if success: # 发送成功后,将邮件记录添加到对话历史 # 查询一个有效的知识库ID default_kb = KnowledgeBase.objects.first() if not default_kb: logger.warning("未找到有效的知识库,无法记录邮件发送历史") # 返回成功但提示无法记录历史 return Response({ 'code': 200, 'message': '邮件发送成功,但未能记录到对话历史', 'data': { 'conversation_id': conversation_id, 'creator_email': creator_email, 'creator_name': creator.name, 'message_id': result } }) ChatHistory.objects.create( user=request.user, knowledge_base=default_kb, # 使用查询到的默认知识库 conversation_id=conversation_id, role='system', content=f"发送邮件给 {creator.name}: {subject}\n\n{body}", metadata={ 'email_sent': True, 'to': creator_email, 'from': from_email, 'subject': subject, 'negotiation_id': str(negotiation_chat.negotiation.id) if negotiation_chat.negotiation else None, 'status': negotiation_chat.negotiation.status if negotiation_chat.negotiation else None, 'has_attachments': len(attachments) > 0 } ) return Response({ 'code': 200, 'message': '邮件发送成功', 'data': { 'conversation_id': conversation_id, 'creator_email': creator_email, 'creator_name': creator.name, 'message_id': result } }) else: # 返回发送失败信息 return Response({ 'code': 500, 'message': f'邮件发送失败: {result}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"发送邮件给达人失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'发送邮件给达人失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def get_negotiation_chats(self, request): """获取谈判聊天列表,返回NegotiationChat表的数据及关联的达人和产品信息""" try: # 获取分页和筛选参数 page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) status_filter = request.query_params.get('status') # 查询NegotiationChat表 query = NegotiationChat.objects.all().select_related('negotiation') # 如果提供了status参数,则过滤谈判状态 if status_filter: query = query.filter(negotiation__status=status_filter) # 获取总数量 total = query.count() # 分页 start = (page - 1) * page_size end = start + page_size chats = query.order_by('-updated_at')[start:end] results = [] for chat in chats: # 获取关联的谈判 negotiation = chat.negotiation # 获取关联的达人信息 creator_id = chat.creator_id from apps.daren_detail.models import CreatorProfile creator = CreatorProfile.objects.filter(id=creator_id).first() # 获取关联的产品信息 product_id = chat.product_id from apps.brands.models import Product product = Product.objects.filter(id=product_id).first() # 获取最新的聊天记录 latest_message = ChatHistory.objects.filter( conversation_id=chat.conversation_id, is_deleted=False ).order_by('-created_at').first() # 计算聊天记录数量 message_count = ChatHistory.objects.filter( conversation_id=chat.conversation_id, is_deleted=False ).count() # 构建响应数据 chat_data = { 'conversation_id': chat.conversation_id, 'negotiation_id': str(negotiation.id), 'negotiation_status': negotiation.status, 'current_round': negotiation.current_round, 'context': negotiation.context, 'creator': { 'id': creator.id if creator else None, 'name': creator.name if creator else "未知", 'email': creator.email if creator and hasattr(creator, 'email') else None, } if creator else {"id": None, "name": "未知"}, 'product': { 'id': str(product.id) if product else None, 'name': product.name if product else "未知", 'description': product.description if product and hasattr(product, 'description') else "", 'brand': product.brand.name if product and hasattr(product, 'brand') else "" } if product else {"id": None, "name": "未知"}, 'message_count': message_count, 'last_message': latest_message.content if latest_message else '', 'last_time': latest_message.created_at.strftime('%Y-%m-%d %H:%M:%S') if latest_message else chat.updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'created_at': chat.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': chat.updated_at.strftime('%Y-%m-%d %H:%M:%S') } results.append(chat_data) return Response({ 'code': 200, 'message': '获取成功', 'data': { 'total': total, 'page': page, 'page_size': page_size, 'results': results } }) except Exception as e: logger.error(f"获取谈判聊天列表失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取谈判聊天列表失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)