daren_project/user_management/gmail_integration.py

2355 lines
109 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import uuid
import base64
import pickle
import logging
import traceback # 确保导入traceback模块
from pathlib import Path
import dateutil.parser as parser
from datetime import datetime
from bs4 import BeautifulSoup
from django.utils import timezone
from django.conf import settings # 添加Django设置导入
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import warnings
import mimetypes
import requests
# 忽略oauth2client相关的所有警告
warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth')
warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client')
warnings.filterwarnings('ignore', message='.*google-auth.*')
warnings.filterwarnings('ignore', message='.*oauth2client.*')
from apiclient import discovery
from httplib2 import Http
from oauth2client import file, client, tools
from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile
logger = logging.getLogger(__name__)
# Gmail服务单例管理器
class GmailServiceManager:
_instances = {} # 以用户ID为键存储Gmail服务实例
@classmethod
def get_instance(cls, user):
"""获取用户的Gmail服务实例如果不存在则创建"""
user_id = str(user.id)
if user_id not in cls._instances:
try:
# 从数据库获取认证信息
credential = GmailCredential.objects.filter(user=user, is_active=True).first()
if credential and credential.credentials:
# 反序列化凭证
creds = pickle.loads(credential.credentials)
# 检查凭证是否有效
if not creds.invalid:
# 初始化服务
gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
# 存储实例
cls._instances[user_id] = {
'service': gmail_service,
'credentials': creds,
'timestamp': timezone.now(),
'user': user
}
logger.info(f"创建用户 {user.username} 的Gmail服务单例")
return cls._instances[user_id]
except Exception as e:
logger.error(f"创建Gmail服务单例失败: {e}")
else:
# 检查实例是否过期(超过30分钟)
instance = cls._instances[user_id]
time_diff = timezone.now() - instance['timestamp']
if time_diff.total_seconds() > 1800: # 30分钟过期
del cls._instances[user_id]
return cls.get_instance(user) # 递归调用,重新创建
# 更新时间戳
cls._instances[user_id]['timestamp'] = timezone.now()
logger.info(f"复用用户 {user.username} 的Gmail服务单例")
return cls._instances[user_id]
return None
@classmethod
def update_instance(cls, user, credentials, service):
"""更新用户的Gmail服务实例"""
user_id = str(user.id)
cls._instances[user_id] = {
'service': service,
'credentials': credentials,
'timestamp': timezone.now(),
'user': user
}
@classmethod
def clear_instance(cls, user):
"""清除用户的Gmail服务实例"""
user_id = str(user.id)
if user_id in cls._instances:
del cls._instances[user_id]
class GmailIntegration:
"""Gmail集成类"""
# Gmail API 权限范围
SCOPES = ['https://mail.google.com/']
def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890'):
self.user = user
self.user_email = user.email if user else None
self.email = email # 目标邮箱
self.client_secret = client_secret_json
self.credentials = None
self.gmail_service = None
self.use_proxy = use_proxy
self.proxy_url = proxy_url
# 设置代理
if self.use_proxy:
logger.info(f"设置Gmail API代理: {proxy_url}")
os.environ['HTTP_PROXY'] = proxy_url
os.environ['HTTPS_PROXY'] = proxy_url
# 设置Token文件存储路径
if user:
# 使用用户ID创建唯一的token存储路径
token_file = f"gmail_token_{user.id}.json"
self.token_storage_path = os.path.join("gmail_tokens", token_file)
# 确保目录存在
os.makedirs("gmail_tokens", exist_ok=True)
logger.info(f"设置Token存储路径: {self.token_storage_path}")
# 尝试从数据库加载凭证
try:
gmail_cred = GmailCredential.objects.filter(user=user, is_active=True).first()
if gmail_cred and gmail_cred.credentials:
logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证")
self.credentials = pickle.loads(gmail_cred.credentials)
# 初始化Gmail服务
self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http()))
logger.info("从数据库凭证初始化Gmail服务成功")
except Exception as e:
logger.error(f"从数据库加载凭证失败: {str(e)}")
# 继续使用文件方式
else:
self.token_storage_path = "gmail_token.json"
logger.warning("未提供用户将使用默认Token存储路径")
# 存储对象
self.storage = file.Storage(self.token_storage_path)
def authenticate(self):
"""获取Gmail API凭证并认证"""
try:
# 优先尝试使用单例服务
if self.user:
instance = GmailServiceManager.get_instance(self.user)
if instance:
self.gmail_service = instance['service']
self.credentials = instance['credentials']
logger.info("使用现有的Gmail服务单例")
return True
# 以下是原有的认证逻辑...
# 写入client_secret.json
if self.client_secret:
client_secret_path = 'client_secret.json'
with open(client_secret_path, 'w') as f:
if isinstance(self.client_secret, str):
try:
# 确保是有效的JSON
json_data = json.loads(self.client_secret)
json.dump(json_data, f)
except json.JSONDecodeError as e:
logger.error(f"client_secret不是有效的JSON: {str(e)}")
return False
else:
json.dump(self.client_secret, f)
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
# 使用与quickstart.py相同的流程
logger.info(f"创建或读取token存储: {self.token_storage_path}")
# 确保token目录存在
token_dir = os.path.dirname(self.token_storage_path)
if token_dir and not os.path.exists(token_dir):
logger.info(f"创建token目录: {token_dir}")
os.makedirs(token_dir)
store = file.Storage(self.token_storage_path)
creds = store.get()
if not creds or creds.invalid:
logger.info("没有有效的凭证,需要重新授权")
if not self.client_secret:
logger.error("没有提供client_secret_json且找不到有效凭证")
return False
# 提取重定向URI
redirect_uri = None
if isinstance(self.client_secret, dict):
for key in ['web', 'installed']:
if key in self.client_secret and 'redirect_uris' in self.client_secret[key]:
redirect_uri = self.client_secret[key]['redirect_uris'][0]
break
elif isinstance(self.client_secret, str):
try:
json_data = json.loads(self.client_secret)
for key in ['web', 'installed']:
if key in json_data and 'redirect_uris' in json_data[key]:
redirect_uri = json_data[key]['redirect_uris'][0]
break
except:
pass
# 如果找不到重定向URI使用默认值
if not redirect_uri or redirect_uri == 'urn:ietf:wg:oauth:2.0:oob':
logger.info("使用非浏览器认证模式")
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES)
flow.redirect_uri = redirect_uri
else:
logger.info(f"使用重定向URI: {redirect_uri}")
flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES)
flow.redirect_uri = redirect_uri
# 获取授权URL并抛出异常
auth_url = flow.step1_get_authorize_url()
logger.info(f"获取授权URL: {auth_url[:50]}...")
raise Exception(f"Please visit this URL to authorize: {auth_url}")
# 如果有有效凭证,初始化服务
self.credentials = creds
logger.info("使用现有凭证初始化Gmail服务")
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
logger.info("Gmail服务初始化成功")
# 获取Gmail账号信息
gmail_email = None
try:
# 调用Gmail API获取用户资料
profile = self.gmail_service.users().getProfile(userId='me').execute()
gmail_email = profile.get('emailAddress')
logger.info(f"获取到Gmail账号: {gmail_email}")
except Exception as profile_error:
logger.error(f"获取Gmail账号失败: {str(profile_error)}")
# 即使获取Gmail账号失败也要尝试获取消息以提取邮箱
if not gmail_email:
try:
# 尝试获取一条消息来提取邮箱
messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute()
if 'messages' in messages and len(messages['messages']) > 0:
msg_id = messages['messages'][0]['id']
msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute()
# 从消息中查找与当前用户匹配的邮箱
if 'payload' in msg and 'headers' in msg['payload']:
for header in msg['payload']['headers']:
if header['name'] in ['From', 'To', 'Cc', 'Bcc']:
if '<' in header['value'] and '>' in header['value']:
email = header['value'].split('<')[1].split('>')[0]
# 如果找到一个邮箱就使用它
if not gmail_email:
gmail_email = email
logger.info(f"从消息中提取到Gmail账号: {gmail_email}")
except Exception as msg_error:
logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}")
# 保存凭证到数据库
if self.user:
from django.utils import timezone
logger.info("保存凭证到数据库")
# 将凭证对象序列化
credentials_data = pickle.dumps(creds)
# 更新或创建凭证记录添加gmail_email字段
gmail_credential, created = GmailCredential.objects.update_or_create(
user=self.user,
defaults={
'credentials': credentials_data,
'token_path': self.token_storage_path,
'gmail_email': gmail_email, # 添加实际Gmail账号
'updated_at': timezone.now(),
'is_active': True
}
)
action = "创建" if created else "更新"
logger.info(f"{action}用户 {self.user.username} 的Gmail凭证记录实际Gmail账号: {gmail_email}")
# 认证成功后更新单例
if self.user and self.gmail_service and self.credentials:
GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service)
return True
except Exception as e:
# 保留授权URL异常视图层会处理
if "Please visit this URL" in str(e):
raise e
logger.error(f"Gmail认证失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
finally:
# 清理client_secret.json文件
if self.client_secret and os.path.exists('client_secret.json'):
logger.info("删除临时client_secret文件")
os.unlink('client_secret.json')
def create_talent_knowledge_base(self, talent_email):
"""
创建或获取与talent_email关联的知识库
Args:
talent_email (str): 达人Gmail邮箱地址
Returns:
tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志
"""
try:
# 优先查找现有的关联关系
mapping = GmailTalentMapping.objects.filter(
user=self.user,
talent_email=talent_email,
is_active=True
).first()
if mapping and mapping.knowledge_base:
logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}")
return mapping.knowledge_base, False
# 查找与该达人邮箱关联的知识库
# 根据达人邮箱生成一个唯一的标识名称
kb_name = f"Gmail-{talent_email.split('@')[0]}"
# 检查该名称的知识库是否已存在
existing_kb = KnowledgeBase.objects.filter(
name=kb_name,
user_id=self.user.id
).first()
if existing_kb:
logger.info(f"找到现有知识库: {kb_name}")
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
# 没有找到现有知识库,直接使用基本方法创建
logger.info(f"使用基本方法创建知识库: {kb_name}")
return self._create_knowledge_base_basic(talent_email)
except Exception as e:
logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 创建失败时,尝试使用基本方法创建
logger.info("尝试使用基本方法创建知识库")
return self._create_knowledge_base_basic(talent_email)
def _create_knowledge_base_basic(self, talent_email):
"""
使用基本方法创建知识库当KnowledgeBaseViewSet创建失败时的后备方案
Args:
talent_email (str): 达人Gmail邮箱地址
Returns:
tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志
"""
try:
# 根据达人邮箱生成一个唯一的标识名称
kb_name = f"Gmail-{talent_email.split('@')[0]}"
# 检查该名称的知识库是否已存在
existing_kb = KnowledgeBase.objects.filter(
name=kb_name,
user_id=self.user.id
).first()
if existing_kb:
logger.info(f"找到现有知识库: {kb_name}")
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
# 创建新知识库
knowledge_base = KnowledgeBase.objects.create(
name=kb_name,
desc=f"{talent_email}的Gmail邮件交流记录",
type="private",
user_id=self.user.id,
documents=[]
)
# 创建外部知识库
try:
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
external_id = kb_viewset._create_external_dataset(knowledge_base)
if external_id:
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"成功创建外部知识库: {external_id}")
except Exception as e:
logger.error(f"创建外部知识库失败: {str(e)}")
# 继续执行,不影响基本功能
# 创建映射关系
GmailTalentMapping.objects.create(
user=self.user,
talent_email=talent_email,
knowledge_base=knowledge_base,
is_active=True
)
logger.info(f"成功创建新知识库: {kb_name}, ID: {knowledge_base.id}")
return knowledge_base, True
except Exception as e:
logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise
def get_conversations(self, talent_gmail):
"""获取与特定用户的所有邮件对话"""
try:
if not self.gmail_service:
logger.error("Gmail服务未初始化")
return []
# 使用crushwds@gmail.com作为固定邮箱
email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱
email2 = talent_gmail # 使用参数传入的目标邮箱
logger.info(f"执行Gmail查询: {email1}{email2}")
# 构建搜索查询
query = f"from:({email1} OR {email2}) to:({email1} OR {email2})"
# 获取所有匹配的邮件
response = self.gmail_service.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
message_count = len(messages)
logger.info(f"找到 {message_count} 封邮件")
# 分页获取所有邮件
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.gmail_service.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
if 'messages' in response:
new_messages = response['messages']
messages.extend(new_messages)
new_count = len(new_messages)
message_count += new_count
logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}")
else:
logger.warning(f"未找到匹配邮件")
# 处理每封邮件
conversations = []
if messages:
logger.info(f"开始获取 {len(messages)} 封邮件详情...")
for i, msg in enumerate(messages):
try:
message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute()
email_data = self._extract_email_content(message)
if email_data:
conversations.append(email_data)
if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度
logger.info(f"已处理 {i+1}/{len(messages)} 封邮件")
else:
logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败")
except Exception as e:
logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}")
# 按时间排序
conversations.sort(key=lambda x: x['date'])
logger.info(f"总共找到并解析了 {len(conversations)} 封邮件")
return conversations
except Exception as e:
logger.error(f"获取Gmail对话失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
def _extract_email_content(self, message):
"""提取邮件内容完全按照quickstart.py的get_email_content函数实现"""
try:
message_id = message['id'] # 获取邮件ID
payload = message['payload']
headers = payload['headers']
# 获取邮件基本信息
email_data = {
'id': message_id, # 保存邮件ID
'subject': '',
'from': '',
'date': '',
'body': '',
'attachments': [] # 新增附件列表
}
# 提取头部信息
for header in headers:
if header['name'] == 'Subject':
email_data['subject'] = header['value']
elif header['name'] == 'From':
email_data['from'] = header['value']
elif header['name'] == 'Date':
try:
date = parser.parse(header['value'])
# 转换为与Django时区一致的格式
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
date = timezone.make_aware(date)
email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.error(f"解析日期失败: {str(e)}")
email_data['date'] = header['value']
# 定义一个递归函数来处理所有部分和附件
def process_parts(parts):
for part in parts:
# 检查是否是附件
if 'filename' in part and part['filename']:
attachment = {
'filename': part['filename'],
'mimeType': part['mimeType'],
'size': part['body'].get('size', 0)
}
# 如果有附件内容数据可以获取附件ID
if 'attachmentId' in part['body']:
attachment['attachmentId'] = part['body']['attachmentId']
email_data['attachments'].append(attachment)
# 处理文本内容
if part['mimeType'] == 'text/plain' and not email_data['body']:
data = part['body'].get('data', '')
if data:
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
except Exception as e:
logger.error(f"解码邮件内容失败: {str(e)}")
# 递归处理多部分内容
if 'parts' in part:
process_parts(part['parts'])
# 处理邮件正文和附件
if 'parts' in payload:
process_parts(payload['parts'])
elif 'body' in payload and 'data' in payload['body']:
# 没有parts直接处理body
data = payload['body'].get('data', '')
if data:
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
except Exception as e:
logger.error(f"解码邮件内容失败: {str(e)}")
return email_data
except Exception as e:
logger.error(f"处理邮件内容时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def download_attachment(self, message_id, attachment_id, filename):
"""下载邮件附件"""
try:
attachment = self.gmail_service.users().messages().attachments().get(
userId='me', messageId=message_id, id=attachment_id
).execute()
data = attachment['data']
file_data = base64.urlsafe_b64decode(data)
# 创建附件目录
attachments_dir = 'gmail_attachments'
if not os.path.exists(attachments_dir):
os.makedirs(attachments_dir)
# 保存附件
filepath = os.path.join(attachments_dir, f"{message_id}_{filename}")
with open(filepath, 'wb') as f:
f.write(file_data)
return filepath
except Exception as e:
logger.error(f"下载附件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def save_conversations_to_knowledge_base(self, conversations, knowledge_base):
"""
将Gmail对话保存到知识库
Args:
conversations: Gmail邮件列表
knowledge_base: 保存到的知识库对象
"""
try:
# 导入所需模型
from .models import GmailAttachment, ChatHistory
# 检查入参
if not conversations or not knowledge_base:
logger.error("参数不完整: conversations或knowledge_base为空")
return False
if not conversations:
logger.warning("没有邮件对话可保存")
return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"}
# 查找现有的对话ID - 优先使用talent_email查找
conversation_id = None
# 1. 首先尝试通过talent_email查找现有映射
if self.email:
existing_mapping = GmailTalentMapping.objects.filter(
user=self.user,
talent_email=self.email,
knowledge_base=knowledge_base,
is_active=True
).first()
if existing_mapping and existing_mapping.conversation_id:
conversation_id = existing_mapping.conversation_id
logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}")
# 2. 如果上面没找到,尝试通过知识库查找任何相关的对话
if not conversation_id:
existing_conversation = ChatHistory.objects.filter(
knowledge_base=knowledge_base,
user=self.user,
is_deleted=False
).values('conversation_id').distinct().first()
if existing_conversation:
conversation_id = existing_conversation['conversation_id']
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
# 如果有talent_email更新或创建映射关系
if self.email:
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=self.email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': conversation_id,
'is_active': True
}
)
logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}")
# 3. 如果仍然没找到则创建新的对话ID
if not conversation_id:
# 生成唯一的对话ID
conversation_id = str(uuid.uuid4())
logger.info(f"创建新的对话ID: {conversation_id}")
# 创建或更新Gmail和达人的映射关系
if self.email:
talent_mapping, created = GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=self.email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': conversation_id,
'is_active': True
}
)
action = "创建" if created else "更新"
logger.info(f"{action}Gmail达人映射: {self.email} -> {knowledge_base.name}")
# 构建知识库文档
logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话")
document = {
"title": f"{self.email or '联系人'}的Gmail邮件对话",
"url": "",
"source_type": "gmail",
"paragraphs": []
}
# 对话按时间顺序排序
conversations.sort(key=lambda x: x.get('date', ''))
# 将对话添加到文档
previous_message_dict = {} # 用于存储邮件时间与消息ID的映射以便找到正确的parent_id
# 首先查找现有的聊天记录
existing_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
# 如果已有消息,跳过重复导入
existing_gmail_ids = set()
if existing_messages.exists():
for msg in existing_messages:
if msg.metadata and 'gmail_message_id' in msg.metadata:
existing_gmail_ids.add(msg.metadata['gmail_message_id'])
logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入")
for message in conversations:
try:
# 跳过已导入的消息
if message.get('id') in existing_gmail_ids:
logger.info(f"跳过已导入的消息: {message.get('id')}")
continue
# 提取邮件内容
sender = message.get('from', '未知发件人')
date_str = message.get('date', '未知时间')
subject = message.get('subject', '无主题')
body = message.get('body', '').strip()
logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...")
# 判断角色
# 从发件人地址中提取邮箱部分
sender_email = ''
if '<' in sender and '>' in sender:
# 格式如 "姓名 <email@example.com>"
sender_email = sender.split('<')[1].split('>')[0]
else:
# 格式可能直接是邮箱
sender_email = sender
# 根据邮箱判断角色:检查发件人与用户邮箱或者目标邮箱是否匹配
is_user_email = self.user.email.lower() == sender_email.lower()
# 检查是否是目标邮箱talent_email
is_talent_email = False
if self.email and sender_email.lower() == self.email.lower():
is_talent_email = True
# 如果是用户邮箱或目标邮箱则为user角色否则为assistant
role = 'user' if is_user_email or is_talent_email else 'assistant'
logger.info(f"设置消息角色: {role},发件人: {sender_email},用户邮箱: {self.user.email},目标邮箱: {self.email}")
# 将邮件添加到文档
paragraph = {
"id": f"msg_{len(document['paragraphs']) + 1}",
"title": f"{date_str} - {sender} - {subject}",
"content": body,
"meta": {
"sender": sender,
"date": date_str,
"subject": subject,
"has_attachments": len(message.get('attachments', [])) > 0,
"message_id": message.get('id', '')
}
}
document['paragraphs'].append(paragraph)
# 解析邮件日期为datetime对象
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用当前时间
logger.warning(f"无法解析邮件日期: {date_str},使用当前时间")
date_obj = datetime.now()
# 查找parent_id查找日期早于当前邮件且最接近的消息
parent_id = None
closest_date = None
for d, mid in previous_message_dict.items():
if d < date_obj and (closest_date is None or d > closest_date):
closest_date = d
parent_id = mid
# 保存消息到聊天历史,使用邮件实际日期
from django.utils import timezone
aware_date = timezone.make_aware(date_obj) if not timezone.is_aware(date_obj) else date_obj
# 创建消息记录
chat_message = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=parent_id,
role=role,
content=f"{subject}\n\n{body}",
metadata={
'gmail_message_id': message.get('id', ''),
'from': sender,
'date': date_str,
'subject': subject,
'dataset_id_list': [str(knowledge_base.id)],
'dataset_names': [knowledge_base.name]
},
created_at=aware_date # 设置正确的创建时间
)
# 更新previous_message_dict
previous_message_dict[date_obj] = str(chat_message.id)
# 处理附件
attachments = message.get('attachments', [])
if attachments:
for attachment in attachments:
if 'attachmentId' in attachment and 'filename' in attachment:
try:
# 下载附件
filepath = self.download_attachment(
message_id=message['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename']
)
if filepath:
# 记录附件信息
GmailAttachment.objects.create(
chat_message=chat_message,
gmail_message_id=message['id'],
filename=attachment['filename'],
filepath=filepath,
mimetype=attachment.get('mimeType', ''),
filesize=attachment.get('size', 0)
)
logger.info(f"已保存附件: {attachment['filename']}")
except Exception as e:
logger.error(f"保存附件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"处理邮件时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 将文档添加到知识库
logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}")
# 准备文档数据结构
doc_data = {
"name": f"Gmail对话与{self.email or '联系人'}.txt",
"paragraphs": []
}
# 将所有对话转换为段落
for paragraph in document['paragraphs']:
doc_data["paragraphs"].append({
"title": paragraph['title'],
"content": paragraph['content'],
"is_active": True,
"problem_list": []
})
# 调用知识库的文档上传API
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
# 上传文档
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_data["name"],
external_id=document_id
)
logger.info(f"Gmail对话文档上传成功ID: {document_id}")
# 上传所有附件
self._upload_attachments_to_knowledge_base(knowledge_base, conversations)
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"Gmail对话文档上传失败: {error_msg}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}")
return {
"conversation_id": conversation_id,
"success": True,
"message_count": len(conversations),
"knowledge_base_id": str(knowledge_base.id)
}
except Exception as e:
logger.error(f"保存Gmail对话到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return {"conversation_id": None, "success": False, "error": str(e)}
def send_email(self, to_email, subject, body, conversation_id=None, attachments=None):
"""发送邮件并保存到聊天记录"""
try:
# 构建邮件内容
if attachments and len(attachments) > 0:
message = self._create_message_with_attachment(to_email, subject, body, attachments)
else:
message = self._create_message(to_email, subject, body)
# 发送邮件
sent_message = self.gmail_service.users().messages().send(
userId='me', body=message
).execute()
message_id = sent_message['id']
logger.info(f"邮件发送成功, ID: {message_id}")
# 获取邮件详情,包括服务器时间戳
try:
message_detail = self.gmail_service.users().messages().get(
userId='me', id=message_id
).execute()
# 从消息详情中提取时间戳
internal_date = message_detail.get('internalDate')
if internal_date:
# 转换毫秒时间戳为datetime不使用timezone-aware
email_date = datetime.fromtimestamp(int(internal_date)/1000)
# 如果系统设置了USE_TZ再转换为timezone-aware
if hasattr(timezone, 'is_aware') and not timezone.is_aware(email_date):
email_date = timezone.make_aware(email_date)
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
else:
email_date = datetime.now()
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间")
email_date = datetime.now()
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
# 如果有conversation_id保存到聊天记录
if conversation_id:
# 查找现有的聊天记录
from django.db.models import Q
existing_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
if not existing_messages.exists():
logger.warning(f"找不到对话ID: {conversation_id},无法保存消息")
return message_id
# 查找关联的知识库
first_message = existing_messages.first()
if first_message.knowledge_base:
knowledge_base = first_message.knowledge_base
else:
# 如果第一条消息没有知识库尝试从metadata获取
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
kb_id = first_message.metadata.get('dataset_id_list', [])[0]
knowledge_base = KnowledgeBase.objects.get(id=kb_id)
else:
logger.error(f"找不到关联的知识库,无法保存消息")
return message_id
# 查找parent_id时间早于当前邮件且最接近的消息避免时区问题
parent_id = None
try:
# 使用简单查询,不依赖时区
previous_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('-id')[:1] # 使用ID降序排序而非时间
if previous_messages:
parent_id = str(previous_messages[0].id)
except Exception as e:
logger.error(f"查找父消息失败: {str(e)}")
logger.error(traceback.format_exc())
# 构建metadata
metadata = {
'gmail_message_id': message_id,
'from': self.user.email,
'to': to_email,
'date': date_str,
'subject': subject,
'dataset_id_list': [str(knowledge_base.id)],
'dataset_names': [knowledge_base.name]
}
# 如果现有消息有dataset_id_list保留它
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
metadata['dataset_id_list'] = first_message.metadata['dataset_id_list']
# 尝试获取对应的dataset_names
if 'dataset_names' in first_message.metadata:
metadata['dataset_names'] = first_message.metadata['dataset_names']
# 创建聊天记录,使用当前时间而不是邮件的实际时间
chat_message = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=parent_id,
role='user', # 用户发送的消息,角色固定为'user'
content=f"[{subject}] {body}",
metadata=metadata
# 不设置created_at使用数据库默认时间
)
# 如果有附件,保存附件信息
if attachments and len(attachments) > 0:
for att_path in attachments:
file_name = os.path.basename(att_path)
file_size = os.path.getsize(att_path)
# 获取MIME类型
mime_type, _ = mimetypes.guess_type(att_path)
if not mime_type:
mime_type = 'application/octet-stream'
# 保存附件信息到数据库
gmail_attachment = GmailAttachment.objects.create(
chat_message=chat_message,
gmail_message_id=message_id,
filename=file_name,
filepath=att_path,
mimetype=mime_type,
filesize=file_size
)
# 更新知识库文档
self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email)
return message_id
except Exception as e:
logger.error(f"发送邮件失败: {str(e)}")
logger.error(traceback.format_exc())
raise
def _create_message(self, to, subject, body):
"""创建邮件消息"""
message = MIMEText(body)
message['to'] = to
message['subject'] = subject
# 编码为JSON安全的字符串
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw}
def _create_message_with_attachment(self, to, subject, body, attachment_files):
"""创建带附件的邮件消息"""
message = MIMEMultipart()
message['to'] = to
message['subject'] = subject
# 添加邮件正文
msg = MIMEText(body)
message.attach(msg)
# 添加附件
for file in attachment_files:
try:
with open(file, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
# 编码附件内容
encoders.encode_base64(part)
# 添加头部信息
filename = os.path.basename(file)
part.add_header(
'Content-Disposition',
f'attachment; filename="{filename}"'
)
message.attach(part)
except Exception as e:
logger.error(f"添加附件 {file} 失败: {str(e)}")
# 编码为JSON安全的字符串
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw}
def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient):
"""将新发送的邮件内容追加到知识库文档中"""
try:
# 准备文档数据结构
doc_data = {
"name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt",
"paragraphs": [
{
"title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}",
"content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}",
"is_active": True,
"problem_list": []
}
]
}
# 调用知识库的文档上传API
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
# 验证知识库是否有external_id
if not knowledge_base.external_id:
logger.error(f"知识库没有external_id无法上传文档: {knowledge_base.name}")
return None
# 上传文档
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_data["name"],
external_id=document_id
)
logger.info(f"Gmail回复文档上传成功ID: {document_id}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
return upload_response
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"Gmail回复文档上传失败: {error_msg}")
return None
except Exception as e:
logger.error(f"追加邮件内容到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def setup_watch(self, topic_name=None):
"""设置Gmail监听"""
try:
# 如果没有指定topic使用配置的主题名称
if not topic_name:
# 从settings获取项目配置的主题名称
topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic')
# 直接使用settings中配置的项目ID不再从client_secret.json获取
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905')
logger.info(f"使用settings中配置的项目ID: {project_id}")
# 注意不再使用用户邮箱作为判断依据Gmail API总是使用'me'作为userId
# Gmail认证是通过OAuth流程与系统用户邮箱无关
logger.info(f"系统用户: {self.user.email}Gmail API使用OAuth认证的邮箱 (userId='me')")
# 构建完整的webhook URL
webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None)
if not webhook_url:
# 使用默认值确保这是一个完全限定的URL
domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0]
protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http'
if domain == 'localhost' or domain.startswith('127.0.0.1'):
# 本地开发环境需要公网可访问的URL可以用ngrok等工具暴露本地服务
webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/"
logger.warning("使用本地开发环境URL作为webhook回调Google可能无法访问。建议使用ngrok等工具创建公网地址。")
else:
webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/"
logger.info(f"使用Gmail webhook URL: {webhook_url}")
# 如果想在Google Cloud控制台中手动配置webhook打印明确的说明
logger.info("如需手动配置Gmail推送通知请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:")
logger.info(f"推送端点: {webhook_url}")
logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限")
# 请求监听
request = {
'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签
'topicName': f"projects/{project_id}/topics/{topic_name}",
'labelFilterAction': 'include'
}
logger.info(f"设置Gmail监听: {request}")
# 执行watch请求
response = self.gmail_service.users().watch(userId='me', body=request).execute()
# 获取historyId用于后续同步
history_id = response.get('historyId')
expiration = response.get('expiration')
logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}")
# 保存监听信息到数据库
if self.user:
credential = GmailCredential.objects.filter(user=self.user, is_active=True).first()
if credential:
# 转换时间戳为datetime
expiration_time = None
if expiration:
# 将毫秒时间戳转换为timezone-aware的datetime
naive_time = datetime.fromtimestamp(int(expiration)/1000)
expiration_time = timezone.make_aware(naive_time)
credential.last_history_id = history_id
credential.watch_expiration = expiration_time
credential.save()
logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}")
return {
'historyId': history_id,
'expiration': expiration
}
except Exception as e:
logger.error(f"设置Gmail监听失败: {str(e)}")
logger.error(traceback.format_exc())
raise
def get_history(self, start_history_id):
"""获取历史变更"""
try:
logger.info(f"获取历史记录起始ID: {start_history_id}")
response = self.gmail_service.users().history().list(
userId='me', startHistoryId=start_history_id
).execute()
logger.info(f"历史记录响应: {response}")
history_list = []
if 'history' in response:
history_list.extend(response['history'])
logger.info(f"找到 {len(response['history'])} 个历史记录")
# 获取所有页
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.gmail_service.users().history().list(
userId='me', startHistoryId=start_history_id, pageToken=page_token
).execute()
if 'history' in response:
history_list.extend(response['history'])
logger.info(f"加载额外 {len(response['history'])} 个历史记录")
else:
logger.info(f"没有新的历史记录最新historyId: {response.get('historyId', 'N/A')}")
# 提取新消息ID
new_message_ids = set()
for history in history_list:
logger.info(f"处理历史记录: {history}")
if 'messagesAdded' in history:
for message in history['messagesAdded']:
message_id = message['message']['id']
new_message_ids.add(message_id)
logger.info(f"新增消息ID: {message_id}")
if 'labelsAdded' in history:
for label in history['labelsAdded']:
message_id = label['message']['id']
if 'INBOX' in label.get('labelIds', []):
new_message_ids.add(message_id)
logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签")
if new_message_ids:
logger.info(f"总共找到 {len(new_message_ids)} 个新消息")
else:
logger.info("没有新增消息")
return list(new_message_ids)
except Exception as e:
logger.error(f"获取Gmail历史记录失败: {str(e)}")
return []
def process_notification(self, notification_data):
"""处理Gmail推送通知"""
try:
# 提取通知数据
logger.info(f"处理Gmail通知: {notification_data}")
# 处理Google Pub/Sub消息格式
if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']:
try:
import base64
import json
logger.info("检测到Google Pub/Sub消息格式")
# Base64解码data字段
encoded_data = notification_data['message']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
logger.info(f"解码后的数据: {decoded_data}")
# 解析JSON获取email和historyId
json_data = json.loads(decoded_data)
email = json_data.get('emailAddress')
history_id = json_data.get('historyId')
logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}")
except Exception as decode_error:
logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}")
logger.error(traceback.format_exc())
return False
else:
# 原始格式处理
email = notification_data.get('emailAddress')
history_id = notification_data.get('historyId')
if not email or not history_id:
logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId")
return False
# 查找关联用户
from .models import User
user = User.objects.filter(email=email).first()
# 如果找不到用户尝试使用gmail_email字段查找
if not user:
logger.info(f"找不到email={email}的用户尝试使用gmail_email查找")
from .models import GmailCredential
credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first()
if credential:
user = credential.user
logger.info(f"通过gmail_email找到用户: {user.email}")
if not user:
logger.error(f"找不到与 {email} 关联的用户")
return False
# 初始化Gmail集成
gmail_integration = GmailIntegration(user)
if not gmail_integration.authenticate():
logger.error(f"Gmail认证失败: {email}")
return False
# 首先尝试使用历史记录API获取新消息
message_ids = gmail_integration.get_history(history_id)
if message_ids:
logger.info(f"从历史记录找到 {len(message_ids)} 个新消息")
# 处理每个新消息
for message_id in message_ids:
self._process_new_message(gmail_integration, message_id)
return True
# 如果历史记录API没有返回新消息尝试获取最近的对话
logger.info("历史记录API没有返回新消息尝试获取达人对话")
# 查找所有与该用户关联的达人映射
from .models import GmailTalentMapping
mappings = GmailTalentMapping.objects.filter(
user=user,
is_active=True
)
if not mappings.exists():
logger.info(f"用户 {user.email} 没有达人映射记录")
return False
# 处理每个达人映射
for mapping in mappings:
talent_email = mapping.talent_email
logger.info(f"处理达人 {talent_email} 的对话")
# 获取达人最近的邮件
recent_emails = gmail_integration.get_recent_emails(
from_email=talent_email,
max_results=5 # 限制获取最近5封
)
if not recent_emails:
logger.info(f"没有找到来自 {talent_email} 的最近邮件")
continue
logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件")
# 创建或获取知识库
knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_email)
kb_action = "创建" if created else "获取"
logger.info(f"知识库{kb_action}成功: {knowledge_base.name}")
# 保存对话
result = gmail_integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base)
logger.info(f"保存达人对话结果: {result}")
return True
except Exception as e:
logger.error(f"处理Gmail通知失败: {str(e)}")
logger.error(traceback.format_exc())
return False
def _process_new_message(self, gmail_integration, message_id):
"""处理新收到的邮件"""
try:
# 导入所需模型
from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile
# 获取邮件详情
message = gmail_integration.gmail_service.users().messages().get(
userId='me', id=message_id
).execute()
# 提取邮件内容
email_data = gmail_integration._extract_email_content(message)
if not email_data:
logger.error(f"提取邮件内容失败: {message_id}")
return False
# 获取发件人邮箱
from_email = email_data.get('from', '')
sender_email = ''
if '<' in from_email and '>' in from_email:
# 格式如 "姓名 <email@example.com>"
sender_email = from_email.split('<')[1].split('>')[0]
else:
# 格式可能直接是邮箱
sender_email = from_email
# 根据邮箱判断角色检查发件人与用户邮箱或者映射的talent邮箱是否匹配
is_user_email = gmail_integration.user.email.lower() == sender_email.lower()
# 检查是否有与当前用户关联的talent邮箱映射
is_mapped_talent = False
talent_mapping = GmailTalentMapping.objects.filter(
user=gmail_integration.user,
talent_email=sender_email,
is_active=True
).first() # 修改为first()以获取实际的对象而不是布尔值
# 如果是用户邮箱或映射的talent邮箱则为user角色否则为assistant
role = 'user' if is_user_email or talent_mapping else 'assistant'
logger.info(f"设置消息角色: {role}, 发件人: {sender_email}, 用户邮箱: {gmail_integration.user.email}, 是否映射达人: {talent_mapping}")
# 查找是否有关联的达人知识库
kb_name = f"Gmail-{sender_email.split('@')[0]}"
knowledge_base = KnowledgeBase.objects.filter(name=kb_name).first()
# 如果没有以发件人邮箱命名的知识库,尝试查找自定义知识库
if not knowledge_base:
# 查找映射关系
mapping = GmailTalentMapping.objects.filter(
talent_email=sender_email,
user=gmail_integration.user,
is_active=True
).first()
if mapping and mapping.knowledge_base:
knowledge_base = mapping.knowledge_base
logger.info(f"使用映射的知识库: {knowledge_base.name}")
else:
logger.info(f"收到新邮件,但没有找到关联的达人知识库: {sender_email}")
return False
# 查找关联的对话ID
conversation_id = None
# 1. 首先通过talent_mapping查找
if talent_mapping and talent_mapping.conversation_id:
conversation_id = talent_mapping.conversation_id
logger.info(f"通过达人映射找到对话ID: {conversation_id}")
# 2. 如果没有找到,尝试通过知识库查找任何相关对话
if not conversation_id:
existing_conversation = ChatHistory.objects.filter(
knowledge_base=knowledge_base,
user=gmail_integration.user,
is_deleted=False
).values('conversation_id').distinct().first()
if existing_conversation:
conversation_id = existing_conversation['conversation_id']
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
# 更新或创建映射关系
if not talent_mapping:
GmailTalentMapping.objects.update_or_create(
user=gmail_integration.user,
talent_email=sender_email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': conversation_id,
'is_active': True
}
)
logger.info(f"更新Gmail达人映射: {sender_email} -> 对话ID {conversation_id}")
# 3. 如果仍没找到创建新的对话ID
if not conversation_id:
conversation_id = str(uuid.uuid4())
logger.info(f"创建新的对话ID: {conversation_id}")
# 保存映射关系
if not talent_mapping:
GmailTalentMapping.objects.create(
user=gmail_integration.user,
talent_email=sender_email,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
is_active=True
)
logger.info(f"已创建新的Gmail达人映射: {sender_email} -> {knowledge_base.name}")
# 检查消息是否已经处理过
if ChatHistory.objects.filter(
conversation_id=conversation_id,
metadata__gmail_message_id=message_id,
is_deleted=False
).exists():
logger.info(f"邮件已处理过,跳过: {message_id}")
return True
# 解析邮件日期 - 使用普通datetime而非timezone-aware
date_str = email_data.get('date', '')
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
# 确保时区处理正确
from django.utils import timezone
aware_date = timezone.make_aware(date_obj) if not timezone.is_aware(date_obj) else date_obj
except (ValueError, TypeError):
logger.warning(f"无法解析邮件日期: {date_str},使用当前时间")
aware_date = timezone.now()
# 查找适合的parent_id: 使用创建时间排序
try:
# 查找该对话中最新的消息
latest_message = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('-created_at').first()
parent_id = None
if latest_message:
parent_id = str(latest_message.id)
logger.info(f"找到最新消息ID作为parent_id: {parent_id}, 创建时间: {latest_message.created_at}")
else:
logger.info(f"对话 {conversation_id} 没有现有消息不设置parent_id")
except Exception as e:
logger.error(f"查找父消息失败: {str(e)}")
logger.error(traceback.format_exc())
parent_id = None
# 下载附件
attachment_records = []
for attachment in email_data.get('attachments', []):
if 'attachmentId' in attachment:
filepath = gmail_integration.download_attachment(
message_id,
attachment['attachmentId'],
attachment['filename']
)
if filepath:
attachment_records.append({
'filepath': filepath,
'filename': attachment['filename'],
'message_id': message_id,
'date': date_str
})
# 构建metadata
metadata = {
'gmail_message_id': message_id,
'from': email_data.get('from', ''),
'date': date_str,
'subject': email_data.get('subject', ''),
'dataset_id_list': [str(knowledge_base.id)],
'dataset_names': [knowledge_base.name]
}
if attachment_records:
metadata['message_attachments'] = attachment_records
# 使用之前查找到的parent_id和aware_date创建聊天记录
chat_message = ChatHistory.objects.create(
user=gmail_integration.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=parent_id, # 使用之前查找到的parent_id
role=role, # 使用上面确定的role变量
content=f"[{email_data['subject']}] {email_data['body']}",
metadata=metadata,
created_at=aware_date # 设置正确的创建时间
)
# 更新知识库文档
gmail_integration._append_to_knowledge_base_document(
knowledge_base,
email_data['subject'],
email_data['body'],
gmail_integration.user.email
)
# 如果有附件,上传到知识库
if attachment_records:
gmail_integration._upload_message_attachments_to_knowledge_base(
knowledge_base,
attachment_records
)
# 添加WebSocket通知功能
try:
# 导入必要的模块
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.conf import settings
# 检查是否有WebSocket通道层配置
channel_layer = get_channel_layer()
if channel_layer:
# 创建通知数据
notification_data = {
"type": "notification",
"data": {
"message_type": "new_gmail",
"conversation_id": conversation_id,
"message": {
"id": str(chat_message.id),
"role": role,
"content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}",
"sender": sender_email,
"subject": email_data['subject'],
"has_attachments": len(attachment_records) > 0
}
}
}
# 发送WebSocket消息
async_to_sync(channel_layer.group_send)(
f"notification_user_{gmail_integration.user.id}",
notification_data
)
logger.info(f"已发送WebSocket通知: 用户 {gmail_integration.user.id} 收到新Gmail消息")
# 创建系统通知记录
try:
from .models import Notification
Notification.objects.create(
sender=gmail_integration.user,
receiver=gmail_integration.user,
title="新Gmail消息",
content=f"您收到了来自 {sender_email} 的新邮件: {email_data['subject']}",
type="system_notice",
related_resource=conversation_id
)
logger.info(f"已创建系统通知记录: 用户 {gmail_integration.user.id} 的新Gmail消息")
except Exception as notification_error:
logger.error(f"创建系统通知记录失败: {str(notification_error)}")
# 如果消息是达人发送的,并且用户启用了自动推荐回复功能,则生成推荐回复
if role == 'user' and talent_mapping:
try:
# 检查用户是否启用了自动推荐回复功能
user_profile, created = UserProfile.objects.get_or_create(user=gmail_integration.user)
if user_profile.auto_recommend_reply:
logger.info(f"用户 {gmail_integration.user.id} 已启用自动推荐回复功能,生成推荐回复")
# 获取对话历史以传递给DeepSeek API
conversation_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
# 构建对话历史
conversation_history = []
for message in conversation_messages:
conversation_history.append({
'role': 'user' if message.role == 'user' else 'assistant',
'content': message.content
})
# 限制对话历史长度只保留最近的5条消息避免超出token限制
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
messages.extend(recent_messages)
# 确保最后一条消息是用户消息,如果不是,添加一个提示
if not recent_messages or recent_messages[-1]['role'] != 'user':
# 添加一个系统消息作为用户的最后一条消息
messages.append({
"role": "user",
"content": "请针对我之前的消息提供详细的回复建议。"
})
# 调用DeepSeek API生成推荐回复
recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history)
if recommended_reply:
# 创建推荐回复通知
recommend_notification_data = {
"type": "notification",
"data": {
"message_type": "recommended_reply",
"conversation_id": conversation_id,
"message": {
"id": str(chat_message.id),
"role": role,
"content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}",
"sender": sender_email,
"subject": email_data['subject'],
"recommended_reply": recommended_reply
}
}
}
# 发送推荐回复WebSocket通知
async_to_sync(channel_layer.group_send)(
f"notification_user_{gmail_integration.user.id}",
recommend_notification_data
)
logger.info(f"已发送推荐回复通知: 用户 {gmail_integration.user.id}")
# 创建推荐回复系统通知
Notification.objects.create(
sender=gmail_integration.user,
receiver=gmail_integration.user,
title="新推荐回复",
content=f"系统为来自 {sender_email} 的邮件生成了推荐回复",
type="system_notice",
related_resource=conversation_id
)
logger.info(f"已创建推荐回复系统通知: 用户 {gmail_integration.user.id}")
else:
logger.warning(f"生成推荐回复失败: 用户 {gmail_integration.user.id}, 对话 {conversation_id}")
except Exception as recommend_error:
logger.error(f"处理推荐回复失败: {str(recommend_error)}")
logger.error(traceback.format_exc())
except Exception as ws_error:
logger.error(f"发送WebSocket通知失败: {str(ws_error)}")
logger.error(traceback.format_exc())
# 通知失败不影响消息处理流程,继续执行
logger.info(f"成功处理新邮件: {message_id}{sender_email}")
return True
except Exception as e:
logger.error(f"处理新邮件失败: {str(e)}")
logger.error(traceback.format_exc())
return False
def _get_recommended_reply_from_deepseek(self, conversation_history):
"""调用DeepSeek V3 API生成推荐回复"""
try:
# 使用有效的API密钥
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
# 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取
# 从Django设置中获取密钥
from django.conf import settings
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
api_key = settings.DEEPSEEK_API_KEY
url = "https://api.siliconflow.cn/v1/chat/completions"
# 直接使用默认系统消息,不进行复杂处理,尽量模仿文档示例
system_message = {
"role": "system",
"content": "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。即使用户消息很短或不明确也必须提供有实质内容的回复。禁止返回空白内容。回复应该有至少100个字符。"
}
messages = [system_message]
# 限制对话历史长度只保留最近的5条消息避免超出token限制
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
messages.extend(recent_messages)
# 确保最后一条消息是用户消息,如果不是,添加一个提示
if not recent_messages or recent_messages[-1]['role'] != 'user':
# 添加一个系统消息作为用户的最后一条消息
messages.append({
"role": "user",
"content": "请针对我之前的消息提供详细的回复建议。"
})
# 完全按照文档提供的参数格式构建请求
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"stream": False,
"max_tokens": 1024, # 增加token上限
"temperature": 0.7, # 提高多样性
"top_p": 0.9,
"top_k": 50,
"frequency_penalty": 0.5,
"presence_penalty": 0.2, # 添加新参数
"n": 1,
"stop": [],
"response_format": {
"type": "text"
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
logger.info(f"开始调用DeepSeek API生成推荐回复")
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
return None
result = response.json()
logger.debug(f"DeepSeek API返回: {result}")
# 提取回复内容
if 'choices' in result and len(result['choices']) > 0:
reply = result['choices'][0]['message']['content']
# 如果返回的内容为空直接返回None
if not reply or reply.strip() == '':
logger.warning("DeepSeek API返回的回复内容为空")
return None
return reply
logger.warning(f"DeepSeek API返回格式异常: {result}")
return None
except Exception as e:
logger.error(f"调用DeepSeek API失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def get_attachment_by_conversation(self, conversation_id):
"""获取特定对话的所有附件"""
try:
# 查找对话记录
records = ChatHistory.objects.filter(
conversation_id=conversation_id,
user=self.user,
is_deleted=False
)
if not records:
logger.warning(f"找不到对话记录: {conversation_id}")
return []
# 使用GmailAttachment数据模型查询附件
attachments = []
records_ids = [record.id for record in records]
gmail_attachments = GmailAttachment.objects.filter(
chat_message_id__in=records_ids
).select_related('chat_message')
for attachment in gmail_attachments:
chat_message = attachment.chat_message
attachments.append({
'id': str(attachment.id),
'filename': attachment.filename,
'filepath': attachment.filepath,
'mimetype': attachment.mimetype,
'filesize': attachment.filesize,
'message_id': str(chat_message.id),
'gmail_message_id': attachment.gmail_message_id,
'role': chat_message.role,
'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content
})
return attachments
except Exception as e:
logger.error(f"获取对话附件失败: {str(e)}")
return []
def handle_auth_code(self, auth_code):
"""
处理授权码并完成OAuth2授权流程使用与quickstart.py相同的简化流程
Args:
auth_code (str): 从Google授权页面获取的授权码
Returns:
bool: 授权是否成功
"""
try:
logger.info("开始处理Gmail授权码...")
# 确保client_secret_json已提供
if not self.client_secret:
logger.error("未提供client_secret_json无法处理授权码")
return False
# 创建临时文件存储client_secret
client_secret_path = 'client_secret.json'
with open(client_secret_path, 'w') as f:
if isinstance(self.client_secret, str):
logger.info("client_secret是字符串解析为JSON")
try:
# 确保是有效的JSON
json_data = json.loads(self.client_secret)
json.dump(json_data, f)
except json.JSONDecodeError as e:
logger.error(f"client_secret不是有效的JSON: {str(e)}")
return False
else:
logger.info("client_secret是字典直接写入文件")
json.dump(self.client_secret, f)
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
try:
# 确认token目录存在
token_dir = os.path.dirname(self.token_storage_path)
if token_dir and not os.path.exists(token_dir):
logger.info(f"创建token目录: {token_dir}")
os.makedirs(token_dir)
# 设置token存储
logger.info(f"设置token存储: {self.token_storage_path}")
store = file.Storage(self.token_storage_path)
# 提取重定向URI
redirect_uri = None
if isinstance(self.client_secret, dict):
for key in ['web', 'installed']:
if key in self.client_secret and 'redirect_uris' in self.client_secret[key]:
redirect_uri = self.client_secret[key]['redirect_uris'][0]
break
elif isinstance(self.client_secret, str):
try:
json_data = json.loads(self.client_secret)
for key in ['web', 'installed']:
if key in json_data and 'redirect_uris' in json_data[key]:
redirect_uri = json_data[key]['redirect_uris'][0]
break
except:
pass
# 如果找不到重定向URI使用默认值
if not redirect_uri:
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
logger.info(f"使用重定向URI: {redirect_uri}")
# 从client_secret创建flow
logger.info("从client_secret创建授权流程")
flow = client.flow_from_clientsecrets(
client_secret_path,
self.SCOPES,
redirect_uri=redirect_uri
)
# 使用授权码交换token
logger.info("使用授权码交换访问令牌")
credentials = flow.step2_exchange(auth_code)
logger.info("成功获取到访问令牌")
# 保存到文件
logger.info(f"保存凭证到文件: {self.token_storage_path}")
store.put(credentials)
# 保存到实例变量
self.credentials = credentials
# 初始化Gmail服务
logger.info("初始化Gmail服务")
self.gmail_service = discovery.build('gmail', 'v1', http=credentials.authorize(Http()))
logger.info("Gmail服务初始化成功")
# 保存到数据库
from django.utils import timezone
logger.info("保存凭证到数据库")
# 将凭证对象序列化
credentials_data = pickle.dumps(credentials)
gmail_credential, created = GmailCredential.objects.update_or_create(
user=self.user,
defaults={
'credentials': credentials_data,
'token_path': self.token_storage_path,
'updated_at': timezone.now(),
'is_active': True
}
)
action = "创建" if created else "更新"
logger.info(f"{action}用户 {self.user.username} 的Gmail凭证记录")
# 成功获取凭证后更新单例
if self.user and self.gmail_service and self.credentials:
GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service)
return True
except client.FlowExchangeError as e:
logger.error(f"授权码交换失败: {str(e)}")
return False
except Exception as e:
logger.error(f"处理授权码时发生错误: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
finally:
# 删除临时文件
if os.path.exists(client_secret_path):
logger.info(f"删除临时文件: {client_secret_path}")
os.unlink(client_secret_path)
except Exception as e:
logger.error(f"处理授权码过程中发生异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations):
"""上传所有邮件附件到知识库"""
try:
# 收集所有需要上传的附件
attachments_to_upload = []
# 从conversations中提取所有附件信息
for message in conversations:
if 'attachments' in message and message['attachments']:
message_id = message.get('id', '')
sender = message.get('from', '未知发件人')
subject = message.get('subject', '无主题')
date_str = message.get('date', '未知时间')
for attachment in message['attachments']:
if 'attachmentId' in attachment and 'filename' in attachment:
# 下载附件(如果尚未下载)
filepath = None
# 检查数据库中是否已有记录
existing_attachment = GmailAttachment.objects.filter(
gmail_message_id=message_id,
filename=attachment['filename']
).first()
if existing_attachment and os.path.exists(existing_attachment.filepath):
filepath = existing_attachment.filepath
else:
# 下载附件
filepath = self.download_attachment(
message_id=message_id,
attachment_id=attachment['attachmentId'],
filename=attachment['filename']
)
if filepath:
attachments_to_upload.append({
'filepath': filepath,
'filename': attachment['filename'],
'message_id': message_id,
'sender': sender,
'subject': subject,
'date': date_str
})
# 上传收集到的所有附件
if attachments_to_upload:
logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库")
from .views import KnowledgeBaseViewSet
import django.core.files.uploadedfile as uploadedfile
# 导入FileUploadParser
from rest_framework.parsers import FileUploadParser
# 创建视图集实例
kb_viewset = KnowledgeBaseViewSet()
# 批量上传附件
for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件
batch = attachments_to_upload[i:i+10]
files = []
for att in batch:
filepath = att['filepath']
filename = att['filename']
# 确认文件存在
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 读取文件并创建UploadedFile对象
with open(filepath, 'rb') as f:
file_content = f.read()
files.append(uploadedfile.SimpleUploadedFile(
name=filename,
content=file_content,
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
))
# 如果这批有文件调用_call_split_api_multiple上传
if files:
# 直接调用文档分割API
split_response = kb_viewset._call_split_api_multiple(files)
if not split_response or split_response.get('code') != 200:
logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
continue
# 处理分割后的文档
documents_data = split_response.get('data', [])
# 对每个文档调用上传API
for doc in documents_data:
doc_name = doc.get('name', 'Gmail附件')
doc_content = doc.get('content', [])
# 准备文档数据
upload_doc_data = {
"name": doc_name,
"paragraphs": []
}
# 将所有段落添加到文档中
for paragraph in doc_content:
upload_doc_data["paragraphs"].append({
"content": paragraph.get('content', ''),
"title": paragraph.get('title', ''),
"is_active": True,
"problem_list": []
})
# 调用文档上传API
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_name,
external_id=document_id
)
logger.info(f"Gmail附件文档上传成功ID: {document_id}, 文件名: {doc_name}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件")
except Exception as e:
logger.error(f"上传附件到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records):
"""上传单个邮件的附件到知识库"""
try:
# 检查是否有附件需要上传
if not attachment_records:
return
logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库")
from .views import KnowledgeBaseViewSet
import django.core.files.uploadedfile as uploadedfile
# 创建视图集实例
kb_viewset = KnowledgeBaseViewSet()
# 准备文件列表
files = []
for att in attachment_records:
filepath = att.get('filepath')
filename = att.get('filename')
# 确认文件存在
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 读取文件并创建UploadedFile对象
with open(filepath, 'rb') as f:
file_content = f.read()
files.append(uploadedfile.SimpleUploadedFile(
name=filename,
content=file_content,
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
))
# 如果有文件调用_call_split_api_multiple上传
if files:
# 直接调用文档分割API
split_response = kb_viewset._call_split_api_multiple(files)
if not split_response or split_response.get('code') != 200:
logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
return
# 处理分割后的文档
documents_data = split_response.get('data', [])
# 对每个文档调用上传API
for doc in documents_data:
doc_name = doc.get('name', 'Gmail附件')
doc_content = doc.get('content', [])
# 准备文档数据
upload_doc_data = {
"name": doc_name,
"paragraphs": []
}
# 将所有段落添加到文档中
for paragraph in doc_content:
upload_doc_data["paragraphs"].append({
"content": paragraph.get('content', ''),
"title": paragraph.get('title', ''),
"is_active": True,
"problem_list": []
})
# 调用文档上传API
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_name,
external_id=document_id
)
logger.info(f"Gmail附件文档上传成功ID: {document_id}, 文件名: {doc_name}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"完成附件上传,共 {len(files)} 个文件")
except Exception as e:
logger.error(f"上传附件到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def get_recent_emails(self, from_email=None, max_results=10):
"""
获取最近的邮件不依赖history_id
Args:
from_email (str, optional): 发件人邮箱过滤
max_results (int, optional): 最大结果数默认10封
Returns:
list: 邮件列表
"""
try:
# 构建查询
query = f"from:{from_email}" if from_email else None
logger.info(f"查询最近邮件: query={query}, max_results={max_results}")
# 查询邮件列表
response = self.gmail_service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
if 'messages' not in response:
logger.info("未找到匹配邮件")
return []
# 获取邮件详情
messages = []
for msg in response['messages']:
try:
message = self.gmail_service.users().messages().get(
userId='me', id=msg['id']
).execute()
email_data = self._extract_email_content(message)
if email_data:
messages.append(email_data)
except Exception as msg_error:
logger.error(f"处理邮件 {msg['id']} 失败: {str(msg_error)}")
logger.info(f"获取到 {len(messages)} 封邮件")
return messages
except Exception as e:
logger.error(f"获取最近邮件失败: {str(e)}")
logger.error(traceback.format_exc())
return []