import os import json import uuid import base64 import pickle import logging import traceback # 确保导入traceback模块 from pathlib import Path import dateutil.parser as parser from datetime import datetime, timedelta from bs4 import BeautifulSoup from django.utils import timezone from django.conf import settings # 添加Django设置导入 from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import warnings import mimetypes import requests import django.db.utils import time from django.db.models import Q import socket # 忽略oauth2client相关的所有警告 warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth') warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client') warnings.filterwarnings('ignore', message='.*google-auth.*') warnings.filterwarnings('ignore', message='.*oauth2client.*') from apiclient import discovery from httplib2 import Http from oauth2client import file, client, tools from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile # Gmail API调用超时设置(秒) GMAIL_REQUEST_TIMEOUT = getattr(settings, 'GMAIL_REQUEST_TIMEOUT', 30) logger = logging.getLogger(__name__) # Gmail服务单例管理器 class GmailServiceManager: _instances = {} # 以用户ID和Gmail凭证ID为键存储Gmail服务实例 @classmethod def get_instance(cls, user, gmail_credential_id=None): """ 获取用户的Gmail服务实例,如果不存在则创建 参数: user: 用户对象 gmail_credential_id: 指定Gmail凭证ID,如果为None,则使用默认凭证 """ user_id = str(user.id) # 生成实例键,组合用户ID和Gmail凭证ID if gmail_credential_id: instance_key = f"{user_id}:{gmail_credential_id}" else: instance_key = user_id if instance_key not in cls._instances: try: # 从数据库获取认证信息 if gmail_credential_id: # 获取指定ID的凭证 credential = GmailCredential.objects.filter( id=gmail_credential_id, user=user, is_active=True ).first() else: # 获取默认凭证,优先获取is_default=True的凭证 credential = GmailCredential.objects.filter( user=user, is_active=True, is_default=True ).first() # 如果没有找到默认凭证,则获取最近更新的一个凭证 if not credential: credential = GmailCredential.objects.filter( user=user, is_active=True ).order_by('-updated_at').first() if credential and credential.credentials: # 反序列化凭证 creds = pickle.loads(credential.credentials) # 检查凭证是否有效 if not creds.invalid: # 初始化服务 gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) # 存储实例 cls._instances[instance_key] = { 'service': gmail_service, 'credentials': creds, 'timestamp': timezone.now(), 'user': user, 'gmail_credential': credential } logger.info(f"创建用户 {user.username} 的Gmail服务单例,Gmail账号: {credential.gmail_email or '未知'},名称: {credential.name}") return cls._instances[instance_key] except Exception as e: logger.error(f"创建Gmail服务单例失败: {e}") else: # 检查实例是否过期(超过30分钟) instance = cls._instances[instance_key] time_diff = timezone.now() - instance['timestamp'] if time_diff.total_seconds() > 1800: # 30分钟过期 del cls._instances[instance_key] return cls.get_instance(user, gmail_credential_id) # 递归调用,重新创建 # 更新时间戳 cls._instances[instance_key]['timestamp'] = timezone.now() credential = instance.get('gmail_credential') gmail_email = credential.gmail_email if credential else '未知' credential_name = credential.name if credential else '默认' logger.info(f"复用用户 {user.username} 的Gmail服务单例,Gmail账号: {gmail_email},名称: {credential_name}") return cls._instances[instance_key] return None @classmethod def get_credential_instance(cls, credential): """通过GmailCredential对象获取服务实例""" if not credential or not credential.user: return None return cls.get_instance(credential.user, str(credential.id)) @classmethod def update_instance(cls, user, credentials, service, gmail_credential=None): """更新用户的Gmail服务实例""" user_id = str(user.id) # 确定实例键 if gmail_credential: instance_key = f"{user_id}:{gmail_credential.id}" else: instance_key = user_id cls._instances[instance_key] = { 'service': service, 'credentials': credentials, 'timestamp': timezone.now(), 'user': user, 'gmail_credential': gmail_credential } @classmethod def clear_instance(cls, user, gmail_credential_id=None): """清除用户的Gmail服务实例""" user_id = str(user.id) # 清除特定凭证的实例 if gmail_credential_id: instance_key = f"{user_id}:{gmail_credential_id}" if instance_key in cls._instances: del cls._instances[instance_key] else: # 清除该用户的所有实例 keys_to_delete = [] for key in cls._instances.keys(): if key == user_id or key.startswith(f"{user_id}:"): keys_to_delete.append(key) for key in keys_to_delete: del cls._instances[key] @classmethod def get_all_user_instances(cls, user): """获取用户的所有Gmail服务实例""" user_id = str(user.id) user_instances = {} # 收集该用户所有的实例 for key, instance in cls._instances.items(): if key == user_id or key.startswith(f"{user_id}:"): credential = instance.get('gmail_credential') if credential: user_instances[str(credential.id)] = instance return user_instances @classmethod def clear_all_instances_by_email(cls, gmail_email): """ 清除与特定Gmail邮箱关联的所有服务实例 参数: gmail_email: Gmail邮箱地址 返回: int: 清除的实例数量 """ try: # 导入GmailCredential模型 from .models import GmailCredential # 查找与该邮箱关联的所有凭证 credentials = GmailCredential.objects.filter( gmail_email=gmail_email, is_active=True ).select_related('user') # 记录清除的实例数量 cleared_count = 0 # 清除每个凭证关联的实例 for credential in credentials: user = credential.user # 清除特定用户和凭证的实例 if cls.clear_instance(user, str(credential.id)): cleared_count += 1 # 同时清除用户的默认实例 user_id = str(user.id) if user_id in cls._instances: del cls._instances[user_id] cleared_count += 1 return cleared_count except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"清除Gmail邮箱 {gmail_email} 的服务实例时出错: {str(e)}") return 0 class GmailIntegration: """Gmail集成类""" # Gmail API 权限范围 SCOPES = ['https://mail.google.com/'] def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890', gmail_credential_id=None): self.user = user self.user_email = user.email if user else None self.email = email # 目标邮箱 self.client_secret = client_secret_json self.credentials = None self.gmail_service = None self.use_proxy = use_proxy self.proxy_url = proxy_url self.gmail_credential_id = gmail_credential_id self.gmail_credential = None # 设置代理 if self.use_proxy: logger.info(f"设置Gmail API代理: {proxy_url}") os.environ['HTTP_PROXY'] = proxy_url os.environ['HTTPS_PROXY'] = proxy_url # 设置Token文件存储路径 if user: # 使用用户ID创建唯一的token存储路径 token_file = f"gmail_token_{user.id}.json" if gmail_credential_id: token_file = f"gmail_token_{user.id}_{gmail_credential_id}.json" self.token_storage_path = os.path.join("gmail_tokens", token_file) # 确保目录存在 os.makedirs("gmail_tokens", exist_ok=True) logger.info(f"设置Token存储路径: {self.token_storage_path}") # 尝试从数据库加载凭证 try: if gmail_credential_id: # 加载指定ID的凭证 gmail_cred = GmailCredential.objects.filter( id=gmail_credential_id, user=user, is_active=True ).first() else: # 加载默认凭证 gmail_cred = GmailCredential.objects.filter( user=user, is_active=True, is_default=True ).first() # 如果没有默认凭证,加载最新的一个 if not gmail_cred: gmail_cred = GmailCredential.objects.filter( user=user, is_active=True ).order_by('-updated_at').first() if gmail_cred and gmail_cred.credentials: self.gmail_credential = gmail_cred logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证 (ID: {gmail_cred.id}, Email: {gmail_cred.gmail_email or '未知'}, 名称: {gmail_cred.name})") # 使用新方法加载凭证 creds = self._load_credentials_from_storage(gmail_cred.credentials) if creds: self.credentials = creds # 初始化Gmail服务 self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) logger.info("从数据库凭证初始化Gmail服务成功") else: logger.error("无法从数据库中加载有效凭证") # 标记需要重新认证 if hasattr(gmail_cred, 'needs_reauth'): gmail_cred.needs_reauth = True gmail_cred.save() logger.info(f"已将Gmail账号 {gmail_cred.gmail_email} 标记为需要重新认证") except Exception as e: logger.error(f"从数据库加载凭证失败: {str(e)}") # 继续使用文件方式 else: self.token_storage_path = "gmail_token.json" logger.warning("未提供用户,将使用默认Token存储路径") # 存储对象 self.storage = file.Storage(self.token_storage_path) def authenticate(self): """获取Gmail API凭证并认证""" try: # 优先尝试使用单例服务 if self.user: instance = GmailServiceManager.get_instance(self.user, self.gmail_credential_id) if instance: self.gmail_service = instance['service'] self.credentials = instance['credentials'] self.gmail_credential = instance.get('gmail_credential') credential_id = str(self.gmail_credential.id) if self.gmail_credential else "未知" gmail_email = self.gmail_credential.gmail_email if self.gmail_credential else "未知" credential_name = self.gmail_credential.name if self.gmail_credential else "默认" logger.info(f"使用现有的Gmail服务单例 (ID: {credential_id}, Email: {gmail_email}, 名称: {credential_name})") return True # 以下是原有的认证逻辑... # 写入client_secret.json if self.client_secret: client_secret_path = 'client_secret.json' with open(client_secret_path, 'w') as f: if isinstance(self.client_secret, str): try: # 确保是有效的JSON import json # 在本地作用域引入 json_data = json.loads(self.client_secret) # 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题 for key in ['web', 'installed']: if key in json_data and 'redirect_uris' in json_data[key]: json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob'] logger.info("已强制设置redirect_uri为非浏览器模式") json.dump(json_data, f) except json.JSONDecodeError as e: logger.error(f"client_secret不是有效的JSON: {str(e)}") return False else: # 如果是字典,也进行相同处理 import json # 在本地作用域引入 client_secret_dict = dict(self.client_secret) for key in ['web', 'installed']: if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]: client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob'] logger.info("已强制设置redirect_uri为非浏览器模式") json.dump(client_secret_dict, f) logger.info(f"已将client_secret写入临时文件: {client_secret_path}") # 使用与quickstart.py相同的流程 logger.info(f"创建或读取token存储: {self.token_storage_path}") # 确保token目录存在 token_dir = os.path.dirname(self.token_storage_path) if token_dir and not os.path.exists(token_dir): logger.info(f"创建token目录: {token_dir}") os.makedirs(token_dir) store = file.Storage(self.token_storage_path) creds = store.get() if not creds or creds.invalid: logger.info("没有有效的凭证,需要重新授权") if not self.client_secret: logger.error("没有提供client_secret_json且找不到有效凭证") return False # 强制使用非浏览器认证模式,避免localhost连接问题 logger.info("使用非浏览器认证模式") redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES) flow.redirect_uri = redirect_uri # 获取授权URL并抛出异常 auth_url = flow.step1_get_authorize_url() logger.info(f"获取授权URL: {auth_url[:50]}...") raise Exception(f"Please visit this URL to authorize: {auth_url}") # 如果有有效凭证,初始化服务 self.credentials = creds logger.info("使用现有凭证初始化Gmail服务") self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) logger.info("Gmail服务初始化成功") # 获取Gmail账号信息 gmail_email = None try: # 调用Gmail API获取用户资料 profile = self.gmail_service.users().getProfile(userId='me').execute() gmail_email = profile.get('emailAddress') logger.info(f"获取到Gmail账号: {gmail_email}") except Exception as profile_error: logger.error(f"获取Gmail账号失败: {str(profile_error)}") # 即使获取Gmail账号失败,也要尝试获取消息以提取邮箱 if not gmail_email: try: # 尝试获取一条消息来提取邮箱 messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute() if 'messages' in messages and len(messages['messages']) > 0: msg_id = messages['messages'][0]['id'] msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute() # 从消息中查找与当前用户匹配的邮箱 if 'payload' in msg and 'headers' in msg['payload']: for header in msg['payload']['headers']: if header['name'] in ['From', 'To', 'Cc', 'Bcc']: if '<' in header['value'] and '>' in header['value']: email = header['value'].split('<')[1].split('>')[0] # 如果找到一个邮箱就使用它 if not gmail_email: gmail_email = email logger.info(f"从消息中提取到Gmail账号: {gmail_email}") except Exception as msg_error: logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}") # 保存凭证到数据库 if self.user: from django.utils import timezone logger.info("保存凭证到数据库") # 将凭证对象序列化 - 改为JSON序列化 try: # 首先尝试使用JSON序列化 credentials_data = creds.to_json() except Exception as json_error: logger.error(f"JSON序列化凭证失败: {str(json_error)}") # 备选使用pickle序列化 credentials_data = pickle.dumps(creds) logger.info("使用pickle序列化凭证") # 如果提供了具体的gmail_credential_id,更新对应的记录 if self.gmail_credential_id: # 更新指定ID的凭证 try: gmail_credential = GmailCredential.objects.get( id=self.gmail_credential_id, user=self.user ) # 更新凭证信息 gmail_credential.credentials = credentials_data gmail_credential.gmail_email = gmail_email gmail_credential.updated_at = timezone.now() gmail_credential.is_active = True gmail_credential.save() self.gmail_credential = gmail_credential logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证,Gmail账号: {gmail_email}") except GmailCredential.DoesNotExist: logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证,将创建新凭证") # 如果指定的凭证不存在,创建新凭证 self.gmail_credential_id = None # 如果没有具体的gmail_credential_id,或者指定的不存在,创建或更新默认凭证 if not self.gmail_credential_id: # 检查是否已存在相同gmail_email的凭证 existing_credential = None if gmail_email: existing_credential = GmailCredential.objects.filter( user=self.user, gmail_email=gmail_email, is_active=True ).first() if existing_credential: # 更新现有凭证 existing_credential.credentials = credentials_data existing_credential.token_path = self.token_storage_path existing_credential.updated_at = timezone.now() existing_credential.save() self.gmail_credential = existing_credential logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证") else: # 获取Gmail账户数量 gmail_count = GmailCredential.objects.filter(user=self.user).count() # 创建新凭证 name = f"Gmail账号 {gmail_count + 1}" if gmail_email: name = gmail_email else: # 确保gmail_email有默认值,避免null错误 gmail_email = "未知邮箱" # 将凭证转换为JSON字符串 if isinstance(credentials_data, dict): # 确保json模块在本地作用域可访问 import json credentials_data = json.dumps(credentials_data) gmail_credential = GmailCredential.objects.create( user=self.user, credentials=credentials_data, token_path=self.token_storage_path, gmail_email=gmail_email, name=name, is_default=(gmail_count == 0), # 第一个账号设为默认 updated_at=timezone.now(), is_active=True ) self.gmail_credential = gmail_credential logger.info(f"已创建新的Gmail凭证,Gmail账号: {gmail_email}, 名称: {name}") # 认证成功后更新单例 if self.user and self.gmail_service and self.credentials: GmailServiceManager.update_instance( self.user, self.credentials, self.gmail_service, self.gmail_credential ) return True except Exception as e: # 保留授权URL异常,视图层会处理 if "Please visit this URL" in str(e): raise e logger.error(f"Gmail认证失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return False finally: # 清理client_secret.json文件 if self.client_secret and os.path.exists('client_secret.json'): logger.info("删除临时client_secret文件") os.unlink('client_secret.json') def create_talent_knowledge_base(self, talent_email): """ 创建或获取与talent_email关联的知识库 Args: talent_email (str): 达人Gmail邮箱地址 Returns: tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志 """ try: # 优先查找现有的关联关系 mapping = GmailTalentMapping.objects.filter( user=self.user, talent_email=talent_email, is_active=True ).first() if mapping and mapping.knowledge_base: logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}") return mapping.knowledge_base, False # 查找与该达人邮箱关联的知识库 # 根据达人邮箱生成一个唯一的标识名称 kb_name = f"Gmail-{talent_email.split('@')[0]}" # 检查该名称的知识库是否已存在 existing_kb = KnowledgeBase.objects.filter( name=kb_name, user_id=self.user.id ).first() if existing_kb: logger.info(f"找到现有知识库: {kb_name}") # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False # 没有找到现有知识库,直接使用基本方法创建 logger.info(f"使用基本方法创建知识库: {kb_name}") return self._create_knowledge_base_basic(talent_email) except Exception as e: logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) # 创建失败时,尝试使用基本方法创建 logger.info("尝试使用基本方法创建知识库") return self._create_knowledge_base_basic(talent_email) def _create_knowledge_base_basic(self, talent_email): """创建基础知识库,不处理映射关系""" try: # 根据达人邮箱生成一个唯一的标识名称 kb_name = f"Gmail-{talent_email.split('@')[0]}" # 检查该名称的知识库是否已存在 existing_kb = KnowledgeBase.objects.filter( name=kb_name, user_id=self.user.id ).first() if existing_kb: logger.info(f"找到现有知识库: {kb_name}") # 检查external_id是否存在,如果不存在则创建 if not existing_kb.external_id: logger.info(f"知识库 {kb_name} 缺少external_id,尝试创建外部知识库") try: from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() external_id = kb_viewset._create_external_dataset(existing_kb) if external_id: existing_kb.external_id = external_id existing_kb.save() logger.info(f"成功为现有知识库创建外部知识库: {external_id}") else: logger.error("创建外部知识库失败:未返回external_id") return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"} except Exception as e: logger.error(f"为现有知识库创建外部知识库失败: {str(e)}") return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"} # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False # 创建新知识库 try: knowledge_base = KnowledgeBase.objects.create( name=kb_name, desc=f"与{talent_email}的Gmail邮件交流记录", type="private", user_id=self.user.id, documents=[] ) except django.db.utils.IntegrityError as e: # 处理名称重复的情况 logger.warning(f"知识库名称'{kb_name}'已存在,尝试获取或创建带随机后缀的名称") # 先尝试查找已存在的知识库(不限制用户) existing_kb = KnowledgeBase.objects.filter(name=kb_name).first() if existing_kb and str(existing_kb.user_id) == str(self.user.id): # 如果存在且属于当前用户,直接使用 logger.info(f"找到属于当前用户的知识库: {kb_name}") # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False else: # 如果不存在或不属于当前用户,创建带随机后缀的新知识库 import random import string suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5)) new_kb_name = f"{kb_name}-{suffix}" logger.info(f"创建带随机后缀的知识库: {new_kb_name}") knowledge_base = KnowledgeBase.objects.create( name=new_kb_name, desc=f"与{talent_email}的Gmail邮件交流记录", type="private", user_id=self.user.id, documents=[] ) # 创建外部知识库 try: from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() external_id = kb_viewset._create_external_dataset(knowledge_base) if external_id: knowledge_base.external_id = external_id knowledge_base.save() logger.info(f"成功创建外部知识库: {external_id}") except Exception as e: logger.error(f"创建外部知识库失败: {str(e)}") # 继续执行,不影响基本功能 # 创建映射关系 GmailTalentMapping.objects.create( user=self.user, talent_email=talent_email, knowledge_base=knowledge_base, is_active=True ) logger.info(f"成功创建新知识库: {knowledge_base.name}, ID: {knowledge_base.id}") return knowledge_base, True except Exception as e: logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}") import traceback logger.error(traceback.format_exc()) # 尝试直接获取已存在的知识库作为最后手段 try: kb_name = f"Gmail-{talent_email.split('@')[0]}" existing_kb = KnowledgeBase.objects.filter(name__startswith=kb_name).first() if existing_kb: logger.info(f"在错误处理中找到可用的知识库: {existing_kb.name}") # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False except Exception as inner_e: logger.error(f"错误处理中尝试获取知识库失败: {str(inner_e)}") # 如果所有尝试都失败,抛出异常 raise def get_conversations(self, talent_gmail): """获取与特定用户的所有邮件对话""" try: if not self.gmail_service: logger.error("Gmail服务未初始化") return [] # 使用crushwds@gmail.com作为固定邮箱 email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱 email2 = talent_gmail # 使用参数传入的目标邮箱 logger.info(f"执行Gmail查询: {email1} 与 {email2}") # 构建搜索查询 query = f"from:({email1} OR {email2}) to:({email1} OR {email2})" # 获取所有匹配的邮件 response = self.gmail_service.users().messages().list(userId='me', q=query).execute() messages = [] if 'messages' in response: messages.extend(response['messages']) message_count = len(messages) logger.info(f"找到 {message_count} 封邮件") # 分页获取所有邮件 while 'nextPageToken' in response: page_token = response['nextPageToken'] response = self.gmail_service.users().messages().list( userId='me', q=query, pageToken=page_token ).execute() if 'messages' in response: new_messages = response['messages'] messages.extend(new_messages) new_count = len(new_messages) message_count += new_count logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}") else: logger.warning(f"未找到匹配邮件") # 处理每封邮件 conversations = [] if messages: logger.info(f"开始获取 {len(messages)} 封邮件详情...") for i, msg in enumerate(messages): try: message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute() email_data = self._extract_email_content(message) if email_data: conversations.append(email_data) if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度 logger.info(f"已处理 {i+1}/{len(messages)} 封邮件") else: logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败") except Exception as e: logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}") # 按时间排序 conversations.sort(key=lambda x: x['date']) logger.info(f"总共找到并解析了 {len(conversations)} 封邮件") return conversations except Exception as e: logger.error(f"获取Gmail对话失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return [] def _extract_email_content(self, message): """提取邮件内容,完全按照quickstart.py的get_email_content函数实现""" try: message_id = message['id'] # 获取邮件ID payload = message['payload'] headers = payload['headers'] # 获取邮件基本信息 email_data = { 'id': message_id, # 保存邮件ID 'subject': '', 'from': '', 'date': '', 'body': '', 'attachments': [] # 新增附件列表 } # 提取头部信息 for header in headers: if header['name'] == 'Subject': email_data['subject'] = header['value'] elif header['name'] == 'From': email_data['from'] = header['value'] elif header['name'] == 'Date': date_value = header['value'] logger.info(f"原始邮件日期字符串: '{date_value}'") try: # 打印原始日期字符串信息 import dateutil.parser as date_parser import pytz from django.utils import timezone # 先尝试解析日期 date = date_parser.parse(date_value) logger.info(f"解析后的日期对象: {date}, 是否有时区: {date.tzinfo is not None}") # 处理时区问题 if date.tzinfo is not None: # 已有时区的日期,转换为系统时区 if hasattr(settings, 'TIME_ZONE'): system_tz = pytz.timezone(settings.TIME_ZONE) date = date.astimezone(system_tz) logger.info(f"转换到系统时区后: {date}") # 如果需要naive datetime,删除时区信息 if hasattr(settings, 'USE_TZ') and not settings.USE_TZ: date = date.replace(tzinfo=None) logger.info(f"移除时区信息后: {date}") else: # 无时区的日期,如果系统使用时区,添加时区信息 if hasattr(settings, 'USE_TZ') and settings.USE_TZ: try: date = timezone.make_aware(date) logger.info(f"添加时区信息后: {date}") except Exception as tz_error: logger.warning(f"添加时区信息失败: {str(tz_error)}") # 格式化为字符串 email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S') logger.info(f"最终日期字符串: {email_data['date']}") except Exception as e: logger.error(f"解析日期失败: {str(e)}, 原始值: '{date_value}'") # 保留原始值 email_data['date'] = date_value # 定义一个递归函数来处理所有部分和附件 def process_parts(parts): for part in parts: # 检查是否是附件 if 'filename' in part and part['filename']: attachment = { 'filename': part['filename'], 'mimeType': part['mimeType'], 'size': part['body'].get('size', 0) } # 如果有附件内容数据,可以获取附件ID if 'attachmentId' in part['body']: attachment['attachmentId'] = part['body']['attachmentId'] email_data['attachments'].append(attachment) # 处理文本内容 if part['mimeType'] == 'text/plain' and not email_data['body']: data = part['body'].get('data', '') if data: try: text = base64.urlsafe_b64decode(data).decode('utf-8') email_data['body'] = text except Exception as e: logger.error(f"解码邮件内容失败: {str(e)}") # 递归处理多部分内容 if 'parts' in part: process_parts(part['parts']) # 处理邮件正文和附件 if 'parts' in payload: process_parts(payload['parts']) elif 'body' in payload and 'data' in payload['body']: # 没有parts,直接处理body data = payload['body'].get('data', '') if data: try: text = base64.urlsafe_b64decode(data).decode('utf-8') email_data['body'] = text except Exception as e: logger.error(f"解码邮件内容失败: {str(e)}") return email_data except Exception as e: logger.error(f"处理邮件内容时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def download_attachment(self, message_id, attachment_id, filename): """下载邮件附件""" try: attachment = self.gmail_service.users().messages().attachments().get( userId='me', messageId=message_id, id=attachment_id ).execute() data = attachment['data'] file_data = base64.urlsafe_b64decode(data) # 创建附件目录 attachments_dir = 'gmail_attachments' if not os.path.exists(attachments_dir): os.makedirs(attachments_dir) # 保存附件 filepath = os.path.join(attachments_dir, f"{message_id}_{filename}") with open(filepath, 'wb') as f: f.write(file_data) return filepath except Exception as e: logger.error(f"下载附件失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def save_conversations_to_knowledge_base(self, conversations, knowledge_base): """ 将Gmail对话保存到知识库 Args: conversations: Gmail邮件列表 knowledge_base: 保存到的知识库对象 """ try: # 导入所需模型 from .models import GmailAttachment, ChatHistory, KnowledgeBaseDocument, GmailTalentMapping # 检查入参 if not conversations or not knowledge_base: logger.error("参数不完整: conversations或knowledge_base为空") return {"conversation_id": None, "success": False, "error": "参数不完整"} if not conversations: logger.warning("没有邮件对话可保存") return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"} # 确保knowledge_base有external_id if not knowledge_base.external_id: logger.warning(f"知识库 {knowledge_base.name} 缺少external_id,尝试创建外部知识库") try: from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() external_id = kb_viewset._create_external_dataset(knowledge_base) if external_id: knowledge_base.external_id = external_id knowledge_base.save() logger.info(f"成功为知识库创建外部知识库: {external_id}") else: logger.error("创建外部知识库失败:未返回external_id") return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"} except Exception as e: logger.error(f"为知识库创建外部知识库失败: {str(e)}") return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"} # 查找现有的对话ID - 优先使用talent_email查找 conversation_id = None # 1. 首先尝试通过talent_email查找现有映射 if self.email: existing_mapping = GmailTalentMapping.objects.filter( user=self.user, talent_email=self.email, knowledge_base=knowledge_base, is_active=True ).first() if existing_mapping and existing_mapping.conversation_id: conversation_id = existing_mapping.conversation_id logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}") # 2. 如果上面没找到,尝试通过知识库查找任何相关的对话 if not conversation_id: existing_conversation = ChatHistory.objects.filter( knowledge_base=knowledge_base, user=self.user, is_deleted=False ).values('conversation_id').distinct().first() if existing_conversation: conversation_id = existing_conversation['conversation_id'] logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}") # 如果有talent_email,更新或创建映射关系 if self.email: GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=self.email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': conversation_id, 'is_active': True } ) logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}") # 3. 如果仍然没找到,则创建新的对话ID if not conversation_id: # 生成唯一的对话ID conversation_id = str(uuid.uuid4()) logger.info(f"创建新的对话ID: {conversation_id}") # 创建或更新Gmail和达人的映射关系 if self.email: talent_mapping, created = GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=self.email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': conversation_id, 'is_active': True } ) action = "创建" if created else "更新" logger.info(f"已{action}Gmail达人映射: {self.email} -> {knowledge_base.name}") # 构建知识库文档 logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话") document = { "title": f"与{self.email or '联系人'}的Gmail邮件对话", "url": "", "source_type": "gmail", "paragraphs": [] } # 对话按时间顺序排序 conversations.sort(key=lambda x: x.get('date', '')) # 将对话添加到文档 previous_message_dict = {} # 用于存储邮件时间与消息ID的映射,以便找到正确的parent_id # 首先查找现有的聊天记录 existing_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('created_at') # 如果已有消息,跳过重复导入 existing_gmail_ids = set() if existing_messages.exists(): for msg in existing_messages: if msg.metadata and 'gmail_message_id' in msg.metadata: existing_gmail_ids.add(msg.metadata['gmail_message_id']) logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入") for message in conversations: try: # 跳过已导入的消息 if message.get('id') in existing_gmail_ids: logger.info(f"跳过已导入的消息: {message.get('id')}") continue # 提取邮件内容 sender = message.get('from', '未知发件人') date_str = message.get('date', '未知时间') subject = message.get('subject', '无主题') body = message.get('body', '').strip() logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...") # 判断角色 # 从发件人地址中提取邮箱部分 sender_email = '' if '<' in sender and '>' in sender: # 格式如 "姓名 " sender_email = sender.split('<')[1].split('>')[0] else: # 格式可能直接是邮箱 sender_email = sender # 修改角色判断逻辑:根据Gmail凭证邮箱和达人邮箱判断 # 获取当前Gmail凭证邮箱 gmail_credential_email = None if self.gmail_credential and self.gmail_credential.gmail_email: gmail_credential_email = self.gmail_credential.gmail_email.lower() # 判断是否是凭证邮箱发出的邮件 is_credential_email = gmail_credential_email and gmail_credential_email == sender_email.lower() # 判断是否是系统用户邮箱 is_user_email = self.user.email.lower() == sender_email.lower() # 判断是否是目标达人邮箱 is_talent_email = False if self.email: is_talent_email = self.email.lower() == sender_email.lower() # 如果是Gmail凭证邮箱或用户邮箱,设为user角色;如果是达人邮箱,设为assistant角色 if is_credential_email or is_user_email: role = 'user' elif is_talent_email: role = 'assistant' else: # 尝试检查是否有与发件人邮箱相匹配的人才映射 from .models import GmailTalentMapping talent_mapping = GmailTalentMapping.objects.filter( user=self.user, talent_email=sender_email, is_active=True ).first() # 如果有映射关系则视为达人邮箱,设为assistant角色 role = 'assistant' if talent_mapping else 'user' logger.info(f"设置消息角色: {role},发件人: {sender_email},用户Gmail: {gmail_credential_email},用户邮箱: {self.user.email},目标达人: {self.email},是否有达人映射: {bool(talent_mapping) if 'talent_mapping' in locals() else False}") # 将邮件添加到文档 paragraph = { "id": f"msg_{len(document['paragraphs']) + 1}", "title": f"{date_str} - {sender} - {subject}", "content": body, "meta": { "sender": sender, "date": date_str, "subject": subject, "has_attachments": len(message.get('attachments', [])) > 0, "message_id": message.get('id', '') } } document['paragraphs'].append(paragraph) # 解析邮件日期为datetime对象 try: # 先检查是否是时间戳格式 if isinstance(date_str, str) and date_str.isdigit(): # 如果是时间戳,直接转换 date_obj = datetime.fromtimestamp(int(date_str)) else: # 尝试标准格式解析 try: date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') except ValueError: # 如果标准格式解析失败,使用dateutil更灵活的解析 import dateutil.parser as date_parser date_obj = date_parser.parse(date_str) # 如果解析出的日期有时区信息,转换为不带时区的日期 if date_obj.tzinfo is not None: date_obj = date_obj.replace(tzinfo=None) except (ValueError, TypeError) as e: # 如果解析失败,使用当前时间 logger.warning(f"无法解析邮件日期: {date_str},错误: {str(e)},使用当前时间") date_obj = datetime.now() # 查找parent_id(查找日期早于当前邮件且最接近的消息) parent_id = None closest_date = None for d, mid in previous_message_dict.items(): if d < date_obj and (closest_date is None or d > closest_date): closest_date = d parent_id = mid # 保存消息到聊天历史,使用邮件实际日期 from django.utils import timezone # 确保时区处理正确 try: # 检查date_obj是否已经是aware if timezone.is_aware(date_obj): aware_date = date_obj logger.info(f"日期已经包含时区信息: {aware_date}") else: # 将naive转换为aware aware_date = timezone.make_aware(date_obj) logger.info(f"日期添加时区信息后: {aware_date}") except Exception as tz_error: logger.warning(f"时区转换失败: {str(tz_error)},使用当前时间") aware_date = timezone.now() # 创建消息记录 chat_message = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, parent_id=parent_id, role=role, content=f"{subject}\n\n{body}", metadata={ 'gmail_message_id': message.get('id', ''), 'from': sender, 'date': date_str, 'subject': subject, 'dataset_id_list': [str(knowledge_base.id)], 'dataset_names': [knowledge_base.name] }, created_at=aware_date # 设置正确的创建时间 ) # 更新previous_message_dict previous_message_dict[date_obj] = str(chat_message.id) # 处理附件 attachments = message.get('attachments', []) if attachments: for attachment in attachments: if 'attachmentId' in attachment and 'filename' in attachment: try: # 下载附件 filepath = self.download_attachment( message_id=message['id'], attachment_id=attachment['attachmentId'], filename=attachment['filename'] ) if filepath: # 记录附件信息 GmailAttachment.objects.create( chat_message=chat_message, gmail_message_id=message['id'], filename=attachment['filename'], filepath=filepath, mimetype=attachment.get('mimeType', ''), filesize=attachment.get('size', 0) ) logger.info(f"已保存附件: {attachment['filename']}") except Exception as e: logger.error(f"保存附件失败: {str(e)}") import traceback logger.error(traceback.format_exc()) except Exception as e: logger.error(f"处理邮件时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) # 将文档添加到知识库 logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}") # 准备文档数据结构 doc_data = { "name": f"Gmail对话与{self.email or '联系人'}.txt", "paragraphs": [] } # 将所有对话转换为段落 for paragraph in document['paragraphs']: doc_data["paragraphs"].append({ "title": paragraph['title'], "content": paragraph['content'], "is_active": True, "problem_list": [] }) # 检查paragraphs是否为空 if not doc_data["paragraphs"]: logger.warning("没有段落内容可上传,添加默认段落") doc_data["paragraphs"].append({ "title": "初始化邮件对话", "content": f"与{self.email or '联系人'}的邮件对话准备就绪,等待新的邮件内容。", "is_active": True, "problem_list": [] }) # 调用知识库的文档上传API from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() # 添加随机文档ID doc_data["id"] = str(uuid.uuid4()) logger.info(f"生成随机文档ID: {doc_data['id']}") # 上传文档 upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_data["name"], external_id=document_id ) logger.info(f"Gmail对话文档上传成功,ID: {document_id}") # 上传所有附件 self._upload_attachments_to_knowledge_base(knowledge_base, conversations) else: # 上传失败,记录错误信息 error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' logger.error(f"Gmail对话文档上传失败: {error_msg}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}") return { "conversation_id": conversation_id, "success": True, "message_count": len(conversations), "knowledge_base_id": str(knowledge_base.id) } except Exception as e: logger.error(f"保存Gmail对话到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return {"conversation_id": None, "success": False, "error": str(e)} def send_email(self, to_email, subject, body, conversation_id=None, attachments=None): """发送邮件并保存到聊天记录""" try: # 构建邮件内容 if attachments and len(attachments) > 0: message = self._create_message_with_attachment(to_email, subject, body, attachments) else: message = self._create_message(to_email, subject, body) # 发送邮件 sent_message = self.gmail_service.users().messages().send( userId='me', body=message ).execute() message_id = sent_message['id'] logger.info(f"邮件发送成功, ID: {message_id}") # 获取邮件详情,包括服务器时间戳 try: message_detail = self.gmail_service.users().messages().get( userId='me', id=message_id ).execute() # 从消息详情中提取时间戳 internal_date = message_detail.get('internalDate') if internal_date: try: # 转换毫秒时间戳为datetime email_date = datetime.fromtimestamp(int(internal_date)/1000) logger.info(f"从时间戳解析的日期: {email_date}") # 处理时区 from django.utils import timezone if hasattr(settings, 'USE_TZ') and settings.USE_TZ and not timezone.is_aware(email_date): email_date = timezone.make_aware(email_date) logger.info(f"添加时区信息后: {email_date}") date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') logger.info(f"最终格式化日期: {date_str}") except Exception as tz_error: logger.warning(f"处理邮件时间戳失败: {str(tz_error)},使用当前时间") from django.utils import timezone email_date = timezone.now() date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') else: logger.warning("邮件没有时间戳,使用当前时间") from django.utils import timezone email_date = timezone.now() date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间") email_date = datetime.now() date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') # 如果有conversation_id,保存到聊天记录 if conversation_id: # 查找现有的聊天记录 from django.db.models import Q existing_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('created_at') if not existing_messages.exists(): logger.warning(f"找不到对话ID: {conversation_id},无法保存消息") return message_id # 查找关联的知识库 first_message = existing_messages.first() if first_message.knowledge_base: knowledge_base = first_message.knowledge_base else: # 如果第一条消息没有知识库,尝试从metadata获取 if first_message.metadata and 'dataset_id_list' in first_message.metadata: kb_id = first_message.metadata.get('dataset_id_list', [])[0] knowledge_base = KnowledgeBase.objects.get(id=kb_id) else: logger.error(f"找不到关联的知识库,无法保存消息") return message_id # 查找parent_id:时间早于当前邮件且最接近的消息,避免时区问题 parent_id = None try: # 使用简单查询,不依赖时区 previous_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('-id')[:1] # 使用ID降序排序而非时间 if previous_messages: parent_id = str(previous_messages[0].id) except Exception as e: logger.error(f"查找父消息失败: {str(e)}") logger.error(traceback.format_exc()) # 构建metadata metadata = { 'gmail_message_id': message_id, 'from': self.user.email, 'to': to_email, 'date': date_str, 'subject': subject, 'dataset_id_list': [str(knowledge_base.id)], 'dataset_names': [knowledge_base.name] } # 如果现有消息有dataset_id_list,保留它 if first_message.metadata and 'dataset_id_list' in first_message.metadata: metadata['dataset_id_list'] = first_message.metadata['dataset_id_list'] # 尝试获取对应的dataset_names if 'dataset_names' in first_message.metadata: metadata['dataset_names'] = first_message.metadata['dataset_names'] # 创建聊天记录,使用当前时间而不是邮件的实际时间 chat_message = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, parent_id=parent_id, role='user', # 用户发送的消息,角色固定为'user' content=f"[{subject}] {body}", metadata=metadata # 不设置created_at,使用数据库默认时间 ) # 如果有附件,保存附件信息 if attachments and len(attachments) > 0: for att_path in attachments: file_name = os.path.basename(att_path) file_size = os.path.getsize(att_path) # 获取MIME类型 mime_type, _ = mimetypes.guess_type(att_path) if not mime_type: mime_type = 'application/octet-stream' # 保存附件信息到数据库 gmail_attachment = GmailAttachment.objects.create( chat_message=chat_message, gmail_message_id=message_id, filename=file_name, filepath=att_path, mimetype=mime_type, filesize=file_size ) # 更新知识库文档 self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email) return message_id except Exception as e: logger.error(f"发送邮件失败: {str(e)}") logger.error(traceback.format_exc()) raise def _create_message(self, to, subject, body): """创建邮件消息""" message = MIMEText(body) message['to'] = to message['subject'] = subject # 编码为JSON安全的字符串 raw = base64.urlsafe_b64encode(message.as_bytes()).decode() return {'raw': raw} def _create_message_with_attachment(self, to, subject, body, attachment_files): """创建带附件的邮件消息""" message = MIMEMultipart() message['to'] = to message['subject'] = subject # 添加邮件正文 msg = MIMEText(body) message.attach(msg) # 添加附件 for file in attachment_files: try: with open(file, 'rb') as f: part = MIMEBase('application', 'octet-stream') part.set_payload(f.read()) # 编码附件内容 encoders.encode_base64(part) # 添加头部信息 filename = os.path.basename(file) part.add_header( 'Content-Disposition', f'attachment; filename="{filename}"' ) message.attach(part) except Exception as e: logger.error(f"添加附件 {file} 失败: {str(e)}") # 编码为JSON安全的字符串 raw = base64.urlsafe_b64encode(message.as_bytes()).decode() return {'raw': raw} def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient): """将新发送的邮件内容追加到知识库文档中""" try: # 准备文档数据结构 doc_data = { "name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", "paragraphs": [ { "title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}", "content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}", "is_active": True, "problem_list": [] } ] } # 调用知识库的文档上传API from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() # 验证知识库是否有external_id if not knowledge_base.external_id: logger.error(f"知识库没有external_id,无法上传文档: {knowledge_base.name}") return None # 上传文档 upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_data["name"], external_id=document_id ) logger.info(f"Gmail回复文档上传成功,ID: {document_id}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() return upload_response else: # 上传失败,记录错误信息 error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' logger.error(f"Gmail回复文档上传失败: {error_msg}") return None except Exception as e: logger.error(f"追加邮件内容到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def setup_watch(self, topic_name=None): """设置Gmail监听""" try: # 如果没有指定topic,使用配置的主题名称 if not topic_name: # 从settings获取项目配置的主题名称 topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic') # 直接使用settings中配置的项目ID,不再从client_secret.json获取 project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905') logger.info(f"使用settings中配置的项目ID: {project_id}") # 确保Gmail服务已经初始化 if not hasattr(self, 'gmail_service') or not self.gmail_service: logger.warning("Gmail服务未初始化,尝试重新认证") if not self.authenticate(): logger.error("Gmail服务初始化失败") return None # 注意:不再使用用户邮箱作为判断依据,Gmail API总是使用'me'作为userId # Gmail认证是通过OAuth流程,与系统用户邮箱无关 logger.info(f"系统用户: {self.user.email},Gmail API使用OAuth认证的邮箱 (userId='me')") # 构建完整的webhook URL webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None) if not webhook_url: # 使用默认值,确保这是一个完全限定的URL domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0] protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http' if domain == 'localhost' or domain.startswith('127.0.0.1'): # 本地开发环境需要公网可访问的URL,可以用ngrok等工具暴露本地服务 webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/" logger.warning("使用本地开发环境URL作为webhook回调,Google可能无法访问。建议使用ngrok等工具创建公网地址。") else: webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/" logger.info(f"使用Gmail webhook URL: {webhook_url}") # 如果想在Google Cloud控制台中手动配置webhook,打印明确的说明 logger.info("如需手动配置Gmail推送通知,请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:") logger.info(f"推送端点: {webhook_url}") logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限") # 请求监听 request = { 'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签 'topicName': f"projects/{project_id}/topics/{topic_name}", 'labelFilterAction': 'include' } logger.info(f"设置Gmail监听: {request}") # 设置超时,避免长时间阻塞 original_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) try: # 执行watch请求 response = self.gmail_service.users().watch(userId='me', body=request).execute() except Exception as e: logger.error(f"执行watch请求失败: {str(e)}") return None finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) # 获取historyId,用于后续同步 history_id = response.get('historyId') expiration = response.get('expiration') logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}") # 保存监听信息到数据库 if self.user: from .models import GmailCredential credential = GmailCredential.objects.filter(user=self.user, is_active=True).first() if credential: # 转换时间戳为datetime expiration_time = None if expiration: try: # 将毫秒时间戳转换为datetime naive_time = datetime.fromtimestamp(int(expiration)/1000) logger.info(f"从时间戳解析的日期: {naive_time}") # 如果系统使用时区,添加时区信息 if hasattr(settings, 'USE_TZ') and settings.USE_TZ: try: expiration_time = timezone.make_aware(naive_time) logger.info(f"添加时区信息后: {expiration_time}") except Exception as tz_error: logger.warning(f"添加时区信息失败: {str(tz_error)}") expiration_time = naive_time else: expiration_time = naive_time except Exception as conv_error: logger.error(f"转换时间戳失败: {str(conv_error)}") expiration_time = None credential.last_history_id = history_id credential.watch_expiration = expiration_time credential.save() logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}") return { 'historyId': history_id, 'expiration': expiration } except Exception as e: logger.error(f"设置Gmail监听失败: {str(e)}") logger.error(traceback.format_exc()) return None def get_history(self, start_history_id): """获取历史变更""" try: logger.info(f"获取历史记录,起始ID: {start_history_id}") # 确保Gmail服务已经初始化 if not hasattr(self, 'gmail_service') or not self.gmail_service: logger.warning("Gmail服务未初始化,尝试重新认证") if not self.authenticate(): logger.error("Gmail服务初始化失败") return [] # 设置超时,避免长时间阻塞 original_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) try: # 执行history请求 response = self.gmail_service.users().history().list( userId='me', startHistoryId=start_history_id ).execute() except Exception as e: logger.error(f"执行history请求失败: {str(e)}") return [] finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) logger.info(f"历史记录响应: {response}") history_list = [] if 'history' in response: history_list.extend(response['history']) logger.info(f"找到 {len(response['history'])} 个历史记录") # 获取所有页,但每次请求都设置超时 page_count = 0 max_pages = 5 # 限制最大页数,避免无限循环 while 'nextPageToken' in response and page_count < max_pages: page_token = response['nextPageToken'] page_count += 1 # 设置超时 socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) try: response = self.gmail_service.users().history().list( userId='me', startHistoryId=start_history_id, pageToken=page_token ).execute() except Exception as e: logger.error(f"获取历史记录分页失败: {str(e)}") break finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) if 'history' in response: history_list.extend(response['history']) logger.info(f"加载额外 {len(response['history'])} 个历史记录 (页 {page_count})") if page_count >= max_pages and 'nextPageToken' in response: logger.warning(f"达到最大页数限制 ({max_pages}),可能有更多历史记录未获取") else: logger.info(f"没有新的历史记录,最新historyId: {response.get('historyId', 'N/A')}") # 提取新消息ID new_message_ids = set() for history in history_list: logger.info(f"处理历史记录: {history}") if 'messagesAdded' in history: for message in history['messagesAdded']: message_id = message['message']['id'] new_message_ids.add(message_id) logger.info(f"新增消息ID: {message_id}") if 'labelsAdded' in history: for label in history['labelsAdded']: message_id = label['message']['id'] if 'INBOX' in label.get('labelIds', []): new_message_ids.add(message_id) logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签") if new_message_ids: logger.info(f"总共找到 {len(new_message_ids)} 个新消息") else: logger.info("没有新增消息") return list(new_message_ids) except Exception as e: logger.error(f"获取Gmail历史记录失败: {str(e)}") logger.error(traceback.format_exc()) return [] def process_notification(self, notification_data): """处理Gmail推送通知""" try: # 提取通知数据 logger.info(f"处理Gmail通知: {notification_data}") # 处理Google Pub/Sub消息格式 if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']: try: import base64 import json logger.info("检测到Google Pub/Sub消息格式") # Base64解码data字段 encoded_data = notification_data['message']['data'] decoded_data = base64.b64decode(encoded_data).decode('utf-8') logger.info(f"解码后的数据: {decoded_data}") # 解析JSON获取email和historyId json_data = json.loads(decoded_data) email = json_data.get('emailAddress') history_id = json_data.get('historyId') logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}") except Exception as decode_error: logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}") logger.error(traceback.format_exc()) return False else: # 原始格式处理 email = notification_data.get('emailAddress') history_id = notification_data.get('historyId') if not email or not history_id: logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId") return False # 查找关联用户 from .models import User, GmailCredential user = User.objects.filter(email=email).first() credential = None # 如果找不到用户,尝试使用gmail_email字段查找 if not user: logger.info(f"找不到email={email}的用户,尝试使用gmail_email查找") credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first() if credential: user = credential.user logger.info(f"通过gmail_email找到用户: {user.email}") if not user: logger.error(f"找不到与 {email} 关联的用户") return False # 确保有相应的Gmail凭证 if not credential: credential = GmailCredential.objects.filter(user=user, is_active=True).first() if not credential: logger.error(f"用户 {user.email} 没有活跃的Gmail凭证") return False # 检查凭证是否需要重新授权 if credential.needs_reauth: logger.warning(f"Gmail凭证 {credential.id} 需要重新授权,无法处理通知") # 记录当前通知到队列或数据库,供用户重新授权后再处理 try: from .models import GmailNotificationQueue # 存储通知到队列 GmailNotificationQueue.objects.create( user=user, gmail_credential=credential, email=email, history_id=history_id, notification_data=json.dumps(notification_data) ) logger.info(f"通知已保存到队列,等待用户重新授权后处理") except Exception as queue_error: logger.error(f"保存通知到队列失败: {str(queue_error)}") return False # 更新历史ID if credential and history_id: try: # 仅当新的历史ID大于当前值时更新 if not credential.last_history_id or int(history_id) > int(credential.last_history_id): credential.last_history_id = history_id credential.save() logger.info(f"更新历史ID: {history_id}") else: logger.info(f"收到的历史ID ({history_id}) 不大于当前值 ({credential.last_history_id}),不更新") except Exception as update_error: logger.error(f"更新历史ID失败: {str(update_error)}") # 初始化Gmail集成 gmail_integration = GmailIntegration(user, gmail_credential_id=credential.id if credential else None) # 记录详细的处理信息 logger.info(f"Gmail通知处理: 用户={user.email}, Gmail邮箱={email}, 历史ID={history_id}, 凭证ID={credential.id}") # 设置超时 original_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) try: # 认证Gmail服务 if not gmail_integration.authenticate(): logger.error(f"Gmail认证失败: {email}") # 尝试刷新令牌 refresh_result = gmail_integration.refresh_token() if not refresh_result: logger.error("刷新令牌失败,需要用户重新授权") return False logger.info("令牌刷新成功,继续处理通知") # 获取历史变更 message_ids = gmail_integration.get_history(history_id) # 如果没有找到新消息,尝试获取最近的消息 if not message_ids: logger.info("没有找到历史变更,尝试获取最近的邮件") # 查询最近的5封邮件 try: recent_messages = gmail_integration.gmail_service.users().messages().list( userId='me', maxResults=5 ).execute() if 'messages' in recent_messages: message_ids = [msg['id'] for msg in recent_messages['messages']] logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}") else: logger.info("没有找到最近的邮件") except Exception as recent_error: logger.error(f"获取最近邮件失败: {str(recent_error)}") except Exception as e: error_msg = str(e) logger.error(f"处理Gmail通知时发生错误: {error_msg}") # 检查是否是令牌过期 if "invalid_grant" in error_msg.lower() or "401" in error_msg: logger.warning("检测到OAuth令牌问题,尝试刷新令牌") # 尝试刷新令牌 refresh_result = gmail_integration.refresh_token() if not refresh_result: logger.error("刷新令牌失败,需要用户重新授权") return False # 再次尝试获取历史记录 try: message_ids = gmail_integration.get_history(history_id) # 如果仍然没有找到新消息,尝试获取最近的消息 if not message_ids: logger.info("刷新令牌后仍未找到历史变更,尝试获取最近邮件") recent_messages = gmail_integration.gmail_service.users().messages().list( userId='me', maxResults=5 ).execute() if 'messages' in recent_messages: message_ids = [msg['id'] for msg in recent_messages['messages']] logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}") except Exception as retry_error: logger.error(f"刷新令牌后再次尝试失败: {str(retry_error)}") return False else: return False finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) if message_ids: logger.info(f"找到 {len(message_ids)} 个需要处理的消息") # 限制处理的消息数量,防止过多消息导致系统负载过高 max_messages = 10 if len(message_ids) > max_messages: logger.warning(f"消息数量 ({len(message_ids)}) 超过限制 ({max_messages}),将只处理前 {max_messages} 条") message_ids = message_ids[:max_messages] # 处理消息 success_count = 0 for message_id in message_ids: try: # 设置超时 socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) if self._process_new_message(gmail_integration, message_id): success_count += 1 except Exception as msg_error: error_msg = str(msg_error) logger.error(f"处理消息 {message_id} 失败: {error_msg}") # 检查是否是令牌过期 if "invalid_grant" in error_msg.lower() or "401" in error_msg: logger.warning("处理消息时检测到OAuth令牌问题,标记需要重新授权") gmail_integration._mark_credential_needs_reauth() break logger.error(traceback.format_exc()) finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) logger.info(f"成功处理 {success_count}/{len(message_ids)} 个消息") return success_count > 0 # 如果没有找到新消息但仍需确认处理,验证GooglePubSub连接并保持通知持续 try: # 简单验证连接性测试 logger.info(f"未找到需要处理的消息,执行服务连接测试") profile = gmail_integration.gmail_service.users().getProfile(userId='me').execute() if profile and 'emailAddress' in profile: logger.info(f"Gmail服务连接正常: {profile['emailAddress']}") # 检查并更新监听状态 if credential: # 检查监听是否过期 needs_watch_renew = False if credential.watch_expiration: from django.utils import timezone # 如果监听将在3天内过期,提前更新 three_days_later = timezone.now() + timezone.timedelta(days=3) if credential.watch_expiration < three_days_later: needs_watch_renew = True logger.info(f"监听将在3天内过期,需要更新: {credential.watch_expiration}") else: needs_watch_renew = True logger.info("没有监听过期时间记录,需要更新") if needs_watch_renew: try: # 更新监听 watch_result = gmail_integration.setup_watch() logger.info(f"更新监听成功: {watch_result}") return True except Exception as watch_error: logger.error(f"更新监听失败: {str(watch_error)}") return False return True except Exception as verify_error: logger.error(f"Gmail服务连接测试失败: {str(verify_error)}") return False # 如果没有找到新消息,记录日志并返回成功 logger.info("没有找到新消息,处理完成") return True except Exception as e: logger.error(f"处理Gmail通知失败: {str(e)}") logger.error(traceback.format_exc()) return False def _process_new_message(self, gmail_integration, message_id): """处理新收到的邮件""" try: # 导入所需模型 from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile # 添加详细日志 logger.info(f"处理新消息ID: {message_id}") # 获取消息内容 message = gmail_integration.gmail_service.users().messages().get(userId='me', id=message_id).execute() # 从邮件中提取相关信息 email_content = gmail_integration._extract_email_content(message) if not email_content: logger.error(f"无法提取消息 {message_id} 的内容") return False # 记录邮件详情 sender = email_content.get('from', '') recipient = email_content.get('to', '') subject = email_content.get('subject', '') body = email_content.get('body', '') logger.info(f"提取的邮件信息: 发件人={sender}, 收件人={recipient}, 主题={subject}") # 提取达人邮箱 - 可能是发件人或收件人 talent_email = None is_talent_sending = False # 标记是否是达人发送的邮件 # 检查收件人是否是当前用户 user_email = None if self.user and hasattr(self.user, 'email'): user_email = self.user.email # 如果发件人不是用户,则认为发件人是达人 if user_email and sender and user_email.lower() not in sender.lower(): talent_email = sender.lower() is_talent_sending = True logger.info(f"检测到达人(发件人)邮箱: {talent_email}") # 如果收件人不是用户,则认为收件人是达人 elif user_email and recipient and user_email.lower() not in recipient.lower(): talent_email = recipient.lower() is_talent_sending = False logger.info(f"检测到达人(收件人)邮箱: {talent_email}") # 从邮箱中提取纯地址(去除名称部分) if talent_email: if '<' in talent_email and '>' in talent_email: talent_email = talent_email.split('<')[1].split('>')[0] # 转换为小写,提高匹配准确性 talent_email = talent_email.lower() # 如果找不到明确的达人邮箱,尝试从映射中查找 if not talent_email: # 尝试从主题或正文中找线索 # 实现取决于具体需求 pass logger.info(f"最终确定的达人邮箱: {talent_email}") # 如果找到了达人邮箱,进行知识库处理 if talent_email: # 创建或获取知识库 knowledge_base, created = self.create_talent_knowledge_base(talent_email) if not knowledge_base: logger.error(f"无法为达人 {talent_email} 创建知识库") return False logger.info(f"使用知识库: {knowledge_base.name} (ID: {knowledge_base.id}), 新创建: {created}") # 获取映射关系 talent_mapping, mapping_created = GmailTalentMapping.objects.get_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': f"gmail_{talent_email.replace('@', '_').replace('.', '_')}", 'is_active': True } ) # 获取对话ID conversation_id = talent_mapping.conversation_id # 创建聊天记录 - 确定正确的角色 # 修正角色判断逻辑:达人始终是assistant,用户始终是user # 如果是达人发送的邮件,角色应该是assistant;如果是用户发送的邮件,角色应该是user chat_role = 'assistant' if is_talent_sending else 'user' # 添加更详细的角色日志 logger.info(f"设置聊天角色: 角色={chat_role}, 是否达人发送={is_talent_sending}, 发件人={sender}, 达人邮箱={talent_email}") # 创建或更新聊天记录 chat_entry = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, role=chat_role, content=f"{subject}\n\n{body}", parent_id=message_id ) logger.info(f"已创建聊天记录: ID={chat_entry.id}, 角色={chat_role}, 对话ID={conversation_id}") # 查找适合的parent_id:获取对话中最后一条消息 try: # 获取该对话中非自己的最新消息作为正确的父消息 last_message = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).exclude( id=chat_entry.id # 排除刚刚创建的消息 ).order_by('-created_at').first() if last_message: # 更新parent_id为对话中的上一条消息ID chat_entry.parent_id = str(last_message.id) chat_entry.save(update_fields=['parent_id']) logger.info(f"更新消息 {chat_entry.id} 的parent_id为 {last_message.id}") except Exception as parent_error: logger.error(f"更新父消息ID失败: {str(parent_error)}") # 处理附件 if 'attachments' in email_content and email_content['attachments']: # 下载并处理附件 for attachment in email_content['attachments']: try: logger.info(f"处理附件: {attachment.get('filename')}") # 下载附件 filepath = self.download_attachment( message_id=message_id, attachment_id=attachment.get('attachmentId'), filename=attachment.get('filename') ) if filepath: # 创建附件记录 GmailAttachment.objects.create( chat_message=chat_entry, gmail_message_id=message_id, filename=attachment.get('filename'), filepath=filepath, mimetype=attachment.get('mimeType', ''), filesize=attachment.get('size', 0) ) logger.info(f"已保存附件: {filepath}") except Exception as attachment_error: logger.error(f"处理附件 {attachment.get('filename')} 失败: {str(attachment_error)}") # 检查是否需要自动回复 try: # 只有达人发送的邮件才考虑自动回复 if is_talent_sending and chat_role == 'user': # 检查用户是否启用了自动回复 profile = UserProfile.objects.filter(user=self.user).first() if profile and profile.auto_recommend_reply: logger.info(f"用户 {self.user.email} 已启用自动回复,生成回复建议") # 获取该对话的历史记录 conversation_history = ChatHistory.objects.filter( conversation_id=conversation_id ).order_by('created_at') # 生成回复 recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history) if recommended_reply: logger.info(f"生成了推荐回复,长度: {len(recommended_reply)}") # 保存推荐回复 recommended_reply_entry = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, role='user', content=recommended_reply, parent_id=chat_entry.id ) logger.info(f"已保存推荐回复到历史记录, 角色=user, ID={recommended_reply_entry.id}, 父消息ID={chat_entry.id}") except Exception as auto_reply_error: logger.error(f"生成自动回复失败: {str(auto_reply_error)}") # 记录详细的处理结果日志 logger.info(f"成功处理达人邮件: ID={message_id}, 发件人={sender}, 收件人={recipient}, 主题={subject}") logger.info(f"保存消息信息: 角色={chat_role}, 对话ID={conversation_id}, 是否达人发送={is_talent_sending}") return True else: logger.warning(f"无法确定达人邮箱,跳过知识库处理") return False except Exception as e: logger.error(f"处理新消息 {message_id} 失败: {str(e)}") logger.error(traceback.format_exc()) return False def _get_recommended_reply_from_deepseek(self, conversation_history): """调用DeepSeek API生成回复建议""" try: # 使用有效的API密钥 api_key = "" # 尝试从环境变量获取 import os from dotenv import load_dotenv load_dotenv() env_api_key = os.environ.get('DEEPSEEK_API_KEY') if env_api_key: api_key = env_api_key # 从Django设置中获取密钥 from django.conf import settings if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY: api_key = settings.DEEPSEEK_API_KEY # 如果仍然没有有效的API密钥,使用默认值 if not api_key: api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" logger.warning("使用默认API密钥,请在环境变量或settings.py中设置DEEPSEEK_API_KEY") url = "https://api.siliconflow.cn/v1/chat/completions" # 获取用户总目标 from .models import UserGoal, ConversationSummary user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first() goal_content = user_goal.content if user_goal else None # 尝试获取对话总结 talent_email = None conversation_id = None # 从对话历史中尝试提取达人邮箱 for message in conversation_history: if message.get('role') == 'user' and 'metadata' in message and 'from_email' in message['metadata']: talent_email = message['metadata']['from_email'] break # 从对话历史中尝试提取对话ID for message in conversation_history: if 'metadata' in message and 'conversation_id' in message['metadata']: conversation_id = message['metadata']['conversation_id'] break # 获取对话总结 conversation_summary = None if talent_email and conversation_id: summary_obj = ConversationSummary.objects.filter( user=self.user, talent_email=talent_email, conversation_id=conversation_id, is_active=True ).first() if summary_obj: conversation_summary = summary_obj.summary # 构建系统提示,结合用户总目标和对话总结 system_content = "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。" if goal_content: system_content += f"\n\n【用户总目标】\n{goal_content}\n\n" if conversation_summary: system_content += f"【对话总结】\n{conversation_summary}\n\n" system_content += "请针对达人最后一条消息,结合用户总目标和对话总结生成一个有针对性的专业回复。回复必须对达人当前问题直接响应,同时与用户的总体合作目标保持一致。回复应该有至少100个字符,必须提供有实质内容的回复。" system_message = { "role": "system", "content": system_content } messages = [system_message] # 限制对话历史长度,只保留最近的5条消息,避免超出token限制 recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history messages.extend(recent_messages) # 确保最后一条消息是用户消息,如果不是,添加一个提示 if not recent_messages or recent_messages[-1]['role'] != 'user': # 添加一个系统消息作为用户的最后一条消息 messages.append({ "role": "user", "content": "请针对达人最后一条消息生成专业的回复,考虑我们之前的对话历史和我设定的总目标。" }) # 完全按照文档提供的参数格式构建请求 payload = { "model": "deepseek-ai/DeepSeek-V3", "messages": messages, "stream": False, "max_tokens": 1024, # 增加token上限 "temperature": 0.7, # 提高多样性 "top_p": 0.9, "top_k": 50, "frequency_penalty": 0.5, "presence_penalty": 0.2, # 添加新参数 "n": 1, "stop": [], "response_format": { "type": "text" } } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } logger.info(f"开始调用DeepSeek API生成推荐回复") response = requests.post(url, json=payload, headers=headers) if response.status_code != 200: logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}") return None result = response.json() logger.debug(f"DeepSeek API返回: {result}") # 提取回复内容 if 'choices' in result and len(result['choices']) > 0: reply = result['choices'][0]['message']['content'] # 如果返回的内容为空,直接返回None if not reply or reply.strip() == '': logger.warning("DeepSeek API返回的回复内容为空") return None return reply logger.warning(f"DeepSeek API返回格式异常: {result}") return None except Exception as e: logger.error(f"调用DeepSeek API失败: {str(e)}") logger.error(traceback.format_exc()) return None def get_attachment_by_conversation(self, conversation_id): """获取特定对话的所有附件""" try: # 查找对话记录 records = ChatHistory.objects.filter( conversation_id=conversation_id, user=self.user, is_deleted=False ) if not records: logger.warning(f"找不到对话记录: {conversation_id}") return [] # 使用GmailAttachment数据模型查询附件 attachments = [] records_ids = [record.id for record in records] gmail_attachments = GmailAttachment.objects.filter( chat_message_id__in=records_ids ).select_related('chat_message') for attachment in gmail_attachments: chat_message = attachment.chat_message attachments.append({ 'id': str(attachment.id), 'filename': attachment.filename, 'filepath': attachment.filepath, 'mimetype': attachment.mimetype, 'filesize': attachment.filesize, 'message_id': str(chat_message.id), 'gmail_message_id': attachment.gmail_message_id, 'role': chat_message.role, 'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content }) return attachments except Exception as e: logger.error(f"获取对话附件失败: {str(e)}") return [] def handle_auth_code(self, auth_code): """处理OAuth2回调授权码""" try: flow = self.get_oauth_flow() flow.fetch_token(code=auth_code) self.credentials = flow.credentials # 初始化Gmail服务 self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http())) # 获取Gmail账号 gmail_email = None try: # 调用API获取用户资料 profile = self.gmail_service.users().getProfile(userId='me').execute() gmail_email = profile.get('emailAddress') logger.info(f"获取到Gmail账号: {gmail_email}") except Exception as e: logger.error(f"获取Gmail账号失败: {str(e)}") # 保存凭证到数据库 if self.user: from django.utils import timezone # 将凭证对象序列化 credentials_data = pickle.dumps(self.credentials) # 如果提供了具体的gmail_credential_id,更新对应的记录 if self.gmail_credential_id: try: gmail_credential = GmailCredential.objects.get( id=self.gmail_credential_id, user=self.user ) # 更新凭证信息 gmail_credential.credentials = credentials_data gmail_credential.gmail_email = gmail_email gmail_credential.token_path = self.token_storage_path gmail_credential.updated_at = timezone.now() gmail_credential.is_active = True gmail_credential.save() self.gmail_credential = gmail_credential logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证,Gmail账号: {gmail_email}") except GmailCredential.DoesNotExist: logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证,将创建新凭证") self.gmail_credential_id = None # 如果没有具体的gmail_credential_id,或者指定的不存在,创建或更新默认凭证 if not self.gmail_credential_id: # 检查是否已存在相同gmail_email的凭证 existing_credential = None if gmail_email: existing_credential = GmailCredential.objects.filter( user=self.user, gmail_email=gmail_email, is_active=True ).first() if existing_credential: # 更新现有凭证 existing_credential.credentials = credentials_data existing_credential.token_path = self.token_storage_path existing_credential.updated_at = timezone.now() existing_credential.save() self.gmail_credential = existing_credential logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证") else: # 获取Gmail账户数量 gmail_count = GmailCredential.objects.filter(user=self.user).count() # 创建新凭证 name = f"Gmail账号 {gmail_count + 1}" if gmail_email: name = gmail_email else: # 确保gmail_email有默认值,避免null错误 gmail_email = "未知邮箱" # 将凭证转换为JSON字符串 if isinstance(credentials_data, dict): # 确保json模块在本地作用域可访问 import json credentials_data = json.dumps(credentials_data) gmail_credential = GmailCredential.objects.create( user=self.user, credentials=credentials_data, token_path=self.token_storage_path, gmail_email=gmail_email, name=name, is_default=(gmail_count == 0), # 第一个账号设为默认 updated_at=timezone.now(), is_active=True ) self.gmail_credential = gmail_credential logger.info(f"已创建新的Gmail凭证,Gmail账号: {gmail_email}, 名称: {name}") # 更新单例 GmailServiceManager.update_instance( self.user, self.credentials, self.gmail_service, self.gmail_credential ) return { 'status': 'success', 'gmail_email': gmail_email } except Exception as e: logger.error(f"处理授权码失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return { 'status': 'error', 'message': str(e) } def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations): """上传所有邮件附件到知识库""" try: # 收集所有需要上传的附件 attachments_to_upload = [] # 从conversations中提取所有附件信息 for message in conversations: if 'attachments' in message and message['attachments']: message_id = message.get('id', '') sender = message.get('from', '未知发件人') subject = message.get('subject', '无主题') date_str = message.get('date', '未知时间') for attachment in message['attachments']: if 'attachmentId' in attachment and 'filename' in attachment: # 下载附件(如果尚未下载) filepath = None # 检查数据库中是否已有记录 existing_attachment = GmailAttachment.objects.filter( gmail_message_id=message_id, filename=attachment['filename'] ).first() if existing_attachment and os.path.exists(existing_attachment.filepath): filepath = existing_attachment.filepath else: # 下载附件 filepath = self.download_attachment( message_id=message_id, attachment_id=attachment['attachmentId'], filename=attachment['filename'] ) if filepath: attachments_to_upload.append({ 'filepath': filepath, 'filename': attachment['filename'], 'message_id': message_id, 'sender': sender, 'subject': subject, 'date': date_str }) # 上传收集到的所有附件 if attachments_to_upload: logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库") from .views import KnowledgeBaseViewSet import django.core.files.uploadedfile as uploadedfile # 导入FileUploadParser from rest_framework.parsers import FileUploadParser # 创建视图集实例 kb_viewset = KnowledgeBaseViewSet() # 批量上传附件 for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件 batch = attachments_to_upload[i:i+10] files = [] for att in batch: filepath = att['filepath'] filename = att['filename'] # 确认文件存在 if not os.path.exists(filepath): logger.warning(f"附件文件不存在: {filepath}") continue # 读取文件并创建UploadedFile对象 with open(filepath, 'rb') as f: file_content = f.read() files.append(uploadedfile.SimpleUploadedFile( name=filename, content=file_content, content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' )) # 如果这批有文件,调用_call_split_api_multiple上传 if files: # 直接调用文档分割API split_response = kb_viewset._call_split_api_multiple(files) if not split_response or split_response.get('code') != 200: logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") continue # 处理分割后的文档 documents_data = split_response.get('data', []) # 对每个文档调用上传API for doc in documents_data: doc_name = doc.get('name', 'Gmail附件') doc_content = doc.get('content', []) # 准备文档数据 upload_doc_data = { "name": doc_name, "paragraphs": [] } # 将所有段落添加到文档中 for paragraph in doc_content: upload_doc_data["paragraphs"].append({ "content": paragraph.get('content', ''), "title": paragraph.get('title', ''), "is_active": True, "problem_list": [] }) # 调用文档上传API upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_name, external_id=document_id ) logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件") except Exception as e: logger.error(f"上传附件到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records): """上传单个邮件的附件到知识库""" try: # 检查是否有附件需要上传 if not attachment_records: return logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库") from .views import KnowledgeBaseViewSet import django.core.files.uploadedfile as uploadedfile # 创建视图集实例 kb_viewset = KnowledgeBaseViewSet() # 准备文件列表 files = [] for att in attachment_records: filepath = att.get('filepath') filename = att.get('filename') # 确认文件存在 if not os.path.exists(filepath): logger.warning(f"附件文件不存在: {filepath}") continue # 读取文件并创建UploadedFile对象 with open(filepath, 'rb') as f: file_content = f.read() files.append(uploadedfile.SimpleUploadedFile( name=filename, content=file_content, content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' )) # 如果有文件,调用_call_split_api_multiple上传 if files: # 直接调用文档分割API split_response = kb_viewset._call_split_api_multiple(files) if not split_response or split_response.get('code') != 200: logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") return # 处理分割后的文档 documents_data = split_response.get('data', []) # 对每个文档调用上传API for doc in documents_data: doc_name = doc.get('name', 'Gmail附件') doc_content = doc.get('content', []) # 准备文档数据 upload_doc_data = { "name": doc_name, "paragraphs": [] } # 将所有段落添加到文档中 for paragraph in doc_content: upload_doc_data["paragraphs"].append({ "content": paragraph.get('content', ''), "title": paragraph.get('title', ''), "is_active": True, "problem_list": [] }) # 调用文档上传API upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_name, external_id=document_id ) logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"完成附件上传,共 {len(files)} 个文件") except Exception as e: logger.error(f"上传附件到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) def get_recent_emails(self, from_email=None, max_results=10): """获取最近的邮件""" try: service = self.gmail_service if not service: logger.error("Gmail服务未初始化") return [] query = "" if from_email: query = f"from:{from_email}" # 获取收件箱中的邮件列表 results = service.users().messages().list( userId='me', maxResults=max_results, q=query ).execute() messages = results.get('messages', []) emails = [] for message in messages: msg = service.users().messages().get(userId='me', id=message['id']).execute() email_data = self._extract_email_content(msg) emails.append(email_data) return emails except Exception as e: logger.error(f"获取最近邮件失败: {str(e)}") logger.error(traceback.format_exc()) return [] def manage_user_goal(self, goal_content=None): """ 创建或更新用户总目标 Args: goal_content (str): 用户总目标内容,如果为None则返回当前总目标 Returns: dict: 包含操作结果和总目标信息 """ from .models import UserGoal try: # 如果未提供内容,则返回当前总目标 if goal_content is None: user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first() if user_goal: return { 'status': 'success', 'action': 'retrieve', 'goal': { 'id': str(user_goal.id), 'content': user_goal.content, 'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S') } } else: return { 'status': 'success', 'action': 'retrieve', 'goal': None } # 查找当前活跃的用户总目标 user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first() # 如果已存在,则更新 if user_goal: user_goal.content = goal_content user_goal.save() action = 'update' else: # 否则创建新的总目标 user_goal = UserGoal.objects.create( user=self.user, content=goal_content ) action = 'create' return { 'status': 'success', 'action': action, 'goal': { 'id': str(user_goal.id), 'content': user_goal.content, 'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S') } } except Exception as e: logger.error(f"管理用户总目标失败: {str(e)}") logger.error(traceback.format_exc()) return { 'status': 'error', 'message': f"管理用户总目标失败: {str(e)}" } def generate_conversation_summary(self, talent_email): """ 为用户与特定达人的所有对话生成总结 Args: talent_email (str): 达人的邮箱地址 Returns: dict: 包含操作结果和总结信息 """ from .models import ConversationSummary try: # 获取与该达人的所有对话 conversations = self.get_conversations(talent_email) if not conversations: return { 'status': 'error', 'message': f"未找到与{talent_email}的对话记录" } # 准备对话历史记录 conversation_history = [] for conversation in conversations: if 'subject' in conversation and conversation['subject']: conversation_history.append({ 'role': 'system', 'content': f"邮件主题: {conversation['subject']}" }) for message in conversation.get('messages', []): role = 'user' if message.get('from_email') == talent_email else 'assistant' content = message.get('body', '') if content: conversation_history.append({ 'role': role, 'content': content }) # 调用DeepSeek API生成总结 summary = self._generate_summary_from_deepseek(conversation_history) if not summary: return { 'status': 'error', 'message': "生成对话总结失败" } # 保存或更新总结 conversation_id = conversations[0].get('id') if conversations else None if not conversation_id: return { 'status': 'error', 'message': "无法确定对话ID" } # 查找现有总结 conversation_summary = ConversationSummary.objects.filter( user=self.user, talent_email=talent_email, conversation_id=conversation_id, is_active=True ).first() # 如果已存在,则更新 if conversation_summary: conversation_summary.summary = summary conversation_summary.save() action = 'update' else: # 否则创建新的总结 conversation_summary = ConversationSummary.objects.create( user=self.user, talent_email=talent_email, conversation_id=conversation_id, summary=summary ) action = 'create' return { 'status': 'success', 'action': action, 'summary': { 'id': str(conversation_summary.id), 'talent_email': talent_email, 'conversation_id': conversation_id, 'summary': summary, 'created_at': conversation_summary.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': conversation_summary.updated_at.strftime('%Y-%m-%d %H:%M:%S') } } except Exception as e: logger.error(f"生成对话总结失败: {str(e)}") logger.error(traceback.format_exc()) return { 'status': 'error', 'message': f"生成对话总结失败: {str(e)}" } def _generate_summary_from_deepseek(self, conversation_history): """调用DeepSeek API生成对话总结""" try: # 使用有效的API密钥 api_key = "" # 尝试从环境变量获取 import os from dotenv import load_dotenv load_dotenv() env_api_key = os.environ.get('DEEPSEEK_API_KEY') if env_api_key: api_key = env_api_key # 从Django设置中获取密钥 from django.conf import settings if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY: api_key = settings.DEEPSEEK_API_KEY # 如果仍然没有有效的API密钥,使用默认值 if not api_key: api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" logger.warning("使用默认API密钥,请在环境变量或settings.py中设置DEEPSEEK_API_KEY") url = "https://api.siliconflow.cn/v1/chat/completions" # 系统消息指定生成总结的任务 system_message = { "role": "system", "content": "你是一位专业的电商客服和达人助手。你的任务是分析用户与达人之间的所有对话历史,并生成一份简明扼要的总结。总结应包括:1. 主要讨论的产品或服务;2. 达人的主要关注点和需求;3. 已经达成的共识或协议;4. 未解决的问题或后续需要跟进的事项。总结应该客观、全面、结构清晰。" } messages = [system_message] # 添加对话历史,但限制消息数量避免超出token限制 # 如果对话历史太长,可能需要进一步处理或分割 if len(conversation_history) > 20: # 选取关键消息:第一条、最后几条以及中间的一些重要消息 selected_messages = ( conversation_history[:2] + # 前两条 conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条 conversation_history[-6:] # 最后六条 ) messages.extend(selected_messages) else: messages.extend(conversation_history) # 添加用户指令 messages.append({ "role": "user", "content": "请根据以上对话历史生成一份全面的总结。" }) # 构建API请求 payload = { "model": "deepseek-ai/DeepSeek-V3", "messages": messages, "stream": False, "max_tokens": 1500, # 增加token上限以容纳完整总结 "temperature": 0.3, # 降低随机性,使总结更加确定性 "top_p": 0.9, "top_k": 50, "frequency_penalty": 0.5, "presence_penalty": 0.2, "n": 1, "stop": [], "response_format": { "type": "text" } } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } logger.info(f"开始调用DeepSeek API生成对话总结") response = requests.post(url, json=payload, headers=headers) if response.status_code != 200: logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}") return None result = response.json() logger.debug(f"DeepSeek API返回: {result}") # 提取回复内容 if 'choices' in result and len(result['choices']) > 0: summary = result['choices'][0]['message']['content'] # 如果返回的内容为空,直接返回None if not summary or summary.strip() == '': logger.warning("DeepSeek API返回的总结内容为空") return None return summary logger.warning(f"DeepSeek API返回格式异常: {result}") return None except Exception as e: logger.error(f"调用DeepSeek API生成总结失败: {str(e)}") logger.error(traceback.format_exc()) return None def get_oauth_flow(self): """创建OAuth2流程处理器""" try: # 确保client_secret_json已提供 if not self.client_secret: logger.error("未提供client_secret_json,无法创建OAuth流程") raise ValueError("未提供client_secret_json") # 创建临时文件存储client_secret client_secret_path = 'client_secret.json' with open(client_secret_path, 'w') as f: if isinstance(self.client_secret, str): try: # 确保是有效的JSON import json # 在本地作用域引入 json_data = json.loads(self.client_secret) # 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题 for key in ['web', 'installed']: if key in json_data and 'redirect_uris' in json_data[key]: json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob'] logger.info("已强制设置redirect_uri为非浏览器模式") json.dump(json_data, f) except json.JSONDecodeError as e: logger.error(f"client_secret不是有效的JSON: {str(e)}") raise ValueError(f"client_secret不是有效的JSON: {str(e)}") else: # 如果是字典,也进行相同处理 import json # 在本地作用域引入 client_secret_dict = dict(self.client_secret) for key in ['web', 'installed']: if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]: client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob'] logger.info("已强制设置redirect_uri为非浏览器模式") json.dump(client_secret_dict, f) # 从client_secret创建flow flow = client.flow_from_clientsecrets( client_secret_path, self.SCOPES, redirect_uri='urn:ietf:wg:oauth:2.0:oob' ) return flow except Exception as e: logger.error(f"创建OAuth流程失败: {str(e)}") logger.error(traceback.format_exc()) raise e finally: # 删除临时文件 if os.path.exists(client_secret_path): try: os.unlink(client_secret_path) except Exception as del_e: logger.error(f"删除临时文件失败: {str(del_e)}") def check_and_renew_watch(self): """检查并更新Gmail监听状态 如果监听即将过期或已经过期,则自动更新监听 """ try: from .models import GmailCredential # 获取用户的Gmail凭证 credential = GmailCredential.objects.filter( user=self.user, is_active=True, gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None ).first() if not credential: logger.error(f"找不到用户 {self.user.email} 的Gmail凭证") return False # 检查监听是否需要更新 needs_renewal = False # 如果没有watch_expiration,需要设置监听 if not credential.watch_expiration: logger.info(f"Gmail凭证 {credential.gmail_email} 没有监听过期时间,需要设置监听") needs_renewal = True else: # 如果监听将在24小时内过期,更新监听 now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now() time_until_expiration = credential.watch_expiration - now hours_until_expiration = time_until_expiration.total_seconds() / 3600 if hours_until_expiration < 24: logger.info(f"Gmail监听将在 {hours_until_expiration:.2f} 小时后过期,需要更新") needs_renewal = True if needs_renewal: logger.info(f"为Gmail凭证 {credential.gmail_email} 更新监听") watch_result = self.setup_watch() if watch_result and 'historyId' in watch_result: logger.info(f"Gmail监听更新成功: historyId={watch_result['historyId']}") return True else: logger.error("Gmail监听更新失败") return False else: logger.info(f"Gmail监听状态良好,无需更新。过期时间: {credential.watch_expiration}") return True except Exception as e: logger.error(f"检查和更新Gmail监听失败: {str(e)}") logger.error(traceback.format_exc()) return False def verify_connectivity(self): """验证与Gmail API的连接性 测试Gmail API连接,并返回连接状态 Returns: dict: 包含连接测试结果的信息 """ try: if not hasattr(self, 'gmail_service') or not self.gmail_service: logger.warning("Gmail服务未初始化,尝试认证") if not self.authenticate(): return { 'status': 'error', 'message': 'Gmail认证失败', 'is_connected': False } # 尝试获取用户的个人资料,这是一个轻量级的API调用 profile = self.gmail_service.users().getProfile(userId='me').execute() # 检查是否有必要的监听信息 from .models import GmailCredential credential = GmailCredential.objects.filter( user=self.user, is_active=True, gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None ).first() watch_info = {} if credential: watch_info = { 'has_watch': credential.watch_expiration is not None, 'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None, 'last_history_id': credential.last_history_id } return { 'status': 'success', 'message': 'Gmail连接正常', 'is_connected': True, 'profile': { 'email': profile.get('emailAddress'), 'messages_total': profile.get('messagesTotal'), 'threads_total': profile.get('threadsTotal') }, 'watch_info': watch_info } except Exception as e: logger.error(f"Gmail连接测试失败: {str(e)}") logger.error(traceback.format_exc()) error_message = str(e) suggestions = [] if "invalid_grant" in error_message.lower(): suggestions.append("OAuth令牌已过期,需要重新授权") elif "401" in error_message: suggestions.append("认证失败,请重新登录Gmail账号") elif "EOF occurred in violation of protocol" in error_message: suggestions.append("SSL连接问题,可能是网络或代理配置问题") elif "connect timeout" in error_message.lower() or "connection timeout" in error_message.lower(): suggestions.append("连接超时,请检查网络连接和代理设置") return { 'status': 'error', 'message': f'Gmail连接失败: {error_message}', 'is_connected': False, 'suggestions': suggestions } @classmethod def batch_renew_watches(cls): """ 批量检查和更新所有用户的Gmail监听状态 此方法可以由定时任务调用,确保所有用户的Gmail监听都保持活跃状态 Returns: dict: 包含批处理结果的信息 """ try: from .models import GmailCredential, User import time # 获取所有活跃的Gmail凭证 now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now() # 计算24小时后的时间 expiration_threshold = now + timedelta(hours=24) # 获取即将过期或没有监听的凭证 credentials_to_update = GmailCredential.objects.filter( is_active=True ).filter( Q(watch_expiration__lt=expiration_threshold) | Q(watch_expiration__isnull=True) ) logger.info(f"找到 {credentials_to_update.count()} 个需要更新监听的Gmail凭证") success_count = 0 failure_count = 0 # 依次处理每个凭证 for credential in credentials_to_update: try: user = credential.user if not user or not user.is_active: logger.warning(f"跳过已停用用户的凭证: {credential.id}") continue logger.info(f"处理用户 {user.email} 的Gmail凭证 {credential.gmail_email}") # 创建Gmail集成实例 gmail = cls(user, gmail_credential_id=credential.id) # 认证 if not gmail.authenticate(): logger.error(f"用户 {user.email} 的Gmail认证失败") failure_count += 1 continue # 更新监听 result = gmail.setup_watch() if result and 'historyId' in result: logger.info(f"成功更新用户 {user.email} 的Gmail监听") success_count += 1 else: logger.error(f"更新用户 {user.email} 的Gmail监听失败") failure_count += 1 # 休眠一小段时间,避免请求过于频繁 time.sleep(1) except Exception as e: failure_count += 1 logger.error(f"处理凭证 {credential.id} 时出错: {str(e)}") logger.error(traceback.format_exc()) return { 'status': 'success', 'message': f'批量更新Gmail监听完成', 'total_processed': credentials_to_update.count(), 'success_count': success_count, 'failure_count': failure_count } except Exception as e: logger.error(f"批量更新Gmail监听失败: {str(e)}") logger.error(traceback.format_exc()) return { 'status': 'error', 'message': f'批量更新Gmail监听失败: {str(e)}' } def refresh_token(self): """ 尝试刷新OAuth令牌,如果失败则标记凭证需要重新授权 Returns: bool: 刷新是否成功 """ try: logger.info(f"尝试刷新用户 {self.user.email} 的OAuth令牌") # 清除当前服务实例 GmailServiceManager.clear_instance(self.user, self.gmail_credential_id) # 尝试重新认证 if not self.authenticate(): logger.error("重新认证失败") self._mark_credential_needs_reauth() return False # 验证新的令牌是否有效 try: # 设置超时 original_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT) try: # 尝试一个简单的API调用来验证 profile = self.gmail_service.users().getProfile(userId='me').execute() logger.info(f"令牌刷新成功,已验证: {profile.get('emailAddress')}") return True except Exception as e: error_msg = str(e) if "invalid_grant" in error_msg.lower() or "401" in error_msg: logger.error(f"令牌仍然无效: {error_msg}") self._mark_credential_needs_reauth() return False else: # 其他错误,但令牌可能是有效的 logger.warning(f"令牌可能有效,但API调用失败: {error_msg}") return True finally: # 恢复原始超时设置 socket.setdefaulttimeout(original_timeout) except Exception as e: logger.error(f"验证新令牌时出错: {str(e)}") return False except Exception as e: logger.error(f"刷新令牌失败: {str(e)}") logger.error(traceback.format_exc()) self._mark_credential_needs_reauth() return False def _mark_credential_needs_reauth(self): """标记凭证需要重新授权""" try: from .models import GmailCredential # 查找当前凭证 credential = GmailCredential.objects.filter( user=self.user, id=self.gmail_credential_id if self.gmail_credential_id else None, is_active=True ).first() if credential: # 标记需要重新授权 credential.needs_reauth = True credential.save() logger.info(f"已标记Gmail凭证 {credential.id} 需要重新授权") # 从缓存中移除服务实例 GmailServiceManager.clear_instance(self.user, self.gmail_credential_id) except Exception as e: logger.error(f"标记凭证需要重新授权失败: {str(e)}") @classmethod def process_queued_notifications(cls, user=None): """ 处理队列中的通知,可以在用户重新授权后调用 Args: user: 可选的用户参数,如果提供,只处理该用户的通知 Returns: dict: 处理结果统计 """ try: from .models import GmailNotificationQueue, GmailCredential import json # 构建查询 query = Q(processed=False) if user: query &= Q(user=user) # 获取未处理的通知 queued_notifications = GmailNotificationQueue.objects.filter(query) logger.info(f"找到 {queued_notifications.count()} 条未处理的Gmail通知") processed_count = 0 success_count = 0 # 按用户ID和Gmail凭证ID分组处理 from django.db.models import Count user_creds = queued_notifications.values('user_id', 'gmail_credential_id').annotate( count=Count('id') ).order_by('user_id', 'gmail_credential_id') for user_cred in user_creds: user_id = user_cred['user_id'] cred_id = user_cred['gmail_credential_id'] # 获取凭证信息 credential = GmailCredential.objects.filter(id=cred_id, is_active=True).first() if not credential or credential.needs_reauth: logger.warning(f"凭证 {cred_id} 需要重新授权,跳过相关通知") continue # 获取用户对象 from .models import User user_obj = User.objects.get(id=user_id) # 初始化Gmail集成 gmail_integration = cls(user_obj, gmail_credential_id=cred_id) # 获取用户的队列通知 user_notifications = queued_notifications.filter( user_id=user_id, gmail_credential_id=cred_id ).order_by('created_at') # 按创建时间排序 for notification in user_notifications: try: # 解析通知数据 try: notification_data = json.loads(notification.notification_data) except: notification_data = { 'emailAddress': notification.email, 'historyId': notification.history_id } # 处理通知 result = gmail_integration.process_notification(notification_data) # 更新处理状态 notification.processed = True notification.success = result notification.processed_at = timezone.now() notification.save() processed_count += 1 if result: success_count += 1 except Exception as e: logger.error(f"处理队列通知 {notification.id} 失败: {str(e)}") # 标记处理失败 notification.processed = True notification.success = False notification.error_message = str(e)[:255] # 截断错误消息 notification.processed_at = timezone.now() notification.save() processed_count += 1 return { 'status': 'success', 'total_processed': processed_count, 'success_count': success_count, 'remaining': queued_notifications.count() - processed_count } except Exception as e: logger.error(f"处理队列通知失败: {str(e)}") logger.error(traceback.format_exc()) return { 'status': 'error', 'message': str(e) } def _load_credentials_from_storage(self, credential_data): """从存储中加载凭证,处理可能的格式不匹配问题""" logger.info("尝试加载凭证数据") if not credential_data: logger.error("凭证数据为空") return None # 方法1: 尝试直接JSON解析 try: import json if isinstance(credential_data, str): cred_json = credential_data else: cred_json = credential_data.decode('utf-8') # 验证是否为有效JSON json.loads(cred_json) # 使用OAuth2Credentials从JSON创建凭证 logger.info("成功从JSON创建凭证") return client.OAuth2Credentials.from_json(cred_json) except Exception as json_error: logger.error(f"JSON解析凭证失败: {str(json_error)}") # 方法2: 尝试pickle解析 try: if isinstance(credential_data, str): # 如果是字符串,尝试编码为二进制 pickle_data = credential_data.encode('latin1') else: pickle_data = credential_data logger.info("尝试使用pickle解析凭证") return pickle.loads(pickle_data) except Exception as pickle_error: logger.error(f"Pickle解析凭证失败: {str(pickle_error)}") # 所有方法都失败 logger.error("所有凭证解析方法都失败") return None