operations_project/apps/gmail/services/gmail_service.py

1355 lines
59 KiB
Python
Raw Normal View History

2025-05-13 11:58:17 +08:00
import os
import json
import logging
import base64
import email
from email.utils import parseaddr
import datetime
import shutil
import uuid
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from django.conf import settings
from django.utils import timezone
from django.db import transaction
2025-05-13 18:36:06 +08:00
from ..models import GmailCredential, GmailConversation, GmailAttachment, ConversationSummary
2025-05-13 11:58:17 +08:00
from apps.chat.models import ChatHistory
from apps.knowledge_base.models import KnowledgeBase
import requests
from apps.common.services.notification_service import NotificationService
import threading
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.header import Header
import mimetypes
2025-05-13 18:36:06 +08:00
from apps.accounts.models import User
from apps.common.services.ai_service import AIService
import traceback
2025-05-13 11:58:17 +08:00
# 配置日志记录器
logger = logging.getLogger(__name__)
# 全局设置环境变量代理,用于 HTTP 和 HTTPS 请求
proxy_url = getattr(settings, 'PROXY_URL', None)
if proxy_url:
os.environ['HTTP_PROXY'] = proxy_url
os.environ['HTTPS_PROXY'] = proxy_url
logger.info(f"Gmail服务已设置全局代理环境变量: {proxy_url}")
class GmailService:
# 定义 Gmail API 所需的 OAuth 2.0 权限范围
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/pubsub']
# 定义存储临时客户端密钥文件的目录
TOKEN_DIR = os.path.join(settings.BASE_DIR, 'gmail_tokens')
# 附件存储目录
ATTACHMENT_DIR = os.path.join(settings.BASE_DIR, 'media', 'gmail_attachments')
2025-05-13 18:36:06 +08:00
# Gmail 监听 Pub/Sub 主题和订阅(仅后缀)
PUBSUB_TOPIC = getattr(settings, 'GMAIL_PUBSUB_TOPIC', 'gmail-notifications')
PUBSUB_SUBSCRIPTION = getattr(settings, 'GMAIL_PUBSUB_SUBSCRIPTION', 'gmail-notifications-sub')
2025-05-13 11:58:17 +08:00
@staticmethod
def initiate_authentication(user, client_secret_json):
"""
启动 Gmail API OAuth 2.0 认证流程生成授权 URL
Args:
user: Django 用户对象用于关联认证
client_secret_json: 包含客户端密钥的 JSON 字典通常由 Google Cloud Console 获取
Returns:
str: 授权 URL用户需访问该 URL 进行认证并获取授权代码
Raises:
Exception: 如果创建临时文件生成授权 URL 或其他操作失败
"""
try:
# 确保临时文件目录存在
os.makedirs(GmailService.TOKEN_DIR, exist_ok=True)
# 创建临时客户端密钥文件路径,基于用户 ID 避免冲突
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
# 将客户端密钥 JSON 写入临时文件
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,使用临时密钥文件和指定权限范围
# 代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob' # 使用 OOB 流程,适合非 Web 应用
)
# 生成授权 URL强制用户同意权限
auth_url, _ = flow.authorization_url(prompt='consent')
logger.info(f"Generated auth URL for user {user.id}: {auth_url}")
return auth_url
except Exception as e:
# 记录错误并抛出异常
logger.error(f"Error initiating Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件,确保不留下敏感信息
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def complete_authentication(user, auth_code, client_secret_json):
"""
完成 Gmail API OAuth 2.0 认证流程使用授权代码获取凭证并保存
Args:
user: Django 用户对象用于关联凭证
auth_code: 用户在授权 URL 页面获取的授权代码
client_secret_json: 包含客户端密钥的 JSON 字典
Returns:
GmailCredential: 保存的 Gmail 凭证对象包含用户邮箱和认证信息
Raises:
HttpError: 如果 Gmail API 请求失败如无效授权代码
Exception: 如果保存凭证或文件操作失败
"""
try:
# 创建临时客户端密钥文件路径
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob'
)
# 使用授权代码获取访问令牌和刷新令牌
flow.fetch_token(code=auth_code)
credentials = flow.credentials
# 创建 Gmail API 服务并获取用户邮箱
service = build('gmail', 'v1', credentials=credentials)
profile = service.users().getProfile(userId='me').execute()
email = profile['emailAddress']
# 保存或更新 Gmail 凭证到数据库
credential, created = GmailCredential.objects.get_or_create(
user=user,
email=email,
defaults={'is_default': not GmailCredential.objects.filter(user=user).exists()}
)
credential.set_credentials(credentials)
credential.save()
# 确保只有一个默认凭证
if credential.is_default:
GmailCredential.objects.filter(user=user).exclude(id=credential.id).update(is_default=False)
logger.info(f"Gmail credential saved for user {user.id}, email: {email}")
return credential
except HttpError as e:
# 记录 Gmail API 错误并抛出
logger.error(f"Gmail API error for user {user.id}: {str(e)}")
raise
except Exception as e:
# 记录其他错误并抛出
logger.error(f"Error completing Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def get_service(credential):
"""
使用存储的凭证创建 Gmail API 服务实例
Args:
credential: GmailCredential 对象包含用户的 OAuth 2.0 凭证
Returns:
googleapiclient.discovery.Resource: Gmail API 服务实例用于后续 API 调用
Raises:
Exception: 如果凭证无效或创建服务失败
"""
try:
2025-05-13 18:36:06 +08:00
logger.info(f"获取Gmail服务实例用户: {credential.user.username}, 邮箱: {credential.email}")
2025-05-13 11:58:17 +08:00
# 从数据库凭证中获取 Google API 凭证对象
credentials = credential.get_credentials()
2025-05-13 18:36:06 +08:00
# 检查凭证是否需要刷新
if credentials.expired:
logger.info(f"OAuth凭证已过期尝试刷新...")
2025-05-13 11:58:17 +08:00
# 创建 Gmail API 服务,代理通过环境变量自动应用
2025-05-13 18:36:06 +08:00
service = build('gmail', 'v1', credentials=credentials)
logger.info(f"成功创建Gmail服务实例")
return service
2025-05-13 11:58:17 +08:00
except Exception as e:
2025-05-13 18:36:06 +08:00
logger.error(f"创建Gmail服务失败: {str(e)}")
# 如果是凭证刷新失败,标记凭证为无效
if 'invalid_grant' in str(e).lower() or '401' in str(e) or 'token has been expired or revoked' in str(e).lower():
logger.error(f"OAuth凭证刷新失败标记凭证为无效")
credential.is_valid = False
credential.save()
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
2025-05-13 11:58:17 +08:00
raise
@staticmethod
def get_conversations(user, user_email, influencer_email):
"""
获取用户和达人之间的Gmail对话
Args:
user: 当前用户对象
user_email: 用户的Gmail邮箱 (已授权)
influencer_email: 达人的Gmail邮箱
Returns:
tuple: (对话列表, 错误信息)
"""
try:
# 确保附件目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return None, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 构建搜索查询 - 查找与达人的所有邮件往来
query = f"from:({user_email} OR {influencer_email}) to:({user_email} OR {influencer_email})"
logger.info(f"Gmail搜索查询: {query}")
# 获取满足条件的所有邮件
response = service.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
# 如果有更多页,继续获取
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = service.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
messages.extend(response['messages'])
logger.info(f"找到 {len(messages)} 封邮件")
# 获取每封邮件的详细内容
conversations = []
for msg in messages:
try:
message = service.users().messages().get(userId='me', id=msg['id']).execute()
email_data = GmailService._parse_email_content(message)
if email_data:
conversations.append(email_data)
except Exception as e:
logger.error(f"处理邮件 {msg['id']} 时出错: {str(e)}")
# 按时间排序
conversations.sort(key=lambda x: x['date'])
return conversations, None
except Exception as e:
logger.error(f"获取Gmail对话失败: {str(e)}")
return None, f"获取Gmail对话失败: {str(e)}"
@staticmethod
def _parse_email_content(message):
"""
解析邮件内容
Args:
message: Gmail API返回的邮件对象
Returns:
dict: 邮件内容字典
"""
try:
message_id = message['id']
payload = message['payload']
headers = payload['headers']
# 提取基本信息
email_data = {
'id': message_id,
'subject': '',
'from': '',
'from_email': '',
'to': '',
'to_email': '',
'date': '',
'body': '',
'attachments': []
}
# 提取邮件头信息
for header in headers:
name = header['name'].lower()
if name == 'subject':
email_data['subject'] = header['value']
elif name == 'from':
email_data['from'] = header['value']
_, email_data['from_email'] = parseaddr(header['value'])
elif name == 'to':
email_data['to'] = header['value']
_, email_data['to_email'] = parseaddr(header['value'])
elif name == 'date':
try:
date_value = header['value']
# 解析日期格式并转换为标准格式
date_obj = email.utils.parsedate_to_datetime(date_value)
email_data['date'] = date_obj.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.error(f"解析日期失败: {str(e)}")
email_data['date'] = header['value']
# 处理邮件正文和附件
GmailService._process_email_parts(payload, email_data)
return email_data
except Exception as e:
logger.error(f"解析邮件内容失败: {str(e)}")
return None
@staticmethod
def _process_email_parts(part, email_data, is_root=True):
"""
递归处理邮件部分提取正文和附件
Args:
part: 邮件部分
email_data: 邮件数据字典
is_root: 是否为根部分
"""
if 'parts' in part:
for sub_part in part['parts']:
GmailService._process_email_parts(sub_part, email_data, False)
# 处理附件
if not is_root and 'filename' in part.get('body', {}) and part.get('filename'):
attachment = {
'filename': part.get('filename', ''),
'mimeType': part.get('mimeType', ''),
'size': part['body'].get('size', 0)
}
if 'attachmentId' in part['body']:
attachment['attachmentId'] = part['body']['attachmentId']
email_data['attachments'].append(attachment)
# 处理正文
mime_type = part.get('mimeType', '')
if mime_type == 'text/plain' and 'data' in part.get('body', {}):
data = part['body'].get('data', '')
if data:
try:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
except Exception as e:
logger.error(f"解码邮件正文失败: {str(e)}")
@staticmethod
def download_attachment(user, gmail_credential, message_id, attachment_id, filename):
"""
下载邮件附件
Args:
user: 当前用户
gmail_credential: Gmail凭证
message_id: 邮件ID
attachment_id: 附件ID
filename: 文件名
Returns:
str: 保存的文件路径
"""
try:
service = GmailService.get_service(gmail_credential)
attachment = service.users().messages().attachments().get(
userId='me',
messageId=message_id,
id=attachment_id
).execute()
data = attachment['data']
file_data = base64.urlsafe_b64decode(data)
# 安全处理文件名
safe_filename = GmailService._safe_filename(filename)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{user.id}_{timestamp}_{safe_filename}"
# 保存附件
filepath = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
with open(filepath, 'wb') as f:
f.write(file_data)
logger.info(f"附件已保存: {filepath}")
return filepath
except Exception as e:
logger.error(f"下载附件失败: {str(e)}")
return None
@staticmethod
def _safe_filename(filename):
"""
生成安全的文件名
Args:
filename: 原始文件名
Returns:
str: 安全的文件名
"""
# 替换不安全字符
unsafe_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in unsafe_chars:
filename = filename.replace(char, '_')
# 确保文件名长度合理
if len(filename) > 100:
base, ext = os.path.splitext(filename)
filename = base[:100] + ext
return filename
@staticmethod
@transaction.atomic
def save_conversations_to_chat(user, user_email, influencer_email, kb_id=None):
"""
保存Gmail对话到聊天记录
Args:
user: 当前用户
user_email: 用户Gmail邮箱
influencer_email: 达人Gmail邮箱
kb_id: 知识库ID (可选)
Returns:
tuple: (对话ID, 错误信息)
"""
try:
# 获取Gmail凭证
gmail_credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not gmail_credential:
return None, f"未找到{user_email}的授权信息"
# 获取对话
conversations, error = GmailService.get_conversations(user, user_email, influencer_email)
if error:
return None, error
if not conversations:
return None, "未找到与该达人的对话记录"
# 获取或创建默认知识库
if not kb_id:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
return None, "未找到默认知识库,请先创建一个知识库"
else:
knowledge_base = KnowledgeBase.objects.filter(id=kb_id).first()
if not knowledge_base:
return None, f"知识库ID {kb_id} 不存在"
# 创建会话ID
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
# 创建或更新Gmail对话记录
gmail_conversation, created = GmailConversation.objects.get_or_create(
user=user,
user_email=user_email,
influencer_email=influencer_email,
defaults={
'conversation_id': conversation_id,
'title': f"{influencer_email} 的Gmail对话",
'last_sync_time': timezone.now()
}
)
if not created:
# 使用现有的会话ID
conversation_id = gmail_conversation.conversation_id
gmail_conversation.last_sync_time = timezone.now()
gmail_conversation.save()
# 逐个保存邮件到聊天历史
chat_messages = []
for email_data in conversations:
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == user_email.lower()
role = 'user' if is_from_user else 'assistant'
# 准备内容文本
content = f"主题: {email_data['subject']}\n\n{email_data['body']}"
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
title=gmail_conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
'source': 'gmail'
}
)
chat_messages.append(chat_message)
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
gmail_credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=gmail_conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return conversation_id, None
except Exception as e:
logger.error(f"保存Gmail对话到聊天记录失败: {str(e)}")
return None, f"保存Gmail对话到聊天记录失败: {str(e)}"
@staticmethod
def setup_gmail_push_notification(user, user_email, topic_name=None, subscription_name=None):
"""
为Gmail账户设置Pub/Sub推送通知
Args:
user: 当前用户
user_email: 用户Gmail邮箱
topic_name: 自定义主题名称 (可选)
subscription_name: 自定义订阅名称 (可选)
Returns:
tuple: (成功标志, 错误信息)
"""
try:
# 获取Gmail凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
2025-05-13 18:36:06 +08:00
# 获取最新的历史ID
try:
profile = service.users().getProfile(userId='me').execute()
latest_history_id = profile.get('historyId')
if latest_history_id:
# 保存最新历史ID到凭证
credential.last_history_id = latest_history_id
credential.save()
logger.info(f"已更新{user_email}的历史ID: {latest_history_id}")
except Exception as history_error:
logger.error(f"获取历史ID失败: {str(history_error)}")
2025-05-13 11:58:17 +08:00
# 设置Pub/Sub主题和订阅名称
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT_ID', '')
if not project_id:
return False, "未配置Google Cloud项目ID"
2025-05-13 18:36:06 +08:00
# 完整的主题格式必须是projects/{project_id}/topics/{topic_name}
# 使用已经在Google Cloud Console中创建的主题gmail-watch-topic
full_topic_name = f'projects/{project_id}/topics/gmail-watch-topic'
logger.info(f"使用主题名称: {full_topic_name}")
2025-05-13 11:58:17 +08:00
# 为Gmail账户启用推送通知
request = {
'labelIds': ['INBOX'],
2025-05-13 18:36:06 +08:00
'topicName': full_topic_name
2025-05-13 11:58:17 +08:00
}
try:
# 先停止现有的监听
service.users().stop(userId='me').execute()
logger.info(f"已停止现有的监听: {user_email}")
2025-05-13 18:36:06 +08:00
except Exception as stop_error:
logger.warning(f"停止现有监听失败(可能无监听): {str(stop_error)}")
2025-05-13 11:58:17 +08:00
# 启动新的监听
2025-05-13 18:36:06 +08:00
watch_response = service.users().watch(userId='me', body=request).execute()
logger.info(f"已为 {user_email} 设置Gmail推送通知主题: {full_topic_name}, 响应: {watch_response}")
# 如果响应中包含historyId保存它
if 'historyId' in watch_response:
credential.last_history_id = watch_response['historyId']
credential.save()
logger.info(f"已从watch响应更新{user_email}的历史ID: {watch_response['historyId']}")
2025-05-13 11:58:17 +08:00
return True, None
except Exception as e:
logger.error(f"设置Gmail推送通知失败: {str(e)}")
return False, f"设置Gmail推送通知失败: {str(e)}"
@staticmethod
2025-05-13 18:36:06 +08:00
def process_new_emails(user, credential, history_id=None):
2025-05-13 11:58:17 +08:00
"""
处理新收到的邮件
Args:
user: 用户对象
credential: Gmail凭证对象
2025-05-13 18:36:06 +08:00
history_id: Gmail历史记录ID (可选如果不提供则使用凭证中的last_history_id)
2025-05-13 11:58:17 +08:00
Returns:
None
"""
try:
2025-05-13 18:36:06 +08:00
# 如果没有提供history_id使用凭证中的last_history_id
if not history_id and credential.last_history_id:
history_id = credential.last_history_id
logger.info(f"使用凭证中保存的历史ID: {history_id}")
if not history_id:
logger.error(f"缺少历史ID无法处理新邮件")
return
logger.info(f"开始处理Gmail新邮件用户: {user.username}, 邮箱: {credential.email}, 历史ID: {history_id}")
2025-05-13 11:58:17 +08:00
# 获取Gmail服务
service = GmailService.get_service(credential)
# 获取活跃对话
active_conversations = GmailConversation.objects.filter(
user=user,
user_email=credential.email,
is_active=True
)
influencer_emails = [conv.influencer_email for conv in active_conversations]
if not influencer_emails:
logger.info(f"用户 {user.username} 没有活跃的Gmail对话")
return
2025-05-13 18:36:06 +08:00
logger.info(f"找到 {len(influencer_emails)} 个活跃的Gmail对话")
# 方法1: 通过历史记录获取变更
try:
logger.info(f"通过历史记录获取变更...")
# 获取历史记录变更,包含所有相关变更类型
history_results = service.users().history().list(
userId='me',
startHistoryId=history_id,
historyTypes=['messageAdded', 'messageDeleted', 'labelAdded', 'labelRemoved']
).execute()
# 保存最新的historyId如果有
if 'historyId' in history_results:
new_history_id = history_results['historyId']
credential.last_history_id = new_history_id
credential.save()
logger.info(f"已更新最新历史ID: {new_history_id}")
# 处理历史记录
processed_by_history = False
if 'history' in history_results and history_results['history']:
logger.info(f"找到 {len(history_results.get('history', []))} 条历史变更记录")
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
# 提取所有消息ID
message_ids = set()
for history in history_results.get('history', []):
# 检查不同类型的变更
for messages_key in ['messagesAdded', 'labelAdded', 'labelRemoved']:
for message_item in history.get(messages_key, []):
if 'message' in message_item and 'id' in message_item['message']:
message_ids.add(message_item['message']['id'])
if message_ids:
logger.info(f"从历史记录中找到 {len(message_ids)} 个消息ID")
processed_by_history = True
# 处理每个消息
for message_id in message_ids:
GmailService._process_single_message(service, user, credential, message_id, active_conversations, influencer_emails)
else:
logger.info(f"未找到历史变更记录")
# 方法2: 如果历史记录没有变更,直接查询收件箱最近邮件
if not processed_by_history:
logger.info(f"未通过历史记录找到变更,尝试直接查询最近邮件...")
# 查询最近的10封邮件
results = service.users().messages().list(
userId='me',
maxResults=10,
labelIds=['INBOX']
).execute()
messages = results.get('messages', [])
if not messages:
logger.info(f"未找到任何收件箱邮件")
return
logger.info(f"找到 {len(messages)} 封收件箱邮件,检查最近的邮件")
# 检查所有邮件,不限制处理数量
processed_message_ids = []
saved_count = 0
# 从conversation的metadata中获取已处理的消息ID
already_processed_ids = set()
for conversation in active_conversations:
if conversation.metadata and 'last_processed_messages' in conversation.metadata:
already_processed_ids.update(conversation.metadata.get('last_processed_messages', []))
for msg in messages:
message_id = msg['id']
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
# 避免重复处理
if message_id in already_processed_ids:
logger.info(f"邮件ID: {message_id} 已处理过,跳过")
continue
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
processed_message_ids.append(message_id)
logger.info(f"处理新发现的邮件ID: {message_id}")
if GmailService._process_single_message(service, user, credential, message_id, active_conversations, influencer_emails):
saved_count += 1
# 更新最近处理的消息ID到所有活跃对话
if processed_message_ids:
for conversation in active_conversations:
metadata = conversation.metadata or {}
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
# 保留之前处理过的ID加上新处理的ID
old_ids = metadata.get('last_processed_messages', [])
# 只保留最近的20个ID避免列表过长
new_ids = (processed_message_ids + old_ids)[:20]
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
metadata['last_processed_messages'] = new_ids
conversation.metadata = metadata
conversation.save()
logger.info(f"更新了 {len(processed_message_ids)} 个新处理的邮件ID保存了 {saved_count} 封邮件")
except HttpError as e:
if e.resp.status == 404:
# 历史ID可能无效尝试获取当前ID并更新
logger.warning(f"历史ID {history_id} 无效尝试获取当前ID")
try:
profile = service.users().getProfile(userId='me').execute()
new_history_id = profile.get('historyId')
if new_history_id:
credential.last_history_id = new_history_id
credential.save()
logger.info(f"已更新为新的历史ID: {new_history_id}")
# 尝试使用方法2直接获取邮件
logger.info(f"尝试直接获取最近邮件...")
GmailService.process_new_emails(user, credential, new_history_id)
except Exception as profile_error:
logger.error(f"获取新历史ID失败: {str(profile_error)}")
else:
logger.error(f"获取历史变更失败: {str(e)}")
2025-05-13 11:58:17 +08:00
except Exception as e:
logger.error(f"处理Gmail新消息失败: {str(e)}")
2025-05-13 18:36:06 +08:00
# 记录堆栈跟踪以便更好地诊断问题
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
2025-05-13 11:58:17 +08:00
@staticmethod
2025-05-13 18:36:06 +08:00
def _process_single_message(service, user, credential, message_id, active_conversations, influencer_emails):
"""处理单个邮件消息"""
2025-05-13 11:58:17 +08:00
try:
2025-05-13 18:36:06 +08:00
# 获取完整邮件内容
message = service.users().messages().get(userId='me', id=message_id).execute()
email_data = GmailService._parse_email_content(message)
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
if not email_data:
logger.warning(f"无法解析邮件内容: {message_id}")
return False
logger.info(f"邮件信息: 发件人={email_data['from_email']}, 收件人={email_data['to_email']}, 主题={email_data['subject']}")
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
saved = False
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
# 场景1: 来自达人的邮件 - 寻找匹配的对话记录
if email_data['from_email'] in influencer_emails:
logger.info(f"找到来自达人 {email_data['from_email']} 的邮件")
# 查找相关对话
conversation = active_conversations.filter(
influencer_email=email_data['from_email']
).first()
if conversation:
logger.info(f"找到匹配的对话记录: ID={conversation.id}, 会话ID={conversation.conversation_id}")
# 将新邮件保存到聊天历史
success = GmailService._save_email_to_chat(
user,
credential,
conversation,
email_data
)
if success:
logger.info(f"成功保存邮件到聊天历史")
saved = True
# 发送通知
try:
NotificationService().send_notification(
user=user,
title="收到新邮件",
content=f"您收到来自 {email_data['from_email']} 的新邮件: {email_data['subject']}",
notification_type="gmail",
related_object_id=conversation.conversation_id
)
except Exception as notif_error:
logger.error(f"发送通知失败: {str(notif_error)}")
else:
logger.error(f"保存邮件到聊天历史失败")
else:
# 找不到对话记录,创建新的
logger.info(f"未找到与 {email_data['from_email']} 的对话记录,创建新对话")
try:
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
conversation = GmailConversation.objects.create(
user=user,
user_email=credential.email,
influencer_email=email_data['from_email'],
conversation_id=conversation_id,
title=f"{email_data['from_email']} 的Gmail对话",
is_active=True
)
# 保存邮件到新创建的对话
success = GmailService._save_email_to_chat(
2025-05-13 11:58:17 +08:00
user,
credential,
2025-05-13 18:36:06 +08:00
conversation,
email_data
2025-05-13 11:58:17 +08:00
)
2025-05-13 18:36:06 +08:00
if success:
logger.info(f"成功保存邮件到新创建的对话")
saved = True
except Exception as create_error:
logger.error(f"创建新对话失败: {str(create_error)}")
# 场景2: 发送给达人的邮件 - 寻找匹配的对话记录
elif email_data['to_email'] in influencer_emails:
logger.info(f"这是发送给达人 {email_data['to_email']} 的邮件")
# 查找相关对话
conversation = active_conversations.filter(
influencer_email=email_data['to_email']
).first()
if conversation:
logger.info(f"找到匹配的对话记录: ID={conversation.id}, 会话ID={conversation.conversation_id}")
# 将新邮件保存到聊天历史
success = GmailService._save_email_to_chat(
user,
credential,
conversation,
email_data
)
if success:
logger.info(f"成功保存邮件到聊天历史")
saved = True
else:
logger.error(f"保存邮件到聊天历史失败")
else:
# 找不到对话记录,创建新的
logger.info(f"未找到与 {email_data['to_email']} 的对话记录,创建新对话")
try:
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
conversation = GmailConversation.objects.create(
user=user,
user_email=credential.email,
influencer_email=email_data['to_email'],
conversation_id=conversation_id,
title=f"{email_data['to_email']} 的Gmail对话",
is_active=True
)
# 保存邮件到新创建的对话
success = GmailService._save_email_to_chat(
user,
credential,
conversation,
email_data
)
if success:
logger.info(f"成功保存邮件到新创建的对话")
saved = True
except Exception as create_error:
logger.error(f"创建新对话失败: {str(create_error)}")
2025-05-13 11:58:17 +08:00
2025-05-13 18:36:06 +08:00
# 场景3: 其他邮件 - 不保存非达人相关邮件
else:
logger.info(f"邮件 {email_data['from_email']}{email_data['to_email']} 与跟踪的达人对话无关,不保存")
return saved
2025-05-13 11:58:17 +08:00
except Exception as e:
2025-05-13 18:36:06 +08:00
logger.error(f"处理邮件 {message_id} 时出错: {str(e)}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
2025-05-13 11:58:17 +08:00
return False
@staticmethod
def send_email(user, user_email, to_email, subject, body, attachments=None):
"""
发送Gmail邮件支持附件
Args:
user: 用户对象
user_email: 发件人Gmail邮箱已授权
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
attachments: 附件列表格式为 [{'path': '本地文件路径', 'filename': '文件名称(可选)'}]
Returns:
tuple: (成功标志, 消息ID或错误信息)
"""
try:
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 创建邮件
message = MIMEMultipart()
message['to'] = to_email
message['from'] = user_email
message['subject'] = Header(subject, 'utf-8').encode()
# 添加正文
text_part = MIMEText(body, 'plain', 'utf-8')
message.attach(text_part)
# 添加附件
if attachments and isinstance(attachments, list):
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 确定MIME类型
content_type, encoding = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
main_type, sub_type = content_type.split('/', 1)
try:
with open(filepath, 'rb') as f:
file_data = f.read()
file_part = MIMEApplication(file_data, Name=filename)
file_part['Content-Disposition'] = f'attachment; filename="{filename}"'
message.attach(file_part)
logger.info(f"已添加附件: {filename}")
except Exception as e:
logger.error(f"处理附件时出错: {str(e)}")
# 编码邮件为base64
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
# 发送邮件
result = service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
message_id = result.get('id')
# 查找或创建对话
conversation = None
try:
# 查找现有对话
conversation = GmailConversation.objects.filter(
user=user,
user_email=user_email,
influencer_email=to_email
).first()
if not conversation:
# 创建新对话
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
conversation = GmailConversation.objects.create(
user=user,
user_email=user_email,
influencer_email=to_email,
conversation_id=conversation_id,
title=f"{to_email} 的Gmail对话",
last_sync_time=timezone.now()
)
else:
# 更新最后同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
# 保存到聊天历史
if conversation:
# 获取知识库
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.warning(f"未找到默认知识库,邮件发送成功但未保存到聊天记录")
else:
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role='user',
content=f"主题: {subject}\n\n{body}",
metadata={
'gmail_message_id': message_id,
'from': user_email,
'to': to_email,
'date': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'source': 'gmail'
}
)
# 如果有附件,保存附件信息
if attachments and isinstance(attachments, list):
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
continue
# 复制附件到Gmail附件目录
try:
# 确保目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
# 生成唯一文件名
unique_filename = f"{uuid.uuid4()}_{filename}"
target_path = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
# 复制文件
shutil.copy2(filepath, target_path)
# 获取文件大小和类型
filesize = os.path.getsize(filepath)
content_type, _ = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
# 创建附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=message_id,
attachment_id=f"outgoing_{uuid.uuid4()}",
filename=filename,
file_path=target_path,
content_type=content_type,
size=filesize,
sender_email=user_email,
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': filename,
'size': filesize,
'mime_type': content_type,
'url': gmail_attachment.get_absolute_url()
})
except Exception as e:
logger.error(f"处理发送邮件附件时出错: {str(e)}")
# 保存更新的元数据
if metadata['attachments']:
chat_message.metadata = metadata
chat_message.save()
except Exception as e:
logger.error(f"保存发送的邮件到聊天记录失败: {str(e)}")
logger.info(f"成功发送邮件到 {to_email}")
return True, message_id
except Exception as e:
logger.error(f"发送Gmail邮件失败: {str(e)}")
return False, f"发送Gmail邮件失败: {str(e)}"
2025-05-13 18:36:06 +08:00
@staticmethod
def _save_email_to_chat(user, credential, conversation, email_data):
"""
保存一封邮件到聊天历史
Args:
user: 用户对象
credential: Gmail凭证对象
conversation: Gmail对话对象
email_data: 邮件数据
Returns:
bool: 成功标志
"""
try:
# 查找关联的知识库
first_message = ChatHistory.objects.filter(
conversation_id=conversation.conversation_id
).first()
if not first_message:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.error("未找到默认知识库")
return False
else:
knowledge_base = first_message.knowledge_base
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == credential.email.lower()
role = 'user' if is_from_user else 'assistant'
# 准备内容文本
content = f"主题: {email_data['subject']}\n\n{email_data['body']}"
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
'source': 'gmail'
}
)
# 更新对话的同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return True
except Exception as e:
logger.error(f"保存Gmail新邮件到聊天记录失败: {str(e)}")
return False
@staticmethod
def get_conversation_summary(user, conversation_id):
"""
获取Gmail对话摘要
Args:
user (User): 用户对象
conversation_id (str): 对话ID
Returns:
tuple: (摘要内容, 错误信息)
"""
try:
# 查询对话
try:
conversation = GmailConversation.objects.get(conversation_id=conversation_id)
except GmailConversation.DoesNotExist:
return None, "对话不存在"
# 检查访问权限
if str(conversation.user.id) != str(user.id):
return None, "无权访问此对话"
# 获取最新的聊天历史ID
latest_chat = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('-created_at').first()
if not latest_chat:
return None, "对话中没有消息记录"
# 检查摘要是否已存在且是最新的
try:
summary = ConversationSummary.objects.get(conversation=conversation)
# 如果摘要存在且已包含最新消息,直接返回
if summary.last_message_id == str(latest_chat.id):
return summary.content, None
except ConversationSummary.DoesNotExist:
# 如果摘要不存在,则创建
summary = None
# 获取对话历史
chat_history = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('created_at')
if not chat_history:
return None, "对话中没有消息记录"
# 构造对话历史记录列表
conversation_history = []
for chat in chat_history:
conversation_history.append({
'content': chat.content,
'is_from_user': chat.role == 'user',
'timestamp': chat.created_at.isoformat()
})
# 使用AI服务生成摘要
summary_content, error = AIService.generate_conversation_summary(conversation_history)
if error:
return None, f"生成摘要失败: {error}"
# 持久化保存摘要
if summary:
# 更新现有摘要
summary.content = summary_content
summary.last_message_id = str(latest_chat.id)
summary.save()
else:
# 创建新摘要
summary = ConversationSummary.objects.create(
conversation=conversation,
content=summary_content,
last_message_id=str(latest_chat.id)
)
return summary_content, None
except Exception as e:
error_msg = f"获取对话摘要失败: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return None, error_msg
2025-05-13 11:58:17 +08:00