From 7ffb0de74c50ad9133d6f4a296f1b51e2435f385 Mon Sep 17 00:00:00 2001 From: wanjia Date: Fri, 11 Apr 2025 15:22:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DeepSeek=20API=E8=B0=83?= =?UTF-8?q?=E7=94=A8=EF=BC=8C=E6=94=B9=E8=BF=9B=E6=8E=A8=E8=8D=90=E5=9B=9E?= =?UTF-8?q?=E5=A4=8D=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- role_based_system/settings.py | 4 + user_management/gmail_integration.py | 201 +++++++++++++-- ...08_gmailcredential_gmail_email_and_more.py | 18 ++ user_management/models.py | 5 +- user_management/views.py | 239 +++++++++++++++++- 5 files changed, 447 insertions(+), 20 deletions(-) create mode 100644 user_management/migrations/0008_gmailcredential_gmail_email_and_more.py diff --git a/role_based_system/settings.py b/role_based_system/settings.py index cc8939b..b00e85e 100644 --- a/role_based_system/settings.py +++ b/role_based_system/settings.py @@ -316,4 +316,8 @@ USE_I18N = True USE_L10N = True USE_TZ = True # 将此项设置为True以启用时区支持 +# DeepSeek API配置 +DEEPSEEK_API_KEY = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" +# 注意:这里需要更新为有效的DeepSeek API密钥 + diff --git a/user_management/gmail_integration.py b/user_management/gmail_integration.py index e726464..3ef4e42 100644 --- a/user_management/gmail_integration.py +++ b/user_management/gmail_integration.py @@ -17,6 +17,7 @@ from email.mime.base import MIMEBase from email import encoders import warnings import mimetypes +import requests # 忽略oauth2client相关的所有警告 warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth') @@ -28,7 +29,7 @@ from apiclient import discovery from httplib2 import Http from oauth2client import file, client, tools -from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment +from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile logger = logging.getLogger(__name__) @@ -1446,7 +1447,7 @@ class GmailIntegration: """处理新收到的邮件""" try: # 导入所需模型 - from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory + from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile # 获取邮件详情 message = gmail_integration.gmail_service.users().messages().get( @@ -1567,23 +1568,27 @@ class GmailIntegration: date_str = email_data.get('date', '') try: date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') - # 不应用时区转换 - aware_date = date_obj + # 确保时区处理正确 + from django.utils import timezone + aware_date = timezone.make_aware(date_obj) if not timezone.is_aware(date_obj) else date_obj except (ValueError, TypeError): logger.warning(f"无法解析邮件日期: {date_str},使用当前时间") - aware_date = datetime.now() + aware_date = timezone.now() - # 查找适合的parent_id: 使用ID排序而非时间 + # 查找适合的parent_id: 使用创建时间排序 try: - previous_messages = ChatHistory.objects.filter( + # 查找该对话中最新的消息 + latest_message = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False - ).order_by('-id')[:1] + ).order_by('-created_at').first() parent_id = None - if previous_messages: - parent_id = str(previous_messages[0].id) - logger.info(f"找到父消息ID: {parent_id}") + if latest_message: + parent_id = str(latest_message.id) + logger.info(f"找到最新消息ID作为parent_id: {parent_id}, 创建时间: {latest_message.created_at}") + else: + logger.info(f"对话 {conversation_id} 没有现有消息,不设置parent_id") except Exception as e: logger.error(f"查找父消息失败: {str(e)}") logger.error(traceback.format_exc()) @@ -1620,16 +1625,16 @@ class GmailIntegration: if attachment_records: metadata['message_attachments'] = attachment_records - # 创建聊天记录,不指定时间 + # 使用之前查找到的parent_id和aware_date创建聊天记录 chat_message = ChatHistory.objects.create( user=gmail_integration.user, knowledge_base=knowledge_base, conversation_id=conversation_id, - parent_id=parent_id, - role=role, # 使用上面确定的role变量,而不是硬编码为'assistant' + parent_id=parent_id, # 使用之前查找到的parent_id + role=role, # 使用上面确定的role变量 content=f"[{email_data['subject']}] {email_data['body']}", - metadata=metadata - # 不设置created_at,使用数据库默认时间 + metadata=metadata, + created_at=aware_date # 设置正确的创建时间 ) # 更新知识库文档 @@ -1695,6 +1700,85 @@ class GmailIntegration: logger.info(f"已创建系统通知记录: 用户 {gmail_integration.user.id} 的新Gmail消息") except Exception as notification_error: logger.error(f"创建系统通知记录失败: {str(notification_error)}") + + # 如果消息是达人发送的,并且用户启用了自动推荐回复功能,则生成推荐回复 + if role == 'user' and talent_mapping: + try: + # 检查用户是否启用了自动推荐回复功能 + user_profile, created = UserProfile.objects.get_or_create(user=gmail_integration.user) + + if user_profile.auto_recommend_reply: + logger.info(f"用户 {gmail_integration.user.id} 已启用自动推荐回复功能,生成推荐回复") + + # 获取对话历史以传递给DeepSeek API + conversation_messages = ChatHistory.objects.filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('created_at') + + # 构建对话历史 + conversation_history = [] + for message in conversation_messages: + conversation_history.append({ + 'role': 'user' if message.role == 'user' else 'assistant', + 'content': message.content + }) + + # 限制对话历史长度,只保留最近的5条消息,避免超出token限制 + recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history + messages.extend(recent_messages) + + # 确保最后一条消息是用户消息,如果不是,添加一个提示 + if not recent_messages or recent_messages[-1]['role'] != 'user': + # 添加一个系统消息作为用户的最后一条消息 + messages.append({ + "role": "user", + "content": "请针对我之前的消息提供详细的回复建议。" + }) + + # 调用DeepSeek API生成推荐回复 + recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history) + + if recommended_reply: + # 创建推荐回复通知 + recommend_notification_data = { + "type": "notification", + "data": { + "message_type": "recommended_reply", + "conversation_id": conversation_id, + "message": { + "id": str(chat_message.id), + "role": role, + "content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}", + "sender": sender_email, + "subject": email_data['subject'], + "recommended_reply": recommended_reply + } + } + } + + # 发送推荐回复WebSocket通知 + async_to_sync(channel_layer.group_send)( + f"notification_user_{gmail_integration.user.id}", + recommend_notification_data + ) + logger.info(f"已发送推荐回复通知: 用户 {gmail_integration.user.id}") + + # 创建推荐回复系统通知 + Notification.objects.create( + sender=gmail_integration.user, + receiver=gmail_integration.user, + title="新推荐回复", + content=f"系统为来自 {sender_email} 的邮件生成了推荐回复", + type="system_notice", + related_resource=conversation_id + ) + logger.info(f"已创建推荐回复系统通知: 用户 {gmail_integration.user.id}") + else: + logger.warning(f"生成推荐回复失败: 用户 {gmail_integration.user.id}, 对话 {conversation_id}") + except Exception as recommend_error: + logger.error(f"处理推荐回复失败: {str(recommend_error)}") + logger.error(traceback.format_exc()) except Exception as ws_error: logger.error(f"发送WebSocket通知失败: {str(ws_error)}") logger.error(traceback.format_exc()) @@ -1707,7 +1791,90 @@ class GmailIntegration: logger.error(f"处理新邮件失败: {str(e)}") logger.error(traceback.format_exc()) return False - + + def _get_recommended_reply_from_deepseek(self, conversation_history): + """调用DeepSeek V3 API生成推荐回复""" + try: + # 使用有效的API密钥 + api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" + # 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取 + # 从Django设置中获取密钥 + from django.conf import settings + if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY: + api_key = settings.DEEPSEEK_API_KEY + + url = "https://api.siliconflow.cn/v1/chat/completions" + + # 直接使用默认系统消息,不进行复杂处理,尽量模仿文档示例 + system_message = { + "role": "system", + "content": "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。即使用户消息很短或不明确,也必须提供有实质内容的回复。禁止返回空白内容。回复应该有至少100个字符。" + } + + messages = [system_message] + + # 限制对话历史长度,只保留最近的5条消息,避免超出token限制 + recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history + messages.extend(recent_messages) + + # 确保最后一条消息是用户消息,如果不是,添加一个提示 + if not recent_messages or recent_messages[-1]['role'] != 'user': + # 添加一个系统消息作为用户的最后一条消息 + messages.append({ + "role": "user", + "content": "请针对我之前的消息提供详细的回复建议。" + }) + + # 完全按照文档提供的参数格式构建请求 + payload = { + "model": "deepseek-ai/DeepSeek-V3", + "messages": messages, + "stream": False, + "max_tokens": 1024, # 增加token上限 + "temperature": 0.7, # 提高多样性 + "top_p": 0.9, + "top_k": 50, + "frequency_penalty": 0.5, + "presence_penalty": 0.2, # 添加新参数 + "n": 1, + "stop": [], + "response_format": { + "type": "text" + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + logger.info(f"开始调用DeepSeek API生成推荐回复") + response = requests.post(url, json=payload, headers=headers) + + if response.status_code != 200: + logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}") + return None + + result = response.json() + logger.debug(f"DeepSeek API返回: {result}") + + # 提取回复内容 + if 'choices' in result and len(result['choices']) > 0: + reply = result['choices'][0]['message']['content'] + # 如果返回的内容为空,直接返回None + if not reply or reply.strip() == '': + logger.warning("DeepSeek API返回的回复内容为空") + return None + return reply + + logger.warning(f"DeepSeek API返回格式异常: {result}") + return None + + except Exception as e: + logger.error(f"调用DeepSeek API失败: {str(e)}") + logger.error(traceback.format_exc()) + return None + def get_attachment_by_conversation(self, conversation_id): """获取特定对话的所有附件""" try: diff --git a/user_management/migrations/0008_gmailcredential_gmail_email_and_more.py b/user_management/migrations/0008_gmailcredential_gmail_email_and_more.py new file mode 100644 index 0000000..ea9131f --- /dev/null +++ b/user_management/migrations/0008_gmailcredential_gmail_email_and_more.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.5 on 2025-04-11 04:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('user_management', '0007_alter_gmailcredential_options_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='userprofile', + name='auto_recommend_reply', + field=models.BooleanField(default=False, help_text='是否启用自动推荐回复功能'), + ), + ] diff --git a/user_management/models.py b/user_management/models.py index e7d8137..b7d43fa 100644 --- a/user_management/models.py +++ b/user_management/models.py @@ -396,12 +396,13 @@ class UserProfile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile') department = models.CharField(max_length=100, blank=True, help_text="部门") group = models.CharField(max_length=100, blank=True, help_text="小组") + auto_recommend_reply = models.BooleanField(default=False, help_text="是否启用自动推荐回复功能") class Meta: db_table = 'user_profiles' - + def __str__(self): - return f"{self.user.username}'s profile" + return f"{self.user.username}的个人资料" class KnowledgeBasePermission(models.Model): """知识库权限模型 - 实现知识库和用户的多对多关系""" diff --git a/user_management/views.py b/user_management/views.py index b96e24e..8397821 100644 --- a/user_management/views.py +++ b/user_management/views.py @@ -59,7 +59,8 @@ from .models import ( KnowledgeBaseDocument, GmailCredential, GmailTalentMapping, - GmailAttachment + GmailAttachment, + UserProfile ) from .serializers import ( UserSerializer, @@ -1656,6 +1657,242 @@ class ChatHistoryViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + @action(detail=False, methods=['get'], url_path='generate-recommended-reply') + def generate_recommended_reply(self, request): + """获取达人消息并生成推荐回复""" + try: + conversation_id = request.query_params.get('conversation_id') + if not conversation_id: + return Response({ + 'code': 400, + 'message': '缺少conversation_id参数', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 获取对话历史,确保按时间顺序排序 + messages = self.get_queryset().filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('created_at') + + if not messages.exists(): + return Response({ + 'code': 404, + 'message': '对话不存在', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 获取最新的消息,检查是否是达人发送的 + latest_message = messages.last() + + # 如果最后一条不是用户消息,尝试查找最新的用户消息 + if latest_message.role != 'user': + user_messages = messages.filter(role='user') + if user_messages.exists(): + latest_message = user_messages.order_by('-created_at').first() + else: + return Response({ + 'code': 400, + 'message': '对话中没有达人发送的消息,无法生成推荐回复', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 提取对话历史以传递给DeepSeek API + conversation_history = [] + for message in messages: + conversation_history.append({ + 'role': 'user' if message.role == 'user' else 'assistant', + 'content': message.content + }) + + # 调用DeepSeek V3 API生成推荐回复 + recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history) + + if not recommended_reply: + return Response({ + 'code': 400, # 改为400,表示可以重试 + 'message': 'API暂时无法生成推荐回复,请稍后再试', + 'data': { + 'conversation_id': conversation_id, + 'latest_message': { + 'id': str(latest_message.id), + 'content': latest_message.content, + 'role': latest_message.role, + 'created_at': latest_message.created_at.strftime('%Y-%m-%d %H:%M:%S') + }, + 'recommended_reply': None + } + }, status=status.HTTP_400_BAD_REQUEST) + + return Response({ + 'code': 200, + 'message': '生成推荐回复成功', + 'data': { + 'conversation_id': conversation_id, + 'latest_message': { + 'id': str(latest_message.id), + 'content': latest_message.content, + 'role': latest_message.role, + 'created_at': latest_message.created_at.strftime('%Y-%m-%d %H:%M:%S') + }, + 'recommended_reply': recommended_reply + } + }) + + except Exception as e: + logger.error(f"生成推荐回复失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f"生成推荐回复失败: {str(e)}", + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + def _get_recommended_reply_from_deepseek(self, conversation_history): + """调用DeepSeek V3 API生成推荐回复""" + try: + # 使用有效的API密钥 + api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" + # 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取 + # 从Django设置中获取密钥 + from django.conf import settings + if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY: + api_key = settings.DEEPSEEK_API_KEY + + url = "https://api.siliconflow.cn/v1/chat/completions" + + # 直接使用默认系统消息,不进行复杂处理,尽量模仿文档示例 + system_message = { + "role": "system", + "content": "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。即使用户消息很短或不明确,也必须提供有实质内容的回复。禁止返回空白内容。回复应该有至少100个字符。" + } + + messages = [system_message] + + + # 限制对话历史长度,只保留最近的5条消息,避免超出token限制 + recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history + messages.extend(recent_messages) + + # 确保最后一条消息是用户消息,如果不是,添加一个提示 + if not recent_messages or recent_messages[-1]['role'] != 'user': + # 添加一个系统消息作为用户的最后一条消息 + messages.append({ + "role": "user", + "content": "请针对我之前的消息提供详细的回复建议。" + }) + + # 完全按照文档提供的参数格式构建请求 + payload = { + "model": "deepseek-ai/DeepSeek-V3", + "messages": messages, + "stream": False, + "max_tokens": 1024, # 增加token上限 + "temperature": 0.7, # 提高多样性 + "top_p": 0.9, + "top_k": 50, + "frequency_penalty": 0.5, + "presence_penalty": 0.2, # 添加新参数 + "n": 1, + "stop": [], + "response_format": { + "type": "text" + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + logger.info(f"开始调用DeepSeek API生成推荐回复") + response = requests.post(url, json=payload, headers=headers) + + if response.status_code != 200: + logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}") + return None + + result = response.json() + logger.debug(f"DeepSeek API返回: {result}") + + # 提取回复内容 + if 'choices' in result and len(result['choices']) > 0: + reply = result['choices'][0]['message']['content'] + # 如果返回的内容为空,直接返回None + if not reply or reply.strip() == '': + logger.warning("DeepSeek API返回的回复内容为空") + return None + return reply + + logger.warning(f"DeepSeek API返回格式异常: {result}") + return None + + except Exception as e: + logger.error(f"调用DeepSeek API失败: {str(e)}") + logger.error(traceback.format_exc()) + return None + + def _generate_fallback_reply(self, messages): + """此功能已禁用,不再生成备用回复""" + return None + + @action(detail=False, methods=['post'], url_path='auto-recommend-reply') + def auto_recommend_reply(self, request): + """ + 设置自动推荐回复功能 + 当收到达人消息时,自动生成推荐回复并通过WebSocket发送通知 + """ + try: + # 获取是否启用自动推荐回复的设置 + enable_auto_recommend = request.data.get('enable_auto_recommend', False) + user = request.user + + # 更新用户配置 + user_profile, created = UserProfile.objects.get_or_create(user=user) + user_profile.auto_recommend_reply = enable_auto_recommend + user_profile.save() + + return Response({ + 'code': 200, + 'message': '设置自动推荐回复成功', + 'data': { + 'enable_auto_recommend': enable_auto_recommend + } + }) + + except Exception as e: + logger.error(f"设置自动推荐回复失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f"设置自动推荐回复失败: {str(e)}", + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + @action(detail=False, methods=['get'], url_path='get-auto-recommend-setting') + def get_auto_recommend_setting(self, request): + """获取自动推荐回复设置""" + try: + user = request.user + user_profile, created = UserProfile.objects.get_or_create(user=user) + + return Response({ + 'code': 200, + 'message': '获取自动推荐回复设置成功', + 'data': { + 'enable_auto_recommend': getattr(user_profile, 'auto_recommend_reply', False) + } + }) + + except Exception as e: + logger.error(f"获取自动推荐回复设置失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f"获取自动推荐回复设置失败: {str(e)}", + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + class KnowledgeBaseViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): serializer_class = KnowledgeBaseSerializer