3650 lines
169 KiB
Python
3650 lines
169 KiB
Python
import os
|
||
import json
|
||
import uuid
|
||
import base64
|
||
import pickle
|
||
import logging
|
||
import traceback # 确保导入traceback模块
|
||
from pathlib import Path
|
||
import dateutil.parser as parser
|
||
from datetime import datetime, timedelta
|
||
from bs4 import BeautifulSoup
|
||
from django.utils import timezone
|
||
from django.conf import settings # 添加Django设置导入
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.base import MIMEBase
|
||
from email import encoders
|
||
import warnings
|
||
import mimetypes
|
||
import requests
|
||
import django.db.utils
|
||
import time
|
||
from django.db.models import Q
|
||
import socket
|
||
|
||
# 忽略oauth2client相关的所有警告
|
||
warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth')
|
||
warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client')
|
||
warnings.filterwarnings('ignore', message='.*google-auth.*')
|
||
warnings.filterwarnings('ignore', message='.*oauth2client.*')
|
||
|
||
from apiclient import discovery
|
||
from httplib2 import Http
|
||
from oauth2client import file, client, tools
|
||
|
||
from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile
|
||
|
||
# Gmail API调用超时设置(秒)
|
||
GMAIL_REQUEST_TIMEOUT = getattr(settings, 'GMAIL_REQUEST_TIMEOUT', 30)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Gmail服务单例管理器
|
||
class GmailServiceManager:
|
||
_instances = {} # 以用户ID和Gmail凭证ID为键存储Gmail服务实例
|
||
|
||
@classmethod
|
||
def get_instance(cls, user, gmail_credential_id=None):
|
||
"""
|
||
获取用户的Gmail服务实例,如果不存在则创建
|
||
|
||
参数:
|
||
user: 用户对象
|
||
gmail_credential_id: 指定Gmail凭证ID,如果为None,则使用默认凭证
|
||
"""
|
||
user_id = str(user.id)
|
||
|
||
# 生成实例键,组合用户ID和Gmail凭证ID
|
||
if gmail_credential_id:
|
||
instance_key = f"{user_id}:{gmail_credential_id}"
|
||
else:
|
||
instance_key = user_id
|
||
|
||
if instance_key not in cls._instances:
|
||
try:
|
||
# 从数据库获取认证信息
|
||
if gmail_credential_id:
|
||
# 获取指定ID的凭证
|
||
credential = GmailCredential.objects.filter(
|
||
id=gmail_credential_id,
|
||
user=user,
|
||
is_active=True
|
||
).first()
|
||
else:
|
||
# 获取默认凭证,优先获取is_default=True的凭证
|
||
credential = GmailCredential.objects.filter(
|
||
user=user,
|
||
is_active=True,
|
||
is_default=True
|
||
).first()
|
||
|
||
# 如果没有找到默认凭证,则获取最近更新的一个凭证
|
||
if not credential:
|
||
credential = GmailCredential.objects.filter(
|
||
user=user,
|
||
is_active=True
|
||
).order_by('-updated_at').first()
|
||
|
||
if credential and credential.credentials:
|
||
# 反序列化凭证
|
||
creds = pickle.loads(credential.credentials)
|
||
|
||
# 检查凭证是否有效
|
||
if not creds.invalid:
|
||
# 初始化服务
|
||
gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
|
||
|
||
# 存储实例
|
||
cls._instances[instance_key] = {
|
||
'service': gmail_service,
|
||
'credentials': creds,
|
||
'timestamp': timezone.now(),
|
||
'user': user,
|
||
'gmail_credential': credential
|
||
}
|
||
|
||
logger.info(f"创建用户 {user.username} 的Gmail服务单例,Gmail账号: {credential.gmail_email or '未知'},名称: {credential.name}")
|
||
return cls._instances[instance_key]
|
||
except Exception as e:
|
||
logger.error(f"创建Gmail服务单例失败: {e}")
|
||
else:
|
||
# 检查实例是否过期(超过30分钟)
|
||
instance = cls._instances[instance_key]
|
||
time_diff = timezone.now() - instance['timestamp']
|
||
|
||
if time_diff.total_seconds() > 1800: # 30分钟过期
|
||
del cls._instances[instance_key]
|
||
return cls.get_instance(user, gmail_credential_id) # 递归调用,重新创建
|
||
|
||
# 更新时间戳
|
||
cls._instances[instance_key]['timestamp'] = timezone.now()
|
||
credential = instance.get('gmail_credential')
|
||
gmail_email = credential.gmail_email if credential else '未知'
|
||
credential_name = credential.name if credential else '默认'
|
||
logger.info(f"复用用户 {user.username} 的Gmail服务单例,Gmail账号: {gmail_email},名称: {credential_name}")
|
||
return cls._instances[instance_key]
|
||
|
||
return None
|
||
|
||
@classmethod
|
||
def get_credential_instance(cls, credential):
|
||
"""通过GmailCredential对象获取服务实例"""
|
||
if not credential or not credential.user:
|
||
return None
|
||
return cls.get_instance(credential.user, str(credential.id))
|
||
|
||
@classmethod
|
||
def update_instance(cls, user, credentials, service, gmail_credential=None):
|
||
"""更新用户的Gmail服务实例"""
|
||
user_id = str(user.id)
|
||
|
||
# 确定实例键
|
||
if gmail_credential:
|
||
instance_key = f"{user_id}:{gmail_credential.id}"
|
||
else:
|
||
instance_key = user_id
|
||
|
||
cls._instances[instance_key] = {
|
||
'service': service,
|
||
'credentials': credentials,
|
||
'timestamp': timezone.now(),
|
||
'user': user,
|
||
'gmail_credential': gmail_credential
|
||
}
|
||
|
||
@classmethod
|
||
def clear_instance(cls, user, gmail_credential_id=None):
|
||
"""清除用户的Gmail服务实例"""
|
||
user_id = str(user.id)
|
||
|
||
# 清除特定凭证的实例
|
||
if gmail_credential_id:
|
||
instance_key = f"{user_id}:{gmail_credential_id}"
|
||
if instance_key in cls._instances:
|
||
del cls._instances[instance_key]
|
||
else:
|
||
# 清除该用户的所有实例
|
||
keys_to_delete = []
|
||
for key in cls._instances.keys():
|
||
if key == user_id or key.startswith(f"{user_id}:"):
|
||
keys_to_delete.append(key)
|
||
|
||
for key in keys_to_delete:
|
||
del cls._instances[key]
|
||
|
||
@classmethod
|
||
def get_all_user_instances(cls, user):
|
||
"""获取用户的所有Gmail服务实例"""
|
||
user_id = str(user.id)
|
||
user_instances = {}
|
||
|
||
# 收集该用户所有的实例
|
||
for key, instance in cls._instances.items():
|
||
if key == user_id or key.startswith(f"{user_id}:"):
|
||
credential = instance.get('gmail_credential')
|
||
if credential:
|
||
user_instances[str(credential.id)] = instance
|
||
|
||
return user_instances
|
||
|
||
@classmethod
|
||
def clear_all_instances_by_email(cls, gmail_email):
|
||
"""
|
||
清除与特定Gmail邮箱关联的所有服务实例
|
||
|
||
参数:
|
||
gmail_email: Gmail邮箱地址
|
||
|
||
返回:
|
||
int: 清除的实例数量
|
||
"""
|
||
try:
|
||
# 导入GmailCredential模型
|
||
from .models import GmailCredential
|
||
|
||
# 查找与该邮箱关联的所有凭证
|
||
credentials = GmailCredential.objects.filter(
|
||
gmail_email=gmail_email,
|
||
is_active=True
|
||
).select_related('user')
|
||
|
||
# 记录清除的实例数量
|
||
cleared_count = 0
|
||
|
||
# 清除每个凭证关联的实例
|
||
for credential in credentials:
|
||
user = credential.user
|
||
# 清除特定用户和凭证的实例
|
||
if cls.clear_instance(user, str(credential.id)):
|
||
cleared_count += 1
|
||
|
||
# 同时清除用户的默认实例
|
||
user_id = str(user.id)
|
||
if user_id in cls._instances:
|
||
del cls._instances[user_id]
|
||
cleared_count += 1
|
||
|
||
return cleared_count
|
||
except Exception as e:
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.error(f"清除Gmail邮箱 {gmail_email} 的服务实例时出错: {str(e)}")
|
||
return 0
|
||
|
||
class GmailIntegration:
|
||
"""Gmail集成类"""
|
||
|
||
# Gmail API 权限范围
|
||
SCOPES = ['https://mail.google.com/']
|
||
|
||
def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890', gmail_credential_id=None):
|
||
self.user = user
|
||
self.user_email = user.email if user else None
|
||
self.email = email # 目标邮箱
|
||
self.client_secret = client_secret_json
|
||
self.credentials = None
|
||
self.gmail_service = None
|
||
self.use_proxy = use_proxy
|
||
self.proxy_url = proxy_url
|
||
self.gmail_credential_id = gmail_credential_id
|
||
self.gmail_credential = None
|
||
|
||
# 设置代理
|
||
if self.use_proxy:
|
||
logger.info(f"设置Gmail API代理: {proxy_url}")
|
||
os.environ['HTTP_PROXY'] = proxy_url
|
||
os.environ['HTTPS_PROXY'] = proxy_url
|
||
|
||
# 设置Token文件存储路径
|
||
if user:
|
||
# 使用用户ID创建唯一的token存储路径
|
||
token_file = f"gmail_token_{user.id}.json"
|
||
if gmail_credential_id:
|
||
token_file = f"gmail_token_{user.id}_{gmail_credential_id}.json"
|
||
self.token_storage_path = os.path.join("gmail_tokens", token_file)
|
||
|
||
# 确保目录存在
|
||
os.makedirs("gmail_tokens", exist_ok=True)
|
||
logger.info(f"设置Token存储路径: {self.token_storage_path}")
|
||
|
||
# 尝试从数据库加载凭证
|
||
try:
|
||
if gmail_credential_id:
|
||
# 加载指定ID的凭证
|
||
gmail_cred = GmailCredential.objects.filter(
|
||
id=gmail_credential_id,
|
||
user=user,
|
||
is_active=True
|
||
).first()
|
||
else:
|
||
# 加载默认凭证
|
||
gmail_cred = GmailCredential.objects.filter(
|
||
user=user,
|
||
is_active=True,
|
||
is_default=True
|
||
).first()
|
||
|
||
# 如果没有默认凭证,加载最新的一个
|
||
if not gmail_cred:
|
||
gmail_cred = GmailCredential.objects.filter(
|
||
user=user,
|
||
is_active=True
|
||
).order_by('-updated_at').first()
|
||
|
||
if gmail_cred and gmail_cred.credentials:
|
||
self.gmail_credential = gmail_cred
|
||
logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证 (ID: {gmail_cred.id}, Email: {gmail_cred.gmail_email or '未知'}, 名称: {gmail_cred.name})")
|
||
|
||
# 使用新方法加载凭证
|
||
creds = self._load_credentials_from_storage(gmail_cred.credentials)
|
||
|
||
if creds:
|
||
self.credentials = creds
|
||
# 初始化Gmail服务
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
|
||
logger.info("从数据库凭证初始化Gmail服务成功")
|
||
else:
|
||
logger.error("无法从数据库中加载有效凭证")
|
||
# 标记需要重新认证
|
||
if hasattr(gmail_cred, 'needs_reauth'):
|
||
gmail_cred.needs_reauth = True
|
||
gmail_cred.save()
|
||
logger.info(f"已将Gmail账号 {gmail_cred.gmail_email} 标记为需要重新认证")
|
||
except Exception as e:
|
||
logger.error(f"从数据库加载凭证失败: {str(e)}")
|
||
# 继续使用文件方式
|
||
else:
|
||
self.token_storage_path = "gmail_token.json"
|
||
logger.warning("未提供用户,将使用默认Token存储路径")
|
||
|
||
# 存储对象
|
||
self.storage = file.Storage(self.token_storage_path)
|
||
|
||
def authenticate(self):
|
||
"""获取Gmail API凭证并认证"""
|
||
try:
|
||
# 优先尝试使用单例服务
|
||
if self.user:
|
||
instance = GmailServiceManager.get_instance(self.user, self.gmail_credential_id)
|
||
if instance:
|
||
self.gmail_service = instance['service']
|
||
self.credentials = instance['credentials']
|
||
self.gmail_credential = instance.get('gmail_credential')
|
||
credential_id = str(self.gmail_credential.id) if self.gmail_credential else "未知"
|
||
gmail_email = self.gmail_credential.gmail_email if self.gmail_credential else "未知"
|
||
credential_name = self.gmail_credential.name if self.gmail_credential else "默认"
|
||
logger.info(f"使用现有的Gmail服务单例 (ID: {credential_id}, Email: {gmail_email}, 名称: {credential_name})")
|
||
return True
|
||
|
||
# 以下是原有的认证逻辑...
|
||
# 写入client_secret.json
|
||
if self.client_secret:
|
||
client_secret_path = 'client_secret.json'
|
||
with open(client_secret_path, 'w') as f:
|
||
if isinstance(self.client_secret, str):
|
||
try:
|
||
# 确保是有效的JSON
|
||
import json # 在本地作用域引入
|
||
json_data = json.loads(self.client_secret)
|
||
# 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题
|
||
for key in ['web', 'installed']:
|
||
if key in json_data and 'redirect_uris' in json_data[key]:
|
||
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(json_data, f)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"client_secret不是有效的JSON: {str(e)}")
|
||
return False
|
||
else:
|
||
# 如果是字典,也进行相同处理
|
||
import json # 在本地作用域引入
|
||
client_secret_dict = dict(self.client_secret)
|
||
for key in ['web', 'installed']:
|
||
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
|
||
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(client_secret_dict, f)
|
||
|
||
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
|
||
|
||
# 使用与quickstart.py相同的流程
|
||
logger.info(f"创建或读取token存储: {self.token_storage_path}")
|
||
|
||
# 确保token目录存在
|
||
token_dir = os.path.dirname(self.token_storage_path)
|
||
if token_dir and not os.path.exists(token_dir):
|
||
logger.info(f"创建token目录: {token_dir}")
|
||
os.makedirs(token_dir)
|
||
|
||
store = file.Storage(self.token_storage_path)
|
||
creds = store.get()
|
||
|
||
if not creds or creds.invalid:
|
||
logger.info("没有有效的凭证,需要重新授权")
|
||
if not self.client_secret:
|
||
logger.error("没有提供client_secret_json且找不到有效凭证")
|
||
return False
|
||
|
||
# 强制使用非浏览器认证模式,避免localhost连接问题
|
||
logger.info("使用非浏览器认证模式")
|
||
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
|
||
flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES)
|
||
flow.redirect_uri = redirect_uri
|
||
|
||
# 获取授权URL并抛出异常
|
||
auth_url = flow.step1_get_authorize_url()
|
||
logger.info(f"获取授权URL: {auth_url[:50]}...")
|
||
raise Exception(f"Please visit this URL to authorize: {auth_url}")
|
||
|
||
# 如果有有效凭证,初始化服务
|
||
self.credentials = creds
|
||
logger.info("使用现有凭证初始化Gmail服务")
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
|
||
logger.info("Gmail服务初始化成功")
|
||
|
||
# 获取Gmail账号信息
|
||
gmail_email = None
|
||
try:
|
||
# 调用Gmail API获取用户资料
|
||
profile = self.gmail_service.users().getProfile(userId='me').execute()
|
||
gmail_email = profile.get('emailAddress')
|
||
logger.info(f"获取到Gmail账号: {gmail_email}")
|
||
except Exception as profile_error:
|
||
logger.error(f"获取Gmail账号失败: {str(profile_error)}")
|
||
|
||
# 即使获取Gmail账号失败,也要尝试获取消息以提取邮箱
|
||
if not gmail_email:
|
||
try:
|
||
# 尝试获取一条消息来提取邮箱
|
||
messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute()
|
||
if 'messages' in messages and len(messages['messages']) > 0:
|
||
msg_id = messages['messages'][0]['id']
|
||
msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute()
|
||
# 从消息中查找与当前用户匹配的邮箱
|
||
if 'payload' in msg and 'headers' in msg['payload']:
|
||
for header in msg['payload']['headers']:
|
||
if header['name'] in ['From', 'To', 'Cc', 'Bcc']:
|
||
if '<' in header['value'] and '>' in header['value']:
|
||
email = header['value'].split('<')[1].split('>')[0]
|
||
# 如果找到一个邮箱就使用它
|
||
if not gmail_email:
|
||
gmail_email = email
|
||
logger.info(f"从消息中提取到Gmail账号: {gmail_email}")
|
||
except Exception as msg_error:
|
||
logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}")
|
||
|
||
# 保存凭证到数据库
|
||
if self.user:
|
||
from django.utils import timezone
|
||
logger.info("保存凭证到数据库")
|
||
|
||
# 将凭证对象序列化 - 改为JSON序列化
|
||
try:
|
||
# 首先尝试使用JSON序列化
|
||
credentials_data = creds.to_json()
|
||
except Exception as json_error:
|
||
logger.error(f"JSON序列化凭证失败: {str(json_error)}")
|
||
# 备选使用pickle序列化
|
||
credentials_data = pickle.dumps(creds)
|
||
logger.info("使用pickle序列化凭证")
|
||
|
||
# 如果提供了具体的gmail_credential_id,更新对应的记录
|
||
if self.gmail_credential_id:
|
||
# 更新指定ID的凭证
|
||
try:
|
||
gmail_credential = GmailCredential.objects.get(
|
||
id=self.gmail_credential_id,
|
||
user=self.user
|
||
)
|
||
|
||
# 更新凭证信息
|
||
gmail_credential.credentials = credentials_data
|
||
gmail_credential.gmail_email = gmail_email
|
||
gmail_credential.updated_at = timezone.now()
|
||
gmail_credential.is_active = True
|
||
gmail_credential.save()
|
||
|
||
self.gmail_credential = gmail_credential
|
||
logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证,Gmail账号: {gmail_email}")
|
||
except GmailCredential.DoesNotExist:
|
||
logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证,将创建新凭证")
|
||
# 如果指定的凭证不存在,创建新凭证
|
||
self.gmail_credential_id = None
|
||
|
||
# 如果没有具体的gmail_credential_id,或者指定的不存在,创建或更新默认凭证
|
||
if not self.gmail_credential_id:
|
||
# 检查是否已存在相同gmail_email的凭证
|
||
existing_credential = None
|
||
if gmail_email:
|
||
existing_credential = GmailCredential.objects.filter(
|
||
user=self.user,
|
||
gmail_email=gmail_email,
|
||
is_active=True
|
||
).first()
|
||
|
||
if existing_credential:
|
||
# 更新现有凭证
|
||
existing_credential.credentials = credentials_data
|
||
existing_credential.token_path = self.token_storage_path
|
||
existing_credential.updated_at = timezone.now()
|
||
existing_credential.save()
|
||
|
||
self.gmail_credential = existing_credential
|
||
logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证")
|
||
else:
|
||
# 获取Gmail账户数量
|
||
gmail_count = GmailCredential.objects.filter(user=self.user).count()
|
||
|
||
# 创建新凭证
|
||
name = f"Gmail账号 {gmail_count + 1}"
|
||
if gmail_email:
|
||
name = gmail_email
|
||
else:
|
||
# 确保gmail_email有默认值,避免null错误
|
||
gmail_email = "未知邮箱"
|
||
|
||
# 将凭证转换为JSON字符串
|
||
if isinstance(credentials_data, dict):
|
||
# 确保json模块在本地作用域可访问
|
||
import json
|
||
credentials_data = json.dumps(credentials_data)
|
||
|
||
gmail_credential = GmailCredential.objects.create(
|
||
user=self.user,
|
||
credentials=credentials_data,
|
||
token_path=self.token_storage_path,
|
||
gmail_email=gmail_email,
|
||
name=name,
|
||
is_default=(gmail_count == 0), # 第一个账号设为默认
|
||
updated_at=timezone.now(),
|
||
is_active=True
|
||
)
|
||
|
||
self.gmail_credential = gmail_credential
|
||
logger.info(f"已创建新的Gmail凭证,Gmail账号: {gmail_email}, 名称: {name}")
|
||
|
||
# 认证成功后更新单例
|
||
if self.user and self.gmail_service and self.credentials:
|
||
GmailServiceManager.update_instance(
|
||
self.user,
|
||
self.credentials,
|
||
self.gmail_service,
|
||
self.gmail_credential
|
||
)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
# 保留授权URL异常,视图层会处理
|
||
if "Please visit this URL" in str(e):
|
||
raise e
|
||
logger.error(f"Gmail认证失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
finally:
|
||
# 清理client_secret.json文件
|
||
if self.client_secret and os.path.exists('client_secret.json'):
|
||
logger.info("删除临时client_secret文件")
|
||
os.unlink('client_secret.json')
|
||
|
||
def create_talent_knowledge_base(self, talent_email):
|
||
"""
|
||
创建或获取与talent_email关联的知识库
|
||
|
||
Args:
|
||
talent_email (str): 达人Gmail邮箱地址
|
||
|
||
Returns:
|
||
tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志
|
||
"""
|
||
try:
|
||
# 优先查找现有的关联关系
|
||
mapping = GmailTalentMapping.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
is_active=True
|
||
).first()
|
||
|
||
if mapping and mapping.knowledge_base:
|
||
logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}")
|
||
return mapping.knowledge_base, False
|
||
|
||
# 查找与该达人邮箱关联的知识库
|
||
# 根据达人邮箱生成一个唯一的标识名称
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
|
||
# 检查该名称的知识库是否已存在
|
||
existing_kb = KnowledgeBase.objects.filter(
|
||
name=kb_name,
|
||
user_id=self.user.id
|
||
).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"找到现有知识库: {kb_name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
|
||
# 没有找到现有知识库,直接使用基本方法创建
|
||
logger.info(f"使用基本方法创建知识库: {kb_name}")
|
||
return self._create_knowledge_base_basic(talent_email)
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 创建失败时,尝试使用基本方法创建
|
||
logger.info("尝试使用基本方法创建知识库")
|
||
return self._create_knowledge_base_basic(talent_email)
|
||
|
||
def _create_knowledge_base_basic(self, talent_email):
|
||
"""创建基础知识库,不处理映射关系"""
|
||
try:
|
||
# 根据达人邮箱生成一个唯一的标识名称
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
|
||
# 检查该名称的知识库是否已存在
|
||
existing_kb = KnowledgeBase.objects.filter(
|
||
name=kb_name,
|
||
user_id=self.user.id
|
||
).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"找到现有知识库: {kb_name}")
|
||
|
||
# 检查external_id是否存在,如果不存在则创建
|
||
if not existing_kb.external_id:
|
||
logger.info(f"知识库 {kb_name} 缺少external_id,尝试创建外部知识库")
|
||
try:
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
external_id = kb_viewset._create_external_dataset(existing_kb)
|
||
if external_id:
|
||
existing_kb.external_id = external_id
|
||
existing_kb.save()
|
||
logger.info(f"成功为现有知识库创建外部知识库: {external_id}")
|
||
else:
|
||
logger.error("创建外部知识库失败:未返回external_id")
|
||
return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"}
|
||
except Exception as e:
|
||
logger.error(f"为现有知识库创建外部知识库失败: {str(e)}")
|
||
return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"}
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
|
||
# 创建新知识库
|
||
try:
|
||
knowledge_base = KnowledgeBase.objects.create(
|
||
name=kb_name,
|
||
desc=f"与{talent_email}的Gmail邮件交流记录",
|
||
type="private",
|
||
user_id=self.user.id,
|
||
documents=[]
|
||
)
|
||
except django.db.utils.IntegrityError as e:
|
||
# 处理名称重复的情况
|
||
logger.warning(f"知识库名称'{kb_name}'已存在,尝试获取或创建带随机后缀的名称")
|
||
|
||
# 先尝试查找已存在的知识库(不限制用户)
|
||
existing_kb = KnowledgeBase.objects.filter(name=kb_name).first()
|
||
|
||
if existing_kb and str(existing_kb.user_id) == str(self.user.id):
|
||
# 如果存在且属于当前用户,直接使用
|
||
logger.info(f"找到属于当前用户的知识库: {kb_name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
else:
|
||
# 如果不存在或不属于当前用户,创建带随机后缀的新知识库
|
||
import random
|
||
import string
|
||
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5))
|
||
new_kb_name = f"{kb_name}-{suffix}"
|
||
|
||
logger.info(f"创建带随机后缀的知识库: {new_kb_name}")
|
||
knowledge_base = KnowledgeBase.objects.create(
|
||
name=new_kb_name,
|
||
desc=f"与{talent_email}的Gmail邮件交流记录",
|
||
type="private",
|
||
user_id=self.user.id,
|
||
documents=[]
|
||
)
|
||
|
||
# 创建外部知识库
|
||
try:
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
external_id = kb_viewset._create_external_dataset(knowledge_base)
|
||
if external_id:
|
||
knowledge_base.external_id = external_id
|
||
knowledge_base.save()
|
||
logger.info(f"成功创建外部知识库: {external_id}")
|
||
except Exception as e:
|
||
logger.error(f"创建外部知识库失败: {str(e)}")
|
||
# 继续执行,不影响基本功能
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
knowledge_base=knowledge_base,
|
||
is_active=True
|
||
)
|
||
|
||
logger.info(f"成功创建新知识库: {knowledge_base.name}, ID: {knowledge_base.id}")
|
||
return knowledge_base, True
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 尝试直接获取已存在的知识库作为最后手段
|
||
try:
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
existing_kb = KnowledgeBase.objects.filter(name__startswith=kb_name).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"在错误处理中找到可用的知识库: {existing_kb.name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
except Exception as inner_e:
|
||
logger.error(f"错误处理中尝试获取知识库失败: {str(inner_e)}")
|
||
|
||
# 如果所有尝试都失败,抛出异常
|
||
raise
|
||
|
||
def get_conversations(self, talent_gmail):
|
||
"""获取与特定用户的所有邮件对话"""
|
||
try:
|
||
if not self.gmail_service:
|
||
logger.error("Gmail服务未初始化")
|
||
return []
|
||
|
||
# 使用crushwds@gmail.com作为固定邮箱
|
||
email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱
|
||
email2 = talent_gmail # 使用参数传入的目标邮箱
|
||
|
||
logger.info(f"执行Gmail查询: {email1} 与 {email2}")
|
||
|
||
# 构建搜索查询
|
||
query = f"from:({email1} OR {email2}) to:({email1} OR {email2})"
|
||
|
||
# 获取所有匹配的邮件
|
||
response = self.gmail_service.users().messages().list(userId='me', q=query).execute()
|
||
messages = []
|
||
|
||
if 'messages' in response:
|
||
messages.extend(response['messages'])
|
||
message_count = len(messages)
|
||
logger.info(f"找到 {message_count} 封邮件")
|
||
|
||
# 分页获取所有邮件
|
||
while 'nextPageToken' in response:
|
||
page_token = response['nextPageToken']
|
||
response = self.gmail_service.users().messages().list(
|
||
userId='me',
|
||
q=query,
|
||
pageToken=page_token
|
||
).execute()
|
||
if 'messages' in response:
|
||
new_messages = response['messages']
|
||
messages.extend(new_messages)
|
||
new_count = len(new_messages)
|
||
message_count += new_count
|
||
logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}")
|
||
else:
|
||
logger.warning(f"未找到匹配邮件")
|
||
|
||
# 处理每封邮件
|
||
conversations = []
|
||
if messages:
|
||
logger.info(f"开始获取 {len(messages)} 封邮件详情...")
|
||
|
||
for i, msg in enumerate(messages):
|
||
try:
|
||
message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute()
|
||
email_data = self._extract_email_content(message)
|
||
if email_data:
|
||
conversations.append(email_data)
|
||
if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度
|
||
logger.info(f"已处理 {i+1}/{len(messages)} 封邮件")
|
||
else:
|
||
logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败")
|
||
except Exception as e:
|
||
logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}")
|
||
|
||
# 按时间排序
|
||
conversations.sort(key=lambda x: x['date'])
|
||
|
||
logger.info(f"总共找到并解析了 {len(conversations)} 封邮件")
|
||
return conversations
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Gmail对话失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return []
|
||
|
||
def _extract_email_content(self, message):
|
||
"""提取邮件内容,完全按照quickstart.py的get_email_content函数实现"""
|
||
try:
|
||
message_id = message['id'] # 获取邮件ID
|
||
payload = message['payload']
|
||
headers = payload['headers']
|
||
|
||
# 获取邮件基本信息
|
||
email_data = {
|
||
'id': message_id, # 保存邮件ID
|
||
'subject': '',
|
||
'from': '',
|
||
'date': '',
|
||
'body': '',
|
||
'attachments': [] # 新增附件列表
|
||
}
|
||
|
||
# 提取头部信息
|
||
for header in headers:
|
||
if header['name'] == 'Subject':
|
||
email_data['subject'] = header['value']
|
||
elif header['name'] == 'From':
|
||
email_data['from'] = header['value']
|
||
elif header['name'] == 'Date':
|
||
date_value = header['value']
|
||
logger.info(f"原始邮件日期字符串: '{date_value}'")
|
||
|
||
try:
|
||
# 打印原始日期字符串信息
|
||
import dateutil.parser as date_parser
|
||
import pytz
|
||
from django.utils import timezone
|
||
|
||
# 先尝试解析日期
|
||
date = date_parser.parse(date_value)
|
||
logger.info(f"解析后的日期对象: {date}, 是否有时区: {date.tzinfo is not None}")
|
||
|
||
# 处理时区问题
|
||
if date.tzinfo is not None:
|
||
# 已有时区的日期,转换为系统时区
|
||
if hasattr(settings, 'TIME_ZONE'):
|
||
system_tz = pytz.timezone(settings.TIME_ZONE)
|
||
date = date.astimezone(system_tz)
|
||
logger.info(f"转换到系统时区后: {date}")
|
||
|
||
# 如果需要naive datetime,删除时区信息
|
||
if hasattr(settings, 'USE_TZ') and not settings.USE_TZ:
|
||
date = date.replace(tzinfo=None)
|
||
logger.info(f"移除时区信息后: {date}")
|
||
else:
|
||
# 无时区的日期,如果系统使用时区,添加时区信息
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
|
||
try:
|
||
date = timezone.make_aware(date)
|
||
logger.info(f"添加时区信息后: {date}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"添加时区信息失败: {str(tz_error)}")
|
||
|
||
# 格式化为字符串
|
||
email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"最终日期字符串: {email_data['date']}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析日期失败: {str(e)}, 原始值: '{date_value}'")
|
||
# 保留原始值
|
||
email_data['date'] = date_value
|
||
|
||
# 定义一个递归函数来处理所有部分和附件
|
||
def process_parts(parts):
|
||
for part in parts:
|
||
# 检查是否是附件
|
||
if 'filename' in part and part['filename']:
|
||
attachment = {
|
||
'filename': part['filename'],
|
||
'mimeType': part['mimeType'],
|
||
'size': part['body'].get('size', 0)
|
||
}
|
||
|
||
# 如果有附件内容数据,可以获取附件ID
|
||
if 'attachmentId' in part['body']:
|
||
attachment['attachmentId'] = part['body']['attachmentId']
|
||
|
||
email_data['attachments'].append(attachment)
|
||
|
||
# 处理文本内容
|
||
if part['mimeType'] == 'text/plain' and not email_data['body']:
|
||
data = part['body'].get('data', '')
|
||
if data:
|
||
try:
|
||
text = base64.urlsafe_b64decode(data).decode('utf-8')
|
||
email_data['body'] = text
|
||
except Exception as e:
|
||
logger.error(f"解码邮件内容失败: {str(e)}")
|
||
|
||
# 递归处理多部分内容
|
||
if 'parts' in part:
|
||
process_parts(part['parts'])
|
||
|
||
# 处理邮件正文和附件
|
||
if 'parts' in payload:
|
||
process_parts(payload['parts'])
|
||
elif 'body' in payload and 'data' in payload['body']:
|
||
# 没有parts,直接处理body
|
||
data = payload['body'].get('data', '')
|
||
if data:
|
||
try:
|
||
text = base64.urlsafe_b64decode(data).decode('utf-8')
|
||
email_data['body'] = text
|
||
except Exception as e:
|
||
logger.error(f"解码邮件内容失败: {str(e)}")
|
||
|
||
return email_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮件内容时出错: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def download_attachment(self, message_id, attachment_id, filename):
|
||
"""下载邮件附件"""
|
||
try:
|
||
attachment = self.gmail_service.users().messages().attachments().get(
|
||
userId='me', messageId=message_id, id=attachment_id
|
||
).execute()
|
||
|
||
data = attachment['data']
|
||
file_data = base64.urlsafe_b64decode(data)
|
||
|
||
# 创建附件目录
|
||
attachments_dir = 'gmail_attachments'
|
||
if not os.path.exists(attachments_dir):
|
||
os.makedirs(attachments_dir)
|
||
|
||
# 保存附件
|
||
filepath = os.path.join(attachments_dir, f"{message_id}_{filename}")
|
||
with open(filepath, 'wb') as f:
|
||
f.write(file_data)
|
||
|
||
return filepath
|
||
except Exception as e:
|
||
logger.error(f"下载附件失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def save_conversations_to_knowledge_base(self, conversations, knowledge_base):
|
||
"""
|
||
将Gmail对话保存到知识库
|
||
|
||
Args:
|
||
conversations: Gmail邮件列表
|
||
knowledge_base: 保存到的知识库对象
|
||
"""
|
||
try:
|
||
# 导入所需模型
|
||
from .models import GmailAttachment, ChatHistory, KnowledgeBaseDocument, GmailTalentMapping
|
||
|
||
# 检查入参
|
||
if not conversations or not knowledge_base:
|
||
logger.error("参数不完整: conversations或knowledge_base为空")
|
||
return {"conversation_id": None, "success": False, "error": "参数不完整"}
|
||
|
||
if not conversations:
|
||
logger.warning("没有邮件对话可保存")
|
||
return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"}
|
||
|
||
# 确保knowledge_base有external_id
|
||
if not knowledge_base.external_id:
|
||
logger.warning(f"知识库 {knowledge_base.name} 缺少external_id,尝试创建外部知识库")
|
||
try:
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
external_id = kb_viewset._create_external_dataset(knowledge_base)
|
||
if external_id:
|
||
knowledge_base.external_id = external_id
|
||
knowledge_base.save()
|
||
logger.info(f"成功为知识库创建外部知识库: {external_id}")
|
||
else:
|
||
logger.error("创建外部知识库失败:未返回external_id")
|
||
return {"conversation_id": None, "success": False, "error": "创建外部知识库失败"}
|
||
except Exception as e:
|
||
logger.error(f"为知识库创建外部知识库失败: {str(e)}")
|
||
return {"conversation_id": None, "success": False, "error": f"创建外部知识库失败: {str(e)}"}
|
||
|
||
# 查找现有的对话ID - 优先使用talent_email查找
|
||
conversation_id = None
|
||
|
||
# 1. 首先尝试通过talent_email查找现有映射
|
||
if self.email:
|
||
existing_mapping = GmailTalentMapping.objects.filter(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
knowledge_base=knowledge_base,
|
||
is_active=True
|
||
).first()
|
||
|
||
if existing_mapping and existing_mapping.conversation_id:
|
||
conversation_id = existing_mapping.conversation_id
|
||
logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}")
|
||
|
||
# 2. 如果上面没找到,尝试通过知识库查找任何相关的对话
|
||
if not conversation_id:
|
||
existing_conversation = ChatHistory.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
user=self.user,
|
||
is_deleted=False
|
||
).values('conversation_id').distinct().first()
|
||
|
||
if existing_conversation:
|
||
conversation_id = existing_conversation['conversation_id']
|
||
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
|
||
|
||
# 如果有talent_email,更新或创建映射关系
|
||
if self.email:
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': conversation_id,
|
||
'is_active': True
|
||
}
|
||
)
|
||
logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}")
|
||
|
||
# 3. 如果仍然没找到,则创建新的对话ID
|
||
if not conversation_id:
|
||
# 生成唯一的对话ID
|
||
conversation_id = str(uuid.uuid4())
|
||
logger.info(f"创建新的对话ID: {conversation_id}")
|
||
|
||
# 创建或更新Gmail和达人的映射关系
|
||
if self.email:
|
||
talent_mapping, created = GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': conversation_id,
|
||
'is_active': True
|
||
}
|
||
)
|
||
action = "创建" if created else "更新"
|
||
logger.info(f"已{action}Gmail达人映射: {self.email} -> {knowledge_base.name}")
|
||
|
||
# 构建知识库文档
|
||
logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话")
|
||
document = {
|
||
"title": f"与{self.email or '联系人'}的Gmail邮件对话",
|
||
"url": "",
|
||
"source_type": "gmail",
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 对话按时间顺序排序
|
||
conversations.sort(key=lambda x: x.get('date', ''))
|
||
|
||
# 将对话添加到文档
|
||
previous_message_dict = {} # 用于存储邮件时间与消息ID的映射,以便找到正确的parent_id
|
||
|
||
# 首先查找现有的聊天记录
|
||
existing_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
# 如果已有消息,跳过重复导入
|
||
existing_gmail_ids = set()
|
||
if existing_messages.exists():
|
||
for msg in existing_messages:
|
||
if msg.metadata and 'gmail_message_id' in msg.metadata:
|
||
existing_gmail_ids.add(msg.metadata['gmail_message_id'])
|
||
logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入")
|
||
|
||
for message in conversations:
|
||
try:
|
||
# 跳过已导入的消息
|
||
if message.get('id') in existing_gmail_ids:
|
||
logger.info(f"跳过已导入的消息: {message.get('id')}")
|
||
continue
|
||
|
||
# 提取邮件内容
|
||
sender = message.get('from', '未知发件人')
|
||
date_str = message.get('date', '未知时间')
|
||
subject = message.get('subject', '无主题')
|
||
body = message.get('body', '').strip()
|
||
|
||
logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...")
|
||
|
||
# 判断角色
|
||
# 从发件人地址中提取邮箱部分
|
||
sender_email = ''
|
||
if '<' in sender and '>' in sender:
|
||
# 格式如 "姓名 <email@example.com>"
|
||
sender_email = sender.split('<')[1].split('>')[0]
|
||
else:
|
||
# 格式可能直接是邮箱
|
||
sender_email = sender
|
||
|
||
# 修改角色判断逻辑:根据Gmail凭证邮箱和达人邮箱判断
|
||
# 获取当前Gmail凭证邮箱
|
||
gmail_credential_email = None
|
||
if self.gmail_credential and self.gmail_credential.gmail_email:
|
||
gmail_credential_email = self.gmail_credential.gmail_email.lower()
|
||
|
||
# 判断是否是凭证邮箱发出的邮件
|
||
is_credential_email = gmail_credential_email and gmail_credential_email == sender_email.lower()
|
||
|
||
# 判断是否是系统用户邮箱
|
||
is_user_email = self.user.email.lower() == sender_email.lower()
|
||
|
||
# 判断是否是目标达人邮箱
|
||
is_talent_email = False
|
||
if self.email:
|
||
is_talent_email = self.email.lower() == sender_email.lower()
|
||
|
||
# 如果是Gmail凭证邮箱或用户邮箱,设为user角色;如果是达人邮箱,设为assistant角色
|
||
if is_credential_email or is_user_email:
|
||
role = 'user'
|
||
elif is_talent_email:
|
||
role = 'assistant'
|
||
else:
|
||
# 尝试检查是否有与发件人邮箱相匹配的人才映射
|
||
from .models import GmailTalentMapping
|
||
talent_mapping = GmailTalentMapping.objects.filter(
|
||
user=self.user,
|
||
talent_email=sender_email,
|
||
is_active=True
|
||
).first()
|
||
|
||
# 如果有映射关系则视为达人邮箱,设为assistant角色
|
||
role = 'assistant' if talent_mapping else 'user'
|
||
|
||
logger.info(f"设置消息角色: {role},发件人: {sender_email},用户Gmail: {gmail_credential_email},用户邮箱: {self.user.email},目标达人: {self.email},是否有达人映射: {bool(talent_mapping) if 'talent_mapping' in locals() else False}")
|
||
|
||
# 将邮件添加到文档
|
||
paragraph = {
|
||
"id": f"msg_{len(document['paragraphs']) + 1}",
|
||
"title": f"{date_str} - {sender} - {subject}",
|
||
"content": body,
|
||
"meta": {
|
||
"sender": sender,
|
||
"date": date_str,
|
||
"subject": subject,
|
||
"has_attachments": len(message.get('attachments', [])) > 0,
|
||
"message_id": message.get('id', '')
|
||
}
|
||
}
|
||
document['paragraphs'].append(paragraph)
|
||
|
||
# 解析邮件日期为datetime对象
|
||
try:
|
||
# 先检查是否是时间戳格式
|
||
if isinstance(date_str, str) and date_str.isdigit():
|
||
# 如果是时间戳,直接转换
|
||
date_obj = datetime.fromtimestamp(int(date_str))
|
||
else:
|
||
# 尝试标准格式解析
|
||
try:
|
||
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
# 如果标准格式解析失败,使用dateutil更灵活的解析
|
||
import dateutil.parser as date_parser
|
||
date_obj = date_parser.parse(date_str)
|
||
# 如果解析出的日期有时区信息,转换为不带时区的日期
|
||
if date_obj.tzinfo is not None:
|
||
date_obj = date_obj.replace(tzinfo=None)
|
||
except (ValueError, TypeError) as e:
|
||
# 如果解析失败,使用当前时间
|
||
logger.warning(f"无法解析邮件日期: {date_str},错误: {str(e)},使用当前时间")
|
||
date_obj = datetime.now()
|
||
|
||
# 查找parent_id(查找日期早于当前邮件且最接近的消息)
|
||
parent_id = None
|
||
closest_date = None
|
||
|
||
for d, mid in previous_message_dict.items():
|
||
if d < date_obj and (closest_date is None or d > closest_date):
|
||
closest_date = d
|
||
parent_id = mid
|
||
|
||
# 保存消息到聊天历史,使用邮件实际日期
|
||
from django.utils import timezone
|
||
|
||
# 确保时区处理正确
|
||
try:
|
||
# 检查date_obj是否已经是aware
|
||
if timezone.is_aware(date_obj):
|
||
aware_date = date_obj
|
||
logger.info(f"日期已经包含时区信息: {aware_date}")
|
||
else:
|
||
# 将naive转换为aware
|
||
aware_date = timezone.make_aware(date_obj)
|
||
logger.info(f"日期添加时区信息后: {aware_date}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"时区转换失败: {str(tz_error)},使用当前时间")
|
||
aware_date = timezone.now()
|
||
|
||
# 创建消息记录
|
||
chat_message = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
parent_id=parent_id,
|
||
role=role,
|
||
content=f"{subject}\n\n{body}",
|
||
metadata={
|
||
'gmail_message_id': message.get('id', ''),
|
||
'from': sender,
|
||
'date': date_str,
|
||
'subject': subject,
|
||
'dataset_id_list': [str(knowledge_base.id)],
|
||
'dataset_names': [knowledge_base.name]
|
||
},
|
||
created_at=aware_date # 设置正确的创建时间
|
||
)
|
||
|
||
# 更新previous_message_dict
|
||
previous_message_dict[date_obj] = str(chat_message.id)
|
||
|
||
# 处理附件
|
||
attachments = message.get('attachments', [])
|
||
if attachments:
|
||
for attachment in attachments:
|
||
if 'attachmentId' in attachment and 'filename' in attachment:
|
||
try:
|
||
# 下载附件
|
||
filepath = self.download_attachment(
|
||
message_id=message['id'],
|
||
attachment_id=attachment['attachmentId'],
|
||
filename=attachment['filename']
|
||
)
|
||
|
||
if filepath:
|
||
# 记录附件信息
|
||
GmailAttachment.objects.create(
|
||
chat_message=chat_message,
|
||
gmail_message_id=message['id'],
|
||
filename=attachment['filename'],
|
||
filepath=filepath,
|
||
mimetype=attachment.get('mimeType', ''),
|
||
filesize=attachment.get('size', 0)
|
||
)
|
||
logger.info(f"已保存附件: {attachment['filename']}")
|
||
except Exception as e:
|
||
logger.error(f"保存附件失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮件时出错: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 将文档添加到知识库
|
||
logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}")
|
||
|
||
# 准备文档数据结构
|
||
doc_data = {
|
||
"name": f"Gmail对话与{self.email or '联系人'}.txt",
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有对话转换为段落
|
||
for paragraph in document['paragraphs']:
|
||
doc_data["paragraphs"].append({
|
||
"title": paragraph['title'],
|
||
"content": paragraph['content'],
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 检查paragraphs是否为空
|
||
if not doc_data["paragraphs"]:
|
||
logger.warning("没有段落内容可上传,添加默认段落")
|
||
doc_data["paragraphs"].append({
|
||
"title": "初始化邮件对话",
|
||
"content": f"与{self.email or '联系人'}的邮件对话准备就绪,等待新的邮件内容。",
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用知识库的文档上传API
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 添加随机文档ID
|
||
doc_data["id"] = str(uuid.uuid4())
|
||
logger.info(f"生成随机文档ID: {doc_data['id']}")
|
||
|
||
# 上传文档
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_data["name"],
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail对话文档上传成功,ID: {document_id}")
|
||
|
||
# 上传所有附件
|
||
self._upload_attachments_to_knowledge_base(knowledge_base, conversations)
|
||
else:
|
||
# 上传失败,记录错误信息
|
||
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
|
||
logger.error(f"Gmail对话文档上传失败: {error_msg}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}")
|
||
|
||
return {
|
||
"conversation_id": conversation_id,
|
||
"success": True,
|
||
"message_count": len(conversations),
|
||
"knowledge_base_id": str(knowledge_base.id)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存Gmail对话到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return {"conversation_id": None, "success": False, "error": str(e)}
|
||
|
||
def send_email(self, to_email, subject, body, conversation_id=None, attachments=None):
|
||
"""发送邮件并保存到聊天记录"""
|
||
try:
|
||
# 构建邮件内容
|
||
if attachments and len(attachments) > 0:
|
||
message = self._create_message_with_attachment(to_email, subject, body, attachments)
|
||
else:
|
||
message = self._create_message(to_email, subject, body)
|
||
|
||
# 发送邮件
|
||
sent_message = self.gmail_service.users().messages().send(
|
||
userId='me', body=message
|
||
).execute()
|
||
|
||
message_id = sent_message['id']
|
||
logger.info(f"邮件发送成功, ID: {message_id}")
|
||
|
||
# 获取邮件详情,包括服务器时间戳
|
||
try:
|
||
message_detail = self.gmail_service.users().messages().get(
|
||
userId='me', id=message_id
|
||
).execute()
|
||
|
||
# 从消息详情中提取时间戳
|
||
internal_date = message_detail.get('internalDate')
|
||
if internal_date:
|
||
try:
|
||
# 转换毫秒时间戳为datetime
|
||
email_date = datetime.fromtimestamp(int(internal_date)/1000)
|
||
logger.info(f"从时间戳解析的日期: {email_date}")
|
||
|
||
# 处理时区
|
||
from django.utils import timezone
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ and not timezone.is_aware(email_date):
|
||
email_date = timezone.make_aware(email_date)
|
||
logger.info(f"添加时区信息后: {email_date}")
|
||
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"最终格式化日期: {date_str}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"处理邮件时间戳失败: {str(tz_error)},使用当前时间")
|
||
from django.utils import timezone
|
||
email_date = timezone.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
logger.warning("邮件没有时间戳,使用当前时间")
|
||
from django.utils import timezone
|
||
email_date = timezone.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except Exception as e:
|
||
logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间")
|
||
email_date = datetime.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 如果有conversation_id,保存到聊天记录
|
||
if conversation_id:
|
||
# 查找现有的聊天记录
|
||
from django.db.models import Q
|
||
existing_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
if not existing_messages.exists():
|
||
logger.warning(f"找不到对话ID: {conversation_id},无法保存消息")
|
||
return message_id
|
||
|
||
# 查找关联的知识库
|
||
first_message = existing_messages.first()
|
||
if first_message.knowledge_base:
|
||
knowledge_base = first_message.knowledge_base
|
||
else:
|
||
# 如果第一条消息没有知识库,尝试从metadata获取
|
||
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
|
||
kb_id = first_message.metadata.get('dataset_id_list', [])[0]
|
||
knowledge_base = KnowledgeBase.objects.get(id=kb_id)
|
||
else:
|
||
logger.error(f"找不到关联的知识库,无法保存消息")
|
||
return message_id
|
||
|
||
# 查找parent_id:时间早于当前邮件且最接近的消息,避免时区问题
|
||
parent_id = None
|
||
try:
|
||
# 使用简单查询,不依赖时区
|
||
previous_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('-id')[:1] # 使用ID降序排序而非时间
|
||
|
||
if previous_messages:
|
||
parent_id = str(previous_messages[0].id)
|
||
except Exception as e:
|
||
logger.error(f"查找父消息失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 构建metadata
|
||
metadata = {
|
||
'gmail_message_id': message_id,
|
||
'from': self.user.email,
|
||
'to': to_email,
|
||
'date': date_str,
|
||
'subject': subject,
|
||
'dataset_id_list': [str(knowledge_base.id)],
|
||
'dataset_names': [knowledge_base.name]
|
||
}
|
||
|
||
# 如果现有消息有dataset_id_list,保留它
|
||
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
|
||
metadata['dataset_id_list'] = first_message.metadata['dataset_id_list']
|
||
# 尝试获取对应的dataset_names
|
||
if 'dataset_names' in first_message.metadata:
|
||
metadata['dataset_names'] = first_message.metadata['dataset_names']
|
||
|
||
# 创建聊天记录,使用当前时间而不是邮件的实际时间
|
||
chat_message = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
parent_id=parent_id,
|
||
role='user', # 用户发送的消息,角色固定为'user'
|
||
content=f"[{subject}] {body}",
|
||
metadata=metadata
|
||
# 不设置created_at,使用数据库默认时间
|
||
)
|
||
|
||
# 如果有附件,保存附件信息
|
||
if attachments and len(attachments) > 0:
|
||
for att_path in attachments:
|
||
file_name = os.path.basename(att_path)
|
||
file_size = os.path.getsize(att_path)
|
||
|
||
# 获取MIME类型
|
||
mime_type, _ = mimetypes.guess_type(att_path)
|
||
if not mime_type:
|
||
mime_type = 'application/octet-stream'
|
||
|
||
# 保存附件信息到数据库
|
||
gmail_attachment = GmailAttachment.objects.create(
|
||
chat_message=chat_message,
|
||
gmail_message_id=message_id,
|
||
filename=file_name,
|
||
filepath=att_path,
|
||
mimetype=mime_type,
|
||
filesize=file_size
|
||
)
|
||
|
||
# 更新知识库文档
|
||
self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email)
|
||
|
||
return message_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送邮件失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
raise
|
||
|
||
def _create_message(self, to, subject, body):
|
||
"""创建邮件消息"""
|
||
message = MIMEText(body)
|
||
message['to'] = to
|
||
message['subject'] = subject
|
||
|
||
# 编码为JSON安全的字符串
|
||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||
return {'raw': raw}
|
||
|
||
def _create_message_with_attachment(self, to, subject, body, attachment_files):
|
||
"""创建带附件的邮件消息"""
|
||
message = MIMEMultipart()
|
||
message['to'] = to
|
||
message['subject'] = subject
|
||
|
||
# 添加邮件正文
|
||
msg = MIMEText(body)
|
||
message.attach(msg)
|
||
|
||
# 添加附件
|
||
for file in attachment_files:
|
||
try:
|
||
with open(file, 'rb') as f:
|
||
part = MIMEBase('application', 'octet-stream')
|
||
part.set_payload(f.read())
|
||
|
||
# 编码附件内容
|
||
encoders.encode_base64(part)
|
||
|
||
# 添加头部信息
|
||
filename = os.path.basename(file)
|
||
part.add_header(
|
||
'Content-Disposition',
|
||
f'attachment; filename="{filename}"'
|
||
)
|
||
message.attach(part)
|
||
except Exception as e:
|
||
logger.error(f"添加附件 {file} 失败: {str(e)}")
|
||
|
||
# 编码为JSON安全的字符串
|
||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||
return {'raw': raw}
|
||
|
||
def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient):
|
||
"""将新发送的邮件内容追加到知识库文档中"""
|
||
try:
|
||
# 准备文档数据结构
|
||
doc_data = {
|
||
"name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt",
|
||
"paragraphs": [
|
||
{
|
||
"title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}",
|
||
"content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}",
|
||
"is_active": True,
|
||
"problem_list": []
|
||
}
|
||
]
|
||
}
|
||
|
||
# 调用知识库的文档上传API
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 验证知识库是否有external_id
|
||
if not knowledge_base.external_id:
|
||
logger.error(f"知识库没有external_id,无法上传文档: {knowledge_base.name}")
|
||
return None
|
||
|
||
# 上传文档
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_data["name"],
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail回复文档上传成功,ID: {document_id}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
return upload_response
|
||
else:
|
||
# 上传失败,记录错误信息
|
||
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
|
||
logger.error(f"Gmail回复文档上传失败: {error_msg}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"追加邮件内容到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def setup_watch(self, topic_name=None):
|
||
"""设置Gmail监听"""
|
||
try:
|
||
# 如果没有指定topic,使用配置的主题名称
|
||
if not topic_name:
|
||
# 从settings获取项目配置的主题名称
|
||
topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic')
|
||
|
||
# 直接使用settings中配置的项目ID,不再从client_secret.json获取
|
||
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905')
|
||
logger.info(f"使用settings中配置的项目ID: {project_id}")
|
||
|
||
# 确保Gmail服务已经初始化
|
||
if not hasattr(self, 'gmail_service') or not self.gmail_service:
|
||
logger.warning("Gmail服务未初始化,尝试重新认证")
|
||
if not self.authenticate():
|
||
logger.error("Gmail服务初始化失败")
|
||
return None
|
||
|
||
# 注意:不再使用用户邮箱作为判断依据,Gmail API总是使用'me'作为userId
|
||
# Gmail认证是通过OAuth流程,与系统用户邮箱无关
|
||
logger.info(f"系统用户: {self.user.email},Gmail API使用OAuth认证的邮箱 (userId='me')")
|
||
|
||
# 构建完整的webhook URL
|
||
webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None)
|
||
if not webhook_url:
|
||
# 使用默认值,确保这是一个完全限定的URL
|
||
domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0]
|
||
protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http'
|
||
if domain == 'localhost' or domain.startswith('127.0.0.1'):
|
||
# 本地开发环境需要公网可访问的URL,可以用ngrok等工具暴露本地服务
|
||
webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/"
|
||
logger.warning("使用本地开发环境URL作为webhook回调,Google可能无法访问。建议使用ngrok等工具创建公网地址。")
|
||
else:
|
||
webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/"
|
||
|
||
logger.info(f"使用Gmail webhook URL: {webhook_url}")
|
||
|
||
# 如果想在Google Cloud控制台中手动配置webhook,打印明确的说明
|
||
logger.info("如需手动配置Gmail推送通知,请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:")
|
||
logger.info(f"推送端点: {webhook_url}")
|
||
logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限")
|
||
|
||
# 请求监听
|
||
request = {
|
||
'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签
|
||
'topicName': f"projects/{project_id}/topics/{topic_name}",
|
||
'labelFilterAction': 'include'
|
||
}
|
||
|
||
logger.info(f"设置Gmail监听: {request}")
|
||
|
||
# 设置超时,避免长时间阻塞
|
||
original_timeout = socket.getdefaulttimeout()
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
try:
|
||
# 执行watch请求
|
||
response = self.gmail_service.users().watch(userId='me', body=request).execute()
|
||
except Exception as e:
|
||
logger.error(f"执行watch请求失败: {str(e)}")
|
||
return None
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
|
||
# 获取historyId,用于后续同步
|
||
history_id = response.get('historyId')
|
||
expiration = response.get('expiration')
|
||
|
||
logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}")
|
||
|
||
# 保存监听信息到数据库
|
||
if self.user:
|
||
from .models import GmailCredential
|
||
credential = GmailCredential.objects.filter(user=self.user, is_active=True).first()
|
||
if credential:
|
||
# 转换时间戳为datetime
|
||
expiration_time = None
|
||
if expiration:
|
||
try:
|
||
# 将毫秒时间戳转换为datetime
|
||
naive_time = datetime.fromtimestamp(int(expiration)/1000)
|
||
logger.info(f"从时间戳解析的日期: {naive_time}")
|
||
|
||
# 如果系统使用时区,添加时区信息
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
|
||
try:
|
||
expiration_time = timezone.make_aware(naive_time)
|
||
logger.info(f"添加时区信息后: {expiration_time}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"添加时区信息失败: {str(tz_error)}")
|
||
expiration_time = naive_time
|
||
else:
|
||
expiration_time = naive_time
|
||
except Exception as conv_error:
|
||
logger.error(f"转换时间戳失败: {str(conv_error)}")
|
||
expiration_time = None
|
||
|
||
credential.last_history_id = history_id
|
||
credential.watch_expiration = expiration_time
|
||
credential.save()
|
||
|
||
logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}")
|
||
|
||
return {
|
||
'historyId': history_id,
|
||
'expiration': expiration
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置Gmail监听失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def get_history(self, start_history_id):
|
||
"""获取历史变更"""
|
||
try:
|
||
logger.info(f"获取历史记录,起始ID: {start_history_id}")
|
||
|
||
# 确保Gmail服务已经初始化
|
||
if not hasattr(self, 'gmail_service') or not self.gmail_service:
|
||
logger.warning("Gmail服务未初始化,尝试重新认证")
|
||
if not self.authenticate():
|
||
logger.error("Gmail服务初始化失败")
|
||
return []
|
||
|
||
# 设置超时,避免长时间阻塞
|
||
original_timeout = socket.getdefaulttimeout()
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
try:
|
||
# 执行history请求
|
||
response = self.gmail_service.users().history().list(
|
||
userId='me', startHistoryId=start_history_id
|
||
).execute()
|
||
except Exception as e:
|
||
logger.error(f"执行history请求失败: {str(e)}")
|
||
return []
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
|
||
logger.info(f"历史记录响应: {response}")
|
||
|
||
history_list = []
|
||
if 'history' in response:
|
||
history_list.extend(response['history'])
|
||
logger.info(f"找到 {len(response['history'])} 个历史记录")
|
||
|
||
# 获取所有页,但每次请求都设置超时
|
||
page_count = 0
|
||
max_pages = 5 # 限制最大页数,避免无限循环
|
||
|
||
while 'nextPageToken' in response and page_count < max_pages:
|
||
page_token = response['nextPageToken']
|
||
page_count += 1
|
||
|
||
# 设置超时
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
try:
|
||
response = self.gmail_service.users().history().list(
|
||
userId='me', startHistoryId=start_history_id, pageToken=page_token
|
||
).execute()
|
||
except Exception as e:
|
||
logger.error(f"获取历史记录分页失败: {str(e)}")
|
||
break
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
|
||
if 'history' in response:
|
||
history_list.extend(response['history'])
|
||
logger.info(f"加载额外 {len(response['history'])} 个历史记录 (页 {page_count})")
|
||
|
||
if page_count >= max_pages and 'nextPageToken' in response:
|
||
logger.warning(f"达到最大页数限制 ({max_pages}),可能有更多历史记录未获取")
|
||
else:
|
||
logger.info(f"没有新的历史记录,最新historyId: {response.get('historyId', 'N/A')}")
|
||
|
||
# 提取新消息ID
|
||
new_message_ids = set()
|
||
for history in history_list:
|
||
logger.info(f"处理历史记录: {history}")
|
||
if 'messagesAdded' in history:
|
||
for message in history['messagesAdded']:
|
||
message_id = message['message']['id']
|
||
new_message_ids.add(message_id)
|
||
logger.info(f"新增消息ID: {message_id}")
|
||
|
||
if 'labelsAdded' in history:
|
||
for label in history['labelsAdded']:
|
||
message_id = label['message']['id']
|
||
if 'INBOX' in label.get('labelIds', []):
|
||
new_message_ids.add(message_id)
|
||
logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签")
|
||
|
||
if new_message_ids:
|
||
logger.info(f"总共找到 {len(new_message_ids)} 个新消息")
|
||
else:
|
||
logger.info("没有新增消息")
|
||
|
||
return list(new_message_ids)
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Gmail历史记录失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return []
|
||
|
||
def process_notification(self, notification_data):
|
||
"""处理Gmail推送通知"""
|
||
try:
|
||
# 提取通知数据
|
||
logger.info(f"处理Gmail通知: {notification_data}")
|
||
|
||
# 处理Google Pub/Sub消息格式
|
||
if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']:
|
||
try:
|
||
import base64
|
||
import json
|
||
logger.info("检测到Google Pub/Sub消息格式")
|
||
|
||
# Base64解码data字段
|
||
encoded_data = notification_data['message']['data']
|
||
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
|
||
logger.info(f"解码后的数据: {decoded_data}")
|
||
|
||
# 解析JSON获取email和historyId
|
||
json_data = json.loads(decoded_data)
|
||
email = json_data.get('emailAddress')
|
||
history_id = json_data.get('historyId')
|
||
logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}")
|
||
except Exception as decode_error:
|
||
logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
else:
|
||
# 原始格式处理
|
||
email = notification_data.get('emailAddress')
|
||
history_id = notification_data.get('historyId')
|
||
|
||
if not email or not history_id:
|
||
logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId")
|
||
return False
|
||
|
||
# 查找关联用户
|
||
from .models import User, GmailCredential
|
||
user = User.objects.filter(email=email).first()
|
||
credential = None
|
||
|
||
# 如果找不到用户,尝试使用gmail_email字段查找
|
||
if not user:
|
||
logger.info(f"找不到email={email}的用户,尝试使用gmail_email查找")
|
||
credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first()
|
||
if credential:
|
||
user = credential.user
|
||
logger.info(f"通过gmail_email找到用户: {user.email}")
|
||
|
||
if not user:
|
||
logger.error(f"找不到与 {email} 关联的用户")
|
||
return False
|
||
|
||
# 确保有相应的Gmail凭证
|
||
if not credential:
|
||
credential = GmailCredential.objects.filter(user=user, is_active=True).first()
|
||
if not credential:
|
||
logger.error(f"用户 {user.email} 没有活跃的Gmail凭证")
|
||
return False
|
||
|
||
# 检查凭证是否需要重新授权
|
||
if credential.needs_reauth:
|
||
logger.warning(f"Gmail凭证 {credential.id} 需要重新授权,无法处理通知")
|
||
|
||
# 记录当前通知到队列或数据库,供用户重新授权后再处理
|
||
try:
|
||
from .models import GmailNotificationQueue
|
||
|
||
# 存储通知到队列
|
||
GmailNotificationQueue.objects.create(
|
||
user=user,
|
||
gmail_credential=credential,
|
||
email=email,
|
||
history_id=history_id,
|
||
notification_data=json.dumps(notification_data)
|
||
)
|
||
logger.info(f"通知已保存到队列,等待用户重新授权后处理")
|
||
except Exception as queue_error:
|
||
logger.error(f"保存通知到队列失败: {str(queue_error)}")
|
||
|
||
return False
|
||
|
||
# 更新历史ID
|
||
if credential and history_id:
|
||
try:
|
||
# 仅当新的历史ID大于当前值时更新
|
||
if not credential.last_history_id or int(history_id) > int(credential.last_history_id):
|
||
credential.last_history_id = history_id
|
||
credential.save()
|
||
logger.info(f"更新历史ID: {history_id}")
|
||
else:
|
||
logger.info(f"收到的历史ID ({history_id}) 不大于当前值 ({credential.last_history_id}),不更新")
|
||
except Exception as update_error:
|
||
logger.error(f"更新历史ID失败: {str(update_error)}")
|
||
|
||
# 初始化Gmail集成
|
||
gmail_integration = GmailIntegration(user, gmail_credential_id=credential.id if credential else None)
|
||
|
||
# 记录详细的处理信息
|
||
logger.info(f"Gmail通知处理: 用户={user.email}, Gmail邮箱={email}, 历史ID={history_id}, 凭证ID={credential.id}")
|
||
|
||
# 设置超时
|
||
original_timeout = socket.getdefaulttimeout()
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
try:
|
||
# 认证Gmail服务
|
||
if not gmail_integration.authenticate():
|
||
logger.error(f"Gmail认证失败: {email}")
|
||
|
||
# 尝试刷新令牌
|
||
refresh_result = gmail_integration.refresh_token()
|
||
if not refresh_result:
|
||
logger.error("刷新令牌失败,需要用户重新授权")
|
||
return False
|
||
|
||
logger.info("令牌刷新成功,继续处理通知")
|
||
|
||
# 获取历史变更
|
||
message_ids = gmail_integration.get_history(history_id)
|
||
|
||
# 如果没有找到新消息,尝试获取最近的消息
|
||
if not message_ids:
|
||
logger.info("没有找到历史变更,尝试获取最近的邮件")
|
||
|
||
# 查询最近的5封邮件
|
||
try:
|
||
recent_messages = gmail_integration.gmail_service.users().messages().list(
|
||
userId='me',
|
||
maxResults=5
|
||
).execute()
|
||
|
||
if 'messages' in recent_messages:
|
||
message_ids = [msg['id'] for msg in recent_messages['messages']]
|
||
logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}")
|
||
else:
|
||
logger.info("没有找到最近的邮件")
|
||
except Exception as recent_error:
|
||
logger.error(f"获取最近邮件失败: {str(recent_error)}")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.error(f"处理Gmail通知时发生错误: {error_msg}")
|
||
|
||
# 检查是否是令牌过期
|
||
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
|
||
logger.warning("检测到OAuth令牌问题,尝试刷新令牌")
|
||
|
||
# 尝试刷新令牌
|
||
refresh_result = gmail_integration.refresh_token()
|
||
if not refresh_result:
|
||
logger.error("刷新令牌失败,需要用户重新授权")
|
||
return False
|
||
|
||
# 再次尝试获取历史记录
|
||
try:
|
||
message_ids = gmail_integration.get_history(history_id)
|
||
|
||
# 如果仍然没有找到新消息,尝试获取最近的消息
|
||
if not message_ids:
|
||
logger.info("刷新令牌后仍未找到历史变更,尝试获取最近邮件")
|
||
|
||
recent_messages = gmail_integration.gmail_service.users().messages().list(
|
||
userId='me',
|
||
maxResults=5
|
||
).execute()
|
||
|
||
if 'messages' in recent_messages:
|
||
message_ids = [msg['id'] for msg in recent_messages['messages']]
|
||
logger.info(f"获取到 {len(message_ids)} 封最近的邮件: {message_ids}")
|
||
except Exception as retry_error:
|
||
logger.error(f"刷新令牌后再次尝试失败: {str(retry_error)}")
|
||
return False
|
||
else:
|
||
return False
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
|
||
if message_ids:
|
||
logger.info(f"找到 {len(message_ids)} 个需要处理的消息")
|
||
|
||
# 限制处理的消息数量,防止过多消息导致系统负载过高
|
||
max_messages = 10
|
||
if len(message_ids) > max_messages:
|
||
logger.warning(f"消息数量 ({len(message_ids)}) 超过限制 ({max_messages}),将只处理前 {max_messages} 条")
|
||
message_ids = message_ids[:max_messages]
|
||
|
||
# 处理消息
|
||
success_count = 0
|
||
for message_id in message_ids:
|
||
try:
|
||
# 设置超时
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
if self._process_new_message(gmail_integration, message_id):
|
||
success_count += 1
|
||
except Exception as msg_error:
|
||
error_msg = str(msg_error)
|
||
logger.error(f"处理消息 {message_id} 失败: {error_msg}")
|
||
|
||
# 检查是否是令牌过期
|
||
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
|
||
logger.warning("处理消息时检测到OAuth令牌问题,标记需要重新授权")
|
||
gmail_integration._mark_credential_needs_reauth()
|
||
break
|
||
|
||
logger.error(traceback.format_exc())
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
|
||
logger.info(f"成功处理 {success_count}/{len(message_ids)} 个消息")
|
||
return success_count > 0
|
||
|
||
# 如果没有找到新消息但仍需确认处理,验证GooglePubSub连接并保持通知持续
|
||
try:
|
||
# 简单验证连接性测试
|
||
logger.info(f"未找到需要处理的消息,执行服务连接测试")
|
||
profile = gmail_integration.gmail_service.users().getProfile(userId='me').execute()
|
||
if profile and 'emailAddress' in profile:
|
||
logger.info(f"Gmail服务连接正常: {profile['emailAddress']}")
|
||
|
||
# 检查并更新监听状态
|
||
if credential:
|
||
# 检查监听是否过期
|
||
needs_watch_renew = False
|
||
if credential.watch_expiration:
|
||
from django.utils import timezone
|
||
# 如果监听将在3天内过期,提前更新
|
||
three_days_later = timezone.now() + timezone.timedelta(days=3)
|
||
if credential.watch_expiration < three_days_later:
|
||
needs_watch_renew = True
|
||
logger.info(f"监听将在3天内过期,需要更新: {credential.watch_expiration}")
|
||
else:
|
||
needs_watch_renew = True
|
||
logger.info("没有监听过期时间记录,需要更新")
|
||
|
||
if needs_watch_renew:
|
||
try:
|
||
# 更新监听
|
||
watch_result = gmail_integration.setup_watch()
|
||
logger.info(f"更新监听成功: {watch_result}")
|
||
return True
|
||
except Exception as watch_error:
|
||
logger.error(f"更新监听失败: {str(watch_error)}")
|
||
return False
|
||
|
||
return True
|
||
except Exception as verify_error:
|
||
logger.error(f"Gmail服务连接测试失败: {str(verify_error)}")
|
||
return False
|
||
|
||
# 如果没有找到新消息,记录日志并返回成功
|
||
logger.info("没有找到新消息,处理完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理Gmail通知失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def _process_new_message(self, gmail_integration, message_id):
|
||
"""处理新收到的邮件"""
|
||
try:
|
||
# 导入所需模型
|
||
from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile
|
||
|
||
# 添加详细日志
|
||
logger.info(f"处理新消息ID: {message_id}")
|
||
|
||
# 获取消息内容
|
||
message = gmail_integration.gmail_service.users().messages().get(userId='me', id=message_id).execute()
|
||
|
||
# 从邮件中提取相关信息
|
||
email_content = gmail_integration._extract_email_content(message)
|
||
if not email_content:
|
||
logger.error(f"无法提取消息 {message_id} 的内容")
|
||
return False
|
||
|
||
# 记录邮件详情
|
||
sender = email_content.get('from', '')
|
||
recipient = email_content.get('to', '')
|
||
subject = email_content.get('subject', '')
|
||
body = email_content.get('body', '')
|
||
|
||
logger.info(f"提取的邮件信息: 发件人={sender}, 收件人={recipient}, 主题={subject}")
|
||
|
||
# 提取达人邮箱 - 可能是发件人或收件人
|
||
talent_email = None
|
||
is_talent_sending = False # 标记是否是达人发送的邮件
|
||
|
||
# 检查收件人是否是当前用户
|
||
user_email = None
|
||
if self.user and hasattr(self.user, 'email'):
|
||
user_email = self.user.email
|
||
|
||
# 如果发件人不是用户,则认为发件人是达人
|
||
if user_email and sender and user_email.lower() not in sender.lower():
|
||
talent_email = sender.lower()
|
||
is_talent_sending = True
|
||
logger.info(f"检测到达人(发件人)邮箱: {talent_email}")
|
||
# 如果收件人不是用户,则认为收件人是达人
|
||
elif user_email and recipient and user_email.lower() not in recipient.lower():
|
||
talent_email = recipient.lower()
|
||
is_talent_sending = False
|
||
logger.info(f"检测到达人(收件人)邮箱: {talent_email}")
|
||
|
||
# 从邮箱中提取纯地址(去除名称部分)
|
||
if talent_email:
|
||
if '<' in talent_email and '>' in talent_email:
|
||
talent_email = talent_email.split('<')[1].split('>')[0]
|
||
|
||
# 转换为小写,提高匹配准确性
|
||
talent_email = talent_email.lower()
|
||
|
||
# 如果找不到明确的达人邮箱,尝试从映射中查找
|
||
if not talent_email:
|
||
# 尝试从主题或正文中找线索
|
||
# 实现取决于具体需求
|
||
pass
|
||
|
||
logger.info(f"最终确定的达人邮箱: {talent_email}")
|
||
|
||
# 如果找到了达人邮箱,进行知识库处理
|
||
if talent_email:
|
||
# 创建或获取知识库
|
||
knowledge_base, created = self.create_talent_knowledge_base(talent_email)
|
||
|
||
if not knowledge_base:
|
||
logger.error(f"无法为达人 {talent_email} 创建知识库")
|
||
return False
|
||
|
||
logger.info(f"使用知识库: {knowledge_base.name} (ID: {knowledge_base.id}), 新创建: {created}")
|
||
|
||
# 获取映射关系
|
||
talent_mapping, mapping_created = GmailTalentMapping.objects.get_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': f"gmail_{talent_email.replace('@', '_').replace('.', '_')}",
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
# 获取对话ID
|
||
conversation_id = talent_mapping.conversation_id
|
||
|
||
# 创建聊天记录 - 确定正确的角色
|
||
# 修正角色判断逻辑:达人始终是assistant,用户始终是user
|
||
# 如果是达人发送的邮件,角色应该是assistant;如果是用户发送的邮件,角色应该是user
|
||
chat_role = 'assistant' if is_talent_sending else 'user'
|
||
|
||
# 添加更详细的角色日志
|
||
logger.info(f"设置聊天角色: 角色={chat_role}, 是否达人发送={is_talent_sending}, 发件人={sender}, 达人邮箱={talent_email}")
|
||
|
||
# 创建或更新聊天记录
|
||
chat_entry = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
role=chat_role,
|
||
content=f"{subject}\n\n{body}",
|
||
parent_id=message_id
|
||
)
|
||
|
||
logger.info(f"已创建聊天记录: ID={chat_entry.id}, 角色={chat_role}, 对话ID={conversation_id}")
|
||
|
||
# 查找适合的parent_id:获取对话中最后一条消息
|
||
try:
|
||
# 获取该对话中非自己的最新消息作为正确的父消息
|
||
last_message = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).exclude(
|
||
id=chat_entry.id # 排除刚刚创建的消息
|
||
).order_by('-created_at').first()
|
||
|
||
if last_message:
|
||
# 更新parent_id为对话中的上一条消息ID
|
||
chat_entry.parent_id = str(last_message.id)
|
||
chat_entry.save(update_fields=['parent_id'])
|
||
logger.info(f"更新消息 {chat_entry.id} 的parent_id为 {last_message.id}")
|
||
except Exception as parent_error:
|
||
logger.error(f"更新父消息ID失败: {str(parent_error)}")
|
||
|
||
# 处理附件
|
||
if 'attachments' in email_content and email_content['attachments']:
|
||
# 下载并处理附件
|
||
for attachment in email_content['attachments']:
|
||
try:
|
||
logger.info(f"处理附件: {attachment.get('filename')}")
|
||
# 下载附件
|
||
filepath = self.download_attachment(
|
||
message_id=message_id,
|
||
attachment_id=attachment.get('attachmentId'),
|
||
filename=attachment.get('filename')
|
||
)
|
||
|
||
if filepath:
|
||
# 创建附件记录
|
||
GmailAttachment.objects.create(
|
||
chat_message=chat_entry,
|
||
gmail_message_id=message_id,
|
||
filename=attachment.get('filename'),
|
||
filepath=filepath,
|
||
mimetype=attachment.get('mimeType', ''),
|
||
filesize=attachment.get('size', 0)
|
||
)
|
||
logger.info(f"已保存附件: {filepath}")
|
||
except Exception as attachment_error:
|
||
logger.error(f"处理附件 {attachment.get('filename')} 失败: {str(attachment_error)}")
|
||
|
||
# 检查是否需要自动回复
|
||
try:
|
||
# 只有达人发送的邮件才考虑自动回复
|
||
if is_talent_sending and chat_role == 'user':
|
||
# 检查用户是否启用了自动回复
|
||
profile = UserProfile.objects.filter(user=self.user).first()
|
||
|
||
if profile and profile.auto_recommend_reply:
|
||
logger.info(f"用户 {self.user.email} 已启用自动回复,生成回复建议")
|
||
|
||
# 获取该对话的历史记录
|
||
conversation_history = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id
|
||
).order_by('created_at')
|
||
|
||
# 生成回复
|
||
recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history)
|
||
|
||
if recommended_reply:
|
||
logger.info(f"生成了推荐回复,长度: {len(recommended_reply)}")
|
||
|
||
# 保存推荐回复
|
||
recommended_reply_entry = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
role='user',
|
||
content=recommended_reply,
|
||
parent_id=chat_entry.id
|
||
)
|
||
logger.info(f"已保存推荐回复到历史记录, 角色=user, ID={recommended_reply_entry.id}, 父消息ID={chat_entry.id}")
|
||
except Exception as auto_reply_error:
|
||
logger.error(f"生成自动回复失败: {str(auto_reply_error)}")
|
||
|
||
# 记录详细的处理结果日志
|
||
logger.info(f"成功处理达人邮件: ID={message_id}, 发件人={sender}, 收件人={recipient}, 主题={subject}")
|
||
logger.info(f"保存消息信息: 角色={chat_role}, 对话ID={conversation_id}, 是否达人发送={is_talent_sending}")
|
||
|
||
return True
|
||
else:
|
||
logger.warning(f"无法确定达人邮箱,跳过知识库处理")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理新消息 {message_id} 失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def _get_recommended_reply_from_deepseek(self, conversation_history):
|
||
"""调用DeepSeek API生成回复建议"""
|
||
try:
|
||
# 使用有效的API密钥
|
||
api_key = ""
|
||
# 尝试从环境变量获取
|
||
import os
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
env_api_key = os.environ.get('DEEPSEEK_API_KEY')
|
||
if env_api_key:
|
||
api_key = env_api_key
|
||
|
||
# 从Django设置中获取密钥
|
||
from django.conf import settings
|
||
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
|
||
api_key = settings.DEEPSEEK_API_KEY
|
||
|
||
# 如果仍然没有有效的API密钥,使用默认值
|
||
if not api_key:
|
||
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
|
||
logger.warning("使用默认API密钥,请在环境变量或settings.py中设置DEEPSEEK_API_KEY")
|
||
|
||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||
|
||
# 获取用户总目标
|
||
from .models import UserGoal, ConversationSummary
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
goal_content = user_goal.content if user_goal else None
|
||
|
||
# 尝试获取对话总结
|
||
talent_email = None
|
||
conversation_id = None
|
||
|
||
# 从对话历史中尝试提取达人邮箱
|
||
for message in conversation_history:
|
||
if message.get('role') == 'user' and 'metadata' in message and 'from_email' in message['metadata']:
|
||
talent_email = message['metadata']['from_email']
|
||
break
|
||
|
||
# 从对话历史中尝试提取对话ID
|
||
for message in conversation_history:
|
||
if 'metadata' in message and 'conversation_id' in message['metadata']:
|
||
conversation_id = message['metadata']['conversation_id']
|
||
break
|
||
|
||
# 获取对话总结
|
||
conversation_summary = None
|
||
if talent_email and conversation_id:
|
||
summary_obj = ConversationSummary.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
is_active=True
|
||
).first()
|
||
if summary_obj:
|
||
conversation_summary = summary_obj.summary
|
||
|
||
# 构建系统提示,结合用户总目标和对话总结
|
||
system_content = "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。"
|
||
|
||
if goal_content:
|
||
system_content += f"\n\n【用户总目标】\n{goal_content}\n\n"
|
||
|
||
if conversation_summary:
|
||
system_content += f"【对话总结】\n{conversation_summary}\n\n"
|
||
|
||
system_content += "请针对达人最后一条消息,结合用户总目标和对话总结生成一个有针对性的专业回复。回复必须对达人当前问题直接响应,同时与用户的总体合作目标保持一致。回复应该有至少100个字符,必须提供有实质内容的回复。"
|
||
|
||
system_message = {
|
||
"role": "system",
|
||
"content": system_content
|
||
}
|
||
|
||
messages = [system_message]
|
||
|
||
# 限制对话历史长度,只保留最近的5条消息,避免超出token限制
|
||
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
|
||
messages.extend(recent_messages)
|
||
|
||
# 确保最后一条消息是用户消息,如果不是,添加一个提示
|
||
if not recent_messages or recent_messages[-1]['role'] != 'user':
|
||
# 添加一个系统消息作为用户的最后一条消息
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "请针对达人最后一条消息生成专业的回复,考虑我们之前的对话历史和我设定的总目标。"
|
||
})
|
||
|
||
# 完全按照文档提供的参数格式构建请求
|
||
payload = {
|
||
"model": "deepseek-ai/DeepSeek-V3",
|
||
"messages": messages,
|
||
"stream": False,
|
||
"max_tokens": 1024, # 增加token上限
|
||
"temperature": 0.7, # 提高多样性
|
||
"top_p": 0.9,
|
||
"top_k": 50,
|
||
"frequency_penalty": 0.5,
|
||
"presence_penalty": 0.2, # 添加新参数
|
||
"n": 1,
|
||
"stop": [],
|
||
"response_format": {
|
||
"type": "text"
|
||
}
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}"
|
||
}
|
||
|
||
logger.info(f"开始调用DeepSeek API生成推荐回复")
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
|
||
if response.status_code != 200:
|
||
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
result = response.json()
|
||
logger.debug(f"DeepSeek API返回: {result}")
|
||
|
||
# 提取回复内容
|
||
if 'choices' in result and len(result['choices']) > 0:
|
||
reply = result['choices'][0]['message']['content']
|
||
# 如果返回的内容为空,直接返回None
|
||
if not reply or reply.strip() == '':
|
||
logger.warning("DeepSeek API返回的回复内容为空")
|
||
return None
|
||
return reply
|
||
|
||
logger.warning(f"DeepSeek API返回格式异常: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用DeepSeek API失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def get_attachment_by_conversation(self, conversation_id):
|
||
"""获取特定对话的所有附件"""
|
||
try:
|
||
# 查找对话记录
|
||
records = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
user=self.user,
|
||
is_deleted=False
|
||
)
|
||
|
||
if not records:
|
||
logger.warning(f"找不到对话记录: {conversation_id}")
|
||
return []
|
||
|
||
# 使用GmailAttachment数据模型查询附件
|
||
attachments = []
|
||
records_ids = [record.id for record in records]
|
||
|
||
gmail_attachments = GmailAttachment.objects.filter(
|
||
chat_message_id__in=records_ids
|
||
).select_related('chat_message')
|
||
|
||
for attachment in gmail_attachments:
|
||
chat_message = attachment.chat_message
|
||
attachments.append({
|
||
'id': str(attachment.id),
|
||
'filename': attachment.filename,
|
||
'filepath': attachment.filepath,
|
||
'mimetype': attachment.mimetype,
|
||
'filesize': attachment.filesize,
|
||
'message_id': str(chat_message.id),
|
||
'gmail_message_id': attachment.gmail_message_id,
|
||
'role': chat_message.role,
|
||
'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content
|
||
})
|
||
|
||
return attachments
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取对话附件失败: {str(e)}")
|
||
return []
|
||
|
||
def handle_auth_code(self, auth_code):
|
||
"""处理OAuth2回调授权码"""
|
||
try:
|
||
flow = self.get_oauth_flow()
|
||
flow.fetch_token(code=auth_code)
|
||
self.credentials = flow.credentials
|
||
|
||
# 初始化Gmail服务
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http()))
|
||
|
||
# 获取Gmail账号
|
||
gmail_email = None
|
||
try:
|
||
# 调用API获取用户资料
|
||
profile = self.gmail_service.users().getProfile(userId='me').execute()
|
||
gmail_email = profile.get('emailAddress')
|
||
logger.info(f"获取到Gmail账号: {gmail_email}")
|
||
except Exception as e:
|
||
logger.error(f"获取Gmail账号失败: {str(e)}")
|
||
|
||
# 保存凭证到数据库
|
||
if self.user:
|
||
from django.utils import timezone
|
||
|
||
# 将凭证对象序列化
|
||
credentials_data = pickle.dumps(self.credentials)
|
||
|
||
# 如果提供了具体的gmail_credential_id,更新对应的记录
|
||
if self.gmail_credential_id:
|
||
try:
|
||
gmail_credential = GmailCredential.objects.get(
|
||
id=self.gmail_credential_id,
|
||
user=self.user
|
||
)
|
||
|
||
# 更新凭证信息
|
||
gmail_credential.credentials = credentials_data
|
||
gmail_credential.gmail_email = gmail_email
|
||
gmail_credential.token_path = self.token_storage_path
|
||
gmail_credential.updated_at = timezone.now()
|
||
gmail_credential.is_active = True
|
||
gmail_credential.save()
|
||
|
||
self.gmail_credential = gmail_credential
|
||
logger.info(f"已更新ID为 {self.gmail_credential_id} 的Gmail凭证,Gmail账号: {gmail_email}")
|
||
except GmailCredential.DoesNotExist:
|
||
logger.error(f"未找到ID为 {self.gmail_credential_id} 的Gmail凭证,将创建新凭证")
|
||
self.gmail_credential_id = None
|
||
|
||
# 如果没有具体的gmail_credential_id,或者指定的不存在,创建或更新默认凭证
|
||
if not self.gmail_credential_id:
|
||
# 检查是否已存在相同gmail_email的凭证
|
||
existing_credential = None
|
||
if gmail_email:
|
||
existing_credential = GmailCredential.objects.filter(
|
||
user=self.user,
|
||
gmail_email=gmail_email,
|
||
is_active=True
|
||
).first()
|
||
|
||
if existing_credential:
|
||
# 更新现有凭证
|
||
existing_credential.credentials = credentials_data
|
||
existing_credential.token_path = self.token_storage_path
|
||
existing_credential.updated_at = timezone.now()
|
||
existing_credential.save()
|
||
|
||
self.gmail_credential = existing_credential
|
||
logger.info(f"已更新Gmail账号 {gmail_email} 的现有凭证")
|
||
else:
|
||
# 获取Gmail账户数量
|
||
gmail_count = GmailCredential.objects.filter(user=self.user).count()
|
||
|
||
# 创建新凭证
|
||
name = f"Gmail账号 {gmail_count + 1}"
|
||
if gmail_email:
|
||
name = gmail_email
|
||
else:
|
||
# 确保gmail_email有默认值,避免null错误
|
||
gmail_email = "未知邮箱"
|
||
|
||
# 将凭证转换为JSON字符串
|
||
if isinstance(credentials_data, dict):
|
||
# 确保json模块在本地作用域可访问
|
||
import json
|
||
credentials_data = json.dumps(credentials_data)
|
||
|
||
gmail_credential = GmailCredential.objects.create(
|
||
user=self.user,
|
||
credentials=credentials_data,
|
||
token_path=self.token_storage_path,
|
||
gmail_email=gmail_email,
|
||
name=name,
|
||
is_default=(gmail_count == 0), # 第一个账号设为默认
|
||
updated_at=timezone.now(),
|
||
is_active=True
|
||
)
|
||
|
||
self.gmail_credential = gmail_credential
|
||
logger.info(f"已创建新的Gmail凭证,Gmail账号: {gmail_email}, 名称: {name}")
|
||
|
||
# 更新单例
|
||
GmailServiceManager.update_instance(
|
||
self.user,
|
||
self.credentials,
|
||
self.gmail_service,
|
||
self.gmail_credential
|
||
)
|
||
|
||
return {
|
||
'status': 'success',
|
||
'gmail_email': gmail_email
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理授权码失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': str(e)
|
||
}
|
||
|
||
def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations):
|
||
"""上传所有邮件附件到知识库"""
|
||
try:
|
||
# 收集所有需要上传的附件
|
||
attachments_to_upload = []
|
||
|
||
# 从conversations中提取所有附件信息
|
||
for message in conversations:
|
||
if 'attachments' in message and message['attachments']:
|
||
message_id = message.get('id', '')
|
||
sender = message.get('from', '未知发件人')
|
||
subject = message.get('subject', '无主题')
|
||
date_str = message.get('date', '未知时间')
|
||
|
||
for attachment in message['attachments']:
|
||
if 'attachmentId' in attachment and 'filename' in attachment:
|
||
# 下载附件(如果尚未下载)
|
||
filepath = None
|
||
|
||
# 检查数据库中是否已有记录
|
||
existing_attachment = GmailAttachment.objects.filter(
|
||
gmail_message_id=message_id,
|
||
filename=attachment['filename']
|
||
).first()
|
||
|
||
if existing_attachment and os.path.exists(existing_attachment.filepath):
|
||
filepath = existing_attachment.filepath
|
||
else:
|
||
# 下载附件
|
||
filepath = self.download_attachment(
|
||
message_id=message_id,
|
||
attachment_id=attachment['attachmentId'],
|
||
filename=attachment['filename']
|
||
)
|
||
|
||
if filepath:
|
||
attachments_to_upload.append({
|
||
'filepath': filepath,
|
||
'filename': attachment['filename'],
|
||
'message_id': message_id,
|
||
'sender': sender,
|
||
'subject': subject,
|
||
'date': date_str
|
||
})
|
||
|
||
# 上传收集到的所有附件
|
||
if attachments_to_upload:
|
||
logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库")
|
||
|
||
from .views import KnowledgeBaseViewSet
|
||
import django.core.files.uploadedfile as uploadedfile
|
||
|
||
# 导入FileUploadParser
|
||
from rest_framework.parsers import FileUploadParser
|
||
|
||
# 创建视图集实例
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 批量上传附件
|
||
for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件
|
||
batch = attachments_to_upload[i:i+10]
|
||
files = []
|
||
|
||
for att in batch:
|
||
filepath = att['filepath']
|
||
filename = att['filename']
|
||
|
||
# 确认文件存在
|
||
if not os.path.exists(filepath):
|
||
logger.warning(f"附件文件不存在: {filepath}")
|
||
continue
|
||
|
||
# 读取文件并创建UploadedFile对象
|
||
with open(filepath, 'rb') as f:
|
||
file_content = f.read()
|
||
files.append(uploadedfile.SimpleUploadedFile(
|
||
name=filename,
|
||
content=file_content,
|
||
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
|
||
))
|
||
|
||
# 如果这批有文件,调用_call_split_api_multiple上传
|
||
if files:
|
||
# 直接调用文档分割API
|
||
split_response = kb_viewset._call_split_api_multiple(files)
|
||
|
||
if not split_response or split_response.get('code') != 200:
|
||
logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
|
||
continue
|
||
|
||
# 处理分割后的文档
|
||
documents_data = split_response.get('data', [])
|
||
|
||
# 对每个文档调用上传API
|
||
for doc in documents_data:
|
||
doc_name = doc.get('name', 'Gmail附件')
|
||
doc_content = doc.get('content', [])
|
||
|
||
# 准备文档数据
|
||
upload_doc_data = {
|
||
"name": doc_name,
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有段落添加到文档中
|
||
for paragraph in doc_content:
|
||
upload_doc_data["paragraphs"].append({
|
||
"content": paragraph.get('content', ''),
|
||
"title": paragraph.get('title', ''),
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用文档上传API
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_name,
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"上传附件到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records):
|
||
"""上传单个邮件的附件到知识库"""
|
||
try:
|
||
# 检查是否有附件需要上传
|
||
if not attachment_records:
|
||
return
|
||
|
||
logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库")
|
||
|
||
from .views import KnowledgeBaseViewSet
|
||
import django.core.files.uploadedfile as uploadedfile
|
||
|
||
# 创建视图集实例
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 准备文件列表
|
||
files = []
|
||
|
||
for att in attachment_records:
|
||
filepath = att.get('filepath')
|
||
filename = att.get('filename')
|
||
|
||
# 确认文件存在
|
||
if not os.path.exists(filepath):
|
||
logger.warning(f"附件文件不存在: {filepath}")
|
||
continue
|
||
|
||
# 读取文件并创建UploadedFile对象
|
||
with open(filepath, 'rb') as f:
|
||
file_content = f.read()
|
||
files.append(uploadedfile.SimpleUploadedFile(
|
||
name=filename,
|
||
content=file_content,
|
||
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
|
||
))
|
||
|
||
# 如果有文件,调用_call_split_api_multiple上传
|
||
if files:
|
||
# 直接调用文档分割API
|
||
split_response = kb_viewset._call_split_api_multiple(files)
|
||
|
||
if not split_response or split_response.get('code') != 200:
|
||
logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
|
||
return
|
||
|
||
# 处理分割后的文档
|
||
documents_data = split_response.get('data', [])
|
||
|
||
# 对每个文档调用上传API
|
||
for doc in documents_data:
|
||
doc_name = doc.get('name', 'Gmail附件')
|
||
doc_content = doc.get('content', [])
|
||
|
||
# 准备文档数据
|
||
upload_doc_data = {
|
||
"name": doc_name,
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有段落添加到文档中
|
||
for paragraph in doc_content:
|
||
upload_doc_data["paragraphs"].append({
|
||
"content": paragraph.get('content', ''),
|
||
"title": paragraph.get('title', ''),
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用文档上传API
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_name,
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"完成附件上传,共 {len(files)} 个文件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"上传附件到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def get_recent_emails(self, from_email=None, max_results=10):
|
||
"""获取最近的邮件"""
|
||
try:
|
||
service = self.gmail_service
|
||
if not service:
|
||
logger.error("Gmail服务未初始化")
|
||
return []
|
||
|
||
query = ""
|
||
if from_email:
|
||
query = f"from:{from_email}"
|
||
|
||
# 获取收件箱中的邮件列表
|
||
results = service.users().messages().list(
|
||
userId='me',
|
||
maxResults=max_results,
|
||
q=query
|
||
).execute()
|
||
|
||
messages = results.get('messages', [])
|
||
emails = []
|
||
|
||
for message in messages:
|
||
msg = service.users().messages().get(userId='me', id=message['id']).execute()
|
||
email_data = self._extract_email_content(msg)
|
||
emails.append(email_data)
|
||
|
||
return emails
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取最近邮件失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return []
|
||
|
||
def manage_user_goal(self, goal_content=None):
|
||
"""
|
||
创建或更新用户总目标
|
||
|
||
Args:
|
||
goal_content (str): 用户总目标内容,如果为None则返回当前总目标
|
||
|
||
Returns:
|
||
dict: 包含操作结果和总目标信息
|
||
"""
|
||
from .models import UserGoal
|
||
|
||
try:
|
||
# 如果未提供内容,则返回当前总目标
|
||
if goal_content is None:
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
if user_goal:
|
||
return {
|
||
'status': 'success',
|
||
'action': 'retrieve',
|
||
'goal': {
|
||
'id': str(user_goal.id),
|
||
'content': user_goal.content,
|
||
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
else:
|
||
return {
|
||
'status': 'success',
|
||
'action': 'retrieve',
|
||
'goal': None
|
||
}
|
||
|
||
# 查找当前活跃的用户总目标
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
|
||
# 如果已存在,则更新
|
||
if user_goal:
|
||
user_goal.content = goal_content
|
||
user_goal.save()
|
||
action = 'update'
|
||
else:
|
||
# 否则创建新的总目标
|
||
user_goal = UserGoal.objects.create(
|
||
user=self.user,
|
||
content=goal_content
|
||
)
|
||
action = 'create'
|
||
|
||
return {
|
||
'status': 'success',
|
||
'action': action,
|
||
'goal': {
|
||
'id': str(user_goal.id),
|
||
'content': user_goal.content,
|
||
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"管理用户总目标失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': f"管理用户总目标失败: {str(e)}"
|
||
}
|
||
|
||
def generate_conversation_summary(self, talent_email):
|
||
"""
|
||
为用户与特定达人的所有对话生成总结
|
||
|
||
Args:
|
||
talent_email (str): 达人的邮箱地址
|
||
|
||
Returns:
|
||
dict: 包含操作结果和总结信息
|
||
"""
|
||
from .models import ConversationSummary
|
||
|
||
try:
|
||
# 获取与该达人的所有对话
|
||
conversations = self.get_conversations(talent_email)
|
||
if not conversations:
|
||
return {
|
||
'status': 'error',
|
||
'message': f"未找到与{talent_email}的对话记录"
|
||
}
|
||
|
||
# 准备对话历史记录
|
||
conversation_history = []
|
||
for conversation in conversations:
|
||
if 'subject' in conversation and conversation['subject']:
|
||
conversation_history.append({
|
||
'role': 'system',
|
||
'content': f"邮件主题: {conversation['subject']}"
|
||
})
|
||
|
||
for message in conversation.get('messages', []):
|
||
role = 'user' if message.get('from_email') == talent_email else 'assistant'
|
||
content = message.get('body', '')
|
||
if content:
|
||
conversation_history.append({
|
||
'role': role,
|
||
'content': content
|
||
})
|
||
|
||
# 调用DeepSeek API生成总结
|
||
summary = self._generate_summary_from_deepseek(conversation_history)
|
||
if not summary:
|
||
return {
|
||
'status': 'error',
|
||
'message': "生成对话总结失败"
|
||
}
|
||
|
||
# 保存或更新总结
|
||
conversation_id = conversations[0].get('id') if conversations else None
|
||
if not conversation_id:
|
||
return {
|
||
'status': 'error',
|
||
'message': "无法确定对话ID"
|
||
}
|
||
|
||
# 查找现有总结
|
||
conversation_summary = ConversationSummary.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
is_active=True
|
||
).first()
|
||
|
||
# 如果已存在,则更新
|
||
if conversation_summary:
|
||
conversation_summary.summary = summary
|
||
conversation_summary.save()
|
||
action = 'update'
|
||
else:
|
||
# 否则创建新的总结
|
||
conversation_summary = ConversationSummary.objects.create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
summary=summary
|
||
)
|
||
action = 'create'
|
||
|
||
return {
|
||
'status': 'success',
|
||
'action': action,
|
||
'summary': {
|
||
'id': str(conversation_summary.id),
|
||
'talent_email': talent_email,
|
||
'conversation_id': conversation_id,
|
||
'summary': summary,
|
||
'created_at': conversation_summary.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': conversation_summary.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成对话总结失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': f"生成对话总结失败: {str(e)}"
|
||
}
|
||
|
||
def _generate_summary_from_deepseek(self, conversation_history):
|
||
"""调用DeepSeek API生成对话总结"""
|
||
try:
|
||
# 使用有效的API密钥
|
||
api_key = ""
|
||
# 尝试从环境变量获取
|
||
import os
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
env_api_key = os.environ.get('DEEPSEEK_API_KEY')
|
||
if env_api_key:
|
||
api_key = env_api_key
|
||
|
||
# 从Django设置中获取密钥
|
||
from django.conf import settings
|
||
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
|
||
api_key = settings.DEEPSEEK_API_KEY
|
||
|
||
# 如果仍然没有有效的API密钥,使用默认值
|
||
if not api_key:
|
||
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
|
||
logger.warning("使用默认API密钥,请在环境变量或settings.py中设置DEEPSEEK_API_KEY")
|
||
|
||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||
|
||
# 系统消息指定生成总结的任务
|
||
system_message = {
|
||
"role": "system",
|
||
"content": "你是一位专业的电商客服和达人助手。你的任务是分析用户与达人之间的所有对话历史,并生成一份简明扼要的总结。总结应包括:1. 主要讨论的产品或服务;2. 达人的主要关注点和需求;3. 已经达成的共识或协议;4. 未解决的问题或后续需要跟进的事项。总结应该客观、全面、结构清晰。"
|
||
}
|
||
|
||
messages = [system_message]
|
||
|
||
# 添加对话历史,但限制消息数量避免超出token限制
|
||
# 如果对话历史太长,可能需要进一步处理或分割
|
||
if len(conversation_history) > 20:
|
||
# 选取关键消息:第一条、最后几条以及中间的一些重要消息
|
||
selected_messages = (
|
||
conversation_history[:2] + # 前两条
|
||
conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条
|
||
conversation_history[-6:] # 最后六条
|
||
)
|
||
messages.extend(selected_messages)
|
||
else:
|
||
messages.extend(conversation_history)
|
||
|
||
# 添加用户指令
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "请根据以上对话历史生成一份全面的总结。"
|
||
})
|
||
|
||
# 构建API请求
|
||
payload = {
|
||
"model": "deepseek-ai/DeepSeek-V3",
|
||
"messages": messages,
|
||
"stream": False,
|
||
"max_tokens": 1500, # 增加token上限以容纳完整总结
|
||
"temperature": 0.3, # 降低随机性,使总结更加确定性
|
||
"top_p": 0.9,
|
||
"top_k": 50,
|
||
"frequency_penalty": 0.5,
|
||
"presence_penalty": 0.2,
|
||
"n": 1,
|
||
"stop": [],
|
||
"response_format": {
|
||
"type": "text"
|
||
}
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}"
|
||
}
|
||
|
||
logger.info(f"开始调用DeepSeek API生成对话总结")
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
|
||
if response.status_code != 200:
|
||
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
result = response.json()
|
||
logger.debug(f"DeepSeek API返回: {result}")
|
||
|
||
# 提取回复内容
|
||
if 'choices' in result and len(result['choices']) > 0:
|
||
summary = result['choices'][0]['message']['content']
|
||
# 如果返回的内容为空,直接返回None
|
||
if not summary or summary.strip() == '':
|
||
logger.warning("DeepSeek API返回的总结内容为空")
|
||
return None
|
||
return summary
|
||
|
||
logger.warning(f"DeepSeek API返回格式异常: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用DeepSeek API生成总结失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def get_oauth_flow(self):
|
||
"""创建OAuth2流程处理器"""
|
||
try:
|
||
# 确保client_secret_json已提供
|
||
if not self.client_secret:
|
||
logger.error("未提供client_secret_json,无法创建OAuth流程")
|
||
raise ValueError("未提供client_secret_json")
|
||
|
||
# 创建临时文件存储client_secret
|
||
client_secret_path = 'client_secret.json'
|
||
with open(client_secret_path, 'w') as f:
|
||
if isinstance(self.client_secret, str):
|
||
try:
|
||
# 确保是有效的JSON
|
||
import json # 在本地作用域引入
|
||
json_data = json.loads(self.client_secret)
|
||
# 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题
|
||
for key in ['web', 'installed']:
|
||
if key in json_data and 'redirect_uris' in json_data[key]:
|
||
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(json_data, f)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"client_secret不是有效的JSON: {str(e)}")
|
||
raise ValueError(f"client_secret不是有效的JSON: {str(e)}")
|
||
else:
|
||
# 如果是字典,也进行相同处理
|
||
import json # 在本地作用域引入
|
||
client_secret_dict = dict(self.client_secret)
|
||
for key in ['web', 'installed']:
|
||
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
|
||
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(client_secret_dict, f)
|
||
|
||
# 从client_secret创建flow
|
||
flow = client.flow_from_clientsecrets(
|
||
client_secret_path,
|
||
self.SCOPES,
|
||
redirect_uri='urn:ietf:wg:oauth:2.0:oob'
|
||
)
|
||
|
||
return flow
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建OAuth流程失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
raise e
|
||
finally:
|
||
# 删除临时文件
|
||
if os.path.exists(client_secret_path):
|
||
try:
|
||
os.unlink(client_secret_path)
|
||
except Exception as del_e:
|
||
logger.error(f"删除临时文件失败: {str(del_e)}")
|
||
|
||
def check_and_renew_watch(self):
|
||
"""检查并更新Gmail监听状态
|
||
|
||
如果监听即将过期或已经过期,则自动更新监听
|
||
"""
|
||
try:
|
||
from .models import GmailCredential
|
||
|
||
# 获取用户的Gmail凭证
|
||
credential = GmailCredential.objects.filter(
|
||
user=self.user,
|
||
is_active=True,
|
||
gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None
|
||
).first()
|
||
|
||
if not credential:
|
||
logger.error(f"找不到用户 {self.user.email} 的Gmail凭证")
|
||
return False
|
||
|
||
# 检查监听是否需要更新
|
||
needs_renewal = False
|
||
|
||
# 如果没有watch_expiration,需要设置监听
|
||
if not credential.watch_expiration:
|
||
logger.info(f"Gmail凭证 {credential.gmail_email} 没有监听过期时间,需要设置监听")
|
||
needs_renewal = True
|
||
else:
|
||
# 如果监听将在24小时内过期,更新监听
|
||
now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now()
|
||
time_until_expiration = credential.watch_expiration - now
|
||
hours_until_expiration = time_until_expiration.total_seconds() / 3600
|
||
|
||
if hours_until_expiration < 24:
|
||
logger.info(f"Gmail监听将在 {hours_until_expiration:.2f} 小时后过期,需要更新")
|
||
needs_renewal = True
|
||
|
||
if needs_renewal:
|
||
logger.info(f"为Gmail凭证 {credential.gmail_email} 更新监听")
|
||
watch_result = self.setup_watch()
|
||
|
||
if watch_result and 'historyId' in watch_result:
|
||
logger.info(f"Gmail监听更新成功: historyId={watch_result['historyId']}")
|
||
return True
|
||
else:
|
||
logger.error("Gmail监听更新失败")
|
||
return False
|
||
else:
|
||
logger.info(f"Gmail监听状态良好,无需更新。过期时间: {credential.watch_expiration}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查和更新Gmail监听失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def verify_connectivity(self):
|
||
"""验证与Gmail API的连接性
|
||
|
||
测试Gmail API连接,并返回连接状态
|
||
|
||
Returns:
|
||
dict: 包含连接测试结果的信息
|
||
"""
|
||
try:
|
||
if not hasattr(self, 'gmail_service') or not self.gmail_service:
|
||
logger.warning("Gmail服务未初始化,尝试认证")
|
||
if not self.authenticate():
|
||
return {
|
||
'status': 'error',
|
||
'message': 'Gmail认证失败',
|
||
'is_connected': False
|
||
}
|
||
|
||
# 尝试获取用户的个人资料,这是一个轻量级的API调用
|
||
profile = self.gmail_service.users().getProfile(userId='me').execute()
|
||
|
||
# 检查是否有必要的监听信息
|
||
from .models import GmailCredential
|
||
credential = GmailCredential.objects.filter(
|
||
user=self.user,
|
||
is_active=True,
|
||
gmail_credential_id=self.gmail_credential_id if self.gmail_credential_id else None
|
||
).first()
|
||
|
||
watch_info = {}
|
||
if credential:
|
||
watch_info = {
|
||
'has_watch': credential.watch_expiration is not None,
|
||
'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None,
|
||
'last_history_id': credential.last_history_id
|
||
}
|
||
|
||
return {
|
||
'status': 'success',
|
||
'message': 'Gmail连接正常',
|
||
'is_connected': True,
|
||
'profile': {
|
||
'email': profile.get('emailAddress'),
|
||
'messages_total': profile.get('messagesTotal'),
|
||
'threads_total': profile.get('threadsTotal')
|
||
},
|
||
'watch_info': watch_info
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gmail连接测试失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
error_message = str(e)
|
||
suggestions = []
|
||
|
||
if "invalid_grant" in error_message.lower():
|
||
suggestions.append("OAuth令牌已过期,需要重新授权")
|
||
elif "401" in error_message:
|
||
suggestions.append("认证失败,请重新登录Gmail账号")
|
||
elif "EOF occurred in violation of protocol" in error_message:
|
||
suggestions.append("SSL连接问题,可能是网络或代理配置问题")
|
||
elif "connect timeout" in error_message.lower() or "connection timeout" in error_message.lower():
|
||
suggestions.append("连接超时,请检查网络连接和代理设置")
|
||
|
||
return {
|
||
'status': 'error',
|
||
'message': f'Gmail连接失败: {error_message}',
|
||
'is_connected': False,
|
||
'suggestions': suggestions
|
||
}
|
||
|
||
@classmethod
|
||
def batch_renew_watches(cls):
|
||
"""
|
||
批量检查和更新所有用户的Gmail监听状态
|
||
|
||
此方法可以由定时任务调用,确保所有用户的Gmail监听都保持活跃状态
|
||
|
||
Returns:
|
||
dict: 包含批处理结果的信息
|
||
"""
|
||
try:
|
||
from .models import GmailCredential, User
|
||
import time
|
||
|
||
# 获取所有活跃的Gmail凭证
|
||
now = timezone.now() if hasattr(settings, 'USE_TZ') and settings.USE_TZ else datetime.now()
|
||
|
||
# 计算24小时后的时间
|
||
expiration_threshold = now + timedelta(hours=24)
|
||
|
||
# 获取即将过期或没有监听的凭证
|
||
credentials_to_update = GmailCredential.objects.filter(
|
||
is_active=True
|
||
).filter(
|
||
Q(watch_expiration__lt=expiration_threshold) |
|
||
Q(watch_expiration__isnull=True)
|
||
)
|
||
|
||
logger.info(f"找到 {credentials_to_update.count()} 个需要更新监听的Gmail凭证")
|
||
|
||
success_count = 0
|
||
failure_count = 0
|
||
|
||
# 依次处理每个凭证
|
||
for credential in credentials_to_update:
|
||
try:
|
||
user = credential.user
|
||
if not user or not user.is_active:
|
||
logger.warning(f"跳过已停用用户的凭证: {credential.id}")
|
||
continue
|
||
|
||
logger.info(f"处理用户 {user.email} 的Gmail凭证 {credential.gmail_email}")
|
||
|
||
# 创建Gmail集成实例
|
||
gmail = cls(user, gmail_credential_id=credential.id)
|
||
|
||
# 认证
|
||
if not gmail.authenticate():
|
||
logger.error(f"用户 {user.email} 的Gmail认证失败")
|
||
failure_count += 1
|
||
continue
|
||
|
||
# 更新监听
|
||
result = gmail.setup_watch()
|
||
if result and 'historyId' in result:
|
||
logger.info(f"成功更新用户 {user.email} 的Gmail监听")
|
||
success_count += 1
|
||
else:
|
||
logger.error(f"更新用户 {user.email} 的Gmail监听失败")
|
||
failure_count += 1
|
||
|
||
# 休眠一小段时间,避免请求过于频繁
|
||
time.sleep(1)
|
||
|
||
except Exception as e:
|
||
failure_count += 1
|
||
logger.error(f"处理凭证 {credential.id} 时出错: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
return {
|
||
'status': 'success',
|
||
'message': f'批量更新Gmail监听完成',
|
||
'total_processed': credentials_to_update.count(),
|
||
'success_count': success_count,
|
||
'failure_count': failure_count
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量更新Gmail监听失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': f'批量更新Gmail监听失败: {str(e)}'
|
||
}
|
||
|
||
def refresh_token(self):
|
||
"""
|
||
尝试刷新OAuth令牌,如果失败则标记凭证需要重新授权
|
||
|
||
Returns:
|
||
bool: 刷新是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"尝试刷新用户 {self.user.email} 的OAuth令牌")
|
||
|
||
# 清除当前服务实例
|
||
GmailServiceManager.clear_instance(self.user, self.gmail_credential_id)
|
||
|
||
# 尝试重新认证
|
||
if not self.authenticate():
|
||
logger.error("重新认证失败")
|
||
self._mark_credential_needs_reauth()
|
||
return False
|
||
|
||
# 验证新的令牌是否有效
|
||
try:
|
||
# 设置超时
|
||
original_timeout = socket.getdefaulttimeout()
|
||
socket.setdefaulttimeout(GMAIL_REQUEST_TIMEOUT)
|
||
|
||
try:
|
||
# 尝试一个简单的API调用来验证
|
||
profile = self.gmail_service.users().getProfile(userId='me').execute()
|
||
logger.info(f"令牌刷新成功,已验证: {profile.get('emailAddress')}")
|
||
return True
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if "invalid_grant" in error_msg.lower() or "401" in error_msg:
|
||
logger.error(f"令牌仍然无效: {error_msg}")
|
||
self._mark_credential_needs_reauth()
|
||
return False
|
||
else:
|
||
# 其他错误,但令牌可能是有效的
|
||
logger.warning(f"令牌可能有效,但API调用失败: {error_msg}")
|
||
return True
|
||
finally:
|
||
# 恢复原始超时设置
|
||
socket.setdefaulttimeout(original_timeout)
|
||
except Exception as e:
|
||
logger.error(f"验证新令牌时出错: {str(e)}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"刷新令牌失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
self._mark_credential_needs_reauth()
|
||
return False
|
||
|
||
def _mark_credential_needs_reauth(self):
|
||
"""标记凭证需要重新授权"""
|
||
try:
|
||
from .models import GmailCredential
|
||
|
||
# 查找当前凭证
|
||
credential = GmailCredential.objects.filter(
|
||
user=self.user,
|
||
id=self.gmail_credential_id if self.gmail_credential_id else None,
|
||
is_active=True
|
||
).first()
|
||
|
||
if credential:
|
||
# 标记需要重新授权
|
||
credential.needs_reauth = True
|
||
credential.save()
|
||
|
||
logger.info(f"已标记Gmail凭证 {credential.id} 需要重新授权")
|
||
|
||
# 从缓存中移除服务实例
|
||
GmailServiceManager.clear_instance(self.user, self.gmail_credential_id)
|
||
except Exception as e:
|
||
logger.error(f"标记凭证需要重新授权失败: {str(e)}")
|
||
|
||
@classmethod
|
||
def process_queued_notifications(cls, user=None):
|
||
"""
|
||
处理队列中的通知,可以在用户重新授权后调用
|
||
|
||
Args:
|
||
user: 可选的用户参数,如果提供,只处理该用户的通知
|
||
|
||
Returns:
|
||
dict: 处理结果统计
|
||
"""
|
||
try:
|
||
from .models import GmailNotificationQueue, GmailCredential
|
||
import json
|
||
|
||
# 构建查询
|
||
query = Q(processed=False)
|
||
if user:
|
||
query &= Q(user=user)
|
||
|
||
# 获取未处理的通知
|
||
queued_notifications = GmailNotificationQueue.objects.filter(query)
|
||
|
||
logger.info(f"找到 {queued_notifications.count()} 条未处理的Gmail通知")
|
||
|
||
processed_count = 0
|
||
success_count = 0
|
||
|
||
# 按用户ID和Gmail凭证ID分组处理
|
||
from django.db.models import Count
|
||
user_creds = queued_notifications.values('user_id', 'gmail_credential_id').annotate(
|
||
count=Count('id')
|
||
).order_by('user_id', 'gmail_credential_id')
|
||
|
||
for user_cred in user_creds:
|
||
user_id = user_cred['user_id']
|
||
cred_id = user_cred['gmail_credential_id']
|
||
|
||
# 获取凭证信息
|
||
credential = GmailCredential.objects.filter(id=cred_id, is_active=True).first()
|
||
if not credential or credential.needs_reauth:
|
||
logger.warning(f"凭证 {cred_id} 需要重新授权,跳过相关通知")
|
||
continue
|
||
|
||
# 获取用户对象
|
||
from .models import User
|
||
user_obj = User.objects.get(id=user_id)
|
||
|
||
# 初始化Gmail集成
|
||
gmail_integration = cls(user_obj, gmail_credential_id=cred_id)
|
||
|
||
# 获取用户的队列通知
|
||
user_notifications = queued_notifications.filter(
|
||
user_id=user_id,
|
||
gmail_credential_id=cred_id
|
||
).order_by('created_at') # 按创建时间排序
|
||
|
||
for notification in user_notifications:
|
||
try:
|
||
# 解析通知数据
|
||
try:
|
||
notification_data = json.loads(notification.notification_data)
|
||
except:
|
||
notification_data = {
|
||
'emailAddress': notification.email,
|
||
'historyId': notification.history_id
|
||
}
|
||
|
||
# 处理通知
|
||
result = gmail_integration.process_notification(notification_data)
|
||
|
||
# 更新处理状态
|
||
notification.processed = True
|
||
notification.success = result
|
||
notification.processed_at = timezone.now()
|
||
notification.save()
|
||
|
||
processed_count += 1
|
||
if result:
|
||
success_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理队列通知 {notification.id} 失败: {str(e)}")
|
||
|
||
# 标记处理失败
|
||
notification.processed = True
|
||
notification.success = False
|
||
notification.error_message = str(e)[:255] # 截断错误消息
|
||
notification.processed_at = timezone.now()
|
||
notification.save()
|
||
|
||
processed_count += 1
|
||
|
||
return {
|
||
'status': 'success',
|
||
'total_processed': processed_count,
|
||
'success_count': success_count,
|
||
'remaining': queued_notifications.count() - processed_count
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理队列通知失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': str(e)
|
||
}
|
||
|
||
def _load_credentials_from_storage(self, credential_data):
|
||
"""从存储中加载凭证,处理可能的格式不匹配问题"""
|
||
logger.info("尝试加载凭证数据")
|
||
|
||
if not credential_data:
|
||
logger.error("凭证数据为空")
|
||
return None
|
||
|
||
# 方法1: 尝试直接JSON解析
|
||
try:
|
||
import json
|
||
if isinstance(credential_data, str):
|
||
cred_json = credential_data
|
||
else:
|
||
cred_json = credential_data.decode('utf-8')
|
||
|
||
# 验证是否为有效JSON
|
||
json.loads(cred_json)
|
||
|
||
# 使用OAuth2Credentials从JSON创建凭证
|
||
logger.info("成功从JSON创建凭证")
|
||
return client.OAuth2Credentials.from_json(cred_json)
|
||
except Exception as json_error:
|
||
logger.error(f"JSON解析凭证失败: {str(json_error)}")
|
||
|
||
# 方法2: 尝试pickle解析
|
||
try:
|
||
if isinstance(credential_data, str):
|
||
# 如果是字符串,尝试编码为二进制
|
||
pickle_data = credential_data.encode('latin1')
|
||
else:
|
||
pickle_data = credential_data
|
||
|
||
logger.info("尝试使用pickle解析凭证")
|
||
return pickle.loads(pickle_data)
|
||
except Exception as pickle_error:
|
||
logger.error(f"Pickle解析凭证失败: {str(pickle_error)}")
|
||
|
||
# 所有方法都失败
|
||
logger.error("所有凭证解析方法都失败")
|
||
return None
|