import os import json import uuid import base64 import pickle import logging import traceback # 确保导入traceback模块 from pathlib import Path import dateutil.parser as parser from datetime import datetime from bs4 import BeautifulSoup from django.utils import timezone from django.conf import settings # 添加Django设置导入 from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import warnings import mimetypes # 忽略oauth2client相关的所有警告 warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth') warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client') warnings.filterwarnings('ignore', message='.*google-auth.*') warnings.filterwarnings('ignore', message='.*oauth2client.*') from apiclient import discovery from httplib2 import Http from oauth2client import file, client, tools from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment logger = logging.getLogger(__name__) # Gmail服务单例管理器 class GmailServiceManager: _instances = {} # 以用户ID为键存储Gmail服务实例 @classmethod def get_instance(cls, user): """获取用户的Gmail服务实例,如果不存在则创建""" user_id = str(user.id) if user_id not in cls._instances: try: # 从数据库获取认证信息 credential = GmailCredential.objects.filter(user=user, is_active=True).first() if credential and credential.credentials: # 反序列化凭证 creds = pickle.loads(credential.credentials) # 检查凭证是否有效 if not creds.invalid: # 初始化服务 gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) # 存储实例 cls._instances[user_id] = { 'service': gmail_service, 'credentials': creds, 'timestamp': timezone.now(), 'user': user } logger.info(f"创建用户 {user.username} 的Gmail服务单例") return cls._instances[user_id] except Exception as e: logger.error(f"创建Gmail服务单例失败: {e}") else: # 检查实例是否过期(超过30分钟) instance = cls._instances[user_id] time_diff = timezone.now() - instance['timestamp'] if time_diff.total_seconds() > 1800: # 30分钟过期 del cls._instances[user_id] return cls.get_instance(user) # 递归调用,重新创建 # 更新时间戳 cls._instances[user_id]['timestamp'] = timezone.now() logger.info(f"复用用户 {user.username} 的Gmail服务单例") return cls._instances[user_id] return None @classmethod def update_instance(cls, user, credentials, service): """更新用户的Gmail服务实例""" user_id = str(user.id) cls._instances[user_id] = { 'service': service, 'credentials': credentials, 'timestamp': timezone.now(), 'user': user } @classmethod def clear_instance(cls, user): """清除用户的Gmail服务实例""" user_id = str(user.id) if user_id in cls._instances: del cls._instances[user_id] class GmailIntegration: """Gmail集成类""" # Gmail API 权限范围 SCOPES = ['https://mail.google.com/'] def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890'): self.user = user self.user_email = user.email if user else None self.email = email # 目标邮箱 self.client_secret = client_secret_json self.credentials = None self.gmail_service = None self.use_proxy = use_proxy self.proxy_url = proxy_url # 设置代理 if self.use_proxy: logger.info(f"设置Gmail API代理: {proxy_url}") os.environ['HTTP_PROXY'] = proxy_url os.environ['HTTPS_PROXY'] = proxy_url # 设置Token文件存储路径 if user: # 使用用户ID创建唯一的token存储路径 token_file = f"gmail_token_{user.id}.json" self.token_storage_path = os.path.join("gmail_tokens", token_file) # 确保目录存在 os.makedirs("gmail_tokens", exist_ok=True) logger.info(f"设置Token存储路径: {self.token_storage_path}") # 尝试从数据库加载凭证 try: gmail_cred = GmailCredential.objects.filter(user=user, is_active=True).first() if gmail_cred and gmail_cred.credentials: logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证") self.credentials = pickle.loads(gmail_cred.credentials) # 初始化Gmail服务 self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http())) logger.info("从数据库凭证初始化Gmail服务成功") except Exception as e: logger.error(f"从数据库加载凭证失败: {str(e)}") # 继续使用文件方式 else: self.token_storage_path = "gmail_token.json" logger.warning("未提供用户,将使用默认Token存储路径") # 存储对象 self.storage = file.Storage(self.token_storage_path) def authenticate(self): """获取Gmail API凭证并认证""" try: # 优先尝试使用单例服务 if self.user: instance = GmailServiceManager.get_instance(self.user) if instance: self.gmail_service = instance['service'] self.credentials = instance['credentials'] logger.info("使用现有的Gmail服务单例") return True # 以下是原有的认证逻辑... # 写入client_secret.json if self.client_secret: client_secret_path = 'client_secret.json' with open(client_secret_path, 'w') as f: if isinstance(self.client_secret, str): try: # 确保是有效的JSON json_data = json.loads(self.client_secret) json.dump(json_data, f) except json.JSONDecodeError as e: logger.error(f"client_secret不是有效的JSON: {str(e)}") return False else: json.dump(self.client_secret, f) logger.info(f"已将client_secret写入临时文件: {client_secret_path}") # 使用与quickstart.py相同的流程 logger.info(f"创建或读取token存储: {self.token_storage_path}") # 确保token目录存在 token_dir = os.path.dirname(self.token_storage_path) if token_dir and not os.path.exists(token_dir): logger.info(f"创建token目录: {token_dir}") os.makedirs(token_dir) store = file.Storage(self.token_storage_path) creds = store.get() if not creds or creds.invalid: logger.info("没有有效的凭证,需要重新授权") if not self.client_secret: logger.error("没有提供client_secret_json且找不到有效凭证") return False # 提取重定向URI redirect_uri = None if isinstance(self.client_secret, dict): for key in ['web', 'installed']: if key in self.client_secret and 'redirect_uris' in self.client_secret[key]: redirect_uri = self.client_secret[key]['redirect_uris'][0] break elif isinstance(self.client_secret, str): try: json_data = json.loads(self.client_secret) for key in ['web', 'installed']: if key in json_data and 'redirect_uris' in json_data[key]: redirect_uri = json_data[key]['redirect_uris'][0] break except: pass # 如果找不到重定向URI,使用默认值 if not redirect_uri or redirect_uri == 'urn:ietf:wg:oauth:2.0:oob': logger.info("使用非浏览器认证模式") redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES) flow.redirect_uri = redirect_uri else: logger.info(f"使用重定向URI: {redirect_uri}") flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES) flow.redirect_uri = redirect_uri # 获取授权URL并抛出异常 auth_url = flow.step1_get_authorize_url() logger.info(f"获取授权URL: {auth_url[:50]}...") raise Exception(f"Please visit this URL to authorize: {auth_url}") # 如果有有效凭证,初始化服务 self.credentials = creds logger.info("使用现有凭证初始化Gmail服务") self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) logger.info("Gmail服务初始化成功") # 获取Gmail账号信息 gmail_email = None try: # 调用Gmail API获取用户资料 profile = self.gmail_service.users().getProfile(userId='me').execute() gmail_email = profile.get('emailAddress') logger.info(f"获取到Gmail账号: {gmail_email}") except Exception as profile_error: logger.error(f"获取Gmail账号失败: {str(profile_error)}") # 即使获取Gmail账号失败,也要尝试获取消息以提取邮箱 if not gmail_email: try: # 尝试获取一条消息来提取邮箱 messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute() if 'messages' in messages and len(messages['messages']) > 0: msg_id = messages['messages'][0]['id'] msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute() # 从消息中查找与当前用户匹配的邮箱 if 'payload' in msg and 'headers' in msg['payload']: for header in msg['payload']['headers']: if header['name'] in ['From', 'To', 'Cc', 'Bcc']: if '<' in header['value'] and '>' in header['value']: email = header['value'].split('<')[1].split('>')[0] # 如果找到一个邮箱就使用它 if not gmail_email: gmail_email = email logger.info(f"从消息中提取到Gmail账号: {gmail_email}") except Exception as msg_error: logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}") # 保存凭证到数据库 if self.user: from django.utils import timezone logger.info("保存凭证到数据库") # 将凭证对象序列化 credentials_data = pickle.dumps(creds) # 更新或创建凭证记录,添加gmail_email字段 gmail_credential, created = GmailCredential.objects.update_or_create( user=self.user, defaults={ 'credentials': credentials_data, 'token_path': self.token_storage_path, 'gmail_email': gmail_email, # 添加实际Gmail账号 'updated_at': timezone.now(), 'is_active': True } ) action = "创建" if created else "更新" logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录,实际Gmail账号: {gmail_email}") # 认证成功后更新单例 if self.user and self.gmail_service and self.credentials: GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service) return True except Exception as e: # 保留授权URL异常,视图层会处理 if "Please visit this URL" in str(e): raise e logger.error(f"Gmail认证失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return False finally: # 清理client_secret.json文件 if self.client_secret and os.path.exists('client_secret.json'): logger.info("删除临时client_secret文件") os.unlink('client_secret.json') def create_talent_knowledge_base(self, talent_email): """ 创建或获取与talent_email关联的知识库 Args: talent_email (str): 达人Gmail邮箱地址 Returns: tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志 """ try: # 优先查找现有的关联关系 mapping = GmailTalentMapping.objects.filter( user=self.user, talent_email=talent_email, is_active=True ).first() if mapping and mapping.knowledge_base: logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}") return mapping.knowledge_base, False # 查找与该达人邮箱关联的知识库 # 根据达人邮箱生成一个唯一的标识名称 kb_name = f"Gmail-{talent_email.split('@')[0]}" # 检查该名称的知识库是否已存在 existing_kb = KnowledgeBase.objects.filter( name=kb_name, user_id=self.user.id ).first() if existing_kb: logger.info(f"找到现有知识库: {kb_name}") # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False # 没有找到现有知识库,直接使用基本方法创建 logger.info(f"使用基本方法创建知识库: {kb_name}") return self._create_knowledge_base_basic(talent_email) except Exception as e: logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) # 创建失败时,尝试使用基本方法创建 logger.info("尝试使用基本方法创建知识库") return self._create_knowledge_base_basic(talent_email) def _create_knowledge_base_basic(self, talent_email): """ 使用基本方法创建知识库(当KnowledgeBaseViewSet创建失败时的后备方案) Args: talent_email (str): 达人Gmail邮箱地址 Returns: tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志 """ try: # 根据达人邮箱生成一个唯一的标识名称 kb_name = f"Gmail-{talent_email.split('@')[0]}" # 检查该名称的知识库是否已存在 existing_kb = KnowledgeBase.objects.filter( name=kb_name, user_id=self.user.id ).first() if existing_kb: logger.info(f"找到现有知识库: {kb_name}") # 创建映射关系 GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=talent_email, defaults={ 'knowledge_base': existing_kb, 'is_active': True } ) return existing_kb, False # 创建新知识库 knowledge_base = KnowledgeBase.objects.create( name=kb_name, desc=f"与{talent_email}的Gmail邮件交流记录", type="private", user_id=self.user.id, documents=[] ) # 创建外部知识库 try: from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() external_id = kb_viewset._create_external_dataset(knowledge_base) if external_id: knowledge_base.external_id = external_id knowledge_base.save() logger.info(f"成功创建外部知识库: {external_id}") except Exception as e: logger.error(f"创建外部知识库失败: {str(e)}") # 继续执行,不影响基本功能 # 创建映射关系 GmailTalentMapping.objects.create( user=self.user, talent_email=talent_email, knowledge_base=knowledge_base, is_active=True ) logger.info(f"成功创建新知识库: {kb_name}, ID: {knowledge_base.id}") return knowledge_base, True except Exception as e: logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}") import traceback logger.error(traceback.format_exc()) raise def get_conversations(self, talent_gmail): """获取与特定用户的所有邮件对话""" try: if not self.gmail_service: logger.error("Gmail服务未初始化") return [] # 使用crushwds@gmail.com作为固定邮箱 email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱 email2 = talent_gmail # 使用参数传入的目标邮箱 logger.info(f"执行Gmail查询: {email1} 与 {email2}") # 构建搜索查询 query = f"from:({email1} OR {email2}) to:({email1} OR {email2})" # 获取所有匹配的邮件 response = self.gmail_service.users().messages().list(userId='me', q=query).execute() messages = [] if 'messages' in response: messages.extend(response['messages']) message_count = len(messages) logger.info(f"找到 {message_count} 封邮件") # 分页获取所有邮件 while 'nextPageToken' in response: page_token = response['nextPageToken'] response = self.gmail_service.users().messages().list( userId='me', q=query, pageToken=page_token ).execute() if 'messages' in response: new_messages = response['messages'] messages.extend(new_messages) new_count = len(new_messages) message_count += new_count logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}") else: logger.warning(f"未找到匹配邮件") # 处理每封邮件 conversations = [] if messages: logger.info(f"开始获取 {len(messages)} 封邮件详情...") for i, msg in enumerate(messages): try: message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute() email_data = self._extract_email_content(message) if email_data: conversations.append(email_data) if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度 logger.info(f"已处理 {i+1}/{len(messages)} 封邮件") else: logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败") except Exception as e: logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}") # 按时间排序 conversations.sort(key=lambda x: x['date']) logger.info(f"总共找到并解析了 {len(conversations)} 封邮件") return conversations except Exception as e: logger.error(f"获取Gmail对话失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return [] def _extract_email_content(self, message): """提取邮件内容,完全按照quickstart.py的get_email_content函数实现""" try: message_id = message['id'] # 获取邮件ID payload = message['payload'] headers = payload['headers'] # 获取邮件基本信息 email_data = { 'id': message_id, # 保存邮件ID 'subject': '', 'from': '', 'date': '', 'body': '', 'attachments': [] # 新增附件列表 } # 提取头部信息 for header in headers: if header['name'] == 'Subject': email_data['subject'] = header['value'] elif header['name'] == 'From': email_data['from'] = header['value'] elif header['name'] == 'Date': try: date = parser.parse(header['value']) # 转换为与Django时区一致的格式 if hasattr(settings, 'USE_TZ') and settings.USE_TZ: date = timezone.make_aware(date) email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logger.error(f"解析日期失败: {str(e)}") email_data['date'] = header['value'] # 定义一个递归函数来处理所有部分和附件 def process_parts(parts): for part in parts: # 检查是否是附件 if 'filename' in part and part['filename']: attachment = { 'filename': part['filename'], 'mimeType': part['mimeType'], 'size': part['body'].get('size', 0) } # 如果有附件内容数据,可以获取附件ID if 'attachmentId' in part['body']: attachment['attachmentId'] = part['body']['attachmentId'] email_data['attachments'].append(attachment) # 处理文本内容 if part['mimeType'] == 'text/plain' and not email_data['body']: data = part['body'].get('data', '') if data: try: text = base64.urlsafe_b64decode(data).decode('utf-8') email_data['body'] = text except Exception as e: logger.error(f"解码邮件内容失败: {str(e)}") # 递归处理多部分内容 if 'parts' in part: process_parts(part['parts']) # 处理邮件正文和附件 if 'parts' in payload: process_parts(payload['parts']) elif 'body' in payload and 'data' in payload['body']: # 没有parts,直接处理body data = payload['body'].get('data', '') if data: try: text = base64.urlsafe_b64decode(data).decode('utf-8') email_data['body'] = text except Exception as e: logger.error(f"解码邮件内容失败: {str(e)}") return email_data except Exception as e: logger.error(f"处理邮件内容时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def download_attachment(self, message_id, attachment_id, filename): """下载邮件附件""" try: attachment = self.gmail_service.users().messages().attachments().get( userId='me', messageId=message_id, id=attachment_id ).execute() data = attachment['data'] file_data = base64.urlsafe_b64decode(data) # 创建附件目录 attachments_dir = 'gmail_attachments' if not os.path.exists(attachments_dir): os.makedirs(attachments_dir) # 保存附件 filepath = os.path.join(attachments_dir, f"{message_id}_{filename}") with open(filepath, 'wb') as f: f.write(file_data) return filepath except Exception as e: logger.error(f"下载附件失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def save_conversations_to_knowledge_base(self, conversations, knowledge_base): """ 将Gmail对话保存到知识库 Args: conversations: Gmail邮件列表 knowledge_base: 保存到的知识库对象 """ try: # 导入所需模型 from .models import GmailAttachment, ChatHistory # 检查入参 if not conversations or not knowledge_base: logger.error("参数不完整: conversations或knowledge_base为空") return False if not conversations: logger.warning("没有邮件对话可保存") return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"} # 查找现有的对话ID - 优先使用talent_email查找 conversation_id = None # 1. 首先尝试通过talent_email查找现有映射 if self.email: existing_mapping = GmailTalentMapping.objects.filter( user=self.user, talent_email=self.email, knowledge_base=knowledge_base, is_active=True ).first() if existing_mapping and existing_mapping.conversation_id: conversation_id = existing_mapping.conversation_id logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}") # 2. 如果上面没找到,尝试通过知识库查找任何相关的对话 if not conversation_id: existing_conversation = ChatHistory.objects.filter( knowledge_base=knowledge_base, user=self.user, is_deleted=False ).values('conversation_id').distinct().first() if existing_conversation: conversation_id = existing_conversation['conversation_id'] logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}") # 如果有talent_email,更新或创建映射关系 if self.email: GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=self.email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': conversation_id, 'is_active': True } ) logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}") # 3. 如果仍然没找到,则创建新的对话ID if not conversation_id: # 生成唯一的对话ID conversation_id = str(uuid.uuid4()) logger.info(f"创建新的对话ID: {conversation_id}") # 创建或更新Gmail和达人的映射关系 if self.email: talent_mapping, created = GmailTalentMapping.objects.update_or_create( user=self.user, talent_email=self.email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': conversation_id, 'is_active': True } ) action = "创建" if created else "更新" logger.info(f"已{action}Gmail达人映射: {self.email} -> {knowledge_base.name}") # 构建知识库文档 logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话") document = { "title": f"与{self.email or '联系人'}的Gmail邮件对话", "url": "", "source_type": "gmail", "paragraphs": [] } # 对话按时间顺序排序 conversations.sort(key=lambda x: x.get('date', '')) # 将对话添加到文档 previous_message_dict = {} # 用于存储邮件时间与消息ID的映射,以便找到正确的parent_id # 首先查找现有的聊天记录 existing_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('created_at') # 如果已有消息,跳过重复导入 existing_gmail_ids = set() if existing_messages.exists(): for msg in existing_messages: if msg.metadata and 'gmail_message_id' in msg.metadata: existing_gmail_ids.add(msg.metadata['gmail_message_id']) logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入") for message in conversations: try: # 跳过已导入的消息 if message.get('id') in existing_gmail_ids: logger.info(f"跳过已导入的消息: {message.get('id')}") continue # 提取邮件内容 sender = message.get('from', '未知发件人') date_str = message.get('date', '未知时间') subject = message.get('subject', '无主题') body = message.get('body', '').strip() logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...") # 判断角色 # 从发件人地址中提取邮箱部分 sender_email = '' if '<' in sender and '>' in sender: # 格式如 "姓名 " sender_email = sender.split('<')[1].split('>')[0] else: # 格式可能直接是邮箱 sender_email = sender # 根据邮箱判断角色:检查发件人与用户邮箱或者目标邮箱是否匹配 is_user_email = self.user.email.lower() == sender_email.lower() # 检查是否是目标邮箱(talent_email) is_talent_email = False if self.email and sender_email.lower() == self.email.lower(): is_talent_email = True # 如果是用户邮箱或目标邮箱,则为user角色,否则为assistant role = 'user' if is_user_email or is_talent_email else 'assistant' logger.info(f"设置消息角色: {role},发件人: {sender_email},用户邮箱: {self.user.email},目标邮箱: {self.email}") # 将邮件添加到文档 paragraph = { "id": f"msg_{len(document['paragraphs']) + 1}", "title": f"{date_str} - {sender} - {subject}", "content": body, "meta": { "sender": sender, "date": date_str, "subject": subject, "has_attachments": len(message.get('attachments', [])) > 0, "message_id": message.get('id', '') } } document['paragraphs'].append(paragraph) # 解析邮件日期为datetime对象 try: date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用当前时间 logger.warning(f"无法解析邮件日期: {date_str},使用当前时间") date_obj = datetime.now() # 查找parent_id(查找日期早于当前邮件且最接近的消息) parent_id = None closest_date = None for d, mid in previous_message_dict.items(): if d < date_obj and (closest_date is None or d > closest_date): closest_date = d parent_id = mid # 保存消息到聊天历史,使用邮件实际日期 from django.utils import timezone aware_date = timezone.make_aware(date_obj) if not timezone.is_aware(date_obj) else date_obj # 创建消息记录 chat_message = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, parent_id=parent_id, role=role, content=f"{subject}\n\n{body}", metadata={ 'gmail_message_id': message.get('id', ''), 'from': sender, 'date': date_str, 'subject': subject, 'dataset_id_list': [str(knowledge_base.id)], 'dataset_names': [knowledge_base.name] }, created_at=aware_date # 设置正确的创建时间 ) # 更新previous_message_dict previous_message_dict[date_obj] = str(chat_message.id) # 处理附件 attachments = message.get('attachments', []) if attachments: for attachment in attachments: if 'attachmentId' in attachment and 'filename' in attachment: try: # 下载附件 filepath = self.download_attachment( message_id=message['id'], attachment_id=attachment['attachmentId'], filename=attachment['filename'] ) if filepath: # 记录附件信息 GmailAttachment.objects.create( chat_message=chat_message, gmail_message_id=message['id'], filename=attachment['filename'], filepath=filepath, mimetype=attachment.get('mimeType', ''), filesize=attachment.get('size', 0) ) logger.info(f"已保存附件: {attachment['filename']}") except Exception as e: logger.error(f"保存附件失败: {str(e)}") import traceback logger.error(traceback.format_exc()) except Exception as e: logger.error(f"处理邮件时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) # 将文档添加到知识库 logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}") # 准备文档数据结构 doc_data = { "name": f"Gmail对话与{self.email or '联系人'}.txt", "paragraphs": [] } # 将所有对话转换为段落 for paragraph in document['paragraphs']: doc_data["paragraphs"].append({ "title": paragraph['title'], "content": paragraph['content'], "is_active": True, "problem_list": [] }) # 调用知识库的文档上传API from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() # 上传文档 upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_data["name"], external_id=document_id ) logger.info(f"Gmail对话文档上传成功,ID: {document_id}") # 上传所有附件 self._upload_attachments_to_knowledge_base(knowledge_base, conversations) else: # 上传失败,记录错误信息 error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' logger.error(f"Gmail对话文档上传失败: {error_msg}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}") return { "conversation_id": conversation_id, "success": True, "message_count": len(conversations), "knowledge_base_id": str(knowledge_base.id) } except Exception as e: logger.error(f"保存Gmail对话到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return {"conversation_id": None, "success": False, "error": str(e)} def send_email(self, to_email, subject, body, conversation_id=None, attachments=None): """发送邮件并保存到聊天记录""" try: # 构建邮件内容 if attachments and len(attachments) > 0: message = self._create_message_with_attachment(to_email, subject, body, attachments) else: message = self._create_message(to_email, subject, body) # 发送邮件 sent_message = self.gmail_service.users().messages().send( userId='me', body=message ).execute() message_id = sent_message['id'] logger.info(f"邮件发送成功, ID: {message_id}") # 获取邮件详情,包括服务器时间戳 try: message_detail = self.gmail_service.users().messages().get( userId='me', id=message_id ).execute() # 从消息详情中提取时间戳 internal_date = message_detail.get('internalDate') if internal_date: # 转换毫秒时间戳为datetime,不使用timezone-aware email_date = datetime.fromtimestamp(int(internal_date)/1000) # 如果系统设置了USE_TZ,再转换为timezone-aware if hasattr(timezone, 'is_aware') and not timezone.is_aware(email_date): email_date = timezone.make_aware(email_date) date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') else: email_date = datetime.now() date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间") email_date = datetime.now() date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') # 如果有conversation_id,保存到聊天记录 if conversation_id: # 查找现有的聊天记录 from django.db.models import Q existing_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('created_at') if not existing_messages.exists(): logger.warning(f"找不到对话ID: {conversation_id},无法保存消息") return message_id # 查找关联的知识库 first_message = existing_messages.first() if first_message.knowledge_base: knowledge_base = first_message.knowledge_base else: # 如果第一条消息没有知识库,尝试从metadata获取 if first_message.metadata and 'dataset_id_list' in first_message.metadata: kb_id = first_message.metadata.get('dataset_id_list', [])[0] knowledge_base = KnowledgeBase.objects.get(id=kb_id) else: logger.error(f"找不到关联的知识库,无法保存消息") return message_id # 查找parent_id:时间早于当前邮件且最接近的消息,避免时区问题 parent_id = None try: # 使用简单查询,不依赖时区 previous_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('-id')[:1] # 使用ID降序排序而非时间 if previous_messages: parent_id = str(previous_messages[0].id) except Exception as e: logger.error(f"查找父消息失败: {str(e)}") logger.error(traceback.format_exc()) # 构建metadata metadata = { 'gmail_message_id': message_id, 'from': self.user.email, 'to': to_email, 'date': date_str, 'subject': subject, 'dataset_id_list': [str(knowledge_base.id)], 'dataset_names': [knowledge_base.name] } # 如果现有消息有dataset_id_list,保留它 if first_message.metadata and 'dataset_id_list' in first_message.metadata: metadata['dataset_id_list'] = first_message.metadata['dataset_id_list'] # 尝试获取对应的dataset_names if 'dataset_names' in first_message.metadata: metadata['dataset_names'] = first_message.metadata['dataset_names'] # 创建聊天记录,使用当前时间而不是邮件的实际时间 chat_message = ChatHistory.objects.create( user=self.user, knowledge_base=knowledge_base, conversation_id=conversation_id, parent_id=parent_id, role='user', # 用户发送的消息,角色固定为'user' content=f"[{subject}] {body}", metadata=metadata # 不设置created_at,使用数据库默认时间 ) # 如果有附件,保存附件信息 if attachments and len(attachments) > 0: for att_path in attachments: file_name = os.path.basename(att_path) file_size = os.path.getsize(att_path) # 获取MIME类型 mime_type, _ = mimetypes.guess_type(att_path) if not mime_type: mime_type = 'application/octet-stream' # 保存附件信息到数据库 gmail_attachment = GmailAttachment.objects.create( chat_message=chat_message, gmail_message_id=message_id, filename=file_name, filepath=att_path, mimetype=mime_type, filesize=file_size ) # 更新知识库文档 self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email) return message_id except Exception as e: logger.error(f"发送邮件失败: {str(e)}") logger.error(traceback.format_exc()) raise def _create_message(self, to, subject, body): """创建邮件消息""" message = MIMEText(body) message['to'] = to message['subject'] = subject # 编码为JSON安全的字符串 raw = base64.urlsafe_b64encode(message.as_bytes()).decode() return {'raw': raw} def _create_message_with_attachment(self, to, subject, body, attachment_files): """创建带附件的邮件消息""" message = MIMEMultipart() message['to'] = to message['subject'] = subject # 添加邮件正文 msg = MIMEText(body) message.attach(msg) # 添加附件 for file in attachment_files: try: with open(file, 'rb') as f: part = MIMEBase('application', 'octet-stream') part.set_payload(f.read()) # 编码附件内容 encoders.encode_base64(part) # 添加头部信息 filename = os.path.basename(file) part.add_header( 'Content-Disposition', f'attachment; filename="{filename}"' ) message.attach(part) except Exception as e: logger.error(f"添加附件 {file} 失败: {str(e)}") # 编码为JSON安全的字符串 raw = base64.urlsafe_b64encode(message.as_bytes()).decode() return {'raw': raw} def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient): """将新发送的邮件内容追加到知识库文档中""" try: # 准备文档数据结构 doc_data = { "name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", "paragraphs": [ { "title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}", "content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}", "is_active": True, "problem_list": [] } ] } # 调用知识库的文档上传API from .views import KnowledgeBaseViewSet kb_viewset = KnowledgeBaseViewSet() # 验证知识库是否有external_id if not knowledge_base.external_id: logger.error(f"知识库没有external_id,无法上传文档: {knowledge_base.name}") return None # 上传文档 upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_data["name"], external_id=document_id ) logger.info(f"Gmail回复文档上传成功,ID: {document_id}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() return upload_response else: # 上传失败,记录错误信息 error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' logger.error(f"Gmail回复文档上传失败: {error_msg}") return None except Exception as e: logger.error(f"追加邮件内容到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return None def setup_watch(self, topic_name=None): """设置Gmail监听""" try: # 如果没有指定topic,使用配置的主题名称 if not topic_name: # 从settings获取项目配置的主题名称 topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic') # 直接使用settings中配置的项目ID,不再从client_secret.json获取 project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905') logger.info(f"使用settings中配置的项目ID: {project_id}") # 注意:不再使用用户邮箱作为判断依据,Gmail API总是使用'me'作为userId # Gmail认证是通过OAuth流程,与系统用户邮箱无关 logger.info(f"系统用户: {self.user.email},Gmail API使用OAuth认证的邮箱 (userId='me')") # 构建完整的webhook URL webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None) if not webhook_url: # 使用默认值,确保这是一个完全限定的URL domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0] protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http' if domain == 'localhost' or domain.startswith('127.0.0.1'): # 本地开发环境需要公网可访问的URL,可以用ngrok等工具暴露本地服务 webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/" logger.warning("使用本地开发环境URL作为webhook回调,Google可能无法访问。建议使用ngrok等工具创建公网地址。") else: webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/" logger.info(f"使用Gmail webhook URL: {webhook_url}") # 如果想在Google Cloud控制台中手动配置webhook,打印明确的说明 logger.info("如需手动配置Gmail推送通知,请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:") logger.info(f"推送端点: {webhook_url}") logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限") # 请求监听 request = { 'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签 'topicName': f"projects/{project_id}/topics/{topic_name}", 'labelFilterAction': 'include' } logger.info(f"设置Gmail监听: {request}") # 执行watch请求 response = self.gmail_service.users().watch(userId='me', body=request).execute() # 获取historyId,用于后续同步 history_id = response.get('historyId') expiration = response.get('expiration') logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}") # 保存监听信息到数据库 if self.user: credential = GmailCredential.objects.filter(user=self.user, is_active=True).first() if credential: # 转换时间戳为datetime expiration_time = None if expiration: # 将毫秒时间戳转换为timezone-aware的datetime naive_time = datetime.fromtimestamp(int(expiration)/1000) expiration_time = timezone.make_aware(naive_time) credential.last_history_id = history_id credential.watch_expiration = expiration_time credential.save() logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}") return { 'historyId': history_id, 'expiration': expiration } except Exception as e: logger.error(f"设置Gmail监听失败: {str(e)}") logger.error(traceback.format_exc()) raise def get_history(self, start_history_id): """获取历史变更""" try: logger.info(f"获取历史记录,起始ID: {start_history_id}") response = self.gmail_service.users().history().list( userId='me', startHistoryId=start_history_id ).execute() logger.info(f"历史记录响应: {response}") history_list = [] if 'history' in response: history_list.extend(response['history']) logger.info(f"找到 {len(response['history'])} 个历史记录") # 获取所有页 while 'nextPageToken' in response: page_token = response['nextPageToken'] response = self.gmail_service.users().history().list( userId='me', startHistoryId=start_history_id, pageToken=page_token ).execute() if 'history' in response: history_list.extend(response['history']) logger.info(f"加载额外 {len(response['history'])} 个历史记录") else: logger.info(f"没有新的历史记录,最新historyId: {response.get('historyId', 'N/A')}") # 提取新消息ID new_message_ids = set() for history in history_list: logger.info(f"处理历史记录: {history}") if 'messagesAdded' in history: for message in history['messagesAdded']: message_id = message['message']['id'] new_message_ids.add(message_id) logger.info(f"新增消息ID: {message_id}") if 'labelsAdded' in history: for label in history['labelsAdded']: message_id = label['message']['id'] if 'INBOX' in label.get('labelIds', []): new_message_ids.add(message_id) logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签") if new_message_ids: logger.info(f"总共找到 {len(new_message_ids)} 个新消息") else: logger.info("没有新增消息") return list(new_message_ids) except Exception as e: logger.error(f"获取Gmail历史记录失败: {str(e)}") return [] def process_notification(self, notification_data): """处理Gmail推送通知""" try: # 提取通知数据 logger.info(f"处理Gmail通知: {notification_data}") # 处理Google Pub/Sub消息格式 if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']: try: import base64 import json logger.info("检测到Google Pub/Sub消息格式") # Base64解码data字段 encoded_data = notification_data['message']['data'] decoded_data = base64.b64decode(encoded_data).decode('utf-8') logger.info(f"解码后的数据: {decoded_data}") # 解析JSON获取email和historyId json_data = json.loads(decoded_data) email = json_data.get('emailAddress') history_id = json_data.get('historyId') logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}") except Exception as decode_error: logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}") logger.error(traceback.format_exc()) return False else: # 原始格式处理 email = notification_data.get('emailAddress') history_id = notification_data.get('historyId') if not email or not history_id: logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId") return False # 查找关联用户 from .models import User user = User.objects.filter(email=email).first() # 如果找不到用户,尝试使用gmail_email字段查找 if not user: logger.info(f"找不到email={email}的用户,尝试使用gmail_email查找") from .models import GmailCredential credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first() if credential: user = credential.user logger.info(f"通过gmail_email找到用户: {user.email}") if not user: logger.error(f"找不到与 {email} 关联的用户") return False # 初始化Gmail集成 gmail_integration = GmailIntegration(user) if not gmail_integration.authenticate(): logger.error(f"Gmail认证失败: {email}") return False # 首先尝试使用历史记录API获取新消息 message_ids = gmail_integration.get_history(history_id) if message_ids: logger.info(f"从历史记录找到 {len(message_ids)} 个新消息") # 处理每个新消息 for message_id in message_ids: self._process_new_message(gmail_integration, message_id) return True # 如果历史记录API没有返回新消息,尝试获取最近的对话 logger.info("历史记录API没有返回新消息,尝试获取达人对话") # 查找所有与该用户关联的达人映射 from .models import GmailTalentMapping mappings = GmailTalentMapping.objects.filter( user=user, is_active=True ) if not mappings.exists(): logger.info(f"用户 {user.email} 没有达人映射记录") return False # 处理每个达人映射 for mapping in mappings: talent_email = mapping.talent_email logger.info(f"处理达人 {talent_email} 的对话") # 获取达人最近的邮件 recent_emails = gmail_integration.get_recent_emails( from_email=talent_email, max_results=5 # 限制获取最近5封 ) if not recent_emails: logger.info(f"没有找到来自 {talent_email} 的最近邮件") continue logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件") # 创建或获取知识库 knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_email) kb_action = "创建" if created else "获取" logger.info(f"知识库{kb_action}成功: {knowledge_base.name}") # 保存对话 result = gmail_integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base) logger.info(f"保存达人对话结果: {result}") return True except Exception as e: logger.error(f"处理Gmail通知失败: {str(e)}") logger.error(traceback.format_exc()) return False def _process_new_message(self, gmail_integration, message_id): """处理新收到的邮件""" try: # 导入所需模型 from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory # 获取邮件详情 message = gmail_integration.gmail_service.users().messages().get( userId='me', id=message_id ).execute() # 提取邮件内容 email_data = gmail_integration._extract_email_content(message) if not email_data: logger.error(f"提取邮件内容失败: {message_id}") return False # 获取发件人邮箱 from_email = email_data.get('from', '') sender_email = '' if '<' in from_email and '>' in from_email: # 格式如 "姓名 " sender_email = from_email.split('<')[1].split('>')[0] else: # 格式可能直接是邮箱 sender_email = from_email # 根据邮箱判断角色:检查发件人与用户邮箱或者映射的talent邮箱是否匹配 is_user_email = gmail_integration.user.email.lower() == sender_email.lower() # 检查是否有与当前用户关联的talent邮箱映射 is_mapped_talent = False talent_mapping = GmailTalentMapping.objects.filter( user=gmail_integration.user, talent_email=sender_email, is_active=True ).first() # 修改为first()以获取实际的对象而不是布尔值 # 如果是用户邮箱或映射的talent邮箱,则为user角色,否则为assistant role = 'user' if is_user_email or talent_mapping else 'assistant' logger.info(f"设置消息角色: {role}, 发件人: {sender_email}, 用户邮箱: {gmail_integration.user.email}, 是否映射达人: {talent_mapping}") # 查找是否有关联的达人知识库 kb_name = f"Gmail-{sender_email.split('@')[0]}" knowledge_base = KnowledgeBase.objects.filter(name=kb_name).first() # 如果没有以发件人邮箱命名的知识库,尝试查找自定义知识库 if not knowledge_base: # 查找映射关系 mapping = GmailTalentMapping.objects.filter( talent_email=sender_email, user=gmail_integration.user, is_active=True ).first() if mapping and mapping.knowledge_base: knowledge_base = mapping.knowledge_base logger.info(f"使用映射的知识库: {knowledge_base.name}") else: logger.info(f"收到新邮件,但没有找到关联的达人知识库: {sender_email}") return False # 查找关联的对话ID conversation_id = None # 1. 首先通过talent_mapping查找 if talent_mapping and talent_mapping.conversation_id: conversation_id = talent_mapping.conversation_id logger.info(f"通过达人映射找到对话ID: {conversation_id}") # 2. 如果没有找到,尝试通过知识库查找任何相关对话 if not conversation_id: existing_conversation = ChatHistory.objects.filter( knowledge_base=knowledge_base, user=gmail_integration.user, is_deleted=False ).values('conversation_id').distinct().first() if existing_conversation: conversation_id = existing_conversation['conversation_id'] logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}") # 更新或创建映射关系 if not talent_mapping: GmailTalentMapping.objects.update_or_create( user=gmail_integration.user, talent_email=sender_email, defaults={ 'knowledge_base': knowledge_base, 'conversation_id': conversation_id, 'is_active': True } ) logger.info(f"更新Gmail达人映射: {sender_email} -> 对话ID {conversation_id}") # 3. 如果仍没找到,创建新的对话ID if not conversation_id: conversation_id = str(uuid.uuid4()) logger.info(f"创建新的对话ID: {conversation_id}") # 保存映射关系 if not talent_mapping: GmailTalentMapping.objects.create( user=gmail_integration.user, talent_email=sender_email, knowledge_base=knowledge_base, conversation_id=conversation_id, is_active=True ) logger.info(f"已创建新的Gmail达人映射: {sender_email} -> {knowledge_base.name}") # 检查消息是否已经处理过 if ChatHistory.objects.filter( conversation_id=conversation_id, metadata__gmail_message_id=message_id, is_deleted=False ).exists(): logger.info(f"邮件已处理过,跳过: {message_id}") return True # 解析邮件日期 - 使用普通datetime而非timezone-aware date_str = email_data.get('date', '') try: date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') # 不应用时区转换 aware_date = date_obj except (ValueError, TypeError): logger.warning(f"无法解析邮件日期: {date_str},使用当前时间") aware_date = datetime.now() # 查找适合的parent_id: 使用ID排序而非时间 try: previous_messages = ChatHistory.objects.filter( conversation_id=conversation_id, is_deleted=False ).order_by('-id')[:1] parent_id = None if previous_messages: parent_id = str(previous_messages[0].id) logger.info(f"找到父消息ID: {parent_id}") except Exception as e: logger.error(f"查找父消息失败: {str(e)}") logger.error(traceback.format_exc()) parent_id = None # 下载附件 attachment_records = [] for attachment in email_data.get('attachments', []): if 'attachmentId' in attachment: filepath = gmail_integration.download_attachment( message_id, attachment['attachmentId'], attachment['filename'] ) if filepath: attachment_records.append({ 'filepath': filepath, 'filename': attachment['filename'], 'message_id': message_id, 'date': date_str }) # 构建metadata metadata = { 'gmail_message_id': message_id, 'from': email_data.get('from', ''), 'date': date_str, 'subject': email_data.get('subject', ''), 'dataset_id_list': [str(knowledge_base.id)], 'dataset_names': [knowledge_base.name] } if attachment_records: metadata['message_attachments'] = attachment_records # 创建聊天记录,不指定时间 chat_message = ChatHistory.objects.create( user=gmail_integration.user, knowledge_base=knowledge_base, conversation_id=conversation_id, parent_id=parent_id, role=role, # 使用上面确定的role变量,而不是硬编码为'assistant' content=f"[{email_data['subject']}] {email_data['body']}", metadata=metadata # 不设置created_at,使用数据库默认时间 ) # 更新知识库文档 gmail_integration._append_to_knowledge_base_document( knowledge_base, email_data['subject'], email_data['body'], gmail_integration.user.email ) # 如果有附件,上传到知识库 if attachment_records: gmail_integration._upload_message_attachments_to_knowledge_base( knowledge_base, attachment_records ) # 添加WebSocket通知功能 try: # 导入必要的模块 from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from django.conf import settings # 检查是否有WebSocket通道层配置 channel_layer = get_channel_layer() if channel_layer: # 创建通知数据 notification_data = { "type": "notification", "data": { "message_type": "new_gmail", "conversation_id": conversation_id, "message": { "id": str(chat_message.id), "role": role, "content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}", "sender": sender_email, "subject": email_data['subject'], "has_attachments": len(attachment_records) > 0 } } } # 发送WebSocket消息 async_to_sync(channel_layer.group_send)( f"notification_user_{gmail_integration.user.id}", notification_data ) logger.info(f"已发送WebSocket通知: 用户 {gmail_integration.user.id} 收到新Gmail消息") # 创建系统通知记录 try: from .models import Notification Notification.objects.create( sender=gmail_integration.user, receiver=gmail_integration.user, title="新Gmail消息", content=f"您收到了来自 {sender_email} 的新邮件: {email_data['subject']}", type="system_notice", related_resource=conversation_id ) logger.info(f"已创建系统通知记录: 用户 {gmail_integration.user.id} 的新Gmail消息") except Exception as notification_error: logger.error(f"创建系统通知记录失败: {str(notification_error)}") except Exception as ws_error: logger.error(f"发送WebSocket通知失败: {str(ws_error)}") logger.error(traceback.format_exc()) # 通知失败不影响消息处理流程,继续执行 logger.info(f"成功处理新邮件: {message_id} 从 {sender_email}") return True except Exception as e: logger.error(f"处理新邮件失败: {str(e)}") logger.error(traceback.format_exc()) return False def get_attachment_by_conversation(self, conversation_id): """获取特定对话的所有附件""" try: # 查找对话记录 records = ChatHistory.objects.filter( conversation_id=conversation_id, user=self.user, is_deleted=False ) if not records: logger.warning(f"找不到对话记录: {conversation_id}") return [] # 使用GmailAttachment数据模型查询附件 attachments = [] records_ids = [record.id for record in records] gmail_attachments = GmailAttachment.objects.filter( chat_message_id__in=records_ids ).select_related('chat_message') for attachment in gmail_attachments: chat_message = attachment.chat_message attachments.append({ 'id': str(attachment.id), 'filename': attachment.filename, 'filepath': attachment.filepath, 'mimetype': attachment.mimetype, 'filesize': attachment.filesize, 'message_id': str(chat_message.id), 'gmail_message_id': attachment.gmail_message_id, 'role': chat_message.role, 'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content }) return attachments except Exception as e: logger.error(f"获取对话附件失败: {str(e)}") return [] def handle_auth_code(self, auth_code): """ 处理授权码并完成OAuth2授权流程,使用与quickstart.py相同的简化流程 Args: auth_code (str): 从Google授权页面获取的授权码 Returns: bool: 授权是否成功 """ try: logger.info("开始处理Gmail授权码...") # 确保client_secret_json已提供 if not self.client_secret: logger.error("未提供client_secret_json,无法处理授权码") return False # 创建临时文件存储client_secret client_secret_path = 'client_secret.json' with open(client_secret_path, 'w') as f: if isinstance(self.client_secret, str): logger.info("client_secret是字符串,解析为JSON") try: # 确保是有效的JSON json_data = json.loads(self.client_secret) json.dump(json_data, f) except json.JSONDecodeError as e: logger.error(f"client_secret不是有效的JSON: {str(e)}") return False else: logger.info("client_secret是字典,直接写入文件") json.dump(self.client_secret, f) logger.info(f"已将client_secret写入临时文件: {client_secret_path}") try: # 确认token目录存在 token_dir = os.path.dirname(self.token_storage_path) if token_dir and not os.path.exists(token_dir): logger.info(f"创建token目录: {token_dir}") os.makedirs(token_dir) # 设置token存储 logger.info(f"设置token存储: {self.token_storage_path}") store = file.Storage(self.token_storage_path) # 提取重定向URI redirect_uri = None if isinstance(self.client_secret, dict): for key in ['web', 'installed']: if key in self.client_secret and 'redirect_uris' in self.client_secret[key]: redirect_uri = self.client_secret[key]['redirect_uris'][0] break elif isinstance(self.client_secret, str): try: json_data = json.loads(self.client_secret) for key in ['web', 'installed']: if key in json_data and 'redirect_uris' in json_data[key]: redirect_uri = json_data[key]['redirect_uris'][0] break except: pass # 如果找不到重定向URI,使用默认值 if not redirect_uri: redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' logger.info(f"使用重定向URI: {redirect_uri}") # 从client_secret创建flow logger.info("从client_secret创建授权流程") flow = client.flow_from_clientsecrets( client_secret_path, self.SCOPES, redirect_uri=redirect_uri ) # 使用授权码交换token logger.info("使用授权码交换访问令牌") credentials = flow.step2_exchange(auth_code) logger.info("成功获取到访问令牌") # 保存到文件 logger.info(f"保存凭证到文件: {self.token_storage_path}") store.put(credentials) # 保存到实例变量 self.credentials = credentials # 初始化Gmail服务 logger.info("初始化Gmail服务") self.gmail_service = discovery.build('gmail', 'v1', http=credentials.authorize(Http())) logger.info("Gmail服务初始化成功") # 保存到数据库 from django.utils import timezone logger.info("保存凭证到数据库") # 将凭证对象序列化 credentials_data = pickle.dumps(credentials) gmail_credential, created = GmailCredential.objects.update_or_create( user=self.user, defaults={ 'credentials': credentials_data, 'token_path': self.token_storage_path, 'updated_at': timezone.now(), 'is_active': True } ) action = "创建" if created else "更新" logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录") # 成功获取凭证后更新单例 if self.user and self.gmail_service and self.credentials: GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service) return True except client.FlowExchangeError as e: logger.error(f"授权码交换失败: {str(e)}") return False except Exception as e: logger.error(f"处理授权码时发生错误: {str(e)}") import traceback logger.error(traceback.format_exc()) return False finally: # 删除临时文件 if os.path.exists(client_secret_path): logger.info(f"删除临时文件: {client_secret_path}") os.unlink(client_secret_path) except Exception as e: logger.error(f"处理授权码过程中发生异常: {str(e)}") import traceback logger.error(traceback.format_exc()) return False def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations): """上传所有邮件附件到知识库""" try: # 收集所有需要上传的附件 attachments_to_upload = [] # 从conversations中提取所有附件信息 for message in conversations: if 'attachments' in message and message['attachments']: message_id = message.get('id', '') sender = message.get('from', '未知发件人') subject = message.get('subject', '无主题') date_str = message.get('date', '未知时间') for attachment in message['attachments']: if 'attachmentId' in attachment and 'filename' in attachment: # 下载附件(如果尚未下载) filepath = None # 检查数据库中是否已有记录 existing_attachment = GmailAttachment.objects.filter( gmail_message_id=message_id, filename=attachment['filename'] ).first() if existing_attachment and os.path.exists(existing_attachment.filepath): filepath = existing_attachment.filepath else: # 下载附件 filepath = self.download_attachment( message_id=message_id, attachment_id=attachment['attachmentId'], filename=attachment['filename'] ) if filepath: attachments_to_upload.append({ 'filepath': filepath, 'filename': attachment['filename'], 'message_id': message_id, 'sender': sender, 'subject': subject, 'date': date_str }) # 上传收集到的所有附件 if attachments_to_upload: logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库") from .views import KnowledgeBaseViewSet import django.core.files.uploadedfile as uploadedfile # 导入FileUploadParser from rest_framework.parsers import FileUploadParser # 创建视图集实例 kb_viewset = KnowledgeBaseViewSet() # 批量上传附件 for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件 batch = attachments_to_upload[i:i+10] files = [] for att in batch: filepath = att['filepath'] filename = att['filename'] # 确认文件存在 if not os.path.exists(filepath): logger.warning(f"附件文件不存在: {filepath}") continue # 读取文件并创建UploadedFile对象 with open(filepath, 'rb') as f: file_content = f.read() files.append(uploadedfile.SimpleUploadedFile( name=filename, content=file_content, content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' )) # 如果这批有文件,调用_call_split_api_multiple上传 if files: # 直接调用文档分割API split_response = kb_viewset._call_split_api_multiple(files) if not split_response or split_response.get('code') != 200: logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") continue # 处理分割后的文档 documents_data = split_response.get('data', []) # 对每个文档调用上传API for doc in documents_data: doc_name = doc.get('name', 'Gmail附件') doc_content = doc.get('content', []) # 准备文档数据 upload_doc_data = { "name": doc_name, "paragraphs": [] } # 将所有段落添加到文档中 for paragraph in doc_content: upload_doc_data["paragraphs"].append({ "content": paragraph.get('content', ''), "title": paragraph.get('title', ''), "is_active": True, "problem_list": [] }) # 调用文档上传API upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_name, external_id=document_id ) logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件") except Exception as e: logger.error(f"上传附件到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records): """上传单个邮件的附件到知识库""" try: # 检查是否有附件需要上传 if not attachment_records: return logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库") from .views import KnowledgeBaseViewSet import django.core.files.uploadedfile as uploadedfile # 创建视图集实例 kb_viewset = KnowledgeBaseViewSet() # 准备文件列表 files = [] for att in attachment_records: filepath = att.get('filepath') filename = att.get('filename') # 确认文件存在 if not os.path.exists(filepath): logger.warning(f"附件文件不存在: {filepath}") continue # 读取文件并创建UploadedFile对象 with open(filepath, 'rb') as f: file_content = f.read() files.append(uploadedfile.SimpleUploadedFile( name=filename, content=file_content, content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' )) # 如果有文件,调用_call_split_api_multiple上传 if files: # 直接调用文档分割API split_response = kb_viewset._call_split_api_multiple(files) if not split_response or split_response.get('code') != 200: logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") return # 处理分割后的文档 documents_data = split_response.get('data', []) # 对每个文档调用上传API for doc in documents_data: doc_name = doc.get('name', 'Gmail附件') doc_content = doc.get('content', []) # 准备文档数据 upload_doc_data = { "name": doc_name, "paragraphs": [] } # 将所有段落添加到文档中 for paragraph in doc_content: upload_doc_data["paragraphs"].append({ "content": paragraph.get('content', ''), "title": paragraph.get('title', ''), "is_active": True, "problem_list": [] }) # 调用文档上传API upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): # 上传成功,保存记录到数据库 document_id = upload_response['data']['id'] # 创建文档记录 from .models import KnowledgeBaseDocument doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=doc_name, external_id=document_id ) logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") # 更新知识库文档计数 knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).count() knowledge_base.save() logger.info(f"完成附件上传,共 {len(files)} 个文件") except Exception as e: logger.error(f"上传附件到知识库失败: {str(e)}") import traceback logger.error(traceback.format_exc()) def get_recent_emails(self, from_email=None, max_results=10): """ 获取最近的邮件(不依赖history_id) Args: from_email (str, optional): 发件人邮箱过滤 max_results (int, optional): 最大结果数,默认10封 Returns: list: 邮件列表 """ try: # 构建查询 query = f"from:{from_email}" if from_email else None logger.info(f"查询最近邮件: query={query}, max_results={max_results}") # 查询邮件列表 response = self.gmail_service.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() if 'messages' not in response: logger.info("未找到匹配邮件") return [] # 获取邮件详情 messages = [] for msg in response['messages']: try: message = self.gmail_service.users().messages().get( userId='me', id=msg['id'] ).execute() email_data = self._extract_email_content(message) if email_data: messages.append(email_data) except Exception as msg_error: logger.error(f"处理邮件 {msg['id']} 失败: {str(msg_error)}") logger.info(f"获取到 {len(messages)} 封邮件") return messages except Exception as e: logger.error(f"获取最近邮件失败: {str(e)}") logger.error(traceback.format_exc()) return []