daren_project/user_management/gmail_integration.py
2025-04-29 10:22:57 +08:00

3650 lines
169 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import uuid
import base64
import pickle
import logging
import traceback # 确保导入traceback模块
from pathlib import Path
import dateutil.parser as parser
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from django.utils import timezone
from django.conf import settings # 添加Django设置导入
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import warnings
import mimetypes
import requests
import django.db.utils
import time
from django.db.models import Q
import socket
# 忽略oauth2client相关的所有警告
warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth')
warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client')
warnings.filterwarnings('ignore', message='.*google-auth.*')
warnings.filterwarnings('ignore', message='.*oauth2client.*')
from apiclient import discovery
from httplib2 import Http
from oauth2client import file, client, tools
from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile
# Gmail API调用超时设置
GMAIL_REQUEST_TIMEOUT = getattr(settings, 'GMAIL_REQUEST_TIMEOUT', 30)
logger = logging.getLogger(__name__)
# Gmail服务单例管理器
class GmailServiceManager:
_instances = {} # 以用户ID和Gmail凭证ID为键存储Gmail服务实例
@classmethod
def get_instance(cls, user, gmail_credential_id=None):
"""
获取用户的Gmail服务实例如果不存在则创建
参数:
user: 用户对象
gmail_credential_id: 指定Gmail凭证ID如果为None则使用默认凭证
"""
user_id = str(user.id)
# 生成实例键组合用户ID和Gmail凭证ID
if gmail_credential_id:
instance_key = f"{user_id}:{gmail_credential_id}"
else:
instance_key = user_id
if instance_key not in cls._instances:
try:
# 从数据库获取认证信息
if gmail_credential_id:
# 获取指定ID的凭证
credential = GmailCredential.objects.filter(
id=gmail_credential_id,
user=user,
is_active=True
).first()
else:
# 获取默认凭证优先获取is_default=True的凭证
credential = GmailCredential.objects.filter(
user=user,
is_active=True,
is_default=True
).first()
# 如果没有找到默认凭证,则获取最近更新的一个凭证
if not credential:
credential = GmailCredential.objects.filter(
user=user,
is_active=True
).order_by('-updated_at').first()
if credential and credential.credentials:
# 反序列化凭证
creds = pickle.loads(credential.credentials)
# 检查凭证是否有效
if not creds.invalid:
# 初始化服务
gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
# 存储实例
cls._instances[instance_key] = {
'service': gmail_service,
'credentials': creds,
'timestamp': timezone.now(),
'user': user,
'gmail_credential': credential
}
logger.info(f"创建用户 {user.username} 的Gmail服务单例Gmail账号: {credential.gmail_email or '未知'},名称: {credential.name}")
return cls._instances[instance_key]
except Exception as e:
logger.error(f"创建Gmail服务单例失败: {e}")
else:
# 检查实例是否过期(超过30分钟)
instance = cls._instances[instance_key]
time_diff = timezone.now() - instance['timestamp']
if time_diff.total_seconds() > 1800: # 30分钟过期
del cls._instances[instance_key]
return cls.get_instance(user, gmail_credential_id) # 递归调用,重新创建
# 更新时间戳
cls._instances[instance_key]['timestamp'] = timezone.now()
credential = instance.get('gmail_credential')
gmail_email = credential.gmail_email if credential else '未知'
credential_name = credential.name if credential else '默认'
logger.info(f"复用用户 {user.username} 的Gmail服务单例Gmail账号: {gmail_email},名称: {credential_name}")
return cls._instances[instance_key]
return None
@classmethod
def get_credential_instance(cls, credential):
"""通过GmailCredential对象获取服务实例"""
if not credential or not credential.user:
return None
return cls.get_instance(credential.user, str(credential.id))
@classmethod
def update_instance(cls, user, credentials, service, gmail_credential=None):
"""更新用户的Gmail服务实例"""
user_id = str(user.id)
# 确定实例键
if gmail_credential:
instance_key = f"{user_id}:{gmail_credential.id}"
else:
instance_key = user_id
cls._instances[instance_key] = {
'service': service,
'credentials': credentials,
'timestamp': timezone.now(),
'user': user,
'gmail_credential': gmail_credential
}
@classmethod
def clear_instance(cls, user, gmail_credential_id=None):
"""清除用户的Gmail服务实例"""
user_id = str(user.id)
# 清除特定凭证的实例
if gmail_credential_id:
instance_key = f"{user_id}:{gmail_credential_id}"
if instance_key in cls._instances:
del cls._instances[instance_key]
else:
# 清除该用户的所有实例
keys_to_delete = []
for key in cls._instances.keys():
if key == user_id or key.startswith(f"{user_id}:"):
keys_to_delete.append(key)
for key in keys_to_delete:
del cls._instances[key]
@classmethod
def get_all_user_instances(cls, user):
"""获取用户的所有Gmail服务实例"""
user_id = str(user.id)
user_instances = {}
# 收集该用户所有的实例
for key, instance in cls._instances.items():
if key == user_id or key.startswith(f"{user_id}:"):
credential = instance.get('gmail_credential')
if credential:
user_instances[str(credential.id)] = instance
return user_instances
@classmethod
def clear_all_instances_by_email(cls, gmail_email):
"""
清除与特定Gmail邮箱关联的所有服务实例
参数:
gmail_email: Gmail邮箱地址
返回:
int: 清除的实例数量
"""
try:
# 导入GmailCredential模型
from .models import GmailCredential
# 查找与该邮箱关联的所有凭证
credentials = GmailCredential.objects.filter(
gmail_email=gmail_email,
is_active=True
).select_related('user')
# 记录清除的实例数量
cleared_count = 0
# 清除每个凭证关联的实例
for credential in credentials:
user = credential.user
# 清除特定用户和凭证的实例
if cls.clear_instance(user, str(credential.id)):
cleared_count += 1
# 同时清除用户的默认实例
user_id = str(user.id)
if user_id in cls._instances:
del cls._instances[user_id]
cleared_count += 1
return cleared_count
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"清除Gmail邮箱 {gmail_email} 的服务实例时出错: {str(e)}")
return 0
class GmailIntegration:
"""Gmail集成类"""
# Gmail API 权限范围
SCOPES = ['https://mail.google.com/']
def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890', gmail_credential_id=None):
self.user = user
self.user_email = user.email if user else None
self.email = email # 目标邮箱
self.client_secret = client_secret_json
self.credentials = None
self.gmail_service = None
self.use_proxy = use_proxy
self.proxy_url = proxy_url
self.gmail_credential_id = gmail_credential_id
self.gmail_credential = None
# 设置代理
if self.use_proxy:
logger.info(f"设置Gmail API代理: {proxy_url}")
os.environ['HTTP_PROXY'] = proxy_url
os.environ['HTTPS_PROXY'] = proxy_url
# 设置Token文件存储路径
if user:
# 使用用户ID创建唯一的token存储路径
token_file = f"gmail_token_{user.id}.json"
if gmail_credential_id:
token_file = f"gmail_token_{user.id}_{gmail_credential_id}.json"
self.token_storage_path = os.path.join("gmail_tokens", token_file)
# 确保目录存在
os.makedirs("gmail_tokens", exist_ok=True)
logger.info(f"设置Token存储路径: {self.token_storage_path}")
# 尝试从数据库加载凭证
try:
if gmail_credential_id:
# 加载指定ID的凭证
gmail_cred = GmailCredential.objects.filter(
id=gmail_credential_id,
user=user,
is_active=True
).first()
else:
# 加载默认凭证
gmail_cred = GmailCredential.objects.filter(
user=user,
is_active=True,
is_default=True
).first()
# 如果没有默认凭证,加载最新的一个
if not gmail_cred:
gmail_cred = GmailCredential.objects.filter(
user=user,
is_active=True
).order_by('-updated_at').first()
if gmail_cred and gmail_cred.credentials:
self.gmail_credential = gmail_cred
logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证 (ID: {gmail_cred.id}, Email: {gmail_cred.gmail_email or '未知'}, 名称: {gmail_cred.name})")
# 使用新方法加载凭证
creds = self._load_credentials_from_storage(gmail_cred.credentials)
if creds:
self.credentials = creds
# 初始化Gmail服务
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
logger.info("从数据库凭证初始化Gmail服务成功")
else:
logger.error("无法从数据库中加载有效凭证")
# 标记需要重新认证
if hasattr(gmail_cred, 'needs_reauth'):
gmail_cred.needs_reauth = True
gmail_cred.save()
logger.info(f"已将Gmail账号 {gmail_cred.gmail_email} 标记为需要重新认证")
except Exception as e:
logger.error(f"从数据库加载凭证失败: {str(e)}")
# 继续使用文件方式
else:
self.token_storage_path = "gmail_token.json"
logger.warning("未提供用户将使用默认Token存储路径")
# 存储对象
self.storage = file.Storage(self.token_storage_path)
def authenticate(self):
"""获取Gmail API凭证并认证"""
try:
# 优先尝试使用单例服务
if self.user:
instance = GmailServiceManager.get_instance(self.user, self.gmail_credential_id)
if instance:
self.gmail_service = instance['service']
self.credentials = instance['credentials']
self.gmail_credential = instance.get('gmail_credential')
credential_id = str(self.gmail_credential.id) if self.gmail_credential else "未知"
gmail_email = self.gmail_credential.gmail_email if self.gmail_credential else "未知"
credential_name = self.gmail_credential.name if self.gmail_credential else "默认"
logger.info(f"使用现有的Gmail服务单例 (ID: {credential_id}, Email: {gmail_email}, 名称: {credential_name})")
return True
# 以下是原有的认证逻辑...
# 写入client_secret.json
if self.client_secret:
client_secret_path = 'client_secret.json'
with open(client_secret_path, 'w') as f:
if isinstance(self.client_secret, str):
try:
# 确保是有效的JSON
import json # 在本地作用域引入
json_data = json.loads(self.client_secret)
# 强制设置redirect_uris为非浏览器模式避免localhost连接拒绝问题
for key in ['web', 'installed']:
if key in json_data and 'redirect_uris' in json_data[key]:
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
logger.info("已强制设置redirect_uri为非浏览器模式")
json.dump(json_data, f)
except json.JSONDecodeError as e:
logger.error(f"client_secret不是有效的JSON: {str(e)}")
return False
else:
# 如果是字典,也进行相同处理
import json # 在本地作用域引入
client_secret_dict = dict(self.client_secret)
for key in ['web', 'installed']:
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
logger.info("已强制设置redirect_uri为非浏览器模式")
json.dump(client_secret_dict, f)
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
# 使用与quickstart.py相同的流程
logger.info(f"创建或读取token存储: {self.token_storage_path}")
# 确保token目录存在
token_dir = os.path.dirname(self.token_storage_path)
if token_dir and not os.path.exists(token_dir):
logger.info(f"创建token目录: {token_dir}")
os.makedirs(token_dir)
store = file.Storage(self.token_storage_path)
creds = store.get()
if not creds or creds.invalid:
logger.info("没有有效的凭证,需要重新授权")
if not self.client_secret:
logger.error("没有提供client_secret_json且找不到有效凭证")
return False
# 强制使用非浏览器认证模式避免localhost连接问题
logger.info("使用非浏览器认证模式")
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES)
flow.redirect_uri = redirect_uri
# 获取授权URL并抛出异常
auth_url = flow.step1_get_authorize_url()
logger.info(f"获取授权URL: {auth_url[:50]}...")
raise Exception(f"Please visit this URL to authorize: {auth_url}")
# 如果有有效凭证,初始化服务
self.credentials = creds
logger.info("使用现有凭证初始化Gmail服务")
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
logger.info("Gmail服务初始化成功")
# 获取Gmail账号信息
gmail_email = None
try:
# 调用Gmail API获取用户资料
profile = self.gmail_service.users().getProfile(userId='me').execute()
gmail_email = profile.get('emailAddress')
logger.info(f"获取到Gmail账号: {gmail_email}")
except Exception as profile_error:
logger.error(f"获取Gmail账号失败: {str(profile_error)}")
# 即使获取Gmail账号失败也要尝试获取消息以提取邮箱
if not gmail_email:
try:
# 尝试获取一条消息来提取邮箱
messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute()
if 'messages' in messages and len(messages['messages']) > 0:
msg_id = messages['messages'][0]['id']
msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute()
# 从消息中查找与当前用户匹配的邮箱
if 'payload' in msg and 'headers' in msg['payload']:
for header in msg['payload']['headers']:
if header['name'] in ['From', 'To', 'Cc', 'Bcc']:
if '<' in header['value'] and '>' in header['value']:
email = header['value'].split('<')[1].split('>')[0]
# 如果找到一个邮箱就使用它
if not gmail_email:
gmail_email = email
logger.info(f"从消息中提取到Gmail账号: {gmail_email}")
except Exception as msg_error:
logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}")
# 保存凭证到数据库
if self.user:
from django.utils import timezone
logger.info("保存凭证到数据库")
# 将凭证对象序列化 - 改为JSON序列化
try:
# 首先尝试使用JSON序列化
credentials_data = creds.to_json()
except Exception as json_error:
logger.error(f"JSON序列化凭证失败: {str(json_error)}")
# 备选使用pickle序列化
credentials_data = pickle.dumps(creds)
logger.info("使用pickle序列化凭证")
# 如果提供了具体的gmail_credential_id更新对应的记录
if self.gmail_credential_id:
# 更新指定ID的凭证
try:
gmail_credential = GmailCredential.objects.get(
id=self.gmail_credential_id,
user=self.user
)
# 更新凭证信息
gmail_credential.credentials = credentials_data
gmail_credential.gmail_email = gmail_email
gmail_credential.updated_at = timezone.now()
gmail_credential.is_active = True
gmail_credential.save()
self.gmail_credential = gmail_credential
logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证Gmail账号: {gmail_email}")
except GmailCredential.DoesNotExist:
logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证将创建新凭证")
# 如果指定的凭证不存在,创建新凭证
self.gmail_credential_id = None
# 如果没有具体的gmail_credential_id或者指定的不存在创建或更新默认凭证
if not self.gmail_credential_id:
# 检查是否已存在相同gmail_email的凭证
existing_credential = None
if gmail_email:
existing_credential = GmailCredential.objects.filter(
user=self.user,
gmail_email=gmail_email,
is_active=True
).first()
if existing_credential:
# 更新现有凭证
existing_credential.credentials = credentials_data
existing_credential.token_path = self.token_storage_path
existing_credential.updated_at = timezone.now()
existing_credential.save()
self.gmail_credential = existing_credential
logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证")
else:
# 获取Gmail账户数量
gmail_count = GmailCredential.objects.filter(user=self.user).count()
# 创建新凭证
name = f"Gmail账号 {gmail_count + 1}"
if gmail_email:
name = gmail_email
else:
# 确保gmail_email有默认值避免null错误
gmail_email = "未知邮箱"
# 将凭证转换为JSON字符串
if isinstance(credentials_data, dict):
# 确保json模块在本地作用域可访问
import json
credentials_data = json.dumps(credentials_data)
gmail_credential = GmailCredential.objects.create(
user=self.user,
credentials=credentials_data,
token_path=self.token_storage_path,
gmail_email=gmail_email,
name=name,
is_default=(gmail_count == 0), # 第一个账号设为默认
updated_at=timezone.now(),
is_active=True
)
self.gmail_credential = gmail_credential
logger.info(f"已创建新的Gmail凭证Gmail账号: {gmail_email}, 名称: {name}")
# 认证成功后更新单例
if self.user and self.gmail_service and self.credentials:
GmailServiceManager.update_instance(
self.user,
self.credentials,
self.gmail_service,
self.gmail_credential
)
return True
except Exception as e:
# 保留授权URL异常视图层会处理
if "Please visit this URL" in str(e):
raise e
logger.error(f"Gmail认证失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
finally:
# 清理client_secret.json文件
if self.client_secret and os.path.exists('client_secret.json'):
logger.info("删除临时client_secret文件")
os.unlink('client_secret.json')
def create_talent_knowledge_base(self, talent_email):
"""
创建或获取与talent_email关联的知识库
Args:
talent_email (str): 达人Gmail邮箱地址
Returns:
tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志
"""
try:
# 优先查找现有的关联关系
mapping = GmailTalentMapping.objects.filter(
user=self.user,
talent_email=talent_email,
is_active=True
).first()
if mapping and mapping.knowledge_base:
logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}")
return mapping.knowledge_base, False
# 查找与该达人邮箱关联的知识库
# 根据达人邮箱生成一个唯一的标识名称
kb_name = f"Gmail-{talent_email.split('@')[0]}"
# 检查该名称的知识库是否已存在
existing_kb = KnowledgeBase.objects.filter(
name=kb_name,
user_id=self.user.id
).first()
if existing_kb:
logger.info(f"找到现有知识库: {kb_name}")
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
# 没有找到现有知识库,直接使用基本方法创建
logger.info(f"使用基本方法创建知识库: {kb_name}")
return self._create_knowledge_base_basic(talent_email)
except Exception as e:
logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 创建失败时,尝试使用基本方法创建
logger.info("尝试使用基本方法创建知识库")
return self._create_knowledge_base_basic(talent_email)
def _create_knowledge_base_basic(self, talent_email):
"""创建基础知识库,不处理映射关系"""
try:
# 根据达人邮箱生成一个唯一的标识名称
kb_name = f"Gmail-{talent_email.split('@')[0]}"
# 检查该名称的知识库是否已存在
existing_kb = KnowledgeBase.objects.filter(
name=kb_name,
user_id=self.user.id
).first()
if existing_kb:
logger.info(f"找到现有知识库: {kb_name}")
# 检查external_id是否存在如果不存在则创建
if not existing_kb.external_id:
logger.info(f"知识库 {kb_name} 缺少external_id尝试创建外部知识库")
try:
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
external_id = kb_viewset._create_external_dataset(existing_kb)
if external_id:
existing_kb.external_id = external_id
existing_kb.save()
logger.info(f"成功为现有知识库创建外部知识库: {external_id}")
else:
logger.error("创建外部知识库失败未返回external_id")
return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"}
except Exception as e:
logger.error(f"为现有知识库创建外部知识库失败: {str(e)}")
return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"}
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
# 创建新知识库
try:
knowledge_base = KnowledgeBase.objects.create(
name=kb_name,
desc=f"{talent_email}的Gmail邮件交流记录",
type="private",
user_id=self.user.id,
documents=[]
)
except django.db.utils.IntegrityError as e:
# 处理名称重复的情况
logger.warning(f"知识库名称'{kb_name}'已存在,尝试获取或创建带随机后缀的名称")
# 先尝试查找已存在的知识库(不限制用户)
existing_kb = KnowledgeBase.objects.filter(name=kb_name).first()
if existing_kb and str(existing_kb.user_id) == str(self.user.id):
# 如果存在且属于当前用户,直接使用
logger.info(f"找到属于当前用户的知识库: {kb_name}")
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
else:
# 如果不存在或不属于当前用户,创建带随机后缀的新知识库
import random
import string
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5))
new_kb_name = f"{kb_name}-{suffix}"
logger.info(f"创建带随机后缀的知识库: {new_kb_name}")
knowledge_base = KnowledgeBase.objects.create(
name=new_kb_name,
desc=f"{talent_email}的Gmail邮件交流记录",
type="private",
user_id=self.user.id,
documents=[]
)
# 创建外部知识库
try:
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
external_id = kb_viewset._create_external_dataset(knowledge_base)
if external_id:
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"成功创建外部知识库: {external_id}")
except Exception as e:
logger.error(f"创建外部知识库失败: {str(e)}")
# 继续执行,不影响基本功能
# 创建映射关系
GmailTalentMapping.objects.create(
user=self.user,
talent_email=talent_email,
knowledge_base=knowledge_base,
is_active=True
)
logger.info(f"成功创建新知识库: {knowledge_base.name}, ID: {knowledge_base.id}")
return knowledge_base, True
except Exception as e:
logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试直接获取已存在的知识库作为最后手段
try:
kb_name = f"Gmail-{talent_email.split('@')[0]}"
existing_kb = KnowledgeBase.objects.filter(name__startswith=kb_name).first()
if existing_kb:
logger.info(f"在错误处理中找到可用的知识库: {existing_kb.name}")
# 创建映射关系
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': existing_kb,
'is_active': True
}
)
return existing_kb, False
except Exception as inner_e:
logger.error(f"错误处理中尝试获取知识库失败: {str(inner_e)}")
# 如果所有尝试都失败,抛出异常
raise
def get_conversations(self, talent_gmail):
"""获取与特定用户的所有邮件对话"""
try:
if not self.gmail_service:
logger.error("Gmail服务未初始化")
return []
# 使用crushwds@gmail.com作为固定邮箱
email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱
email2 = talent_gmail # 使用参数传入的目标邮箱
logger.info(f"执行Gmail查询: {email1}{email2}")
# 构建搜索查询
query = f"from:({email1} OR {email2}) to:({email1} OR {email2})"
# 获取所有匹配的邮件
response = self.gmail_service.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
message_count = len(messages)
logger.info(f"找到 {message_count} 封邮件")
# 分页获取所有邮件
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.gmail_service.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
if 'messages' in response:
new_messages = response['messages']
messages.extend(new_messages)
new_count = len(new_messages)
message_count += new_count
logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}")
else:
logger.warning(f"未找到匹配邮件")
# 处理每封邮件
conversations = []
if messages:
logger.info(f"开始获取 {len(messages)} 封邮件详情...")
for i, msg in enumerate(messages):
try:
message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute()
email_data = self._extract_email_content(message)
if email_data:
conversations.append(email_data)
if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度
logger.info(f"已处理 {i+1}/{len(messages)} 封邮件")
else:
logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败")
except Exception as e:
logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}")
# 按时间排序
conversations.sort(key=lambda x: x['date'])
logger.info(f"总共找到并解析了 {len(conversations)} 封邮件")
return conversations
except Exception as e:
logger.error(f"获取Gmail对话失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
def _extract_email_content(self, message):
"""提取邮件内容完全按照quickstart.py的get_email_content函数实现"""
try:
message_id = message['id'] # 获取邮件ID
payload = message['payload']
headers = payload['headers']
# 获取邮件基本信息
email_data = {
'id': message_id, # 保存邮件ID
'subject': '',
'from': '',
'date': '',
'body': '',
'attachments': [] # 新增附件列表
}
# 提取头部信息
for header in headers:
if header['name'] == 'Subject':
email_data['subject'] = header['value']
elif header['name'] == 'From':
email_data['from'] = header['value']
elif header['name'] == 'Date':
date_value = header['value']
logger.info(f"原始邮件日期字符串: '{date_value}'")
try:
# 打印原始日期字符串信息
import dateutil.parser as date_parser
import pytz
from django.utils import timezone
# 先尝试解析日期
date = date_parser.parse(date_value)
logger.info(f"解析后的日期对象: {date}, 是否有时区: {date.tzinfo is not None}")
# 处理时区问题
if date.tzinfo is not None:
# 已有时区的日期,转换为系统时区
if hasattr(settings, 'TIME_ZONE'):
system_tz = pytz.timezone(settings.TIME_ZONE)
date = date.astimezone(system_tz)
logger.info(f"转换到系统时区后: {date}")
# 如果需要naive datetime删除时区信息
if hasattr(settings, 'USE_TZ') and not settings.USE_TZ:
date = date.replace(tzinfo=None)
logger.info(f"移除时区信息后: {date}")
else:
# 无时区的日期,如果系统使用时区,添加时区信息
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
try:
date = timezone.make_aware(date)
logger.info(f"添加时区信息后: {date}")
except Exception as tz_error:
logger.warning(f"添加时区信息失败: {str(tz_error)}")
# 格式化为字符串
email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"最终日期字符串: {email_data['date']}")
except Exception as e:
logger.error(f"解析日期失败: {str(e)}, 原始值: '{date_value}'")
# 保留原始值
email_data['date'] = date_value
# 定义一个递归函数来处理所有部分和附件
def process_parts(parts):
for part in parts:
# 检查是否是附件
if 'filename' in part and part['filename']:
attachment = {
'filename': part['filename'],
'mimeType': part['mimeType'],
'size': part['body'].get('size', 0)
}
# 如果有附件内容数据可以获取附件ID
if 'attachmentId' in part['body']:
attachment['attachmentId'] = part['body']['attachmentId']
email_data['attachments'].append(attachment)
# 处理文本内容
if part['mimeType'] == 'text/plain' and not email_data['body']:
data = part['body'].get('data', '')
if data:
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
except Exception as e:
logger.error(f"解码邮件内容失败: {str(e)}")
# 递归处理多部分内容
if 'parts' in part:
process_parts(part['parts'])
# 处理邮件正文和附件
if 'parts' in payload:
process_parts(payload['parts'])
elif 'body' in payload and 'data' in payload['body']:
# 没有parts直接处理body
data = payload['body'].get('data', '')
if data:
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
except Exception as e:
logger.error(f"解码邮件内容失败: {str(e)}")
return email_data
except Exception as e:
logger.error(f"处理邮件内容时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def download_attachment(self, message_id, attachment_id, filename):
"""下载邮件附件"""
try:
attachment = self.gmail_service.users().messages().attachments().get(
userId='me', messageId=message_id, id=attachment_id
).execute()
data = attachment['data']
file_data = base64.urlsafe_b64decode(data)
# 创建附件目录
attachments_dir = 'gmail_attachments'
if not os.path.exists(attachments_dir):
os.makedirs(attachments_dir)
# 保存附件
filepath = os.path.join(attachments_dir, f"{message_id}_{filename}")
with open(filepath, 'wb') as f:
f.write(file_data)
return filepath
except Exception as e:
logger.error(f"下载附件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def save_conversations_to_knowledge_base(self, conversations, knowledge_base):
"""
将Gmail对话保存到知识库
Args:
conversations: Gmail邮件列表
knowledge_base: 保存到的知识库对象
"""
try:
# 导入所需模型
from .models import GmailAttachment, ChatHistory, KnowledgeBaseDocument, GmailTalentMapping
# 检查入参
if not conversations or not knowledge_base:
logger.error("参数不完整: conversations或knowledge_base为空")
return {"conversation_id": None, "success": False, "error": "参数不完整"}
if not conversations:
logger.warning("没有邮件对话可保存")
return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"}
# 确保knowledge_base有external_id
if not knowledge_base.external_id:
logger.warning(f"知识库 {knowledge_base.name} 缺少external_id尝试创建外部知识库")
try:
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
external_id = kb_viewset._create_external_dataset(knowledge_base)
if external_id:
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"成功为知识库创建外部知识库: {external_id}")
else:
logger.error("创建外部知识库失败未返回external_id")
return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"}
except Exception as e:
logger.error(f"为知识库创建外部知识库失败: {str(e)}")
return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"}
# 查找现有的对话ID - 优先使用talent_email查找
conversation_id = None
# 1. 首先尝试通过talent_email查找现有映射
if self.email:
existing_mapping = GmailTalentMapping.objects.filter(
user=self.user,
talent_email=self.email,
knowledge_base=knowledge_base,
is_active=True
).first()
if existing_mapping and existing_mapping.conversation_id:
conversation_id = existing_mapping.conversation_id
logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}")
# 2. 如果上面没找到,尝试通过知识库查找任何相关的对话
if not conversation_id:
existing_conversation = ChatHistory.objects.filter(
knowledge_base=knowledge_base,
user=self.user,
is_deleted=False
).values('conversation_id').distinct().first()
if existing_conversation:
conversation_id = existing_conversation['conversation_id']
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
# 如果有talent_email更新或创建映射关系
if self.email:
GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=self.email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': conversation_id,
'is_active': True
}
)
logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}")
# 3. 如果仍然没找到则创建新的对话ID
if not conversation_id:
# 生成唯一的对话ID
conversation_id = str(uuid.uuid4())
logger.info(f"创建新的对话ID: {conversation_id}")
# 创建或更新Gmail和达人的映射关系
if self.email:
talent_mapping, created = GmailTalentMapping.objects.update_or_create(
user=self.user,
talent_email=self.email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': conversation_id,
'is_active': True
}
)
action = "创建" if created else "更新"
logger.info(f"{action}Gmail达人映射: {self.email} -> {knowledge_base.name}")
# 构建知识库文档
logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话")
document = {
"title": f"{self.email or '联系人'}的Gmail邮件对话",
"url": "",
"source_type": "gmail",
"paragraphs": []
}
# 对话按时间顺序排序
conversations.sort(key=lambda x: x.get('date', ''))
# 将对话添加到文档
previous_message_dict = {} # 用于存储邮件时间与消息ID的映射以便找到正确的parent_id
# 首先查找现有的聊天记录
existing_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
# 如果已有消息,跳过重复导入
existing_gmail_ids = set()
if existing_messages.exists():
for msg in existing_messages:
if msg.metadata and 'gmail_message_id' in msg.metadata:
existing_gmail_ids.add(msg.metadata['gmail_message_id'])
logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入")
for message in conversations:
try:
# 跳过已导入的消息
if message.get('id') in existing_gmail_ids:
logger.info(f"跳过已导入的消息: {message.get('id')}")
continue
# 提取邮件内容
sender = message.get('from', '未知发件人')
date_str = message.get('date', '未知时间')
subject = message.get('subject', '无主题')
body = message.get('body', '').strip()
logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...")
# 判断角色
# 从发件人地址中提取邮箱部分
sender_email = ''
if '<' in sender and '>' in sender:
# 格式如 "姓名 <email@example.com>"
sender_email = sender.split('<')[1].split('>')[0]
else:
# 格式可能直接是邮箱
sender_email = sender
# 修改角色判断逻辑根据Gmail凭证邮箱和达人邮箱判断
# 获取当前Gmail凭证邮箱
gmail_credential_email = None
if self.gmail_credential and self.gmail_credential.gmail_email:
gmail_credential_email = self.gmail_credential.gmail_email.lower()
# 判断是否是凭证邮箱发出的邮件
is_credential_email = gmail_credential_email and gmail_credential_email == sender_email.lower()
# 判断是否是系统用户邮箱
is_user_email = self.user.email.lower() == sender_email.lower()
# 判断是否是目标达人邮箱
is_talent_email = False
if self.email:
is_talent_email = self.email.lower() == sender_email.lower()
# 如果是Gmail凭证邮箱或用户邮箱设为user角色如果是达人邮箱设为assistant角色
if is_credential_email or is_user_email:
role = 'user'
elif is_talent_email:
role = 'assistant'
else:
# 尝试检查是否有与发件人邮箱相匹配的人才映射
from .models import GmailTalentMapping
talent_mapping = GmailTalentMapping.objects.filter(
user=self.user,
talent_email=sender_email,
is_active=True
).first()
# 如果有映射关系则视为达人邮箱设为assistant角色
role = 'assistant' if talent_mapping else 'user'
logger.info(f"设置消息角色: {role},发件人: {sender_email}用户Gmail: {gmail_credential_email},用户邮箱: {self.user.email},目标达人: {self.email},是否有达人映射: {bool(talent_mapping) if 'talent_mapping' in locals() else False}")
# 将邮件添加到文档
paragraph = {
"id": f"msg_{len(document['paragraphs']) + 1}",
"title": f"{date_str} - {sender} - {subject}",
"content": body,
"meta": {
"sender": sender,
"date": date_str,
"subject": subject,
"has_attachments": len(message.get('attachments', [])) > 0,
"message_id": message.get('id', '')
}
}
document['paragraphs'].append(paragraph)
# 解析邮件日期为datetime对象
try:
# 先检查是否是时间戳格式
if isinstance(date_str, str) and date_str.isdigit():
# 如果是时间戳,直接转换
date_obj = datetime.fromtimestamp(int(date_str))
else:
# 尝试标准格式解析
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果标准格式解析失败使用dateutil更灵活的解析
import dateutil.parser as date_parser
date_obj = date_parser.parse(date_str)
# 如果解析出的日期有时区信息,转换为不带时区的日期
if date_obj.tzinfo is not None:
date_obj = date_obj.replace(tzinfo=None)
except (ValueError, TypeError) as e:
# 如果解析失败,使用当前时间
logger.warning(f"无法解析邮件日期: {date_str},错误: {str(e)},使用当前时间")
date_obj = datetime.now()
# 查找parent_id查找日期早于当前邮件且最接近的消息
parent_id = None
closest_date = None
for d, mid in previous_message_dict.items():
if d < date_obj and (closest_date is None or d > closest_date):
closest_date = d
parent_id = mid
# 保存消息到聊天历史,使用邮件实际日期
from django.utils import timezone
# 确保时区处理正确
try:
# 检查date_obj是否已经是aware
if timezone.is_aware(date_obj):
aware_date = date_obj
logger.info(f"日期已经包含时区信息: {aware_date}")
else:
# 将naive转换为aware
aware_date = timezone.make_aware(date_obj)
logger.info(f"日期添加时区信息后: {aware_date}")
except Exception as tz_error:
logger.warning(f"时区转换失败: {str(tz_error)},使用当前时间")
aware_date = timezone.now()
# 创建消息记录
chat_message = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=parent_id,
role=role,
content=f"{subject}\n\n{body}",
metadata={
'gmail_message_id': message.get('id', ''),
'from': sender,
'date': date_str,
'subject': subject,
'dataset_id_list': [str(knowledge_base.id)],
'dataset_names': [knowledge_base.name]
},
created_at=aware_date # 设置正确的创建时间
)
# 更新previous_message_dict
previous_message_dict[date_obj] = str(chat_message.id)
# 处理附件
attachments = message.get('attachments', [])
if attachments:
for attachment in attachments:
if 'attachmentId' in attachment and 'filename' in attachment:
try:
# 下载附件
filepath = self.download_attachment(
message_id=message['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename']
)
if filepath:
# 记录附件信息
GmailAttachment.objects.create(
chat_message=chat_message,
gmail_message_id=message['id'],
filename=attachment['filename'],
filepath=filepath,
mimetype=attachment.get('mimeType', ''),
filesize=attachment.get('size', 0)
)
logger.info(f"已保存附件: {attachment['filename']}")
except Exception as e:
logger.error(f"保存附件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"处理邮件时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 将文档添加到知识库
logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}")
# 准备文档数据结构
doc_data = {
"name": f"Gmail对话与{self.email or '联系人'}.txt",
"paragraphs": []
}
# 将所有对话转换为段落
for paragraph in document['paragraphs']:
doc_data["paragraphs"].append({
"title": paragraph['title'],
"content": paragraph['content'],
"is_active": True,
"problem_list": []
})
# 检查paragraphs是否为空
if not doc_data["paragraphs"]:
logger.warning("没有段落内容可上传,添加默认段落")
doc_data["paragraphs"].append({
"title": "初始化邮件对话",
"content": f"{self.email or '联系人'}的邮件对话准备就绪,等待新的邮件内容。",
"is_active": True,
"problem_list": []
})
# 调用知识库的文档上传API
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
# 添加随机文档ID
doc_data["id"] = str(uuid.uuid4())
logger.info(f"生成随机文档ID: {doc_data['id']}")
# 上传文档
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_data["name"],
external_id=document_id
)
logger.info(f"Gmail对话文档上传成功ID: {document_id}")
# 上传所有附件
self._upload_attachments_to_knowledge_base(knowledge_base, conversations)
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"Gmail对话文档上传失败: {error_msg}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}")
return {
"conversation_id": conversation_id,
"success": True,
"message_count": len(conversations),
"knowledge_base_id": str(knowledge_base.id)
}
except Exception as e:
logger.error(f"保存Gmail对话到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return {"conversation_id": None, "success": False, "error": str(e)}
def send_email(self, to_email, subject, body, conversation_id=None, attachments=None):
"""发送邮件并保存到聊天记录"""
try:
# 构建邮件内容
if attachments and len(attachments) > 0:
message = self._create_message_with_attachment(to_email, subject, body, attachments)
else:
message = self._create_message(to_email, subject, body)
# 发送邮件
sent_message = self.gmail_service.users().messages().send(
userId='me', body=message
).execute()
message_id = sent_message['id']
logger.info(f"邮件发送成功, ID: {message_id}")
# 获取邮件详情,包括服务器时间戳
try:
message_detail = self.gmail_service.users().messages().get(
userId='me', id=message_id
).execute()
# 从消息详情中提取时间戳
internal_date = message_detail.get('internalDate')
if internal_date:
try:
# 转换毫秒时间戳为datetime
email_date = datetime.fromtimestamp(int(internal_date)/1000)
logger.info(f"从时间戳解析的日期: {email_date}")
# 处理时区
from django.utils import timezone
if hasattr(settings, 'USE_TZ') and settings.USE_TZ and not timezone.is_aware(email_date):
email_date = timezone.make_aware(email_date)
logger.info(f"添加时区信息后: {email_date}")
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"最终格式化日期: {date_str}")
except Exception as tz_error:
logger.warning(f"处理邮件时间戳失败: {str(tz_error)},使用当前时间")
from django.utils import timezone
email_date = timezone.now()
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
else:
logger.warning("邮件没有时间戳,使用当前时间")
from django.utils import timezone
email_date = timezone.now()
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间")
email_date = datetime.now()
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
# 如果有conversation_id保存到聊天记录
if conversation_id:
# 查找现有的聊天记录
from django.db.models import Q
existing_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
if not existing_messages.exists():
logger.warning(f"找不到对话ID: {conversation_id},无法保存消息")
return message_id
# 查找关联的知识库
first_message = existing_messages.first()
if first_message.knowledge_base:
knowledge_base = first_message.knowledge_base
else:
# 如果第一条消息没有知识库尝试从metadata获取
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
kb_id = first_message.metadata.get('dataset_id_list', [])[0]
knowledge_base = KnowledgeBase.objects.get(id=kb_id)
else:
logger.error(f"找不到关联的知识库,无法保存消息")
return message_id
# 查找parent_id时间早于当前邮件且最接近的消息避免时区问题
parent_id = None
try:
# 使用简单查询,不依赖时区
previous_messages = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).order_by('-id')[:1] # 使用ID降序排序而非时间
if previous_messages:
parent_id = str(previous_messages[0].id)
except Exception as e:
logger.error(f"查找父消息失败: {str(e)}")
logger.error(traceback.format_exc())
# 构建metadata
metadata = {
'gmail_message_id': message_id,
'from': self.user.email,
'to': to_email,
'date': date_str,
'subject': subject,
'dataset_id_list': [str(knowledge_base.id)],
'dataset_names': [knowledge_base.name]
}
# 如果现有消息有dataset_id_list保留它
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
metadata['dataset_id_list'] = first_message.metadata['dataset_id_list']
# 尝试获取对应的dataset_names
if 'dataset_names' in first_message.metadata:
metadata['dataset_names'] = first_message.metadata['dataset_names']
# 创建聊天记录,使用当前时间而不是邮件的实际时间
chat_message = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=parent_id,
role='user', # 用户发送的消息,角色固定为'user'
content=f"[{subject}] {body}",
metadata=metadata
# 不设置created_at使用数据库默认时间
)
# 如果有附件,保存附件信息
if attachments and len(attachments) > 0:
for att_path in attachments:
file_name = os.path.basename(att_path)
file_size = os.path.getsize(att_path)
# 获取MIME类型
mime_type, _ = mimetypes.guess_type(att_path)
if not mime_type:
mime_type = 'application/octet-stream'
# 保存附件信息到数据库
gmail_attachment = GmailAttachment.objects.create(
chat_message=chat_message,
gmail_message_id=message_id,
filename=file_name,
filepath=att_path,
mimetype=mime_type,
filesize=file_size
)
# 更新知识库文档
self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email)
return message_id
except Exception as e:
logger.error(f"发送邮件失败: {str(e)}")
logger.error(traceback.format_exc())
raise
def _create_message(self, to, subject, body):
"""创建邮件消息"""
message = MIMEText(body)
message['to'] = to
message['subject'] = subject
# 编码为JSON安全的字符串
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw}
def _create_message_with_attachment(self, to, subject, body, attachment_files):
"""创建带附件的邮件消息"""
message = MIMEMultipart()
message['to'] = to
message['subject'] = subject
# 添加邮件正文
msg = MIMEText(body)
message.attach(msg)
# 添加附件
for file in attachment_files:
try:
with open(file, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
# 编码附件内容
encoders.encode_base64(part)
# 添加头部信息
filename = os.path.basename(file)
part.add_header(
'Content-Disposition',
f'attachment; filename="{filename}"'
)
message.attach(part)
except Exception as e:
logger.error(f"添加附件 {file} 失败: {str(e)}")
# 编码为JSON安全的字符串
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw}
def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient):
"""将新发送的邮件内容追加到知识库文档中"""
try:
# 准备文档数据结构
doc_data = {
"name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt",
"paragraphs": [
{
"title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}",
"content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}",
"is_active": True,
"problem_list": []
}
]
}
# 调用知识库的文档上传API
from .views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
# 验证知识库是否有external_id
if not knowledge_base.external_id:
logger.error(f"知识库没有external_id无法上传文档: {knowledge_base.name}")
return None
# 上传文档
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_data["name"],
external_id=document_id
)
logger.info(f"Gmail回复文档上传成功ID: {document_id}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
return upload_response
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"Gmail回复文档上传失败: {error_msg}")
return None
except Exception as e:
logger.error(f"追加邮件内容到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None
def setup_watch(self, topic_name=None):
"""设置Gmail监听"""
try:
# 如果没有指定topic使用配置的主题名称
if not topic_name:
# 从settings获取项目配置的主题名称
topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic')
# 直接使用settings中配置的项目ID不再从client_secret.json获取
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905')
logger.info(f"使用settings中配置的项目ID: {project_id}")
# 确保Gmail服务已经初始化
if not hasattr(self, 'gmail_service') or not self.gmail_service:
logger.warning("Gmail服务未初始化尝试重新认证")
if not self.authenticate():
logger.error("Gmail服务初始化失败")
return None
# 注意不再使用用户邮箱作为判断依据Gmail API总是使用'me'作为userId
# Gmail认证是通过OAuth流程与系统用户邮箱无关
logger.info(f"系统用户: {self.user.email}Gmail API使用OAuth认证的邮箱 (userId='me')")
# 构建完整的webhook URL
webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None)
if not webhook_url:
# 使用默认值确保这是一个完全限定的URL
domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0]
protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http'
if domain == 'localhost' or domain.startswith('127.0.0.1'):
# 本地开发环境需要公网可访问的URL可以用ngrok等工具暴露本地服务
webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/"
logger.warning("使用本地开发环境URL作为webhook回调Google可能无法访问。建议使用ngrok等工具创建公网地址。")
else:
webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/"
logger.info(f"使用Gmail webhook URL: {webhook_url}")
# 如果想在Google Cloud控制台中手动配置webhook打印明确的说明
logger.info("如需手动配置Gmail推送通知请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:")
logger.info(f"推送端点: {webhook_url}")
logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限")
# 请求监听
request = {
'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签
'topicName': f"projects/{project_id}/topics/{topic_name}",
'labelFilterAction': 'include'
}
logger.info(f"设置Gmail监听: {request}")
# 设置超时,避免长时间阻塞
original_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
try:
# 执行watch请求
response = self.gmail_service.users().watch(userId='me', body=request).execute()
except Exception as e:
logger.error(f"执行watch请求失败: {str(e)}")
return None
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
# 获取historyId用于后续同步
history_id = response.get('historyId')
expiration = response.get('expiration')
logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}")
# 保存监听信息到数据库
if self.user:
from .models import GmailCredential
credential = GmailCredential.objects.filter(user=self.user, is_active=True).first()
if credential:
# 转换时间戳为datetime
expiration_time = None
if expiration:
try:
# 将毫秒时间戳转换为datetime
naive_time = datetime.fromtimestamp(int(expiration)/1000)
logger.info(f"从时间戳解析的日期: {naive_time}")
# 如果系统使用时区,添加时区信息
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
try:
expiration_time = timezone.make_aware(naive_time)
logger.info(f"添加时区信息后: {expiration_time}")
except Exception as tz_error:
logger.warning(f"添加时区信息失败: {str(tz_error)}")
expiration_time = naive_time
else:
expiration_time = naive_time
except Exception as conv_error:
logger.error(f"转换时间戳失败: {str(conv_error)}")
expiration_time = None
credential.last_history_id = history_id
credential.watch_expiration = expiration_time
credential.save()
logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}")
return {
'historyId': history_id,
'expiration': expiration
}
except Exception as e:
logger.error(f"设置Gmail监听失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def get_history(self, start_history_id):
"""获取历史变更"""
try:
logger.info(f"获取历史记录起始ID: {start_history_id}")
# 确保Gmail服务已经初始化
if not hasattr(self, 'gmail_service') or not self.gmail_service:
logger.warning("Gmail服务未初始化尝试重新认证")
if not self.authenticate():
logger.error("Gmail服务初始化失败")
return []
# 设置超时,避免长时间阻塞
original_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
try:
# 执行history请求
response = self.gmail_service.users().history().list(
userId='me', startHistoryId=start_history_id
).execute()
except Exception as e:
logger.error(f"执行history请求失败: {str(e)}")
return []
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
logger.info(f"历史记录响应: {response}")
history_list = []
if 'history' in response:
history_list.extend(response['history'])
logger.info(f"找到 {len(response['history'])} 个历史记录")
# 获取所有页,但每次请求都设置超时
page_count = 0
max_pages = 5 # 限制最大页数,避免无限循环
while 'nextPageToken' in response and page_count < max_pages:
page_token = response['nextPageToken']
page_count += 1
# 设置超时
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
try:
response = self.gmail_service.users().history().list(
userId='me', startHistoryId=start_history_id, pageToken=page_token
).execute()
except Exception as e:
logger.error(f"获取历史记录分页失败: {str(e)}")
break
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
if 'history' in response:
history_list.extend(response['history'])
logger.info(f"加载额外 {len(response['history'])} 个历史记录 (页 {page_count})")
if page_count >= max_pages and 'nextPageToken' in response:
logger.warning(f"达到最大页数限制 ({max_pages}),可能有更多历史记录未获取")
else:
logger.info(f"没有新的历史记录最新historyId: {response.get('historyId', 'N/A')}")
# 提取新消息ID
new_message_ids = set()
for history in history_list:
logger.info(f"处理历史记录: {history}")
if 'messagesAdded' in history:
for message in history['messagesAdded']:
message_id = message['message']['id']
new_message_ids.add(message_id)
logger.info(f"新增消息ID: {message_id}")
if 'labelsAdded' in history:
for label in history['labelsAdded']:
message_id = label['message']['id']
if 'INBOX' in label.get('labelIds', []):
new_message_ids.add(message_id)
logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签")
if new_message_ids:
logger.info(f"总共找到 {len(new_message_ids)} 个新消息")
else:
logger.info("没有新增消息")
return list(new_message_ids)
except Exception as e:
logger.error(f"获取Gmail历史记录失败: {str(e)}")
logger.error(traceback.format_exc())
return []
def process_notification(self, notification_data):
"""处理Gmail推送通知"""
try:
# 提取通知数据
logger.info(f"处理Gmail通知: {notification_data}")
# 处理Google Pub/Sub消息格式
if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']:
try:
import base64
import json
logger.info("检测到Google Pub/Sub消息格式")
# Base64解码data字段
encoded_data = notification_data['message']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
logger.info(f"解码后的数据: {decoded_data}")
# 解析JSON获取email和historyId
json_data = json.loads(decoded_data)
email = json_data.get('emailAddress')
history_id = json_data.get('historyId')
logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}")
except Exception as decode_error:
logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}")
logger.error(traceback.format_exc())
return False
else:
# 原始格式处理
email = notification_data.get('emailAddress')
history_id = notification_data.get('historyId')
if not email or not history_id:
logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId")
return False
# 查找关联用户
from .models import User, GmailCredential
user = User.objects.filter(email=email).first()
credential = None
# 如果找不到用户尝试使用gmail_email字段查找
if not user:
logger.info(f"找不到email={email}的用户尝试使用gmail_email查找")
credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first()
if credential:
user = credential.user
logger.info(f"通过gmail_email找到用户: {user.email}")
if not user:
logger.error(f"找不到与 {email} 关联的用户")
return False
# 确保有相应的Gmail凭证
if not credential:
credential = GmailCredential.objects.filter(user=user, is_active=True).first()
if not credential:
logger.error(f"用户 {user.email} 没有活跃的Gmail凭证")
return False
# 检查凭证是否需要重新授权
if credential.needs_reauth:
logger.warning(f"Gmail凭证 {credential.id} 需要重新授权,无法处理通知")
# 记录当前通知到队列或数据库,供用户重新授权后再处理
try:
from .models import GmailNotificationQueue
# 存储通知到队列
GmailNotificationQueue.objects.create(
user=user,
gmail_credential=credential,
email=email,
history_id=history_id,
notification_data=json.dumps(notification_data)
)
logger.info(f"通知已保存到队列,等待用户重新授权后处理")
except Exception as queue_error:
logger.error(f"保存通知到队列失败: {str(queue_error)}")
return False
# 更新历史ID
if credential and history_id:
try:
# 仅当新的历史ID大于当前值时更新
if not credential.last_history_id or int(history_id) > int(credential.last_history_id):
credential.last_history_id = history_id
credential.save()
logger.info(f"更新历史ID: {history_id}")
else:
logger.info(f"收到的历史ID ({history_id}) 不大于当前值 ({credential.last_history_id}),不更新")
except Exception as update_error:
logger.error(f"更新历史ID失败: {str(update_error)}")
# 初始化Gmail集成
gmail_integration = GmailIntegration(user, gmail_credential_id=credential.id if credential else None)
# 记录详细的处理信息
logger.info(f"Gmail通知处理: 用户={user.email}, Gmail邮箱={email}, 历史ID={history_id}, 凭证ID={credential.id}")
# 设置超时
original_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
try:
# 认证Gmail服务
if not gmail_integration.authenticate():
logger.error(f"Gmail认证失败: {email}")
# 尝试刷新令牌
refresh_result = gmail_integration.refresh_token()
if not refresh_result:
logger.error("刷新令牌失败,需要用户重新授权")
return False
logger.info("令牌刷新成功,继续处理通知")
# 获取历史变更
message_ids = gmail_integration.get_history(history_id)
# 如果没有找到新消息,尝试获取最近的消息
if not message_ids:
logger.info("没有找到历史变更,尝试获取最近的邮件")
# 查询最近的5封邮件
try:
recent_messages = gmail_integration.gmail_service.users().messages().list(
userId='me',
maxResults=5
).execute()
if 'messages' in recent_messages:
message_ids = [msg['id'] for msg in recent_messages['messages']]
logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}")
else:
logger.info("没有找到最近的邮件")
except Exception as recent_error:
logger.error(f"获取最近邮件失败: {str(recent_error)}")
except Exception as e:
error_msg = str(e)
logger.error(f"处理Gmail通知时发生错误: {error_msg}")
# 检查是否是令牌过期
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
logger.warning("检测到OAuth令牌问题尝试刷新令牌")
# 尝试刷新令牌
refresh_result = gmail_integration.refresh_token()
if not refresh_result:
logger.error("刷新令牌失败,需要用户重新授权")
return False
# 再次尝试获取历史记录
try:
message_ids = gmail_integration.get_history(history_id)
# 如果仍然没有找到新消息,尝试获取最近的消息
if not message_ids:
logger.info("刷新令牌后仍未找到历史变更,尝试获取最近邮件")
recent_messages = gmail_integration.gmail_service.users().messages().list(
userId='me',
maxResults=5
).execute()
if 'messages' in recent_messages:
message_ids = [msg['id'] for msg in recent_messages['messages']]
logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}")
except Exception as retry_error:
logger.error(f"刷新令牌后再次尝试失败: {str(retry_error)}")
return False
else:
return False
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
if message_ids:
logger.info(f"找到 {len(message_ids)} 个需要处理的消息")
# 限制处理的消息数量,防止过多消息导致系统负载过高
max_messages = 10
if len(message_ids) > max_messages:
logger.warning(f"消息数量 ({len(message_ids)}) 超过限制 ({max_messages}),将只处理前 {max_messages}")
message_ids = message_ids[:max_messages]
# 处理消息
success_count = 0
for message_id in message_ids:
try:
# 设置超时
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
if self._process_new_message(gmail_integration, message_id):
success_count += 1
except Exception as msg_error:
error_msg = str(msg_error)
logger.error(f"处理消息 {message_id} 失败: {error_msg}")
# 检查是否是令牌过期
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
logger.warning("处理消息时检测到OAuth令牌问题标记需要重新授权")
gmail_integration._mark_credential_needs_reauth()
break
logger.error(traceback.format_exc())
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
logger.info(f"成功处理 {success_count}/{len(message_ids)} 个消息")
return success_count > 0
# 如果没有找到新消息但仍需确认处理验证GooglePubSub连接并保持通知持续
try:
# 简单验证连接性测试
logger.info(f"未找到需要处理的消息,执行服务连接测试")
profile = gmail_integration.gmail_service.users().getProfile(userId='me').execute()
if profile and 'emailAddress' in profile:
logger.info(f"Gmail服务连接正常: {profile['emailAddress']}")
# 检查并更新监听状态
if credential:
# 检查监听是否过期
needs_watch_renew = False
if credential.watch_expiration:
from django.utils import timezone
# 如果监听将在3天内过期提前更新
three_days_later = timezone.now() + timezone.timedelta(days=3)
if credential.watch_expiration < three_days_later:
needs_watch_renew = True
logger.info(f"监听将在3天内过期需要更新: {credential.watch_expiration}")
else:
needs_watch_renew = True
logger.info("没有监听过期时间记录,需要更新")
if needs_watch_renew:
try:
# 更新监听
watch_result = gmail_integration.setup_watch()
logger.info(f"更新监听成功: {watch_result}")
return True
except Exception as watch_error:
logger.error(f"更新监听失败: {str(watch_error)}")
return False
return True
except Exception as verify_error:
logger.error(f"Gmail服务连接测试失败: {str(verify_error)}")
return False
# 如果没有找到新消息,记录日志并返回成功
logger.info("没有找到新消息,处理完成")
return True
except Exception as e:
logger.error(f"处理Gmail通知失败: {str(e)}")
logger.error(traceback.format_exc())
return False
def _process_new_message(self, gmail_integration, message_id):
"""处理新收到的邮件"""
try:
# 导入所需模型
from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile
# 添加详细日志
logger.info(f"处理新消息ID: {message_id}")
# 获取消息内容
message = gmail_integration.gmail_service.users().messages().get(userId='me', id=message_id).execute()
# 从邮件中提取相关信息
email_content = gmail_integration._extract_email_content(message)
if not email_content:
logger.error(f"无法提取消息 {message_id} 的内容")
return False
# 记录邮件详情
sender = email_content.get('from', '')
recipient = email_content.get('to', '')
subject = email_content.get('subject', '')
body = email_content.get('body', '')
logger.info(f"提取的邮件信息: 发件人={sender}, 收件人={recipient}, 主题={subject}")
# 提取达人邮箱 - 可能是发件人或收件人
talent_email = None
is_talent_sending = False # 标记是否是达人发送的邮件
# 检查收件人是否是当前用户
user_email = None
if self.user and hasattr(self.user, 'email'):
user_email = self.user.email
# 如果发件人不是用户,则认为发件人是达人
if user_email and sender and user_email.lower() not in sender.lower():
talent_email = sender.lower()
is_talent_sending = True
logger.info(f"检测到达人(发件人)邮箱: {talent_email}")
# 如果收件人不是用户,则认为收件人是达人
elif user_email and recipient and user_email.lower() not in recipient.lower():
talent_email = recipient.lower()
is_talent_sending = False
logger.info(f"检测到达人(收件人)邮箱: {talent_email}")
# 从邮箱中提取纯地址(去除名称部分)
if talent_email:
if '<' in talent_email and '>' in talent_email:
talent_email = talent_email.split('<')[1].split('>')[0]
# 转换为小写,提高匹配准确性
talent_email = talent_email.lower()
# 如果找不到明确的达人邮箱,尝试从映射中查找
if not talent_email:
# 尝试从主题或正文中找线索
# 实现取决于具体需求
pass
logger.info(f"最终确定的达人邮箱: {talent_email}")
# 如果找到了达人邮箱,进行知识库处理
if talent_email:
# 创建或获取知识库
knowledge_base, created = self.create_talent_knowledge_base(talent_email)
if not knowledge_base:
logger.error(f"无法为达人 {talent_email} 创建知识库")
return False
logger.info(f"使用知识库: {knowledge_base.name} (ID: {knowledge_base.id}), 新创建: {created}")
# 获取映射关系
talent_mapping, mapping_created = GmailTalentMapping.objects.get_or_create(
user=self.user,
talent_email=talent_email,
defaults={
'knowledge_base': knowledge_base,
'conversation_id': f"gmail_{talent_email.replace('@', '_').replace('.', '_')}",
'is_active': True
}
)
# 获取对话ID
conversation_id = talent_mapping.conversation_id
# 创建聊天记录 - 确定正确的角色
# 修正角色判断逻辑达人始终是assistant用户始终是user
# 如果是达人发送的邮件角色应该是assistant如果是用户发送的邮件角色应该是user
chat_role = 'assistant' if is_talent_sending else 'user'
# 添加更详细的角色日志
logger.info(f"设置聊天角色: 角色={chat_role}, 是否达人发送={is_talent_sending}, 发件人={sender}, 达人邮箱={talent_email}")
# 创建或更新聊天记录
chat_entry = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
role=chat_role,
content=f"{subject}\n\n{body}",
parent_id=message_id
)
logger.info(f"已创建聊天记录: ID={chat_entry.id}, 角色={chat_role}, 对话ID={conversation_id}")
# 查找适合的parent_id获取对话中最后一条消息
try:
# 获取该对话中非自己的最新消息作为正确的父消息
last_message = ChatHistory.objects.filter(
conversation_id=conversation_id,
is_deleted=False
).exclude(
id=chat_entry.id # 排除刚刚创建的消息
).order_by('-created_at').first()
if last_message:
# 更新parent_id为对话中的上一条消息ID
chat_entry.parent_id = str(last_message.id)
chat_entry.save(update_fields=['parent_id'])
logger.info(f"更新消息 {chat_entry.id} 的parent_id为 {last_message.id}")
except Exception as parent_error:
logger.error(f"更新父消息ID失败: {str(parent_error)}")
# 处理附件
if 'attachments' in email_content and email_content['attachments']:
# 下载并处理附件
for attachment in email_content['attachments']:
try:
logger.info(f"处理附件: {attachment.get('filename')}")
# 下载附件
filepath = self.download_attachment(
message_id=message_id,
attachment_id=attachment.get('attachmentId'),
filename=attachment.get('filename')
)
if filepath:
# 创建附件记录
GmailAttachment.objects.create(
chat_message=chat_entry,
gmail_message_id=message_id,
filename=attachment.get('filename'),
filepath=filepath,
mimetype=attachment.get('mimeType', ''),
filesize=attachment.get('size', 0)
)
logger.info(f"已保存附件: {filepath}")
except Exception as attachment_error:
logger.error(f"处理附件 {attachment.get('filename')} 失败: {str(attachment_error)}")
# 检查是否需要自动回复
try:
# 只有达人发送的邮件才考虑自动回复
if is_talent_sending and chat_role == 'user':
# 检查用户是否启用了自动回复
profile = UserProfile.objects.filter(user=self.user).first()
if profile and profile.auto_recommend_reply:
logger.info(f"用户 {self.user.email} 已启用自动回复,生成回复建议")
# 获取该对话的历史记录
conversation_history = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('created_at')
# 生成回复
recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history)
if recommended_reply:
logger.info(f"生成了推荐回复,长度: {len(recommended_reply)}")
# 保存推荐回复
recommended_reply_entry = ChatHistory.objects.create(
user=self.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
role='user',
content=recommended_reply,
parent_id=chat_entry.id
)
logger.info(f"已保存推荐回复到历史记录, 角色=user, ID={recommended_reply_entry.id}, 父消息ID={chat_entry.id}")
except Exception as auto_reply_error:
logger.error(f"生成自动回复失败: {str(auto_reply_error)}")
# 记录详细的处理结果日志
logger.info(f"成功处理达人邮件: ID={message_id}, 发件人={sender}, 收件人={recipient}, 主题={subject}")
logger.info(f"保存消息信息: 角色={chat_role}, 对话ID={conversation_id}, 是否达人发送={is_talent_sending}")
return True
else:
logger.warning(f"无法确定达人邮箱,跳过知识库处理")
return False
except Exception as e:
logger.error(f"处理新消息 {message_id} 失败: {str(e)}")
logger.error(traceback.format_exc())
return False
def _get_recommended_reply_from_deepseek(self, conversation_history):
"""调用DeepSeek API生成回复建议"""
try:
# 使用有效的API密钥
api_key = ""
# 尝试从环境变量获取
import os
from dotenv import load_dotenv
load_dotenv()
env_api_key = os.environ.get('DEEPSEEK_API_KEY')
if env_api_key:
api_key = env_api_key
# 从Django设置中获取密钥
from django.conf import settings
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
api_key = settings.DEEPSEEK_API_KEY
# 如果仍然没有有效的API密钥使用默认值
if not api_key:
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
logger.warning("使用默认API密钥请在环境变量或settings.py中设置DEEPSEEK_API_KEY")
url = "https://api.siliconflow.cn/v1/chat/completions"
# 获取用户总目标
from .models import UserGoal, ConversationSummary
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
goal_content = user_goal.content if user_goal else None
# 尝试获取对话总结
talent_email = None
conversation_id = None
# 从对话历史中尝试提取达人邮箱
for message in conversation_history:
if message.get('role') == 'user' and 'metadata' in message and 'from_email' in message['metadata']:
talent_email = message['metadata']['from_email']
break
# 从对话历史中尝试提取对话ID
for message in conversation_history:
if 'metadata' in message and 'conversation_id' in message['metadata']:
conversation_id = message['metadata']['conversation_id']
break
# 获取对话总结
conversation_summary = None
if talent_email and conversation_id:
summary_obj = ConversationSummary.objects.filter(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
is_active=True
).first()
if summary_obj:
conversation_summary = summary_obj.summary
# 构建系统提示,结合用户总目标和对话总结
system_content = "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。"
if goal_content:
system_content += f"\n\n【用户总目标】\n{goal_content}\n\n"
if conversation_summary:
system_content += f"【对话总结】\n{conversation_summary}\n\n"
system_content += "请针对达人最后一条消息结合用户总目标和对话总结生成一个有针对性的专业回复。回复必须对达人当前问题直接响应同时与用户的总体合作目标保持一致。回复应该有至少100个字符必须提供有实质内容的回复。"
system_message = {
"role": "system",
"content": system_content
}
messages = [system_message]
# 限制对话历史长度只保留最近的5条消息避免超出token限制
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
messages.extend(recent_messages)
# 确保最后一条消息是用户消息,如果不是,添加一个提示
if not recent_messages or recent_messages[-1]['role'] != 'user':
# 添加一个系统消息作为用户的最后一条消息
messages.append({
"role": "user",
"content": "请针对达人最后一条消息生成专业的回复,考虑我们之前的对话历史和我设定的总目标。"
})
# 完全按照文档提供的参数格式构建请求
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"stream": False,
"max_tokens": 1024, # 增加token上限
"temperature": 0.7, # 提高多样性
"top_p": 0.9,
"top_k": 50,
"frequency_penalty": 0.5,
"presence_penalty": 0.2, # 添加新参数
"n": 1,
"stop": [],
"response_format": {
"type": "text"
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
logger.info(f"开始调用DeepSeek API生成推荐回复")
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
return None
result = response.json()
logger.debug(f"DeepSeek API返回: {result}")
# 提取回复内容
if 'choices' in result and len(result['choices']) > 0:
reply = result['choices'][0]['message']['content']
# 如果返回的内容为空直接返回None
if not reply or reply.strip() == '':
logger.warning("DeepSeek API返回的回复内容为空")
return None
return reply
logger.warning(f"DeepSeek API返回格式异常: {result}")
return None
except Exception as e:
logger.error(f"调用DeepSeek API失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def get_attachment_by_conversation(self, conversation_id):
"""获取特定对话的所有附件"""
try:
# 查找对话记录
records = ChatHistory.objects.filter(
conversation_id=conversation_id,
user=self.user,
is_deleted=False
)
if not records:
logger.warning(f"找不到对话记录: {conversation_id}")
return []
# 使用GmailAttachment数据模型查询附件
attachments = []
records_ids = [record.id for record in records]
gmail_attachments = GmailAttachment.objects.filter(
chat_message_id__in=records_ids
).select_related('chat_message')
for attachment in gmail_attachments:
chat_message = attachment.chat_message
attachments.append({
'id': str(attachment.id),
'filename': attachment.filename,
'filepath': attachment.filepath,
'mimetype': attachment.mimetype,
'filesize': attachment.filesize,
'message_id': str(chat_message.id),
'gmail_message_id': attachment.gmail_message_id,
'role': chat_message.role,
'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content
})
return attachments
except Exception as e:
logger.error(f"获取对话附件失败: {str(e)}")
return []
def handle_auth_code(self, auth_code):
"""处理OAuth2回调授权码"""
try:
flow = self.get_oauth_flow()
flow.fetch_token(code=auth_code)
self.credentials = flow.credentials
# 初始化Gmail服务
self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http()))
# 获取Gmail账号
gmail_email = None
try:
# 调用API获取用户资料
profile = self.gmail_service.users().getProfile(userId='me').execute()
gmail_email = profile.get('emailAddress')
logger.info(f"获取到Gmail账号: {gmail_email}")
except Exception as e:
logger.error(f"获取Gmail账号失败: {str(e)}")
# 保存凭证到数据库
if self.user:
from django.utils import timezone
# 将凭证对象序列化
credentials_data = pickle.dumps(self.credentials)
# 如果提供了具体的gmail_credential_id更新对应的记录
if self.gmail_credential_id:
try:
gmail_credential = GmailCredential.objects.get(
id=self.gmail_credential_id,
user=self.user
)
# 更新凭证信息
gmail_credential.credentials = credentials_data
gmail_credential.gmail_email = gmail_email
gmail_credential.token_path = self.token_storage_path
gmail_credential.updated_at = timezone.now()
gmail_credential.is_active = True
gmail_credential.save()
self.gmail_credential = gmail_credential
logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证Gmail账号: {gmail_email}")
except GmailCredential.DoesNotExist:
logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证将创建新凭证")
self.gmail_credential_id = None
# 如果没有具体的gmail_credential_id或者指定的不存在创建或更新默认凭证
if not self.gmail_credential_id:
# 检查是否已存在相同gmail_email的凭证
existing_credential = None
if gmail_email:
existing_credential = GmailCredential.objects.filter(
user=self.user,
gmail_email=gmail_email,
is_active=True
).first()
if existing_credential:
# 更新现有凭证
existing_credential.credentials = credentials_data
existing_credential.token_path = self.token_storage_path
existing_credential.updated_at = timezone.now()
existing_credential.save()
self.gmail_credential = existing_credential
logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证")
else:
# 获取Gmail账户数量
gmail_count = GmailCredential.objects.filter(user=self.user).count()
# 创建新凭证
name = f"Gmail账号 {gmail_count + 1}"
if gmail_email:
name = gmail_email
else:
# 确保gmail_email有默认值避免null错误
gmail_email = "未知邮箱"
# 将凭证转换为JSON字符串
if isinstance(credentials_data, dict):
# 确保json模块在本地作用域可访问
import json
credentials_data = json.dumps(credentials_data)
gmail_credential = GmailCredential.objects.create(
user=self.user,
credentials=credentials_data,
token_path=self.token_storage_path,
gmail_email=gmail_email,
name=name,
is_default=(gmail_count == 0), # 第一个账号设为默认
updated_at=timezone.now(),
is_active=True
)
self.gmail_credential = gmail_credential
logger.info(f"已创建新的Gmail凭证Gmail账号: {gmail_email}, 名称: {name}")
# 更新单例
GmailServiceManager.update_instance(
self.user,
self.credentials,
self.gmail_service,
self.gmail_credential
)
return {
'status': 'success',
'gmail_email': gmail_email
}
except Exception as e:
logger.error(f"处理授权码失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations):
"""上传所有邮件附件到知识库"""
try:
# 收集所有需要上传的附件
attachments_to_upload = []
# 从conversations中提取所有附件信息
for message in conversations:
if 'attachments' in message and message['attachments']:
message_id = message.get('id', '')
sender = message.get('from', '未知发件人')
subject = message.get('subject', '无主题')
date_str = message.get('date', '未知时间')
for attachment in message['attachments']:
if 'attachmentId' in attachment and 'filename' in attachment:
# 下载附件(如果尚未下载)
filepath = None
# 检查数据库中是否已有记录
existing_attachment = GmailAttachment.objects.filter(
gmail_message_id=message_id,
filename=attachment['filename']
).first()
if existing_attachment and os.path.exists(existing_attachment.filepath):
filepath = existing_attachment.filepath
else:
# 下载附件
filepath = self.download_attachment(
message_id=message_id,
attachment_id=attachment['attachmentId'],
filename=attachment['filename']
)
if filepath:
attachments_to_upload.append({
'filepath': filepath,
'filename': attachment['filename'],
'message_id': message_id,
'sender': sender,
'subject': subject,
'date': date_str
})
# 上传收集到的所有附件
if attachments_to_upload:
logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库")
from .views import KnowledgeBaseViewSet
import django.core.files.uploadedfile as uploadedfile
# 导入FileUploadParser
from rest_framework.parsers import FileUploadParser
# 创建视图集实例
kb_viewset = KnowledgeBaseViewSet()
# 批量上传附件
for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件
batch = attachments_to_upload[i:i+10]
files = []
for att in batch:
filepath = att['filepath']
filename = att['filename']
# 确认文件存在
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 读取文件并创建UploadedFile对象
with open(filepath, 'rb') as f:
file_content = f.read()
files.append(uploadedfile.SimpleUploadedFile(
name=filename,
content=file_content,
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
))
# 如果这批有文件调用_call_split_api_multiple上传
if files:
# 直接调用文档分割API
split_response = kb_viewset._call_split_api_multiple(files)
if not split_response or split_response.get('code') != 200:
logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
continue
# 处理分割后的文档
documents_data = split_response.get('data', [])
# 对每个文档调用上传API
for doc in documents_data:
doc_name = doc.get('name', 'Gmail附件')
doc_content = doc.get('content', [])
# 准备文档数据
upload_doc_data = {
"name": doc_name,
"paragraphs": []
}
# 将所有段落添加到文档中
for paragraph in doc_content:
upload_doc_data["paragraphs"].append({
"content": paragraph.get('content', ''),
"title": paragraph.get('title', ''),
"is_active": True,
"problem_list": []
})
# 调用文档上传API
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_name,
external_id=document_id
)
logger.info(f"Gmail附件文档上传成功ID: {document_id}, 文件名: {doc_name}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件")
except Exception as e:
logger.error(f"上传附件到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records):
"""上传单个邮件的附件到知识库"""
try:
# 检查是否有附件需要上传
if not attachment_records:
return
logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库")
from .views import KnowledgeBaseViewSet
import django.core.files.uploadedfile as uploadedfile
# 创建视图集实例
kb_viewset = KnowledgeBaseViewSet()
# 准备文件列表
files = []
for att in attachment_records:
filepath = att.get('filepath')
filename = att.get('filename')
# 确认文件存在
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 读取文件并创建UploadedFile对象
with open(filepath, 'rb') as f:
file_content = f.read()
files.append(uploadedfile.SimpleUploadedFile(
name=filename,
content=file_content,
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
))
# 如果有文件调用_call_split_api_multiple上传
if files:
# 直接调用文档分割API
split_response = kb_viewset._call_split_api_multiple(files)
if not split_response or split_response.get('code') != 200:
logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
return
# 处理分割后的文档
documents_data = split_response.get('data', [])
# 对每个文档调用上传API
for doc in documents_data:
doc_name = doc.get('name', 'Gmail附件')
doc_content = doc.get('content', [])
# 准备文档数据
upload_doc_data = {
"name": doc_name,
"paragraphs": []
}
# 将所有段落添加到文档中
for paragraph in doc_content:
upload_doc_data["paragraphs"].append({
"content": paragraph.get('content', ''),
"title": paragraph.get('title', ''),
"is_active": True,
"problem_list": []
})
# 调用文档上传API
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
# 创建文档记录
from .models import KnowledgeBaseDocument
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=knowledge_base,
document_id=document_id,
document_name=doc_name,
external_id=document_id
)
logger.info(f"Gmail附件文档上传成功ID: {document_id}, 文件名: {doc_name}")
# 更新知识库文档计数
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
status='active'
).count()
knowledge_base.save()
logger.info(f"完成附件上传,共 {len(files)} 个文件")
except Exception as e:
logger.error(f"上传附件到知识库失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def get_recent_emails(self, from_email=None, max_results=10):
"""获取最近的邮件"""
try:
service = self.gmail_service
if not service:
logger.error("Gmail服务未初始化")
return []
query = ""
if from_email:
query = f"from:{from_email}"
# 获取收件箱中的邮件列表
results = service.users().messages().list(
userId='me',
maxResults=max_results,
q=query
).execute()
messages = results.get('messages', [])
emails = []
for message in messages:
msg = service.users().messages().get(userId='me', id=message['id']).execute()
email_data = self._extract_email_content(msg)
emails.append(email_data)
return emails
except Exception as e:
logger.error(f"获取最近邮件失败: {str(e)}")
logger.error(traceback.format_exc())
return []
def manage_user_goal(self, goal_content=None):
"""
创建或更新用户总目标
Args:
goal_content (str): 用户总目标内容如果为None则返回当前总目标
Returns:
dict: 包含操作结果和总目标信息
"""
from .models import UserGoal
try:
# 如果未提供内容,则返回当前总目标
if goal_content is None:
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
if user_goal:
return {
'status': 'success',
'action': 'retrieve',
'goal': {
'id': str(user_goal.id),
'content': user_goal.content,
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
else:
return {
'status': 'success',
'action': 'retrieve',
'goal': None
}
# 查找当前活跃的用户总目标
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
# 如果已存在,则更新
if user_goal:
user_goal.content = goal_content
user_goal.save()
action = 'update'
else:
# 否则创建新的总目标
user_goal = UserGoal.objects.create(
user=self.user,
content=goal_content
)
action = 'create'
return {
'status': 'success',
'action': action,
'goal': {
'id': str(user_goal.id),
'content': user_goal.content,
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
except Exception as e:
logger.error(f"管理用户总目标失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': f"管理用户总目标失败: {str(e)}"
}
def generate_conversation_summary(self, talent_email):
"""
为用户与特定达人的所有对话生成总结
Args:
talent_email (str): 达人的邮箱地址
Returns:
dict: 包含操作结果和总结信息
"""
from .models import ConversationSummary
try:
# 获取与该达人的所有对话
conversations = self.get_conversations(talent_email)
if not conversations:
return {
'status': 'error',
'message': f"未找到与{talent_email}的对话记录"
}
# 准备对话历史记录
conversation_history = []
for conversation in conversations:
if 'subject' in conversation and conversation['subject']:
conversation_history.append({
'role': 'system',
'content': f"邮件主题: {conversation['subject']}"
})
for message in conversation.get('messages', []):
role = 'user' if message.get('from_email') == talent_email else 'assistant'
content = message.get('body', '')
if content:
conversation_history.append({
'role': role,
'content': content
})
# 调用DeepSeek API生成总结
summary = self._generate_summary_from_deepseek(conversation_history)
if not summary:
return {
'status': 'error',
'message': "生成对话总结失败"
}
# 保存或更新总结
conversation_id = conversations[0].get('id') if conversations else None
if not conversation_id:
return {
'status': 'error',
'message': "无法确定对话ID"
}
# 查找现有总结
conversation_summary = ConversationSummary.objects.filter(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
is_active=True
).first()
# 如果已存在,则更新
if conversation_summary:
conversation_summary.summary = summary
conversation_summary.save()
action = 'update'
else:
# 否则创建新的总结
conversation_summary = ConversationSummary.objects.create(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
summary=summary
)
action = 'create'
return {
'status': 'success',
'action': action,
'summary': {
'id': str(conversation_summary.id),
'talent_email': talent_email,
'conversation_id': conversation_id,
'summary': summary,
'created_at': conversation_summary.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': conversation_summary.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
except Exception as e:
logger.error(f"生成对话总结失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': f"生成对话总结失败: {str(e)}"
}
def _generate_summary_from_deepseek(self, conversation_history):
"""调用DeepSeek API生成对话总结"""
try:
# 使用有效的API密钥
api_key = ""
# 尝试从环境变量获取
import os
from dotenv import load_dotenv
load_dotenv()
env_api_key = os.environ.get('DEEPSEEK_API_KEY')
if env_api_key:
api_key = env_api_key
# 从Django设置中获取密钥
from django.conf import settings
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
api_key = settings.DEEPSEEK_API_KEY
# 如果仍然没有有效的API密钥使用默认值
if not api_key:
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
logger.warning("使用默认API密钥请在环境变量或settings.py中设置DEEPSEEK_API_KEY")
url = "https://api.siliconflow.cn/v1/chat/completions"
# 系统消息指定生成总结的任务
system_message = {
"role": "system",
"content": "你是一位专业的电商客服和达人助手。你的任务是分析用户与达人之间的所有对话历史并生成一份简明扼要的总结。总结应包括1. 主要讨论的产品或服务2. 达人的主要关注点和需求3. 已经达成的共识或协议4. 未解决的问题或后续需要跟进的事项。总结应该客观、全面、结构清晰。"
}
messages = [system_message]
# 添加对话历史但限制消息数量避免超出token限制
# 如果对话历史太长,可能需要进一步处理或分割
if len(conversation_history) > 20:
# 选取关键消息:第一条、最后几条以及中间的一些重要消息
selected_messages = (
conversation_history[:2] + # 前两条
conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条
conversation_history[-6:] # 最后六条
)
messages.extend(selected_messages)
else:
messages.extend(conversation_history)
# 添加用户指令
messages.append({
"role": "user",
"content": "请根据以上对话历史生成一份全面的总结。"
})
# 构建API请求
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"stream": False,
"max_tokens": 1500, # 增加token上限以容纳完整总结
"temperature": 0.3, # 降低随机性,使总结更加确定性
"top_p": 0.9,
"top_k": 50,
"frequency_penalty": 0.5,
"presence_penalty": 0.2,
"n": 1,
"stop": [],
"response_format": {
"type": "text"
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
logger.info(f"开始调用DeepSeek API生成对话总结")
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
return None
result = response.json()
logger.debug(f"DeepSeek API返回: {result}")
# 提取回复内容
if 'choices' in result and len(result['choices']) > 0:
summary = result['choices'][0]['message']['content']
# 如果返回的内容为空直接返回None
if not summary or summary.strip() == '':
logger.warning("DeepSeek API返回的总结内容为空")
return None
return summary
logger.warning(f"DeepSeek API返回格式异常: {result}")
return None
except Exception as e:
logger.error(f"调用DeepSeek API生成总结失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def get_oauth_flow(self):
"""创建OAuth2流程处理器"""
try:
# 确保client_secret_json已提供
if not self.client_secret:
logger.error("未提供client_secret_json无法创建OAuth流程")
raise ValueError("未提供client_secret_json")
# 创建临时文件存储client_secret
client_secret_path = 'client_secret.json'
with open(client_secret_path, 'w') as f:
if isinstance(self.client_secret, str):
try:
# 确保是有效的JSON
import json # 在本地作用域引入
json_data = json.loads(self.client_secret)
# 强制设置redirect_uris为非浏览器模式避免localhost连接拒绝问题
for key in ['web', 'installed']:
if key in json_data and 'redirect_uris' in json_data[key]:
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
logger.info("已强制设置redirect_uri为非浏览器模式")
json.dump(json_data, f)
except json.JSONDecodeError as e:
logger.error(f"client_secret不是有效的JSON: {str(e)}")
raise ValueError(f"client_secret不是有效的JSON: {str(e)}")
else:
# 如果是字典,也进行相同处理
import json # 在本地作用域引入
client_secret_dict = dict(self.client_secret)
for key in ['web', 'installed']:
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
logger.info("已强制设置redirect_uri为非浏览器模式")
json.dump(client_secret_dict, f)
# 从client_secret创建flow
flow = client.flow_from_clientsecrets(
client_secret_path,
self.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob'
)
return flow
except Exception as e:
logger.error(f"创建OAuth流程失败: {str(e)}")
logger.error(traceback.format_exc())
raise e
finally:
# 删除临时文件
if os.path.exists(client_secret_path):
try:
os.unlink(client_secret_path)
except Exception as del_e:
logger.error(f"删除临时文件失败: {str(del_e)}")
def check_and_renew_watch(self):
"""检查并更新Gmail监听状态
如果监听即将过期或已经过期,则自动更新监听
"""
try:
from .models import GmailCredential
# 获取用户的Gmail凭证
credential = GmailCredential.objects.filter(
user=self.user,
is_active=True,
gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None
).first()
if not credential:
logger.error(f"找不到用户 {self.user.email} 的Gmail凭证")
return False
# 检查监听是否需要更新
needs_renewal = False
# 如果没有watch_expiration需要设置监听
if not credential.watch_expiration:
logger.info(f"Gmail凭证 {credential.gmail_email} 没有监听过期时间,需要设置监听")
needs_renewal = True
else:
# 如果监听将在24小时内过期更新监听
now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now()
time_until_expiration = credential.watch_expiration - now
hours_until_expiration = time_until_expiration.total_seconds() / 3600
if hours_until_expiration < 24:
logger.info(f"Gmail监听将在 {hours_until_expiration:.2f} 小时后过期,需要更新")
needs_renewal = True
if needs_renewal:
logger.info(f"为Gmail凭证 {credential.gmail_email} 更新监听")
watch_result = self.setup_watch()
if watch_result and 'historyId' in watch_result:
logger.info(f"Gmail监听更新成功: historyId={watch_result['historyId']}")
return True
else:
logger.error("Gmail监听更新失败")
return False
else:
logger.info(f"Gmail监听状态良好无需更新。过期时间: {credential.watch_expiration}")
return True
except Exception as e:
logger.error(f"检查和更新Gmail监听失败: {str(e)}")
logger.error(traceback.format_exc())
return False
def verify_connectivity(self):
"""验证与Gmail API的连接性
测试Gmail API连接并返回连接状态
Returns:
dict: 包含连接测试结果的信息
"""
try:
if not hasattr(self, 'gmail_service') or not self.gmail_service:
logger.warning("Gmail服务未初始化尝试认证")
if not self.authenticate():
return {
'status': 'error',
'message': 'Gmail认证失败',
'is_connected': False
}
# 尝试获取用户的个人资料这是一个轻量级的API调用
profile = self.gmail_service.users().getProfile(userId='me').execute()
# 检查是否有必要的监听信息
from .models import GmailCredential
credential = GmailCredential.objects.filter(
user=self.user,
is_active=True,
gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None
).first()
watch_info = {}
if credential:
watch_info = {
'has_watch': credential.watch_expiration is not None,
'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None,
'last_history_id': credential.last_history_id
}
return {
'status': 'success',
'message': 'Gmail连接正常',
'is_connected': True,
'profile': {
'email': profile.get('emailAddress'),
'messages_total': profile.get('messagesTotal'),
'threads_total': profile.get('threadsTotal')
},
'watch_info': watch_info
}
except Exception as e:
logger.error(f"Gmail连接测试失败: {str(e)}")
logger.error(traceback.format_exc())
error_message = str(e)
suggestions = []
if "invalid_grant" in error_message.lower():
suggestions.append("OAuth令牌已过期需要重新授权")
elif "401" in error_message:
suggestions.append("认证失败请重新登录Gmail账号")
elif "EOF occurred in violation of protocol" in error_message:
suggestions.append("SSL连接问题可能是网络或代理配置问题")
elif "connect timeout" in error_message.lower() or "connection timeout" in error_message.lower():
suggestions.append("连接超时,请检查网络连接和代理设置")
return {
'status': 'error',
'message': f'Gmail连接失败: {error_message}',
'is_connected': False,
'suggestions': suggestions
}
@classmethod
def batch_renew_watches(cls):
"""
批量检查和更新所有用户的Gmail监听状态
此方法可以由定时任务调用确保所有用户的Gmail监听都保持活跃状态
Returns:
dict: 包含批处理结果的信息
"""
try:
from .models import GmailCredential, User
import time
# 获取所有活跃的Gmail凭证
now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now()
# 计算24小时后的时间
expiration_threshold = now + timedelta(hours=24)
# 获取即将过期或没有监听的凭证
credentials_to_update = GmailCredential.objects.filter(
is_active=True
).filter(
Q(watch_expiration__lt=expiration_threshold) |
Q(watch_expiration__isnull=True)
)
logger.info(f"找到 {credentials_to_update.count()} 个需要更新监听的Gmail凭证")
success_count = 0
failure_count = 0
# 依次处理每个凭证
for credential in credentials_to_update:
try:
user = credential.user
if not user or not user.is_active:
logger.warning(f"跳过已停用用户的凭证: {credential.id}")
continue
logger.info(f"处理用户 {user.email} 的Gmail凭证 {credential.gmail_email}")
# 创建Gmail集成实例
gmail = cls(user, gmail_credential_id=credential.id)
# 认证
if not gmail.authenticate():
logger.error(f"用户 {user.email} 的Gmail认证失败")
failure_count += 1
continue
# 更新监听
result = gmail.setup_watch()
if result and 'historyId' in result:
logger.info(f"成功更新用户 {user.email} 的Gmail监听")
success_count += 1
else:
logger.error(f"更新用户 {user.email} 的Gmail监听失败")
failure_count += 1
# 休眠一小段时间,避免请求过于频繁
time.sleep(1)
except Exception as e:
failure_count += 1
logger.error(f"处理凭证 {credential.id} 时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'success',
'message': f'批量更新Gmail监听完成',
'total_processed': credentials_to_update.count(),
'success_count': success_count,
'failure_count': failure_count
}
except Exception as e:
logger.error(f"批量更新Gmail监听失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': f'批量更新Gmail监听失败: {str(e)}'
}
def refresh_token(self):
"""
尝试刷新OAuth令牌如果失败则标记凭证需要重新授权
Returns:
bool: 刷新是否成功
"""
try:
logger.info(f"尝试刷新用户 {self.user.email} 的OAuth令牌")
# 清除当前服务实例
GmailServiceManager.clear_instance(self.user, self.gmail_credential_id)
# 尝试重新认证
if not self.authenticate():
logger.error("重新认证失败")
self._mark_credential_needs_reauth()
return False
# 验证新的令牌是否有效
try:
# 设置超时
original_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
try:
# 尝试一个简单的API调用来验证
profile = self.gmail_service.users().getProfile(userId='me').execute()
logger.info(f"令牌刷新成功,已验证: {profile.get('emailAddress')}")
return True
except Exception as e:
error_msg = str(e)
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
logger.error(f"令牌仍然无效: {error_msg}")
self._mark_credential_needs_reauth()
return False
else:
# 其他错误,但令牌可能是有效的
logger.warning(f"令牌可能有效但API调用失败: {error_msg}")
return True
finally:
# 恢复原始超时设置
socket.setdefaulttimeout(original_timeout)
except Exception as e:
logger.error(f"验证新令牌时出错: {str(e)}")
return False
except Exception as e:
logger.error(f"刷新令牌失败: {str(e)}")
logger.error(traceback.format_exc())
self._mark_credential_needs_reauth()
return False
def _mark_credential_needs_reauth(self):
"""标记凭证需要重新授权"""
try:
from .models import GmailCredential
# 查找当前凭证
credential = GmailCredential.objects.filter(
user=self.user,
id=self.gmail_credential_id if self.gmail_credential_id else None,
is_active=True
).first()
if credential:
# 标记需要重新授权
credential.needs_reauth = True
credential.save()
logger.info(f"已标记Gmail凭证 {credential.id} 需要重新授权")
# 从缓存中移除服务实例
GmailServiceManager.clear_instance(self.user, self.gmail_credential_id)
except Exception as e:
logger.error(f"标记凭证需要重新授权失败: {str(e)}")
@classmethod
def process_queued_notifications(cls, user=None):
"""
处理队列中的通知,可以在用户重新授权后调用
Args:
user: 可选的用户参数,如果提供,只处理该用户的通知
Returns:
dict: 处理结果统计
"""
try:
from .models import GmailNotificationQueue, GmailCredential
import json
# 构建查询
query = Q(processed=False)
if user:
query &= Q(user=user)
# 获取未处理的通知
queued_notifications = GmailNotificationQueue.objects.filter(query)
logger.info(f"找到 {queued_notifications.count()} 条未处理的Gmail通知")
processed_count = 0
success_count = 0
# 按用户ID和Gmail凭证ID分组处理
from django.db.models import Count
user_creds = queued_notifications.values('user_id', 'gmail_credential_id').annotate(
count=Count('id')
).order_by('user_id', 'gmail_credential_id')
for user_cred in user_creds:
user_id = user_cred['user_id']
cred_id = user_cred['gmail_credential_id']
# 获取凭证信息
credential = GmailCredential.objects.filter(id=cred_id, is_active=True).first()
if not credential or credential.needs_reauth:
logger.warning(f"凭证 {cred_id} 需要重新授权,跳过相关通知")
continue
# 获取用户对象
from .models import User
user_obj = User.objects.get(id=user_id)
# 初始化Gmail集成
gmail_integration = cls(user_obj, gmail_credential_id=cred_id)
# 获取用户的队列通知
user_notifications = queued_notifications.filter(
user_id=user_id,
gmail_credential_id=cred_id
).order_by('created_at') # 按创建时间排序
for notification in user_notifications:
try:
# 解析通知数据
try:
notification_data = json.loads(notification.notification_data)
except:
notification_data = {
'emailAddress': notification.email,
'historyId': notification.history_id
}
# 处理通知
result = gmail_integration.process_notification(notification_data)
# 更新处理状态
notification.processed = True
notification.success = result
notification.processed_at = timezone.now()
notification.save()
processed_count += 1
if result:
success_count += 1
except Exception as e:
logger.error(f"处理队列通知 {notification.id} 失败: {str(e)}")
# 标记处理失败
notification.processed = True
notification.success = False
notification.error_message = str(e)[:255] # 截断错误消息
notification.processed_at = timezone.now()
notification.save()
processed_count += 1
return {
'status': 'success',
'total_processed': processed_count,
'success_count': success_count,
'remaining': queued_notifications.count() - processed_count
}
except Exception as e:
logger.error(f"处理队列通知失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
def _load_credentials_from_storage(self, credential_data):
"""从存储中加载凭证,处理可能的格式不匹配问题"""
logger.info("尝试加载凭证数据")
if not credential_data:
logger.error("凭证数据为空")
return None
# 方法1: 尝试直接JSON解析
try:
import json
if isinstance(credential_data, str):
cred_json = credential_data
else:
cred_json = credential_data.decode('utf-8')
# 验证是否为有效JSON
json.loads(cred_json)
# 使用OAuth2Credentials从JSON创建凭证
logger.info("成功从JSON创建凭证")
return client.OAuth2Credentials.from_json(cred_json)
except Exception as json_error:
logger.error(f"JSON解析凭证失败: {str(json_error)}")
# 方法2: 尝试pickle解析
try:
if isinstance(credential_data, str):
# 如果是字符串,尝试编码为二进制
pickle_data = credential_data.encode('latin1')
else:
pickle_data = credential_data
logger.info("尝试使用pickle解析凭证")
return pickle.loads(pickle_data)
except Exception as pickle_error:
logger.error(f"Pickle解析凭证失败: {str(pickle_error)}")
# 所有方法都失败
logger.error("所有凭证解析方法都失败")
return None