2810 lines
131 KiB
Python
2810 lines
131 KiB
Python
import os
|
||
import json
|
||
import uuid
|
||
import base64
|
||
import pickle
|
||
import logging
|
||
import traceback # 确保导入traceback模块
|
||
from pathlib import Path
|
||
import dateutil.parser as parser
|
||
from datetime import datetime
|
||
from bs4 import BeautifulSoup
|
||
from django.utils import timezone
|
||
from django.conf import settings # 添加Django设置导入
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.base import MIMEBase
|
||
from email import encoders
|
||
import warnings
|
||
import mimetypes
|
||
import requests
|
||
import django.db.utils
|
||
|
||
# 忽略oauth2client相关的所有警告
|
||
warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth')
|
||
warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client')
|
||
warnings.filterwarnings('ignore', message='.*google-auth.*')
|
||
warnings.filterwarnings('ignore', message='.*oauth2client.*')
|
||
|
||
from apiclient import discovery
|
||
from httplib2 import Http
|
||
from oauth2client import file, client, tools
|
||
|
||
from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment, UserProfile
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Gmail服务单例管理器
|
||
class GmailServiceManager:
|
||
_instances = {} # 以用户ID为键存储Gmail服务实例
|
||
|
||
@classmethod
|
||
def get_instance(cls, user):
|
||
"""获取用户的Gmail服务实例,如果不存在则创建"""
|
||
user_id = str(user.id)
|
||
|
||
if user_id not in cls._instances:
|
||
try:
|
||
# 从数据库获取认证信息
|
||
credential = GmailCredential.objects.filter(user=user, is_active=True).first()
|
||
|
||
if credential and credential.credentials:
|
||
# 反序列化凭证
|
||
creds = pickle.loads(credential.credentials)
|
||
|
||
# 检查凭证是否有效
|
||
if not creds.invalid:
|
||
# 初始化服务
|
||
gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
|
||
|
||
# 存储实例
|
||
cls._instances[user_id] = {
|
||
'service': gmail_service,
|
||
'credentials': creds,
|
||
'timestamp': timezone.now(),
|
||
'user': user
|
||
}
|
||
|
||
logger.info(f"创建用户 {user.username} 的Gmail服务单例")
|
||
return cls._instances[user_id]
|
||
except Exception as e:
|
||
logger.error(f"创建Gmail服务单例失败: {e}")
|
||
else:
|
||
# 检查实例是否过期(超过30分钟)
|
||
instance = cls._instances[user_id]
|
||
time_diff = timezone.now() - instance['timestamp']
|
||
|
||
if time_diff.total_seconds() > 1800: # 30分钟过期
|
||
del cls._instances[user_id]
|
||
return cls.get_instance(user) # 递归调用,重新创建
|
||
|
||
# 更新时间戳
|
||
cls._instances[user_id]['timestamp'] = timezone.now()
|
||
logger.info(f"复用用户 {user.username} 的Gmail服务单例")
|
||
return cls._instances[user_id]
|
||
|
||
return None
|
||
|
||
@classmethod
|
||
def update_instance(cls, user, credentials, service):
|
||
"""更新用户的Gmail服务实例"""
|
||
user_id = str(user.id)
|
||
cls._instances[user_id] = {
|
||
'service': service,
|
||
'credentials': credentials,
|
||
'timestamp': timezone.now(),
|
||
'user': user
|
||
}
|
||
|
||
@classmethod
|
||
def clear_instance(cls, user):
|
||
"""清除用户的Gmail服务实例"""
|
||
user_id = str(user.id)
|
||
if user_id in cls._instances:
|
||
del cls._instances[user_id]
|
||
|
||
class GmailIntegration:
|
||
"""Gmail集成类"""
|
||
|
||
# Gmail API 权限范围
|
||
SCOPES = ['https://mail.google.com/']
|
||
|
||
def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890'):
|
||
self.user = user
|
||
self.user_email = user.email if user else None
|
||
self.email = email # 目标邮箱
|
||
self.client_secret = client_secret_json
|
||
self.credentials = None
|
||
self.gmail_service = None
|
||
self.use_proxy = use_proxy
|
||
self.proxy_url = proxy_url
|
||
|
||
# 设置代理
|
||
if self.use_proxy:
|
||
logger.info(f"设置Gmail API代理: {proxy_url}")
|
||
os.environ['HTTP_PROXY'] = proxy_url
|
||
os.environ['HTTPS_PROXY'] = proxy_url
|
||
|
||
# 设置Token文件存储路径
|
||
if user:
|
||
# 使用用户ID创建唯一的token存储路径
|
||
token_file = f"gmail_token_{user.id}.json"
|
||
self.token_storage_path = os.path.join("gmail_tokens", token_file)
|
||
|
||
# 确保目录存在
|
||
os.makedirs("gmail_tokens", exist_ok=True)
|
||
logger.info(f"设置Token存储路径: {self.token_storage_path}")
|
||
|
||
# 尝试从数据库加载凭证
|
||
try:
|
||
gmail_cred = GmailCredential.objects.filter(user=user, is_active=True).first()
|
||
if gmail_cred and gmail_cred.credentials:
|
||
logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证")
|
||
self.credentials = pickle.loads(gmail_cred.credentials)
|
||
|
||
# 初始化Gmail服务
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http()))
|
||
logger.info("从数据库凭证初始化Gmail服务成功")
|
||
except Exception as e:
|
||
logger.error(f"从数据库加载凭证失败: {str(e)}")
|
||
# 继续使用文件方式
|
||
else:
|
||
self.token_storage_path = "gmail_token.json"
|
||
logger.warning("未提供用户,将使用默认Token存储路径")
|
||
|
||
# 存储对象
|
||
self.storage = file.Storage(self.token_storage_path)
|
||
|
||
def authenticate(self):
|
||
"""获取Gmail API凭证并认证"""
|
||
try:
|
||
# 优先尝试使用单例服务
|
||
if self.user:
|
||
instance = GmailServiceManager.get_instance(self.user)
|
||
if instance:
|
||
self.gmail_service = instance['service']
|
||
self.credentials = instance['credentials']
|
||
logger.info("使用现有的Gmail服务单例")
|
||
return True
|
||
|
||
# 以下是原有的认证逻辑...
|
||
# 写入client_secret.json
|
||
if self.client_secret:
|
||
client_secret_path = 'client_secret.json'
|
||
with open(client_secret_path, 'w') as f:
|
||
if isinstance(self.client_secret, str):
|
||
try:
|
||
# 确保是有效的JSON
|
||
json_data = json.loads(self.client_secret)
|
||
# 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题
|
||
for key in ['web', 'installed']:
|
||
if key in json_data and 'redirect_uris' in json_data[key]:
|
||
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(json_data, f)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"client_secret不是有效的JSON: {str(e)}")
|
||
return False
|
||
else:
|
||
# 如果是字典,也进行相同处理
|
||
client_secret_dict = dict(self.client_secret)
|
||
for key in ['web', 'installed']:
|
||
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
|
||
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(client_secret_dict, f)
|
||
|
||
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
|
||
|
||
# 使用与quickstart.py相同的流程
|
||
logger.info(f"创建或读取token存储: {self.token_storage_path}")
|
||
|
||
# 确保token目录存在
|
||
token_dir = os.path.dirname(self.token_storage_path)
|
||
if token_dir and not os.path.exists(token_dir):
|
||
logger.info(f"创建token目录: {token_dir}")
|
||
os.makedirs(token_dir)
|
||
|
||
store = file.Storage(self.token_storage_path)
|
||
creds = store.get()
|
||
|
||
if not creds or creds.invalid:
|
||
logger.info("没有有效的凭证,需要重新授权")
|
||
if not self.client_secret:
|
||
logger.error("没有提供client_secret_json且找不到有效凭证")
|
||
return False
|
||
|
||
# 强制使用非浏览器认证模式,避免localhost连接问题
|
||
logger.info("使用非浏览器认证模式")
|
||
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
|
||
flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES)
|
||
flow.redirect_uri = redirect_uri
|
||
|
||
# 获取授权URL并抛出异常
|
||
auth_url = flow.step1_get_authorize_url()
|
||
logger.info(f"获取授权URL: {auth_url[:50]}...")
|
||
raise Exception(f"Please visit this URL to authorize: {auth_url}")
|
||
|
||
# 如果有有效凭证,初始化服务
|
||
self.credentials = creds
|
||
logger.info("使用现有凭证初始化Gmail服务")
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
|
||
logger.info("Gmail服务初始化成功")
|
||
|
||
# 获取Gmail账号信息
|
||
gmail_email = None
|
||
try:
|
||
# 调用Gmail API获取用户资料
|
||
profile = self.gmail_service.users().getProfile(userId='me').execute()
|
||
gmail_email = profile.get('emailAddress')
|
||
logger.info(f"获取到Gmail账号: {gmail_email}")
|
||
except Exception as profile_error:
|
||
logger.error(f"获取Gmail账号失败: {str(profile_error)}")
|
||
|
||
# 即使获取Gmail账号失败,也要尝试获取消息以提取邮箱
|
||
if not gmail_email:
|
||
try:
|
||
# 尝试获取一条消息来提取邮箱
|
||
messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute()
|
||
if 'messages' in messages and len(messages['messages']) > 0:
|
||
msg_id = messages['messages'][0]['id']
|
||
msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute()
|
||
# 从消息中查找与当前用户匹配的邮箱
|
||
if 'payload' in msg and 'headers' in msg['payload']:
|
||
for header in msg['payload']['headers']:
|
||
if header['name'] in ['From', 'To', 'Cc', 'Bcc']:
|
||
if '<' in header['value'] and '>' in header['value']:
|
||
email = header['value'].split('<')[1].split('>')[0]
|
||
# 如果找到一个邮箱就使用它
|
||
if not gmail_email:
|
||
gmail_email = email
|
||
logger.info(f"从消息中提取到Gmail账号: {gmail_email}")
|
||
except Exception as msg_error:
|
||
logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}")
|
||
|
||
# 保存凭证到数据库
|
||
if self.user:
|
||
from django.utils import timezone
|
||
logger.info("保存凭证到数据库")
|
||
|
||
# 将凭证对象序列化
|
||
credentials_data = pickle.dumps(creds)
|
||
|
||
# 更新或创建凭证记录,添加gmail_email字段
|
||
gmail_credential, created = GmailCredential.objects.update_or_create(
|
||
user=self.user,
|
||
defaults={
|
||
'credentials': credentials_data,
|
||
'token_path': self.token_storage_path,
|
||
'gmail_email': gmail_email, # 添加实际Gmail账号
|
||
'updated_at': timezone.now(),
|
||
'is_active': True
|
||
}
|
||
)
|
||
action = "创建" if created else "更新"
|
||
logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录,实际Gmail账号: {gmail_email}")
|
||
|
||
# 认证成功后更新单例
|
||
if self.user and self.gmail_service and self.credentials:
|
||
GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
# 保留授权URL异常,视图层会处理
|
||
if "Please visit this URL" in str(e):
|
||
raise e
|
||
logger.error(f"Gmail认证失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
finally:
|
||
# 清理client_secret.json文件
|
||
if self.client_secret and os.path.exists('client_secret.json'):
|
||
logger.info("删除临时client_secret文件")
|
||
os.unlink('client_secret.json')
|
||
|
||
def create_talent_knowledge_base(self, talent_email):
|
||
"""
|
||
创建或获取与talent_email关联的知识库
|
||
|
||
Args:
|
||
talent_email (str): 达人Gmail邮箱地址
|
||
|
||
Returns:
|
||
tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志
|
||
"""
|
||
try:
|
||
# 优先查找现有的关联关系
|
||
mapping = GmailTalentMapping.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
is_active=True
|
||
).first()
|
||
|
||
if mapping and mapping.knowledge_base:
|
||
logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}")
|
||
return mapping.knowledge_base, False
|
||
|
||
# 查找与该达人邮箱关联的知识库
|
||
# 根据达人邮箱生成一个唯一的标识名称
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
|
||
# 检查该名称的知识库是否已存在
|
||
existing_kb = KnowledgeBase.objects.filter(
|
||
name=kb_name,
|
||
user_id=self.user.id
|
||
).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"找到现有知识库: {kb_name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
|
||
# 没有找到现有知识库,直接使用基本方法创建
|
||
logger.info(f"使用基本方法创建知识库: {kb_name}")
|
||
return self._create_knowledge_base_basic(talent_email)
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 创建失败时,尝试使用基本方法创建
|
||
logger.info("尝试使用基本方法创建知识库")
|
||
return self._create_knowledge_base_basic(talent_email)
|
||
|
||
def _create_knowledge_base_basic(self, talent_email):
|
||
"""创建基础知识库,不处理映射关系"""
|
||
try:
|
||
# 根据达人邮箱生成一个唯一的标识名称
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
|
||
# 检查该名称的知识库是否已存在
|
||
existing_kb = KnowledgeBase.objects.filter(
|
||
name=kb_name,
|
||
user_id=self.user.id
|
||
).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"找到现有知识库: {kb_name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
|
||
# 创建新知识库
|
||
try:
|
||
knowledge_base = KnowledgeBase.objects.create(
|
||
name=kb_name,
|
||
desc=f"与{talent_email}的Gmail邮件交流记录",
|
||
type="private",
|
||
user_id=self.user.id,
|
||
documents=[]
|
||
)
|
||
except django.db.utils.IntegrityError as e:
|
||
# 处理名称重复的情况
|
||
logger.warning(f"知识库名称'{kb_name}'已存在,尝试获取或创建带随机后缀的名称")
|
||
|
||
# 先尝试查找已存在的知识库(不限制用户)
|
||
existing_kb = KnowledgeBase.objects.filter(name=kb_name).first()
|
||
|
||
if existing_kb and str(existing_kb.user_id) == str(self.user.id):
|
||
# 如果存在且属于当前用户,直接使用
|
||
logger.info(f"找到属于当前用户的知识库: {kb_name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
else:
|
||
# 如果不存在或不属于当前用户,创建带随机后缀的新知识库
|
||
import random
|
||
import string
|
||
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5))
|
||
new_kb_name = f"{kb_name}-{suffix}"
|
||
|
||
logger.info(f"创建带随机后缀的知识库: {new_kb_name}")
|
||
knowledge_base = KnowledgeBase.objects.create(
|
||
name=new_kb_name,
|
||
desc=f"与{talent_email}的Gmail邮件交流记录",
|
||
type="private",
|
||
user_id=self.user.id,
|
||
documents=[]
|
||
)
|
||
|
||
# 创建外部知识库
|
||
try:
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
external_id = kb_viewset._create_external_dataset(knowledge_base)
|
||
if external_id:
|
||
knowledge_base.external_id = external_id
|
||
knowledge_base.save()
|
||
logger.info(f"成功创建外部知识库: {external_id}")
|
||
except Exception as e:
|
||
logger.error(f"创建外部知识库失败: {str(e)}")
|
||
# 继续执行,不影响基本功能
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
knowledge_base=knowledge_base,
|
||
is_active=True
|
||
)
|
||
|
||
logger.info(f"成功创建新知识库: {knowledge_base.name}, ID: {knowledge_base.id}")
|
||
return knowledge_base, True
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 尝试直接获取已存在的知识库作为最后手段
|
||
try:
|
||
kb_name = f"Gmail-{talent_email.split('@')[0]}"
|
||
existing_kb = KnowledgeBase.objects.filter(name__startswith=kb_name).first()
|
||
|
||
if existing_kb:
|
||
logger.info(f"在错误处理中找到可用的知识库: {existing_kb.name}")
|
||
|
||
# 创建映射关系
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
defaults={
|
||
'knowledge_base': existing_kb,
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
return existing_kb, False
|
||
except Exception as inner_e:
|
||
logger.error(f"错误处理中尝试获取知识库失败: {str(inner_e)}")
|
||
|
||
# 如果所有尝试都失败,抛出异常
|
||
raise
|
||
|
||
def get_conversations(self, talent_gmail):
|
||
"""获取与特定用户的所有邮件对话"""
|
||
try:
|
||
if not self.gmail_service:
|
||
logger.error("Gmail服务未初始化")
|
||
return []
|
||
|
||
# 使用crushwds@gmail.com作为固定邮箱
|
||
email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱
|
||
email2 = talent_gmail # 使用参数传入的目标邮箱
|
||
|
||
logger.info(f"执行Gmail查询: {email1} 与 {email2}")
|
||
|
||
# 构建搜索查询
|
||
query = f"from:({email1} OR {email2}) to:({email1} OR {email2})"
|
||
|
||
# 获取所有匹配的邮件
|
||
response = self.gmail_service.users().messages().list(userId='me', q=query).execute()
|
||
messages = []
|
||
|
||
if 'messages' in response:
|
||
messages.extend(response['messages'])
|
||
message_count = len(messages)
|
||
logger.info(f"找到 {message_count} 封邮件")
|
||
|
||
# 分页获取所有邮件
|
||
while 'nextPageToken' in response:
|
||
page_token = response['nextPageToken']
|
||
response = self.gmail_service.users().messages().list(
|
||
userId='me',
|
||
q=query,
|
||
pageToken=page_token
|
||
).execute()
|
||
if 'messages' in response:
|
||
new_messages = response['messages']
|
||
messages.extend(new_messages)
|
||
new_count = len(new_messages)
|
||
message_count += new_count
|
||
logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}")
|
||
else:
|
||
logger.warning(f"未找到匹配邮件")
|
||
|
||
# 处理每封邮件
|
||
conversations = []
|
||
if messages:
|
||
logger.info(f"开始获取 {len(messages)} 封邮件详情...")
|
||
|
||
for i, msg in enumerate(messages):
|
||
try:
|
||
message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute()
|
||
email_data = self._extract_email_content(message)
|
||
if email_data:
|
||
conversations.append(email_data)
|
||
if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度
|
||
logger.info(f"已处理 {i+1}/{len(messages)} 封邮件")
|
||
else:
|
||
logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败")
|
||
except Exception as e:
|
||
logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}")
|
||
|
||
# 按时间排序
|
||
conversations.sort(key=lambda x: x['date'])
|
||
|
||
logger.info(f"总共找到并解析了 {len(conversations)} 封邮件")
|
||
return conversations
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Gmail对话失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return []
|
||
|
||
def _extract_email_content(self, message):
|
||
"""提取邮件内容,完全按照quickstart.py的get_email_content函数实现"""
|
||
try:
|
||
message_id = message['id'] # 获取邮件ID
|
||
payload = message['payload']
|
||
headers = payload['headers']
|
||
|
||
# 获取邮件基本信息
|
||
email_data = {
|
||
'id': message_id, # 保存邮件ID
|
||
'subject': '',
|
||
'from': '',
|
||
'date': '',
|
||
'body': '',
|
||
'attachments': [] # 新增附件列表
|
||
}
|
||
|
||
# 提取头部信息
|
||
for header in headers:
|
||
if header['name'] == 'Subject':
|
||
email_data['subject'] = header['value']
|
||
elif header['name'] == 'From':
|
||
email_data['from'] = header['value']
|
||
elif header['name'] == 'Date':
|
||
date_value = header['value']
|
||
logger.info(f"原始邮件日期字符串: '{date_value}'")
|
||
|
||
try:
|
||
# 打印原始日期字符串信息
|
||
import dateutil.parser as date_parser
|
||
import pytz
|
||
from django.utils import timezone
|
||
|
||
# 先尝试解析日期
|
||
date = date_parser.parse(date_value)
|
||
logger.info(f"解析后的日期对象: {date}, 是否有时区: {date.tzinfo is not None}")
|
||
|
||
# 处理时区问题
|
||
if date.tzinfo is not None:
|
||
# 已有时区的日期,转换为系统时区
|
||
if hasattr(settings, 'TIME_ZONE'):
|
||
system_tz = pytz.timezone(settings.TIME_ZONE)
|
||
date = date.astimezone(system_tz)
|
||
logger.info(f"转换到系统时区后: {date}")
|
||
|
||
# 如果需要naive datetime,删除时区信息
|
||
if hasattr(settings, 'USE_TZ') and not settings.USE_TZ:
|
||
date = date.replace(tzinfo=None)
|
||
logger.info(f"移除时区信息后: {date}")
|
||
else:
|
||
# 无时区的日期,如果系统使用时区,添加时区信息
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
|
||
try:
|
||
date = timezone.make_aware(date)
|
||
logger.info(f"添加时区信息后: {date}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"添加时区信息失败: {str(tz_error)}")
|
||
|
||
# 格式化为字符串
|
||
email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"最终日期字符串: {email_data['date']}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析日期失败: {str(e)}, 原始值: '{date_value}'")
|
||
# 保留原始值
|
||
email_data['date'] = date_value
|
||
|
||
# 定义一个递归函数来处理所有部分和附件
|
||
def process_parts(parts):
|
||
for part in parts:
|
||
# 检查是否是附件
|
||
if 'filename' in part and part['filename']:
|
||
attachment = {
|
||
'filename': part['filename'],
|
||
'mimeType': part['mimeType'],
|
||
'size': part['body'].get('size', 0)
|
||
}
|
||
|
||
# 如果有附件内容数据,可以获取附件ID
|
||
if 'attachmentId' in part['body']:
|
||
attachment['attachmentId'] = part['body']['attachmentId']
|
||
|
||
email_data['attachments'].append(attachment)
|
||
|
||
# 处理文本内容
|
||
if part['mimeType'] == 'text/plain' and not email_data['body']:
|
||
data = part['body'].get('data', '')
|
||
if data:
|
||
try:
|
||
text = base64.urlsafe_b64decode(data).decode('utf-8')
|
||
email_data['body'] = text
|
||
except Exception as e:
|
||
logger.error(f"解码邮件内容失败: {str(e)}")
|
||
|
||
# 递归处理多部分内容
|
||
if 'parts' in part:
|
||
process_parts(part['parts'])
|
||
|
||
# 处理邮件正文和附件
|
||
if 'parts' in payload:
|
||
process_parts(payload['parts'])
|
||
elif 'body' in payload and 'data' in payload['body']:
|
||
# 没有parts,直接处理body
|
||
data = payload['body'].get('data', '')
|
||
if data:
|
||
try:
|
||
text = base64.urlsafe_b64decode(data).decode('utf-8')
|
||
email_data['body'] = text
|
||
except Exception as e:
|
||
logger.error(f"解码邮件内容失败: {str(e)}")
|
||
|
||
return email_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮件内容时出错: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def download_attachment(self, message_id, attachment_id, filename):
|
||
"""下载邮件附件"""
|
||
try:
|
||
attachment = self.gmail_service.users().messages().attachments().get(
|
||
userId='me', messageId=message_id, id=attachment_id
|
||
).execute()
|
||
|
||
data = attachment['data']
|
||
file_data = base64.urlsafe_b64decode(data)
|
||
|
||
# 创建附件目录
|
||
attachments_dir = 'gmail_attachments'
|
||
if not os.path.exists(attachments_dir):
|
||
os.makedirs(attachments_dir)
|
||
|
||
# 保存附件
|
||
filepath = os.path.join(attachments_dir, f"{message_id}_{filename}")
|
||
with open(filepath, 'wb') as f:
|
||
f.write(file_data)
|
||
|
||
return filepath
|
||
except Exception as e:
|
||
logger.error(f"下载附件失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def save_conversations_to_knowledge_base(self, conversations, knowledge_base):
|
||
"""
|
||
将Gmail对话保存到知识库
|
||
|
||
Args:
|
||
conversations: Gmail邮件列表
|
||
knowledge_base: 保存到的知识库对象
|
||
"""
|
||
try:
|
||
# 导入所需模型
|
||
from .models import GmailAttachment, ChatHistory
|
||
|
||
# 检查入参
|
||
if not conversations or not knowledge_base:
|
||
logger.error("参数不完整: conversations或knowledge_base为空")
|
||
return False
|
||
|
||
if not conversations:
|
||
logger.warning("没有邮件对话可保存")
|
||
return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"}
|
||
|
||
# 查找现有的对话ID - 优先使用talent_email查找
|
||
conversation_id = None
|
||
|
||
# 1. 首先尝试通过talent_email查找现有映射
|
||
if self.email:
|
||
existing_mapping = GmailTalentMapping.objects.filter(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
knowledge_base=knowledge_base,
|
||
is_active=True
|
||
).first()
|
||
|
||
if existing_mapping and existing_mapping.conversation_id:
|
||
conversation_id = existing_mapping.conversation_id
|
||
logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}")
|
||
|
||
# 2. 如果上面没找到,尝试通过知识库查找任何相关的对话
|
||
if not conversation_id:
|
||
existing_conversation = ChatHistory.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
user=self.user,
|
||
is_deleted=False
|
||
).values('conversation_id').distinct().first()
|
||
|
||
if existing_conversation:
|
||
conversation_id = existing_conversation['conversation_id']
|
||
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
|
||
|
||
# 如果有talent_email,更新或创建映射关系
|
||
if self.email:
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': conversation_id,
|
||
'is_active': True
|
||
}
|
||
)
|
||
logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}")
|
||
|
||
# 3. 如果仍然没找到,则创建新的对话ID
|
||
if not conversation_id:
|
||
# 生成唯一的对话ID
|
||
conversation_id = str(uuid.uuid4())
|
||
logger.info(f"创建新的对话ID: {conversation_id}")
|
||
|
||
# 创建或更新Gmail和达人的映射关系
|
||
if self.email:
|
||
talent_mapping, created = GmailTalentMapping.objects.update_or_create(
|
||
user=self.user,
|
||
talent_email=self.email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': conversation_id,
|
||
'is_active': True
|
||
}
|
||
)
|
||
action = "创建" if created else "更新"
|
||
logger.info(f"已{action}Gmail达人映射: {self.email} -> {knowledge_base.name}")
|
||
|
||
# 构建知识库文档
|
||
logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话")
|
||
document = {
|
||
"title": f"与{self.email or '联系人'}的Gmail邮件对话",
|
||
"url": "",
|
||
"source_type": "gmail",
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 对话按时间顺序排序
|
||
conversations.sort(key=lambda x: x.get('date', ''))
|
||
|
||
# 将对话添加到文档
|
||
previous_message_dict = {} # 用于存储邮件时间与消息ID的映射,以便找到正确的parent_id
|
||
|
||
# 首先查找现有的聊天记录
|
||
existing_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
# 如果已有消息,跳过重复导入
|
||
existing_gmail_ids = set()
|
||
if existing_messages.exists():
|
||
for msg in existing_messages:
|
||
if msg.metadata and 'gmail_message_id' in msg.metadata:
|
||
existing_gmail_ids.add(msg.metadata['gmail_message_id'])
|
||
logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入")
|
||
|
||
for message in conversations:
|
||
try:
|
||
# 跳过已导入的消息
|
||
if message.get('id') in existing_gmail_ids:
|
||
logger.info(f"跳过已导入的消息: {message.get('id')}")
|
||
continue
|
||
|
||
# 提取邮件内容
|
||
sender = message.get('from', '未知发件人')
|
||
date_str = message.get('date', '未知时间')
|
||
subject = message.get('subject', '无主题')
|
||
body = message.get('body', '').strip()
|
||
|
||
logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...")
|
||
|
||
# 判断角色
|
||
# 从发件人地址中提取邮箱部分
|
||
sender_email = ''
|
||
if '<' in sender and '>' in sender:
|
||
# 格式如 "姓名 <email@example.com>"
|
||
sender_email = sender.split('<')[1].split('>')[0]
|
||
else:
|
||
# 格式可能直接是邮箱
|
||
sender_email = sender
|
||
|
||
# 根据邮箱判断角色:检查发件人与用户邮箱或者目标邮箱是否匹配
|
||
is_user_email = self.user.email.lower() == sender_email.lower()
|
||
|
||
# 检查是否是目标邮箱(talent_email)
|
||
is_talent_email = False
|
||
if self.email and sender_email.lower() == self.email.lower():
|
||
is_talent_email = True
|
||
|
||
# 如果是用户邮箱或目标邮箱,则为user角色,否则为assistant
|
||
role = 'user' if is_user_email or is_talent_email else 'assistant'
|
||
|
||
logger.info(f"设置消息角色: {role},发件人: {sender_email},用户邮箱: {self.user.email},目标邮箱: {self.email}")
|
||
|
||
# 将邮件添加到文档
|
||
paragraph = {
|
||
"id": f"msg_{len(document['paragraphs']) + 1}",
|
||
"title": f"{date_str} - {sender} - {subject}",
|
||
"content": body,
|
||
"meta": {
|
||
"sender": sender,
|
||
"date": date_str,
|
||
"subject": subject,
|
||
"has_attachments": len(message.get('attachments', [])) > 0,
|
||
"message_id": message.get('id', '')
|
||
}
|
||
}
|
||
document['paragraphs'].append(paragraph)
|
||
|
||
# 解析邮件日期为datetime对象
|
||
try:
|
||
# 先检查是否是时间戳格式
|
||
if isinstance(date_str, str) and date_str.isdigit():
|
||
# 如果是时间戳,直接转换
|
||
date_obj = datetime.fromtimestamp(int(date_str))
|
||
else:
|
||
# 尝试标准格式解析
|
||
try:
|
||
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
# 如果标准格式解析失败,使用dateutil更灵活的解析
|
||
import dateutil.parser as date_parser
|
||
date_obj = date_parser.parse(date_str)
|
||
# 如果解析出的日期有时区信息,转换为不带时区的日期
|
||
if date_obj.tzinfo is not None:
|
||
date_obj = date_obj.replace(tzinfo=None)
|
||
except (ValueError, TypeError) as e:
|
||
# 如果解析失败,使用当前时间
|
||
logger.warning(f"无法解析邮件日期: {date_str},错误: {str(e)},使用当前时间")
|
||
date_obj = datetime.now()
|
||
|
||
# 查找parent_id(查找日期早于当前邮件且最接近的消息)
|
||
parent_id = None
|
||
closest_date = None
|
||
|
||
for d, mid in previous_message_dict.items():
|
||
if d < date_obj and (closest_date is None or d > closest_date):
|
||
closest_date = d
|
||
parent_id = mid
|
||
|
||
# 保存消息到聊天历史,使用邮件实际日期
|
||
from django.utils import timezone
|
||
|
||
# 确保时区处理正确
|
||
try:
|
||
# 检查date_obj是否已经是aware
|
||
if timezone.is_aware(date_obj):
|
||
aware_date = date_obj
|
||
logger.info(f"日期已经包含时区信息: {aware_date}")
|
||
else:
|
||
# 将naive转换为aware
|
||
aware_date = timezone.make_aware(date_obj)
|
||
logger.info(f"日期添加时区信息后: {aware_date}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"时区转换失败: {str(tz_error)},使用当前时间")
|
||
aware_date = timezone.now()
|
||
|
||
# 创建消息记录
|
||
chat_message = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
parent_id=parent_id,
|
||
role=role,
|
||
content=f"{subject}\n\n{body}",
|
||
metadata={
|
||
'gmail_message_id': message.get('id', ''),
|
||
'from': sender,
|
||
'date': date_str,
|
||
'subject': subject,
|
||
'dataset_id_list': [str(knowledge_base.id)],
|
||
'dataset_names': [knowledge_base.name]
|
||
},
|
||
created_at=aware_date # 设置正确的创建时间
|
||
)
|
||
|
||
# 更新previous_message_dict
|
||
previous_message_dict[date_obj] = str(chat_message.id)
|
||
|
||
# 处理附件
|
||
attachments = message.get('attachments', [])
|
||
if attachments:
|
||
for attachment in attachments:
|
||
if 'attachmentId' in attachment and 'filename' in attachment:
|
||
try:
|
||
# 下载附件
|
||
filepath = self.download_attachment(
|
||
message_id=message['id'],
|
||
attachment_id=attachment['attachmentId'],
|
||
filename=attachment['filename']
|
||
)
|
||
|
||
if filepath:
|
||
# 记录附件信息
|
||
GmailAttachment.objects.create(
|
||
chat_message=chat_message,
|
||
gmail_message_id=message['id'],
|
||
filename=attachment['filename'],
|
||
filepath=filepath,
|
||
mimetype=attachment.get('mimeType', ''),
|
||
filesize=attachment.get('size', 0)
|
||
)
|
||
logger.info(f"已保存附件: {attachment['filename']}")
|
||
except Exception as e:
|
||
logger.error(f"保存附件失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮件时出错: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 将文档添加到知识库
|
||
logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}")
|
||
|
||
# 准备文档数据结构
|
||
doc_data = {
|
||
"name": f"Gmail对话与{self.email or '联系人'}.txt",
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有对话转换为段落
|
||
for paragraph in document['paragraphs']:
|
||
doc_data["paragraphs"].append({
|
||
"title": paragraph['title'],
|
||
"content": paragraph['content'],
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用知识库的文档上传API
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 上传文档
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_data["name"],
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail对话文档上传成功,ID: {document_id}")
|
||
|
||
# 上传所有附件
|
||
self._upload_attachments_to_knowledge_base(knowledge_base, conversations)
|
||
else:
|
||
# 上传失败,记录错误信息
|
||
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
|
||
logger.error(f"Gmail对话文档上传失败: {error_msg}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}")
|
||
|
||
return {
|
||
"conversation_id": conversation_id,
|
||
"success": True,
|
||
"message_count": len(conversations),
|
||
"knowledge_base_id": str(knowledge_base.id)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存Gmail对话到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return {"conversation_id": None, "success": False, "error": str(e)}
|
||
|
||
def send_email(self, to_email, subject, body, conversation_id=None, attachments=None):
|
||
"""发送邮件并保存到聊天记录"""
|
||
try:
|
||
# 构建邮件内容
|
||
if attachments and len(attachments) > 0:
|
||
message = self._create_message_with_attachment(to_email, subject, body, attachments)
|
||
else:
|
||
message = self._create_message(to_email, subject, body)
|
||
|
||
# 发送邮件
|
||
sent_message = self.gmail_service.users().messages().send(
|
||
userId='me', body=message
|
||
).execute()
|
||
|
||
message_id = sent_message['id']
|
||
logger.info(f"邮件发送成功, ID: {message_id}")
|
||
|
||
# 获取邮件详情,包括服务器时间戳
|
||
try:
|
||
message_detail = self.gmail_service.users().messages().get(
|
||
userId='me', id=message_id
|
||
).execute()
|
||
|
||
# 从消息详情中提取时间戳
|
||
internal_date = message_detail.get('internalDate')
|
||
if internal_date:
|
||
try:
|
||
# 转换毫秒时间戳为datetime
|
||
email_date = datetime.fromtimestamp(int(internal_date)/1000)
|
||
logger.info(f"从时间戳解析的日期: {email_date}")
|
||
|
||
# 处理时区
|
||
from django.utils import timezone
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ and not timezone.is_aware(email_date):
|
||
email_date = timezone.make_aware(email_date)
|
||
logger.info(f"添加时区信息后: {email_date}")
|
||
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"最终格式化日期: {date_str}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"处理邮件时间戳失败: {str(tz_error)},使用当前时间")
|
||
from django.utils import timezone
|
||
email_date = timezone.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
logger.warning("邮件没有时间戳,使用当前时间")
|
||
from django.utils import timezone
|
||
email_date = timezone.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except Exception as e:
|
||
logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间")
|
||
email_date = datetime.now()
|
||
date_str = email_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 如果有conversation_id,保存到聊天记录
|
||
if conversation_id:
|
||
# 查找现有的聊天记录
|
||
from django.db.models import Q
|
||
existing_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
if not existing_messages.exists():
|
||
logger.warning(f"找不到对话ID: {conversation_id},无法保存消息")
|
||
return message_id
|
||
|
||
# 查找关联的知识库
|
||
first_message = existing_messages.first()
|
||
if first_message.knowledge_base:
|
||
knowledge_base = first_message.knowledge_base
|
||
else:
|
||
# 如果第一条消息没有知识库,尝试从metadata获取
|
||
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
|
||
kb_id = first_message.metadata.get('dataset_id_list', [])[0]
|
||
knowledge_base = KnowledgeBase.objects.get(id=kb_id)
|
||
else:
|
||
logger.error(f"找不到关联的知识库,无法保存消息")
|
||
return message_id
|
||
|
||
# 查找parent_id:时间早于当前邮件且最接近的消息,避免时区问题
|
||
parent_id = None
|
||
try:
|
||
# 使用简单查询,不依赖时区
|
||
previous_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('-id')[:1] # 使用ID降序排序而非时间
|
||
|
||
if previous_messages:
|
||
parent_id = str(previous_messages[0].id)
|
||
except Exception as e:
|
||
logger.error(f"查找父消息失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 构建metadata
|
||
metadata = {
|
||
'gmail_message_id': message_id,
|
||
'from': self.user.email,
|
||
'to': to_email,
|
||
'date': date_str,
|
||
'subject': subject,
|
||
'dataset_id_list': [str(knowledge_base.id)],
|
||
'dataset_names': [knowledge_base.name]
|
||
}
|
||
|
||
# 如果现有消息有dataset_id_list,保留它
|
||
if first_message.metadata and 'dataset_id_list' in first_message.metadata:
|
||
metadata['dataset_id_list'] = first_message.metadata['dataset_id_list']
|
||
# 尝试获取对应的dataset_names
|
||
if 'dataset_names' in first_message.metadata:
|
||
metadata['dataset_names'] = first_message.metadata['dataset_names']
|
||
|
||
# 创建聊天记录,使用当前时间而不是邮件的实际时间
|
||
chat_message = ChatHistory.objects.create(
|
||
user=self.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
parent_id=parent_id,
|
||
role='user', # 用户发送的消息,角色固定为'user'
|
||
content=f"[{subject}] {body}",
|
||
metadata=metadata
|
||
# 不设置created_at,使用数据库默认时间
|
||
)
|
||
|
||
# 如果有附件,保存附件信息
|
||
if attachments and len(attachments) > 0:
|
||
for att_path in attachments:
|
||
file_name = os.path.basename(att_path)
|
||
file_size = os.path.getsize(att_path)
|
||
|
||
# 获取MIME类型
|
||
mime_type, _ = mimetypes.guess_type(att_path)
|
||
if not mime_type:
|
||
mime_type = 'application/octet-stream'
|
||
|
||
# 保存附件信息到数据库
|
||
gmail_attachment = GmailAttachment.objects.create(
|
||
chat_message=chat_message,
|
||
gmail_message_id=message_id,
|
||
filename=file_name,
|
||
filepath=att_path,
|
||
mimetype=mime_type,
|
||
filesize=file_size
|
||
)
|
||
|
||
# 更新知识库文档
|
||
self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email)
|
||
|
||
return message_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送邮件失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
raise
|
||
|
||
def _create_message(self, to, subject, body):
|
||
"""创建邮件消息"""
|
||
message = MIMEText(body)
|
||
message['to'] = to
|
||
message['subject'] = subject
|
||
|
||
# 编码为JSON安全的字符串
|
||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||
return {'raw': raw}
|
||
|
||
def _create_message_with_attachment(self, to, subject, body, attachment_files):
|
||
"""创建带附件的邮件消息"""
|
||
message = MIMEMultipart()
|
||
message['to'] = to
|
||
message['subject'] = subject
|
||
|
||
# 添加邮件正文
|
||
msg = MIMEText(body)
|
||
message.attach(msg)
|
||
|
||
# 添加附件
|
||
for file in attachment_files:
|
||
try:
|
||
with open(file, 'rb') as f:
|
||
part = MIMEBase('application', 'octet-stream')
|
||
part.set_payload(f.read())
|
||
|
||
# 编码附件内容
|
||
encoders.encode_base64(part)
|
||
|
||
# 添加头部信息
|
||
filename = os.path.basename(file)
|
||
part.add_header(
|
||
'Content-Disposition',
|
||
f'attachment; filename="{filename}"'
|
||
)
|
||
message.attach(part)
|
||
except Exception as e:
|
||
logger.error(f"添加附件 {file} 失败: {str(e)}")
|
||
|
||
# 编码为JSON安全的字符串
|
||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||
return {'raw': raw}
|
||
|
||
def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient):
|
||
"""将新发送的邮件内容追加到知识库文档中"""
|
||
try:
|
||
# 准备文档数据结构
|
||
doc_data = {
|
||
"name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt",
|
||
"paragraphs": [
|
||
{
|
||
"title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}",
|
||
"content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}",
|
||
"is_active": True,
|
||
"problem_list": []
|
||
}
|
||
]
|
||
}
|
||
|
||
# 调用知识库的文档上传API
|
||
from .views import KnowledgeBaseViewSet
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 验证知识库是否有external_id
|
||
if not knowledge_base.external_id:
|
||
logger.error(f"知识库没有external_id,无法上传文档: {knowledge_base.name}")
|
||
return None
|
||
|
||
# 上传文档
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_data["name"],
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail回复文档上传成功,ID: {document_id}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
return upload_response
|
||
else:
|
||
# 上传失败,记录错误信息
|
||
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
|
||
logger.error(f"Gmail回复文档上传失败: {error_msg}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"追加邮件内容到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def setup_watch(self, topic_name=None):
|
||
"""设置Gmail监听"""
|
||
try:
|
||
# 如果没有指定topic,使用配置的主题名称
|
||
if not topic_name:
|
||
# 从settings获取项目配置的主题名称
|
||
topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic')
|
||
|
||
# 直接使用settings中配置的项目ID,不再从client_secret.json获取
|
||
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905')
|
||
logger.info(f"使用settings中配置的项目ID: {project_id}")
|
||
|
||
# 注意:不再使用用户邮箱作为判断依据,Gmail API总是使用'me'作为userId
|
||
# Gmail认证是通过OAuth流程,与系统用户邮箱无关
|
||
logger.info(f"系统用户: {self.user.email},Gmail API使用OAuth认证的邮箱 (userId='me')")
|
||
|
||
# 构建完整的webhook URL
|
||
webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None)
|
||
if not webhook_url:
|
||
# 使用默认值,确保这是一个完全限定的URL
|
||
domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0]
|
||
protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http'
|
||
if domain == 'localhost' or domain.startswith('127.0.0.1'):
|
||
# 本地开发环境需要公网可访问的URL,可以用ngrok等工具暴露本地服务
|
||
webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/"
|
||
logger.warning("使用本地开发环境URL作为webhook回调,Google可能无法访问。建议使用ngrok等工具创建公网地址。")
|
||
else:
|
||
webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/"
|
||
|
||
logger.info(f"使用Gmail webhook URL: {webhook_url}")
|
||
|
||
# 如果想在Google Cloud控制台中手动配置webhook,打印明确的说明
|
||
logger.info("如需手动配置Gmail推送通知,请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:")
|
||
logger.info(f"推送端点: {webhook_url}")
|
||
logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限")
|
||
|
||
# 请求监听
|
||
request = {
|
||
'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签
|
||
'topicName': f"projects/{project_id}/topics/{topic_name}",
|
||
'labelFilterAction': 'include'
|
||
}
|
||
|
||
logger.info(f"设置Gmail监听: {request}")
|
||
|
||
# 执行watch请求
|
||
response = self.gmail_service.users().watch(userId='me', body=request).execute()
|
||
|
||
# 获取historyId,用于后续同步
|
||
history_id = response.get('historyId')
|
||
expiration = response.get('expiration')
|
||
|
||
logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}")
|
||
|
||
# 保存监听信息到数据库
|
||
if self.user:
|
||
credential = GmailCredential.objects.filter(user=self.user, is_active=True).first()
|
||
if credential:
|
||
# 转换时间戳为datetime
|
||
expiration_time = None
|
||
if expiration:
|
||
try:
|
||
# 将毫秒时间戳转换为datetime
|
||
naive_time = datetime.fromtimestamp(int(expiration)/1000)
|
||
logger.info(f"从时间戳解析的日期: {naive_time}")
|
||
|
||
# 如果系统使用时区,添加时区信息
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
|
||
try:
|
||
expiration_time = timezone.make_aware(naive_time)
|
||
logger.info(f"添加时区信息后: {expiration_time}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"添加时区信息失败: {str(tz_error)}")
|
||
expiration_time = naive_time
|
||
else:
|
||
expiration_time = naive_time
|
||
except Exception as conv_error:
|
||
logger.error(f"转换时间戳失败: {str(conv_error)}")
|
||
expiration_time = None
|
||
|
||
credential.last_history_id = history_id
|
||
credential.watch_expiration = expiration_time
|
||
credential.save()
|
||
|
||
logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}")
|
||
|
||
return {
|
||
'historyId': history_id,
|
||
'expiration': expiration
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置Gmail监听失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
raise
|
||
|
||
def get_history(self, start_history_id):
|
||
"""获取历史变更"""
|
||
try:
|
||
logger.info(f"获取历史记录,起始ID: {start_history_id}")
|
||
response = self.gmail_service.users().history().list(
|
||
userId='me', startHistoryId=start_history_id
|
||
).execute()
|
||
|
||
logger.info(f"历史记录响应: {response}")
|
||
|
||
history_list = []
|
||
if 'history' in response:
|
||
history_list.extend(response['history'])
|
||
logger.info(f"找到 {len(response['history'])} 个历史记录")
|
||
|
||
# 获取所有页
|
||
while 'nextPageToken' in response:
|
||
page_token = response['nextPageToken']
|
||
response = self.gmail_service.users().history().list(
|
||
userId='me', startHistoryId=start_history_id, pageToken=page_token
|
||
).execute()
|
||
if 'history' in response:
|
||
history_list.extend(response['history'])
|
||
logger.info(f"加载额外 {len(response['history'])} 个历史记录")
|
||
else:
|
||
logger.info(f"没有新的历史记录,最新historyId: {response.get('historyId', 'N/A')}")
|
||
|
||
# 提取新消息ID
|
||
new_message_ids = set()
|
||
for history in history_list:
|
||
logger.info(f"处理历史记录: {history}")
|
||
if 'messagesAdded' in history:
|
||
for message in history['messagesAdded']:
|
||
message_id = message['message']['id']
|
||
new_message_ids.add(message_id)
|
||
logger.info(f"新增消息ID: {message_id}")
|
||
|
||
if 'labelsAdded' in history:
|
||
for label in history['labelsAdded']:
|
||
message_id = label['message']['id']
|
||
if 'INBOX' in label.get('labelIds', []):
|
||
new_message_ids.add(message_id)
|
||
logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签")
|
||
|
||
if new_message_ids:
|
||
logger.info(f"总共找到 {len(new_message_ids)} 个新消息")
|
||
else:
|
||
logger.info("没有新增消息")
|
||
|
||
return list(new_message_ids)
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Gmail历史记录失败: {str(e)}")
|
||
return []
|
||
|
||
def process_notification(self, notification_data):
|
||
"""处理Gmail推送通知"""
|
||
try:
|
||
# 提取通知数据
|
||
logger.info(f"处理Gmail通知: {notification_data}")
|
||
|
||
# 处理Google Pub/Sub消息格式
|
||
if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']:
|
||
try:
|
||
import base64
|
||
import json
|
||
logger.info("检测到Google Pub/Sub消息格式")
|
||
|
||
# Base64解码data字段
|
||
encoded_data = notification_data['message']['data']
|
||
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
|
||
logger.info(f"解码后的数据: {decoded_data}")
|
||
|
||
# 解析JSON获取email和historyId
|
||
json_data = json.loads(decoded_data)
|
||
email = json_data.get('emailAddress')
|
||
history_id = json_data.get('historyId')
|
||
logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}")
|
||
except Exception as decode_error:
|
||
logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
else:
|
||
# 原始格式处理
|
||
email = notification_data.get('emailAddress')
|
||
history_id = notification_data.get('historyId')
|
||
|
||
if not email or not history_id:
|
||
logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId")
|
||
return False
|
||
|
||
# 查找关联用户
|
||
from .models import User
|
||
user = User.objects.filter(email=email).first()
|
||
|
||
# 如果找不到用户,尝试使用gmail_email字段查找
|
||
if not user:
|
||
logger.info(f"找不到email={email}的用户,尝试使用gmail_email查找")
|
||
from .models import GmailCredential
|
||
credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first()
|
||
if credential:
|
||
user = credential.user
|
||
logger.info(f"通过gmail_email找到用户: {user.email}")
|
||
|
||
if not user:
|
||
logger.error(f"找不到与 {email} 关联的用户")
|
||
return False
|
||
|
||
# 初始化Gmail集成
|
||
gmail_integration = GmailIntegration(user)
|
||
if not gmail_integration.authenticate():
|
||
logger.error(f"Gmail认证失败: {email}")
|
||
return False
|
||
|
||
# 首先尝试使用历史记录API获取新消息
|
||
message_ids = gmail_integration.get_history(history_id)
|
||
if message_ids:
|
||
logger.info(f"从历史记录找到 {len(message_ids)} 个新消息")
|
||
# 处理每个新消息
|
||
for message_id in message_ids:
|
||
self._process_new_message(gmail_integration, message_id)
|
||
return True
|
||
|
||
# 如果历史记录API没有返回新消息,尝试获取最近的对话
|
||
logger.info("历史记录API没有返回新消息,尝试获取达人对话")
|
||
|
||
# 查找所有与该用户关联的达人映射
|
||
from .models import GmailTalentMapping
|
||
mappings = GmailTalentMapping.objects.filter(
|
||
user=user,
|
||
is_active=True
|
||
)
|
||
|
||
if not mappings.exists():
|
||
logger.info(f"用户 {user.email} 没有达人映射记录")
|
||
return False
|
||
|
||
# 处理每个达人映射
|
||
for mapping in mappings:
|
||
talent_email = mapping.talent_email
|
||
logger.info(f"处理达人 {talent_email} 的对话")
|
||
|
||
# 获取达人最近的邮件
|
||
recent_emails = gmail_integration.get_recent_emails(
|
||
from_email=talent_email,
|
||
max_results=5 # 限制获取最近5封
|
||
)
|
||
|
||
if not recent_emails:
|
||
logger.info(f"没有找到来自 {talent_email} 的最近邮件")
|
||
continue
|
||
|
||
logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件")
|
||
|
||
# 创建或获取知识库
|
||
knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_email)
|
||
kb_action = "创建" if created else "获取"
|
||
logger.info(f"知识库{kb_action}成功: {knowledge_base.name}")
|
||
|
||
# 保存对话
|
||
result = gmail_integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base)
|
||
logger.info(f"保存达人对话结果: {result}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理Gmail通知失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def _process_new_message(self, gmail_integration, message_id):
|
||
"""处理新收到的邮件"""
|
||
try:
|
||
# 导入所需模型
|
||
from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory, UserProfile
|
||
|
||
# 获取邮件详情
|
||
message = gmail_integration.gmail_service.users().messages().get(
|
||
userId='me', id=message_id
|
||
).execute()
|
||
|
||
# 提取邮件内容
|
||
email_data = gmail_integration._extract_email_content(message)
|
||
if not email_data:
|
||
logger.error(f"提取邮件内容失败: {message_id}")
|
||
return False
|
||
|
||
# 获取发件人邮箱
|
||
from_email = email_data.get('from', '')
|
||
sender_email = ''
|
||
if '<' in from_email and '>' in from_email:
|
||
# 格式如 "姓名 <email@example.com>"
|
||
sender_email = from_email.split('<')[1].split('>')[0]
|
||
else:
|
||
# 格式可能直接是邮箱
|
||
sender_email = from_email
|
||
|
||
# 根据邮箱判断角色:检查发件人与用户邮箱或者映射的talent邮箱是否匹配
|
||
is_user_email = gmail_integration.user.email.lower() == sender_email.lower()
|
||
|
||
# 检查是否有与当前用户关联的talent邮箱映射
|
||
is_mapped_talent = False
|
||
talent_mapping = GmailTalentMapping.objects.filter(
|
||
user=gmail_integration.user,
|
||
talent_email=sender_email,
|
||
is_active=True
|
||
).first() # 修改为first()以获取实际的对象而不是布尔值
|
||
|
||
# 如果是用户邮箱或映射的talent邮箱,则为user角色,否则为assistant
|
||
role = 'user' if is_user_email or talent_mapping else 'assistant'
|
||
|
||
logger.info(f"设置消息角色: {role}, 发件人: {sender_email}, 用户邮箱: {gmail_integration.user.email}, 是否映射达人: {talent_mapping}")
|
||
|
||
# 查找是否有关联的达人知识库
|
||
kb_name = f"Gmail-{sender_email.split('@')[0]}"
|
||
knowledge_base = KnowledgeBase.objects.filter(name=kb_name).first()
|
||
|
||
# 如果没有以发件人邮箱命名的知识库,尝试查找自定义知识库
|
||
if not knowledge_base:
|
||
# 查找映射关系
|
||
mapping = GmailTalentMapping.objects.filter(
|
||
talent_email=sender_email,
|
||
user=gmail_integration.user,
|
||
is_active=True
|
||
).first()
|
||
|
||
if mapping and mapping.knowledge_base:
|
||
knowledge_base = mapping.knowledge_base
|
||
logger.info(f"使用映射的知识库: {knowledge_base.name}")
|
||
else:
|
||
logger.info(f"收到新邮件,但没有找到关联的达人知识库: {sender_email}")
|
||
return False
|
||
|
||
# 查找关联的对话ID
|
||
conversation_id = None
|
||
|
||
# 1. 首先通过talent_mapping查找
|
||
if talent_mapping and talent_mapping.conversation_id:
|
||
conversation_id = talent_mapping.conversation_id
|
||
logger.info(f"通过达人映射找到对话ID: {conversation_id}")
|
||
|
||
# 2. 如果没有找到,尝试通过知识库查找任何相关对话
|
||
if not conversation_id:
|
||
existing_conversation = ChatHistory.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
user=gmail_integration.user,
|
||
is_deleted=False
|
||
).values('conversation_id').distinct().first()
|
||
|
||
if existing_conversation:
|
||
conversation_id = existing_conversation['conversation_id']
|
||
logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}")
|
||
|
||
# 更新或创建映射关系
|
||
if not talent_mapping:
|
||
GmailTalentMapping.objects.update_or_create(
|
||
user=gmail_integration.user,
|
||
talent_email=sender_email,
|
||
defaults={
|
||
'knowledge_base': knowledge_base,
|
||
'conversation_id': conversation_id,
|
||
'is_active': True
|
||
}
|
||
)
|
||
logger.info(f"更新Gmail达人映射: {sender_email} -> 对话ID {conversation_id}")
|
||
|
||
# 3. 如果仍没找到,创建新的对话ID
|
||
if not conversation_id:
|
||
conversation_id = str(uuid.uuid4())
|
||
logger.info(f"创建新的对话ID: {conversation_id}")
|
||
|
||
# 保存映射关系
|
||
if not talent_mapping:
|
||
GmailTalentMapping.objects.create(
|
||
user=gmail_integration.user,
|
||
talent_email=sender_email,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
is_active=True
|
||
)
|
||
logger.info(f"已创建新的Gmail达人映射: {sender_email} -> {knowledge_base.name}")
|
||
|
||
# 检查消息是否已经处理过
|
||
if ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
metadata__gmail_message_id=message_id,
|
||
is_deleted=False
|
||
).exists():
|
||
logger.info(f"邮件已处理过,跳过: {message_id}")
|
||
return True
|
||
|
||
# 解析邮件日期
|
||
date_str = email_data.get('date', '')
|
||
logger.info(f"开始解析邮件日期: '{date_str}'")
|
||
|
||
try:
|
||
# 先检查是否是时间戳格式
|
||
if isinstance(date_str, str) and date_str.isdigit():
|
||
# 如果是时间戳,直接转换
|
||
date_obj = datetime.fromtimestamp(int(date_str))
|
||
logger.info(f"从时间戳解析的日期: {date_obj}")
|
||
else:
|
||
# 尝试标准格式解析
|
||
try:
|
||
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"从标准格式解析的日期: {date_obj}")
|
||
except ValueError:
|
||
# 如果标准格式解析失败,使用dateutil更灵活的解析
|
||
import dateutil.parser as date_parser
|
||
date_obj = date_parser.parse(date_str)
|
||
logger.info(f"从灵活格式解析的日期: {date_obj}, 是否有时区: {date_obj.tzinfo is not None}")
|
||
|
||
# 如果解析出的日期有时区信息且系统不使用时区,转换为不带时区的日期
|
||
if date_obj.tzinfo is not None and hasattr(settings, 'USE_TZ') and not settings.USE_TZ:
|
||
date_obj = date_obj.replace(tzinfo=None)
|
||
logger.info(f"移除时区信息后: {date_obj}")
|
||
|
||
# 确保时区处理正确
|
||
from django.utils import timezone
|
||
try:
|
||
# 检查date_obj是否已经是aware
|
||
if timezone.is_aware(date_obj):
|
||
aware_date = date_obj
|
||
logger.info(f"日期已经包含时区信息: {aware_date}")
|
||
else:
|
||
# 如果系统使用时区且日期没有时区信息,添加时区
|
||
if hasattr(settings, 'USE_TZ') and settings.USE_TZ:
|
||
aware_date = timezone.make_aware(date_obj)
|
||
logger.info(f"日期添加时区信息后: {aware_date}")
|
||
else:
|
||
# 如果系统不使用时区,保持naive状态
|
||
aware_date = date_obj
|
||
logger.info(f"保持日期不带时区: {aware_date}")
|
||
except Exception as tz_error:
|
||
logger.warning(f"时区转换失败: {str(tz_error)},使用当前时间")
|
||
aware_date = timezone.now()
|
||
|
||
except (ValueError, TypeError) as e:
|
||
logger.warning(f"无法解析邮件日期: '{date_str}',错误: {str(e)},使用当前时间")
|
||
from django.utils import timezone
|
||
aware_date = timezone.now()
|
||
|
||
# 查找适合的parent_id: 使用创建时间排序
|
||
try:
|
||
# 查找该对话中最新的消息
|
||
latest_message = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('-created_at').first()
|
||
|
||
parent_id = None
|
||
if latest_message:
|
||
parent_id = str(latest_message.id)
|
||
logger.info(f"找到最新消息ID作为parent_id: {parent_id}, 创建时间: {latest_message.created_at}")
|
||
else:
|
||
logger.info(f"对话 {conversation_id} 没有现有消息,不设置parent_id")
|
||
except Exception as e:
|
||
logger.error(f"查找父消息失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
parent_id = None
|
||
|
||
# 下载附件
|
||
attachment_records = []
|
||
for attachment in email_data.get('attachments', []):
|
||
if 'attachmentId' in attachment:
|
||
filepath = gmail_integration.download_attachment(
|
||
message_id,
|
||
attachment['attachmentId'],
|
||
attachment['filename']
|
||
)
|
||
|
||
if filepath:
|
||
attachment_records.append({
|
||
'filepath': filepath,
|
||
'filename': attachment['filename'],
|
||
'message_id': message_id,
|
||
'date': date_str
|
||
})
|
||
|
||
# 构建metadata
|
||
metadata = {
|
||
'gmail_message_id': message_id,
|
||
'from': email_data.get('from', ''),
|
||
'date': date_str,
|
||
'subject': email_data.get('subject', ''),
|
||
'dataset_id_list': [str(knowledge_base.id)],
|
||
'dataset_names': [knowledge_base.name]
|
||
}
|
||
|
||
if attachment_records:
|
||
metadata['message_attachments'] = attachment_records
|
||
|
||
# 使用之前查找到的parent_id和aware_date创建聊天记录
|
||
chat_message = ChatHistory.objects.create(
|
||
user=gmail_integration.user,
|
||
knowledge_base=knowledge_base,
|
||
conversation_id=conversation_id,
|
||
parent_id=parent_id, # 使用之前查找到的parent_id
|
||
role=role, # 使用上面确定的role变量
|
||
content=f"[{email_data['subject']}] {email_data['body']}",
|
||
metadata=metadata,
|
||
created_at=aware_date # 设置正确的创建时间
|
||
)
|
||
|
||
# 更新知识库文档
|
||
gmail_integration._append_to_knowledge_base_document(
|
||
knowledge_base,
|
||
email_data['subject'],
|
||
email_data['body'],
|
||
gmail_integration.user.email
|
||
)
|
||
|
||
# 如果有附件,上传到知识库
|
||
if attachment_records:
|
||
gmail_integration._upload_message_attachments_to_knowledge_base(
|
||
knowledge_base,
|
||
attachment_records
|
||
)
|
||
|
||
# 添加WebSocket通知功能
|
||
try:
|
||
# 导入必要的模块
|
||
from channels.layers import get_channel_layer
|
||
from asgiref.sync import async_to_sync
|
||
from django.conf import settings
|
||
|
||
# 检查是否有WebSocket通道层配置
|
||
channel_layer = get_channel_layer()
|
||
if channel_layer:
|
||
# 创建通知数据
|
||
notification_data = {
|
||
"type": "notification",
|
||
"data": {
|
||
"message_type": "new_gmail",
|
||
"conversation_id": conversation_id,
|
||
"message": {
|
||
"id": str(chat_message.id),
|
||
"role": role,
|
||
"content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}",
|
||
"sender": sender_email,
|
||
"subject": email_data['subject'],
|
||
"has_attachments": len(attachment_records) > 0
|
||
}
|
||
}
|
||
}
|
||
|
||
# 发送WebSocket消息
|
||
async_to_sync(channel_layer.group_send)(
|
||
f"notification_user_{gmail_integration.user.id}",
|
||
notification_data
|
||
)
|
||
logger.info(f"已发送WebSocket通知: 用户 {gmail_integration.user.id} 收到新Gmail消息")
|
||
|
||
# 创建系统通知记录
|
||
try:
|
||
from .models import Notification
|
||
Notification.objects.create(
|
||
sender=gmail_integration.user,
|
||
receiver=gmail_integration.user,
|
||
title="新Gmail消息",
|
||
content=f"您收到了来自 {sender_email} 的新邮件: {email_data['subject']}",
|
||
type="system_notice",
|
||
related_resource=conversation_id
|
||
)
|
||
logger.info(f"已创建系统通知记录: 用户 {gmail_integration.user.id} 的新Gmail消息")
|
||
except Exception as notification_error:
|
||
logger.error(f"创建系统通知记录失败: {str(notification_error)}")
|
||
|
||
# 如果消息是达人发送的,并且用户启用了自动推荐回复功能,则生成推荐回复
|
||
if role == 'user' and talent_mapping:
|
||
try:
|
||
# 检查用户是否启用了自动推荐回复功能
|
||
user_profile, created = UserProfile.objects.get_or_create(user=gmail_integration.user)
|
||
|
||
if user_profile.auto_recommend_reply:
|
||
logger.info(f"用户 {gmail_integration.user.id} 已启用自动推荐回复功能,生成推荐回复")
|
||
|
||
# 获取对话历史以传递给DeepSeek API
|
||
conversation_messages = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
# 构建对话历史
|
||
conversation_history = []
|
||
for message in conversation_messages:
|
||
conversation_history.append({
|
||
'role': 'user' if message.role == 'user' else 'assistant',
|
||
'content': message.content
|
||
})
|
||
|
||
# 限制对话历史长度,只保留最近的5条消息,避免超出token限制
|
||
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
|
||
messages.extend(recent_messages)
|
||
|
||
# 确保最后一条消息是用户消息,如果不是,添加一个提示
|
||
if not recent_messages or recent_messages[-1]['role'] != 'user':
|
||
# 添加一个系统消息作为用户的最后一条消息
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "请针对我之前的消息提供详细的回复建议。"
|
||
})
|
||
|
||
# 调用DeepSeek API生成推荐回复
|
||
recommended_reply = self._get_recommended_reply_from_deepseek(conversation_history)
|
||
|
||
if recommended_reply:
|
||
# 创建推荐回复通知
|
||
recommend_notification_data = {
|
||
"type": "notification",
|
||
"data": {
|
||
"message_type": "recommended_reply",
|
||
"conversation_id": conversation_id,
|
||
"message": {
|
||
"id": str(chat_message.id),
|
||
"role": role,
|
||
"content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}",
|
||
"sender": sender_email,
|
||
"subject": email_data['subject'],
|
||
"recommended_reply": recommended_reply
|
||
}
|
||
}
|
||
}
|
||
|
||
# 发送推荐回复WebSocket通知
|
||
async_to_sync(channel_layer.group_send)(
|
||
f"notification_user_{gmail_integration.user.id}",
|
||
recommend_notification_data
|
||
)
|
||
logger.info(f"已发送推荐回复通知: 用户 {gmail_integration.user.id}")
|
||
|
||
# 创建推荐回复系统通知
|
||
Notification.objects.create(
|
||
sender=gmail_integration.user,
|
||
receiver=gmail_integration.user,
|
||
title="新推荐回复",
|
||
content=f"系统为来自 {sender_email} 的邮件生成了推荐回复",
|
||
type="system_notice",
|
||
related_resource=conversation_id
|
||
)
|
||
logger.info(f"已创建推荐回复系统通知: 用户 {gmail_integration.user.id}")
|
||
else:
|
||
logger.warning(f"生成推荐回复失败: 用户 {gmail_integration.user.id}, 对话 {conversation_id}")
|
||
except Exception as recommend_error:
|
||
logger.error(f"处理推荐回复失败: {str(recommend_error)}")
|
||
logger.error(traceback.format_exc())
|
||
except Exception as ws_error:
|
||
logger.error(f"发送WebSocket通知失败: {str(ws_error)}")
|
||
logger.error(traceback.format_exc())
|
||
# 通知失败不影响消息处理流程,继续执行
|
||
|
||
logger.info(f"成功处理新邮件: {message_id} 从 {sender_email}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理新邮件失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def _get_recommended_reply_from_deepseek(self, conversation_history):
|
||
"""
|
||
调用DeepSeek V3 API生成推荐回复
|
||
|
||
现在结合用户总目标和对话总结生成更有针对性的回复
|
||
"""
|
||
try:
|
||
# 使用有效的API密钥
|
||
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
|
||
# 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取
|
||
# 从Django设置中获取密钥
|
||
from django.conf import settings
|
||
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
|
||
api_key = settings.DEEPSEEK_API_KEY
|
||
|
||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||
|
||
# 获取用户总目标
|
||
from .models import UserGoal, ConversationSummary
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
goal_content = user_goal.content if user_goal else None
|
||
|
||
# 尝试获取对话总结
|
||
talent_email = None
|
||
conversation_id = None
|
||
|
||
# 从对话历史中尝试提取达人邮箱
|
||
for message in conversation_history:
|
||
if message.get('role') == 'user' and 'metadata' in message and 'from_email' in message['metadata']:
|
||
talent_email = message['metadata']['from_email']
|
||
break
|
||
|
||
# 从对话历史中尝试提取对话ID
|
||
for message in conversation_history:
|
||
if 'metadata' in message and 'conversation_id' in message['metadata']:
|
||
conversation_id = message['metadata']['conversation_id']
|
||
break
|
||
|
||
# 获取对话总结
|
||
conversation_summary = None
|
||
if talent_email and conversation_id:
|
||
summary_obj = ConversationSummary.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
is_active=True
|
||
).first()
|
||
if summary_obj:
|
||
conversation_summary = summary_obj.summary
|
||
|
||
# 构建系统提示,结合用户总目标和对话总结
|
||
system_content = "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。"
|
||
|
||
if goal_content:
|
||
system_content += f"\n\n【用户总目标】\n{goal_content}\n\n"
|
||
|
||
if conversation_summary:
|
||
system_content += f"【对话总结】\n{conversation_summary}\n\n"
|
||
|
||
system_content += "请针对达人最后一条消息,结合用户总目标和对话总结生成一个有针对性的专业回复。回复必须对达人当前问题直接响应,同时与用户的总体合作目标保持一致。回复应该有至少100个字符,必须提供有实质内容的回复。"
|
||
|
||
system_message = {
|
||
"role": "system",
|
||
"content": system_content
|
||
}
|
||
|
||
messages = [system_message]
|
||
|
||
# 限制对话历史长度,只保留最近的5条消息,避免超出token限制
|
||
recent_messages = conversation_history[-5:] if len(conversation_history) > 5 else conversation_history
|
||
messages.extend(recent_messages)
|
||
|
||
# 确保最后一条消息是用户消息,如果不是,添加一个提示
|
||
if not recent_messages or recent_messages[-1]['role'] != 'user':
|
||
# 添加一个系统消息作为用户的最后一条消息
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "请针对达人最后一条消息生成专业的回复,考虑我们之前的对话历史和我设定的总目标。"
|
||
})
|
||
|
||
# 完全按照文档提供的参数格式构建请求
|
||
payload = {
|
||
"model": "deepseek-ai/DeepSeek-V3",
|
||
"messages": messages,
|
||
"stream": False,
|
||
"max_tokens": 1024, # 增加token上限
|
||
"temperature": 0.7, # 提高多样性
|
||
"top_p": 0.9,
|
||
"top_k": 50,
|
||
"frequency_penalty": 0.5,
|
||
"presence_penalty": 0.2, # 添加新参数
|
||
"n": 1,
|
||
"stop": [],
|
||
"response_format": {
|
||
"type": "text"
|
||
}
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}"
|
||
}
|
||
|
||
logger.info(f"开始调用DeepSeek API生成推荐回复")
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
|
||
if response.status_code != 200:
|
||
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
result = response.json()
|
||
logger.debug(f"DeepSeek API返回: {result}")
|
||
|
||
# 提取回复内容
|
||
if 'choices' in result and len(result['choices']) > 0:
|
||
reply = result['choices'][0]['message']['content']
|
||
# 如果返回的内容为空,直接返回None
|
||
if not reply or reply.strip() == '':
|
||
logger.warning("DeepSeek API返回的回复内容为空")
|
||
return None
|
||
return reply
|
||
|
||
logger.warning(f"DeepSeek API返回格式异常: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用DeepSeek API失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
def get_attachment_by_conversation(self, conversation_id):
|
||
"""获取特定对话的所有附件"""
|
||
try:
|
||
# 查找对话记录
|
||
records = ChatHistory.objects.filter(
|
||
conversation_id=conversation_id,
|
||
user=self.user,
|
||
is_deleted=False
|
||
)
|
||
|
||
if not records:
|
||
logger.warning(f"找不到对话记录: {conversation_id}")
|
||
return []
|
||
|
||
# 使用GmailAttachment数据模型查询附件
|
||
attachments = []
|
||
records_ids = [record.id for record in records]
|
||
|
||
gmail_attachments = GmailAttachment.objects.filter(
|
||
chat_message_id__in=records_ids
|
||
).select_related('chat_message')
|
||
|
||
for attachment in gmail_attachments:
|
||
chat_message = attachment.chat_message
|
||
attachments.append({
|
||
'id': str(attachment.id),
|
||
'filename': attachment.filename,
|
||
'filepath': attachment.filepath,
|
||
'mimetype': attachment.mimetype,
|
||
'filesize': attachment.filesize,
|
||
'message_id': str(chat_message.id),
|
||
'gmail_message_id': attachment.gmail_message_id,
|
||
'role': chat_message.role,
|
||
'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content
|
||
})
|
||
|
||
return attachments
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取对话附件失败: {str(e)}")
|
||
return []
|
||
|
||
def handle_auth_code(self, auth_code):
|
||
"""
|
||
处理授权码并完成OAuth2授权流程,使用与quickstart.py相同的简化流程
|
||
|
||
Args:
|
||
auth_code (str): 从Google授权页面获取的授权码
|
||
|
||
Returns:
|
||
bool: 授权是否成功
|
||
"""
|
||
try:
|
||
logger.info("开始处理Gmail授权码...")
|
||
|
||
# 确保client_secret_json已提供
|
||
if not self.client_secret:
|
||
logger.error("未提供client_secret_json,无法处理授权码")
|
||
return False
|
||
|
||
# 创建临时文件存储client_secret
|
||
client_secret_path = 'client_secret.json'
|
||
with open(client_secret_path, 'w') as f:
|
||
if isinstance(self.client_secret, str):
|
||
logger.info("client_secret是字符串,解析为JSON")
|
||
try:
|
||
# 确保是有效的JSON
|
||
json_data = json.loads(self.client_secret)
|
||
# 强制设置redirect_uris为非浏览器模式,避免localhost连接拒绝问题
|
||
for key in ['web', 'installed']:
|
||
if key in json_data and 'redirect_uris' in json_data[key]:
|
||
json_data[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(json_data, f)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"client_secret不是有效的JSON: {str(e)}")
|
||
return False
|
||
else:
|
||
logger.info("client_secret是字典,直接写入文件")
|
||
# 如果是字典,也进行相同处理
|
||
client_secret_dict = dict(self.client_secret)
|
||
for key in ['web', 'installed']:
|
||
if key in client_secret_dict and 'redirect_uris' in client_secret_dict[key]:
|
||
client_secret_dict[key]['redirect_uris'] = ['urn:ietf:wg:oauth:2.0:oob']
|
||
logger.info("已强制设置redirect_uri为非浏览器模式")
|
||
json.dump(client_secret_dict, f)
|
||
|
||
logger.info(f"已将client_secret写入临时文件: {client_secret_path}")
|
||
|
||
try:
|
||
# 确认token目录存在
|
||
token_dir = os.path.dirname(self.token_storage_path)
|
||
if token_dir and not os.path.exists(token_dir):
|
||
logger.info(f"创建token目录: {token_dir}")
|
||
os.makedirs(token_dir)
|
||
|
||
# 设置token存储
|
||
logger.info(f"设置token存储: {self.token_storage_path}")
|
||
store = file.Storage(self.token_storage_path)
|
||
|
||
# 强制使用非浏览器认证模式
|
||
redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
|
||
logger.info(f"使用非浏览器认证模式: {redirect_uri}")
|
||
|
||
# 从client_secret创建flow
|
||
logger.info("从client_secret创建授权流程")
|
||
flow = client.flow_from_clientsecrets(
|
||
client_secret_path,
|
||
self.SCOPES,
|
||
redirect_uri=redirect_uri
|
||
)
|
||
|
||
# 使用授权码交换token
|
||
logger.info("使用授权码交换访问令牌")
|
||
credentials = flow.step2_exchange(auth_code)
|
||
logger.info("成功获取到访问令牌")
|
||
|
||
# 保存到文件
|
||
logger.info(f"保存凭证到文件: {self.token_storage_path}")
|
||
store.put(credentials)
|
||
|
||
# 保存到实例变量
|
||
self.credentials = credentials
|
||
|
||
# 初始化Gmail服务
|
||
logger.info("初始化Gmail服务")
|
||
self.gmail_service = discovery.build('gmail', 'v1', http=credentials.authorize(Http()))
|
||
logger.info("Gmail服务初始化成功")
|
||
|
||
# 保存到数据库
|
||
from django.utils import timezone
|
||
|
||
logger.info("保存凭证到数据库")
|
||
# 将凭证对象序列化
|
||
credentials_data = pickle.dumps(credentials)
|
||
|
||
gmail_credential, created = GmailCredential.objects.update_or_create(
|
||
user=self.user,
|
||
defaults={
|
||
'credentials': credentials_data,
|
||
'token_path': self.token_storage_path,
|
||
'updated_at': timezone.now(),
|
||
'is_active': True
|
||
}
|
||
)
|
||
|
||
action = "创建" if created else "更新"
|
||
logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录")
|
||
|
||
# 成功获取凭证后更新单例
|
||
if self.user and self.gmail_service and self.credentials:
|
||
GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service)
|
||
|
||
return True
|
||
|
||
except client.FlowExchangeError as e:
|
||
logger.error(f"授权码交换失败: {str(e)}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"处理授权码时发生错误: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
finally:
|
||
# 删除临时文件
|
||
if os.path.exists(client_secret_path):
|
||
logger.info(f"删除临时文件: {client_secret_path}")
|
||
os.unlink(client_secret_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理授权码过程中发生异常: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations):
|
||
"""上传所有邮件附件到知识库"""
|
||
try:
|
||
# 收集所有需要上传的附件
|
||
attachments_to_upload = []
|
||
|
||
# 从conversations中提取所有附件信息
|
||
for message in conversations:
|
||
if 'attachments' in message and message['attachments']:
|
||
message_id = message.get('id', '')
|
||
sender = message.get('from', '未知发件人')
|
||
subject = message.get('subject', '无主题')
|
||
date_str = message.get('date', '未知时间')
|
||
|
||
for attachment in message['attachments']:
|
||
if 'attachmentId' in attachment and 'filename' in attachment:
|
||
# 下载附件(如果尚未下载)
|
||
filepath = None
|
||
|
||
# 检查数据库中是否已有记录
|
||
existing_attachment = GmailAttachment.objects.filter(
|
||
gmail_message_id=message_id,
|
||
filename=attachment['filename']
|
||
).first()
|
||
|
||
if existing_attachment and os.path.exists(existing_attachment.filepath):
|
||
filepath = existing_attachment.filepath
|
||
else:
|
||
# 下载附件
|
||
filepath = self.download_attachment(
|
||
message_id=message_id,
|
||
attachment_id=attachment['attachmentId'],
|
||
filename=attachment['filename']
|
||
)
|
||
|
||
if filepath:
|
||
attachments_to_upload.append({
|
||
'filepath': filepath,
|
||
'filename': attachment['filename'],
|
||
'message_id': message_id,
|
||
'sender': sender,
|
||
'subject': subject,
|
||
'date': date_str
|
||
})
|
||
|
||
# 上传收集到的所有附件
|
||
if attachments_to_upload:
|
||
logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库")
|
||
|
||
from .views import KnowledgeBaseViewSet
|
||
import django.core.files.uploadedfile as uploadedfile
|
||
|
||
# 导入FileUploadParser
|
||
from rest_framework.parsers import FileUploadParser
|
||
|
||
# 创建视图集实例
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 批量上传附件
|
||
for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件
|
||
batch = attachments_to_upload[i:i+10]
|
||
files = []
|
||
|
||
for att in batch:
|
||
filepath = att['filepath']
|
||
filename = att['filename']
|
||
|
||
# 确认文件存在
|
||
if not os.path.exists(filepath):
|
||
logger.warning(f"附件文件不存在: {filepath}")
|
||
continue
|
||
|
||
# 读取文件并创建UploadedFile对象
|
||
with open(filepath, 'rb') as f:
|
||
file_content = f.read()
|
||
files.append(uploadedfile.SimpleUploadedFile(
|
||
name=filename,
|
||
content=file_content,
|
||
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
|
||
))
|
||
|
||
# 如果这批有文件,调用_call_split_api_multiple上传
|
||
if files:
|
||
# 直接调用文档分割API
|
||
split_response = kb_viewset._call_split_api_multiple(files)
|
||
|
||
if not split_response or split_response.get('code') != 200:
|
||
logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
|
||
continue
|
||
|
||
# 处理分割后的文档
|
||
documents_data = split_response.get('data', [])
|
||
|
||
# 对每个文档调用上传API
|
||
for doc in documents_data:
|
||
doc_name = doc.get('name', 'Gmail附件')
|
||
doc_content = doc.get('content', [])
|
||
|
||
# 准备文档数据
|
||
upload_doc_data = {
|
||
"name": doc_name,
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有段落添加到文档中
|
||
for paragraph in doc_content:
|
||
upload_doc_data["paragraphs"].append({
|
||
"content": paragraph.get('content', ''),
|
||
"title": paragraph.get('title', ''),
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用文档上传API
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_name,
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"上传附件到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records):
|
||
"""上传单个邮件的附件到知识库"""
|
||
try:
|
||
# 检查是否有附件需要上传
|
||
if not attachment_records:
|
||
return
|
||
|
||
logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库")
|
||
|
||
from .views import KnowledgeBaseViewSet
|
||
import django.core.files.uploadedfile as uploadedfile
|
||
|
||
# 创建视图集实例
|
||
kb_viewset = KnowledgeBaseViewSet()
|
||
|
||
# 准备文件列表
|
||
files = []
|
||
|
||
for att in attachment_records:
|
||
filepath = att.get('filepath')
|
||
filename = att.get('filename')
|
||
|
||
# 确认文件存在
|
||
if not os.path.exists(filepath):
|
||
logger.warning(f"附件文件不存在: {filepath}")
|
||
continue
|
||
|
||
# 读取文件并创建UploadedFile对象
|
||
with open(filepath, 'rb') as f:
|
||
file_content = f.read()
|
||
files.append(uploadedfile.SimpleUploadedFile(
|
||
name=filename,
|
||
content=file_content,
|
||
content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
|
||
))
|
||
|
||
# 如果有文件,调用_call_split_api_multiple上传
|
||
if files:
|
||
# 直接调用文档分割API
|
||
split_response = kb_viewset._call_split_api_multiple(files)
|
||
|
||
if not split_response or split_response.get('code') != 200:
|
||
logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}")
|
||
return
|
||
|
||
# 处理分割后的文档
|
||
documents_data = split_response.get('data', [])
|
||
|
||
# 对每个文档调用上传API
|
||
for doc in documents_data:
|
||
doc_name = doc.get('name', 'Gmail附件')
|
||
doc_content = doc.get('content', [])
|
||
|
||
# 准备文档数据
|
||
upload_doc_data = {
|
||
"name": doc_name,
|
||
"paragraphs": []
|
||
}
|
||
|
||
# 将所有段落添加到文档中
|
||
for paragraph in doc_content:
|
||
upload_doc_data["paragraphs"].append({
|
||
"content": paragraph.get('content', ''),
|
||
"title": paragraph.get('title', ''),
|
||
"is_active": True,
|
||
"problem_list": []
|
||
})
|
||
|
||
# 调用文档上传API
|
||
upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data)
|
||
|
||
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
|
||
# 上传成功,保存记录到数据库
|
||
document_id = upload_response['data']['id']
|
||
|
||
# 创建文档记录
|
||
from .models import KnowledgeBaseDocument
|
||
doc_record = KnowledgeBaseDocument.objects.create(
|
||
knowledge_base=knowledge_base,
|
||
document_id=document_id,
|
||
document_name=doc_name,
|
||
external_id=document_id
|
||
)
|
||
|
||
logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}")
|
||
|
||
# 更新知识库文档计数
|
||
knowledge_base.document_count = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
status='active'
|
||
).count()
|
||
knowledge_base.save()
|
||
|
||
logger.info(f"完成附件上传,共 {len(files)} 个文件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"上传附件到知识库失败: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def get_recent_emails(self, from_email=None, max_results=10):
|
||
"""获取最近的邮件"""
|
||
try:
|
||
service = self.service
|
||
if not service:
|
||
logger.error("Gmail服务未初始化")
|
||
return []
|
||
|
||
query = ""
|
||
if from_email:
|
||
query = f"from:{from_email}"
|
||
|
||
# 获取收件箱中的邮件列表
|
||
results = service.users().messages().list(
|
||
userId='me',
|
||
maxResults=max_results,
|
||
q=query
|
||
).execute()
|
||
|
||
messages = results.get('messages', [])
|
||
emails = []
|
||
|
||
for message in messages:
|
||
msg = service.users().messages().get(userId='me', id=message['id']).execute()
|
||
email_data = self._extract_email_content(msg)
|
||
emails.append(email_data)
|
||
|
||
return emails
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取最近邮件失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return []
|
||
|
||
def manage_user_goal(self, goal_content=None):
|
||
"""
|
||
创建或更新用户总目标
|
||
|
||
Args:
|
||
goal_content (str): 用户总目标内容,如果为None则返回当前总目标
|
||
|
||
Returns:
|
||
dict: 包含操作结果和总目标信息
|
||
"""
|
||
from .models import UserGoal
|
||
|
||
try:
|
||
# 如果未提供内容,则返回当前总目标
|
||
if goal_content is None:
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
if user_goal:
|
||
return {
|
||
'status': 'success',
|
||
'action': 'retrieve',
|
||
'goal': {
|
||
'id': str(user_goal.id),
|
||
'content': user_goal.content,
|
||
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
else:
|
||
return {
|
||
'status': 'success',
|
||
'action': 'retrieve',
|
||
'goal': None
|
||
}
|
||
|
||
# 查找当前活跃的用户总目标
|
||
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
|
||
|
||
# 如果已存在,则更新
|
||
if user_goal:
|
||
user_goal.content = goal_content
|
||
user_goal.save()
|
||
action = 'update'
|
||
else:
|
||
# 否则创建新的总目标
|
||
user_goal = UserGoal.objects.create(
|
||
user=self.user,
|
||
content=goal_content
|
||
)
|
||
action = 'create'
|
||
|
||
return {
|
||
'status': 'success',
|
||
'action': action,
|
||
'goal': {
|
||
'id': str(user_goal.id),
|
||
'content': user_goal.content,
|
||
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"管理用户总目标失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': f"管理用户总目标失败: {str(e)}"
|
||
}
|
||
|
||
def generate_conversation_summary(self, talent_email):
|
||
"""
|
||
为用户与特定达人的所有对话生成总结
|
||
|
||
Args:
|
||
talent_email (str): 达人的邮箱地址
|
||
|
||
Returns:
|
||
dict: 包含操作结果和总结信息
|
||
"""
|
||
from .models import ConversationSummary
|
||
|
||
try:
|
||
# 获取与该达人的所有对话
|
||
conversations = self.get_conversations(talent_email)
|
||
if not conversations:
|
||
return {
|
||
'status': 'error',
|
||
'message': f"未找到与{talent_email}的对话记录"
|
||
}
|
||
|
||
# 准备对话历史记录
|
||
conversation_history = []
|
||
for conversation in conversations:
|
||
if 'subject' in conversation and conversation['subject']:
|
||
conversation_history.append({
|
||
'role': 'system',
|
||
'content': f"邮件主题: {conversation['subject']}"
|
||
})
|
||
|
||
for message in conversation.get('messages', []):
|
||
role = 'user' if message.get('from_email') == talent_email else 'assistant'
|
||
content = message.get('body', '')
|
||
if content:
|
||
conversation_history.append({
|
||
'role': role,
|
||
'content': content
|
||
})
|
||
|
||
# 调用DeepSeek API生成总结
|
||
summary = self._generate_summary_from_deepseek(conversation_history)
|
||
if not summary:
|
||
return {
|
||
'status': 'error',
|
||
'message': "生成对话总结失败"
|
||
}
|
||
|
||
# 保存或更新总结
|
||
conversation_id = conversations[0].get('id') if conversations else None
|
||
if not conversation_id:
|
||
return {
|
||
'status': 'error',
|
||
'message': "无法确定对话ID"
|
||
}
|
||
|
||
# 查找现有总结
|
||
conversation_summary = ConversationSummary.objects.filter(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
is_active=True
|
||
).first()
|
||
|
||
# 如果已存在,则更新
|
||
if conversation_summary:
|
||
conversation_summary.summary = summary
|
||
conversation_summary.save()
|
||
action = 'update'
|
||
else:
|
||
# 否则创建新的总结
|
||
conversation_summary = ConversationSummary.objects.create(
|
||
user=self.user,
|
||
talent_email=talent_email,
|
||
conversation_id=conversation_id,
|
||
summary=summary
|
||
)
|
||
action = 'create'
|
||
|
||
return {
|
||
'status': 'success',
|
||
'action': action,
|
||
'summary': {
|
||
'id': str(conversation_summary.id),
|
||
'talent_email': talent_email,
|
||
'conversation_id': conversation_id,
|
||
'summary': summary,
|
||
'created_at': conversation_summary.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': conversation_summary.updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成对话总结失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return {
|
||
'status': 'error',
|
||
'message': f"生成对话总结失败: {str(e)}"
|
||
}
|
||
|
||
def _generate_summary_from_deepseek(self, conversation_history):
|
||
"""调用DeepSeek API生成对话总结"""
|
||
try:
|
||
# 使用有效的API密钥
|
||
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
|
||
# 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取
|
||
# 从Django设置中获取密钥
|
||
from django.conf import settings
|
||
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
|
||
api_key = settings.DEEPSEEK_API_KEY
|
||
|
||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||
|
||
# 系统消息指定生成总结的任务
|
||
system_message = {
|
||
"role": "system",
|
||
"content": "你是一位专业的电商客服和达人助手。你的任务是分析用户与达人之间的所有对话历史,并生成一份简明扼要的总结。总结应包括:1. 主要讨论的产品或服务;2. 达人的主要关注点和需求;3. 已经达成的共识或协议;4. 未解决的问题或后续需要跟进的事项。总结应该客观、全面、结构清晰。"
|
||
}
|
||
|
||
messages = [system_message]
|
||
|
||
# 添加对话历史,但限制消息数量避免超出token限制
|
||
# 如果对话历史太长,可能需要进一步处理或分割
|
||
if len(conversation_history) > 20:
|
||
# 选取关键消息:第一条、最后几条以及中间的一些重要消息
|
||
selected_messages = (
|
||
conversation_history[:2] + # 前两条
|
||
conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条
|
||
conversation_history[-6:] # 最后六条
|
||
)
|
||
messages.extend(selected_messages)
|
||
else:
|
||
messages.extend(conversation_history)
|
||
|
||
# 添加用户指令
|
||
messages.append({
|
||
"role": "user",
|
||
"content": "请根据以上对话历史生成一份全面的总结。"
|
||
})
|
||
|
||
# 构建API请求
|
||
payload = {
|
||
"model": "deepseek-ai/DeepSeek-V3",
|
||
"messages": messages,
|
||
"stream": False,
|
||
"max_tokens": 1500, # 增加token上限以容纳完整总结
|
||
"temperature": 0.3, # 降低随机性,使总结更加确定性
|
||
"top_p": 0.9,
|
||
"top_k": 50,
|
||
"frequency_penalty": 0.5,
|
||
"presence_penalty": 0.2,
|
||
"n": 1,
|
||
"stop": [],
|
||
"response_format": {
|
||
"type": "text"
|
||
}
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}"
|
||
}
|
||
|
||
logger.info(f"开始调用DeepSeek API生成对话总结")
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
|
||
if response.status_code != 200:
|
||
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
result = response.json()
|
||
logger.debug(f"DeepSeek API返回: {result}")
|
||
|
||
# 提取回复内容
|
||
if 'choices' in result and len(result['choices']) > 0:
|
||
summary = result['choices'][0]['message']['content']
|
||
# 如果返回的内容为空,直接返回None
|
||
if not summary or summary.strip() == '':
|
||
logger.warning("DeepSeek API返回的总结内容为空")
|
||
return None
|
||
return summary
|
||
|
||
logger.warning(f"DeepSeek API返回格式异常: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用DeepSeek API生成总结失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
return None
|