operations_project/apps/gmail/services/gmail_service.py

1458 lines
61 KiB
Python
Raw Normal View History

2025-05-13 11:58:17 +08:00
import os
import json
import logging
import base64
import email
from email.utils import parseaddr
import datetime
import shutil
import uuid
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from django.conf import settings
from django.utils import timezone
from django.db import transaction
2025-05-13 18:36:06 +08:00
from ..models import GmailCredential, GmailConversation, GmailAttachment, ConversationSummary
2025-05-13 11:58:17 +08:00
from apps.chat.models import ChatHistory
from apps.knowledge_base.models import KnowledgeBase
import requests
from apps.common.services.notification_service import NotificationService
import threading
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.header import Header
import mimetypes
2025-05-13 18:36:06 +08:00
from apps.accounts.models import User
from apps.common.services.ai_service import AIService
import traceback
2025-05-13 11:58:17 +08:00
# 配置日志记录器
logger = logging.getLogger(__name__)
# 全局设置环境变量代理,用于 HTTP 和 HTTPS 请求
proxy_url = getattr(settings, 'PROXY_URL', None)
if proxy_url:
os.environ['HTTP_PROXY'] = proxy_url
os.environ['HTTPS_PROXY'] = proxy_url
logger.info(f"Gmail服务已设置全局代理环境变量: {proxy_url}")
class GmailService:
# 定义 Gmail API 所需的 OAuth 2.0 权限范围
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/pubsub']
# 定义存储临时客户端密钥文件的目录
TOKEN_DIR = os.path.join(settings.BASE_DIR, 'gmail_tokens')
# 附件存储目录
ATTACHMENT_DIR = os.path.join(settings.BASE_DIR, 'media', 'gmail_attachments')
2025-05-13 18:36:06 +08:00
# Gmail 监听 Pub/Sub 主题和订阅(仅后缀)
PUBSUB_TOPIC = getattr(settings, 'GMAIL_PUBSUB_TOPIC', 'gmail-notifications')
PUBSUB_SUBSCRIPTION = getattr(settings, 'GMAIL_PUBSUB_SUBSCRIPTION', 'gmail-notifications-sub')
2025-05-13 11:58:17 +08:00
@staticmethod
def initiate_authentication(user, client_secret_json):
"""
启动 Gmail API OAuth 2.0 认证流程生成授权 URL
Args:
user: Django 用户对象用于关联认证
client_secret_json: 包含客户端密钥的 JSON 字典通常由 Google Cloud Console 获取
Returns:
str: 授权 URL用户需访问该 URL 进行认证并获取授权代码
Raises:
Exception: 如果创建临时文件生成授权 URL 或其他操作失败
"""
try:
# 确保临时文件目录存在
os.makedirs(GmailService.TOKEN_DIR, exist_ok=True)
# 创建临时客户端密钥文件路径,基于用户 ID 避免冲突
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
# 将客户端密钥 JSON 写入临时文件
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,使用临时密钥文件和指定权限范围
# 代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob' # 使用 OOB 流程,适合非 Web 应用
)
# 生成授权 URL强制用户同意权限
auth_url, _ = flow.authorization_url(prompt='consent')
logger.info(f"Generated auth URL for user {user.id}: {auth_url}")
return auth_url
except Exception as e:
# 记录错误并抛出异常
logger.error(f"Error initiating Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件,确保不留下敏感信息
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def complete_authentication(user, auth_code, client_secret_json):
"""
完成 Gmail API OAuth 2.0 认证流程使用授权代码获取凭证并保存
Args:
user: Django 用户对象用于关联凭证
auth_code: 用户在授权 URL 页面获取的授权代码
client_secret_json: 包含客户端密钥的 JSON 字典
Returns:
GmailCredential: 保存的 Gmail 凭证对象包含用户邮箱和认证信息
Raises:
HttpError: 如果 Gmail API 请求失败如无效授权代码
Exception: 如果保存凭证或文件操作失败
"""
try:
# 创建临时客户端密钥文件路径
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob'
)
# 使用授权代码获取访问令牌和刷新令牌
flow.fetch_token(code=auth_code)
credentials = flow.credentials
# 创建 Gmail API 服务并获取用户邮箱
service = build('gmail', 'v1', credentials=credentials)
profile = service.users().getProfile(userId='me').execute()
email = profile['emailAddress']
# 保存或更新 Gmail 凭证到数据库
credential, created = GmailCredential.objects.get_or_create(
user=user,
email=email,
defaults={'is_default': not GmailCredential.objects.filter(user=user).exists()}
)
credential.set_credentials(credentials)
credential.save()
# 确保只有一个默认凭证
if credential.is_default:
GmailCredential.objects.filter(user=user).exclude(id=credential.id).update(is_default=False)
logger.info(f"Gmail credential saved for user {user.id}, email: {email}")
return credential
except HttpError as e:
# 记录 Gmail API 错误并抛出
logger.error(f"Gmail API error for user {user.id}: {str(e)}")
raise
except Exception as e:
# 记录其他错误并抛出
logger.error(f"Error completing Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def get_service(credential):
"""
使用存储的凭证创建 Gmail API 服务实例
Args:
credential: GmailCredential 对象包含用户的 OAuth 2.0 凭证
Returns:
googleapiclient.discovery.Resource: Gmail API 服务实例用于后续 API 调用
Raises:
Exception: 如果凭证无效或创建服务失败
"""
try:
2025-05-13 18:36:06 +08:00
logger.info(f"获取Gmail服务实例用户: {credential.user.username}, 邮箱: {credential.email}")
2025-05-13 11:58:17 +08:00
# 从数据库凭证中获取 Google API 凭证对象
credentials = credential.get_credentials()
2025-05-13 18:36:06 +08:00
# 检查凭证是否需要刷新
if credentials.expired:
logger.info(f"OAuth凭证已过期尝试刷新...")
2025-05-13 11:58:17 +08:00
# 创建 Gmail API 服务,代理通过环境变量自动应用
2025-05-13 18:36:06 +08:00
service = build('gmail', 'v1', credentials=credentials)
logger.info(f"成功创建Gmail服务实例")
return service
2025-05-13 11:58:17 +08:00
except Exception as e:
2025-05-13 18:36:06 +08:00
logger.error(f"创建Gmail服务失败: {str(e)}")
# 如果是凭证刷新失败,标记凭证为无效
if 'invalid_grant' in str(e).lower() or '401' in str(e) or 'token has been expired or revoked' in str(e).lower():
logger.error(f"OAuth凭证刷新失败标记凭证为无效")
credential.is_valid = False
credential.save()
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
2025-05-13 11:58:17 +08:00
raise
@staticmethod
def get_conversations(user, user_email, influencer_email):
"""
获取用户和达人之间的Gmail对话
Args:
user: 当前用户对象
user_email: 用户的Gmail邮箱 (已授权)
influencer_email: 达人的Gmail邮箱
Returns:
tuple: (对话列表, 错误信息)
"""
try:
# 确保附件目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return None, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 构建搜索查询 - 查找与达人的所有邮件往来
query = f"from:({user_email} OR {influencer_email}) to:({user_email} OR {influencer_email})"
logger.info(f"Gmail搜索查询: {query}")
# 获取满足条件的所有邮件
response = service.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
# 如果有更多页,继续获取
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = service.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
messages.extend(response['messages'])
logger.info(f"找到 {len(messages)} 封邮件")
# 获取每封邮件的详细内容
conversations = []
for msg in messages:
try:
message = service.users().messages().get(userId='me', id=msg['id']).execute()
email_data = GmailService._parse_email_content(message)
if email_data:
conversations.append(email_data)
except Exception as e:
logger.error(f"处理邮件 {msg['id']} 时出错: {str(e)}")
# 按时间排序
conversations.sort(key=lambda x: x['date'])
return conversations, None
except Exception as e:
logger.error(f"获取Gmail对话失败: {str(e)}")
return None, f"获取Gmail对话失败: {str(e)}"
2025-05-20 15:57:10 +08:00
2025-05-13 11:58:17 +08:00
@staticmethod
def download_attachment(user, gmail_credential, message_id, attachment_id, filename):
"""
下载邮件附件
Args:
user: 当前用户
gmail_credential: Gmail凭证
message_id: 邮件ID
attachment_id: 附件ID
filename: 文件名
Returns:
str: 保存的文件路径
"""
try:
service = GmailService.get_service(gmail_credential)
attachment = service.users().messages().attachments().get(
userId='me',
messageId=message_id,
id=attachment_id
).execute()
data = attachment['data']
file_data = base64.urlsafe_b64decode(data)
# 安全处理文件名
safe_filename = GmailService._safe_filename(filename)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{user.id}_{timestamp}_{safe_filename}"
# 保存附件
filepath = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
with open(filepath, 'wb') as f:
f.write(file_data)
logger.info(f"附件已保存: {filepath}")
return filepath
except Exception as e:
logger.error(f"下载附件失败: {str(e)}")
return None
@staticmethod
@transaction.atomic
def save_conversations_to_chat(user, user_email, influencer_email, kb_id=None):
"""
保存Gmail对话到聊天记录
Args:
user: 当前用户
user_email: 用户Gmail邮箱
influencer_email: 达人Gmail邮箱
kb_id: 知识库ID (可选)
Returns:
tuple: (对话ID, 错误信息)
"""
try:
# 获取Gmail凭证
gmail_credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not gmail_credential:
return None, f"未找到{user_email}的授权信息"
# 获取对话
conversations, error = GmailService.get_conversations(user, user_email, influencer_email)
if error:
return None, error
if not conversations:
return None, "未找到与该达人的对话记录"
# 获取或创建默认知识库
if not kb_id:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
return None, "未找到默认知识库,请先创建一个知识库"
else:
knowledge_base = KnowledgeBase.objects.filter(id=kb_id).first()
if not knowledge_base:
return None, f"知识库ID {kb_id} 不存在"
# 创建会话ID
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
# 创建或更新Gmail对话记录
gmail_conversation, created = GmailConversation.objects.get_or_create(
user=user,
user_email=user_email,
influencer_email=influencer_email,
defaults={
'conversation_id': conversation_id,
'title': f"{influencer_email} 的Gmail对话",
'last_sync_time': timezone.now()
}
)
if not created:
# 使用现有的会话ID
conversation_id = gmail_conversation.conversation_id
gmail_conversation.last_sync_time = timezone.now()
gmail_conversation.save()
# 逐个保存邮件到聊天历史
chat_messages = []
for email_data in conversations:
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == user_email.lower()
role = 'user' if is_from_user else 'assistant'
2025-05-20 15:57:10 +08:00
# 记录角色判断信息,以便调试
logger.info(f"邮件角色判断: 发件人={email_data['from_email']}, 用户邮箱={user_email}, 判定为{'用户' if is_from_user else '达人'}")
# 直接使用原始邮件内容,不进行任何清理或修改
content = email_data['body']
2025-05-13 11:58:17 +08:00
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
title=gmail_conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
2025-05-20 15:57:10 +08:00
'subject': email_data['subject'], # 将主题保存在metadata中
2025-05-13 11:58:17 +08:00
'source': 'gmail'
}
)
chat_messages.append(chat_message)
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
gmail_credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=gmail_conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return conversation_id, None
except Exception as e:
logger.error(f"保存Gmail对话到聊天记录失败: {str(e)}")
return None, f"保存Gmail对话到聊天记录失败: {str(e)}"
@staticmethod
def setup_gmail_push_notification(user, user_email, topic_name=None, subscription_name=None):
"""
为Gmail账户设置Pub/Sub推送通知
Args:
user: 当前用户
user_email: 用户Gmail邮箱
topic_name: 自定义主题名称 (可选)
subscription_name: 自定义订阅名称 (可选)
Returns:
tuple: (成功标志, 错误信息)
"""
try:
# 获取Gmail凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
2025-05-13 18:36:06 +08:00
# 获取最新的历史ID
try:
profile = service.users().getProfile(userId='me').execute()
latest_history_id = profile.get('historyId')
if latest_history_id:
# 保存最新历史ID到凭证
credential.last_history_id = latest_history_id
credential.save()
logger.info(f"已更新{user_email}的历史ID: {latest_history_id}")
except Exception as history_error:
logger.error(f"获取历史ID失败: {str(history_error)}")
2025-05-13 11:58:17 +08:00
# 设置Pub/Sub主题和订阅名称
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT_ID', '')
if not project_id:
return False, "未配置Google Cloud项目ID"
2025-05-13 18:36:06 +08:00
# 完整的主题格式必须是projects/{project_id}/topics/{topic_name}
# 使用已经在Google Cloud Console中创建的主题gmail-watch-topic
full_topic_name = f'projects/{project_id}/topics/gmail-watch-topic'
logger.info(f"使用主题名称: {full_topic_name}")
2025-05-13 11:58:17 +08:00
# 为Gmail账户启用推送通知
request = {
'labelIds': ['INBOX'],
2025-05-13 18:36:06 +08:00
'topicName': full_topic_name
2025-05-13 11:58:17 +08:00
}
try:
# 先停止现有的监听
service.users().stop(userId='me').execute()
logger.info(f"已停止现有的监听: {user_email}")
2025-05-13 18:36:06 +08:00
except Exception as stop_error:
logger.warning(f"停止现有监听失败(可能无监听): {str(stop_error)}")
2025-05-13 11:58:17 +08:00
# 启动新的监听
2025-05-13 18:36:06 +08:00
watch_response = service.users().watch(userId='me', body=request).execute()
logger.info(f"已为 {user_email} 设置Gmail推送通知主题: {full_topic_name}, 响应: {watch_response}")
# 如果响应中包含historyId保存它
if 'historyId' in watch_response:
credential.last_history_id = watch_response['historyId']
credential.save()
logger.info(f"已从watch响应更新{user_email}的历史ID: {watch_response['historyId']}")
2025-05-13 11:58:17 +08:00
return True, None
except Exception as e:
logger.error(f"设置Gmail推送通知失败: {str(e)}")
return False, f"设置Gmail推送通知失败: {str(e)}"
@staticmethod
2025-05-13 18:36:06 +08:00
def process_new_emails(user, credential, history_id=None):
2025-05-13 11:58:17 +08:00
"""
2025-05-20 15:57:10 +08:00
处理新邮件获取最新收件箱邮件进行相关处理
2025-05-13 11:58:17 +08:00
Args:
2025-05-20 15:57:10 +08:00
user: Django用户对象
2025-05-13 11:58:17 +08:00
credential: Gmail凭证对象
2025-05-20 15:57:10 +08:00
history_id: 历史ID如果提供则只获取该ID之后的变更
2025-05-13 11:58:17 +08:00
Returns:
2025-05-20 15:57:10 +08:00
list: 处理的新邮件ID列表
2025-05-13 11:58:17 +08:00
"""
try:
# 获取Gmail服务
service = GmailService.get_service(credential)
2025-05-20 15:57:10 +08:00
# 获取用户邮箱
user_email = credential.email
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
if not service:
logger.error(f"获取Gmail服务失败: {user_email}")
return []
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
logger.info(f"开始处理新邮件: 用户 {user.username}, 邮箱 {user_email}")
# 获取历史记录如果没有历史ID则无法获取历史记录
if not history_id and credential.last_history_id:
history_id = credential.last_history_id
logger.info(f"使用凭证中的历史ID: {history_id}")
# 如果历史ID存在则获取新变更否则直接获取最新邮件
new_messages = []
messages_found = False # 跟踪是否通过历史ID找到了新邮件
if history_id:
try:
history_list = service.users().history().list(
userId='me',
startHistoryId=history_id,
historyTypes=['messageAdded']
2025-05-13 18:36:06 +08:00
).execute()
2025-05-20 15:57:10 +08:00
# 提取所有新增邮件ID
message_ids = []
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 处理历史变更
if 'history' in history_list:
for history in history_list['history']:
if 'messagesAdded' in history:
for message_added in history['messagesAdded']:
message = message_added.get('message', {})
msg_id = message.get('id')
if msg_id:
message_ids.append(msg_id)
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
logger.info(f"从历史变更中获取到 {len(message_ids)} 封新邮件")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 获取邮件完整内容
for msg_id in message_ids:
try:
message = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
new_messages.append(message)
messages_found = True # 标记找到了至少一封新邮件
except Exception as e:
logger.error(f"获取邮件 {msg_id} 详情失败: {str(e)}")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 如果通过历史ID没有找到新邮件直接获取最新邮件
if not messages_found:
logger.info("通过历史ID没有找到新邮件直接获取最新邮件...")
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
except Exception as he:
logger.error(f"获取历史变更失败: {str(he)}")
# 历史获取失败,退回到获取最新邮件
logger.info("尝试直接获取最新邮件...")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
else:
# 没有历史ID直接获取最新邮件
logger.info("没有历史ID直接获取最新邮件...")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 获取已有的邮件ID避免重复处理
from apps.chat.models import ChatHistory
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 查询数据库中已经存在的Gmail消息ID
existing_message_ids = set()
for chat in ChatHistory.objects.filter(
metadata__has_key='gmail_message_id'
).values_list('metadata', flat=True):
if chat and 'gmail_message_id' in chat:
existing_message_ids.add(chat['gmail_message_id'])
logger.info(f"数据库中已存在 {len(existing_message_ids)} 封邮件记录")
logger.info(f"获取到 {len(new_messages)} 封新邮件,开始处理")
# 处理每封新邮件
processed_ids = []
for message in new_messages:
try:
# 提取邮件基本信息
msg_id = message.get('id')
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 检查邮件是否已处理
if msg_id in existing_message_ids:
logger.info(f"邮件 {msg_id} 已存在于数据库中,跳过处理")
continue
# 使用单独的方法处理单个邮件
success = GmailService._process_single_email(user, credential, message, msg_id, user_email)
2025-05-13 18:36:06 +08:00
if success:
2025-05-20 15:57:10 +08:00
processed_ids.append(msg_id)
logger.info(f"成功处理邮件: {msg_id}")
2025-05-13 18:36:06 +08:00
else:
2025-05-20 15:57:10 +08:00
logger.warning(f"邮件处理失败或未满足处理条件: {msg_id}")
except Exception as msg_error:
logger.error(f"处理邮件失败: {str(msg_error)}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"成功处理 {len(processed_ids)} 封新邮件")
return processed_ids
except Exception as e:
logger.error(f"处理新邮件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
@staticmethod
def _process_single_email(user, credential, message, msg_id, user_email):
"""
处理单个邮件提取信息并保存到对话历史
Args:
user: 用户对象
credential: Gmail凭证
message: 邮件对象
msg_id: 邮件ID
user_email: 用户邮箱
Returns:
bool: 处理是否成功
"""
try:
# 解析邮件头部
headers = message.get('payload', {}).get('headers', [])
subject = ''
from_email = ''
to_email = ''
date_str = ''
for header in headers:
if header.get('name') == 'Subject':
subject = header.get('value', '')
elif header.get('name') == 'From':
from_email = GmailService.extract_email(header.get('value', ''))
elif header.get('name') == 'To':
to_email = GmailService.extract_email(header.get('value', ''))
elif header.get('name') == 'Date':
date_str = header.get('value', '')
logger.info(f"处理邮件详情: ID={msg_id}, 主题='{subject}', 发件人={from_email}, 收件人={to_email}, 日期={date_str}")
# 邮件内部日期
internal_date = message.get('internalDate')
if internal_date:
timestamp_ms = int(internal_date)
dt = datetime.datetime.fromtimestamp(timestamp_ms / 1000.0)
logger.info(f"邮件内部时间戳: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
# 提取邮件内容
body = GmailService.get_email_body(message)
logger.info(f"邮件内容长度: {len(body)} 字符")
# 检查是否是用户收到的邮件(即达人->用户)
if to_email == user_email:
# 查找相关的对话
from apps.gmail.models import GmailConversation
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
conversations = GmailConversation.objects.filter(
user=user,
user_email=to_email,
influencer_email=from_email
)
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
if conversations.exists():
# 如果找到对话,保存邮件到对话历史
conversation = conversations.first()
logger.info(f"找到匹配的对话: ID={conversation.conversation_id}, 用户邮箱={to_email}, 达人邮箱={from_email}")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
# 尝试将邮件关联到对话并保存
GmailService._save_email_to_chat(user, credential, conversation, {
'id': msg_id,
'from': f"{from_email}",
'from_email': from_email,
'to': f"{to_email}",
'to_email': to_email,
'subject': subject,
'body': body,
'date': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'attachments': [] # 这里可以添加附件处理逻辑
})
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
logger.info(f"已将邮件保存到对话历史: {msg_id}")
return True
2025-05-13 18:36:06 +08:00
else:
2025-05-20 15:57:10 +08:00
logger.info(f"未找到匹配的对话: 用户邮箱={to_email}, 达人邮箱={from_email}")
2025-05-13 18:36:06 +08:00
else:
2025-05-20 15:57:10 +08:00
logger.info(f"邮件不是发送给用户的: 发件人={from_email}, 收件人={to_email}, 用户邮箱={user_email}")
2025-05-13 18:36:06 +08:00
2025-05-20 15:57:10 +08:00
return False
2025-05-13 11:58:17 +08:00
except Exception as e:
2025-05-20 15:57:10 +08:00
logger.error(f"处理单个邮件失败: {str(e)}")
2025-05-13 18:36:06 +08:00
import traceback
2025-05-20 15:57:10 +08:00
logger.error(traceback.format_exc())
2025-05-13 11:58:17 +08:00
return False
@staticmethod
def send_email(user, user_email, to_email, subject, body, attachments=None):
"""
发送Gmail邮件支持附件
Args:
user: 用户对象
user_email: 发件人Gmail邮箱已授权
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
attachments: 附件列表格式为 [{'path': '本地文件路径', 'filename': '文件名称(可选)'}]
Returns:
tuple: (成功标志, 消息ID或错误信息)
"""
try:
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 创建邮件
message = MIMEMultipart()
message['to'] = to_email
message['from'] = user_email
message['subject'] = Header(subject, 'utf-8').encode()
# 添加正文
text_part = MIMEText(body, 'plain', 'utf-8')
message.attach(text_part)
# 添加附件
if attachments and isinstance(attachments, list):
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 确定MIME类型
content_type, encoding = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
main_type, sub_type = content_type.split('/', 1)
try:
with open(filepath, 'rb') as f:
file_data = f.read()
file_part = MIMEApplication(file_data, Name=filename)
file_part['Content-Disposition'] = f'attachment; filename="{filename}"'
message.attach(file_part)
logger.info(f"已添加附件: {filename}")
except Exception as e:
logger.error(f"处理附件时出错: {str(e)}")
# 编码邮件为base64
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
# 发送邮件
result = service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
message_id = result.get('id')
# 查找或创建对话
conversation = None
try:
# 查找现有对话
conversation = GmailConversation.objects.filter(
user=user,
user_email=user_email,
influencer_email=to_email
).first()
if not conversation:
# 创建新对话
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
conversation = GmailConversation.objects.create(
user=user,
user_email=user_email,
influencer_email=to_email,
conversation_id=conversation_id,
title=f"{to_email} 的Gmail对话",
last_sync_time=timezone.now()
)
else:
# 更新最后同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
2025-05-20 15:57:10 +08:00
# 获取知识库
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.warning(f"未找到默认知识库,邮件发送成功但未保存到聊天记录")
else:
# 创建聊天消息 - 直接使用邮件正文,不添加主题前缀
content = body
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role='user',
content=content,
metadata={
'gmail_message_id': message_id,
'from': user_email,
'to': to_email,
'date': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'subject': subject, # 将主题保存在metadata中
'source': 'gmail'
}
)
# 如果有附件,保存附件信息
if attachments and isinstance(attachments, list):
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
continue
# 复制附件到Gmail附件目录
try:
# 确保目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 生成唯一文件名
unique_filename = f"{uuid.uuid4()}_{filename}"
target_path = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 复制文件
shutil.copy2(filepath, target_path)
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 获取文件大小和类型
filesize = os.path.getsize(filepath)
content_type, _ = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
2025-05-13 11:58:17 +08:00
2025-05-20 15:57:10 +08:00
# 创建附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=message_id,
attachment_id=f"outgoing_{uuid.uuid4()}",
filename=filename,
file_path=target_path,
content_type=content_type,
size=filesize,
sender_email=user_email,
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': filename,
'size': filesize,
'mime_type': content_type,
'url': gmail_attachment.get_absolute_url()
})
except Exception as e:
logger.error(f"处理发送邮件附件时出错: {str(e)}")
2025-05-13 11:58:17 +08:00
# 保存更新的元数据
if metadata['attachments']:
chat_message.metadata = metadata
chat_message.save()
except Exception as e:
logger.error(f"保存发送的邮件到聊天记录失败: {str(e)}")
logger.info(f"成功发送邮件到 {to_email}")
return True, message_id
except Exception as e:
logger.error(f"发送Gmail邮件失败: {str(e)}")
return False, f"发送Gmail邮件失败: {str(e)}"
2025-05-13 18:36:06 +08:00
@staticmethod
def _save_email_to_chat(user, credential, conversation, email_data):
"""
保存一封邮件到聊天历史
Args:
user: 用户对象
credential: Gmail凭证对象
conversation: Gmail对话对象
email_data: 邮件数据
Returns:
bool: 成功标志
"""
try:
# 查找关联的知识库
first_message = ChatHistory.objects.filter(
conversation_id=conversation.conversation_id
).first()
if not first_message:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.error("未找到默认知识库")
return False
else:
knowledge_base = first_message.knowledge_base
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == credential.email.lower()
role = 'user' if is_from_user else 'assistant'
2025-05-20 15:57:10 +08:00
# 记录角色判断信息,以便调试
logger.info(f"邮件角色判断: 发件人={email_data['from_email']}, 用户邮箱={credential.email}, 判定为{'用户' if is_from_user else '达人'}, 角色={role}")
logger.info(f"邮件详情: 主题='{email_data['subject']}', 发送时间={email_data['date']}")
# 直接使用原始邮件内容,不进行任何清理或修改
content = email_data['body']
2025-05-13 18:36:06 +08:00
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
2025-05-20 15:57:10 +08:00
'subject': email_data['subject'], # 将主题保存在metadata中
2025-05-13 18:36:06 +08:00
'source': 'gmail'
}
)
2025-05-20 15:57:10 +08:00
# 记录保存的消息ID和角色
logger.info(f"已保存邮件到聊天历史: ID={chat_message.id}, 角色={role}, conversation_id={conversation.conversation_id}")
2025-05-13 18:36:06 +08:00
# 更新对话的同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return True
except Exception as e:
logger.error(f"保存Gmail新邮件到聊天记录失败: {str(e)}")
return False
@staticmethod
def get_conversation_summary(user, conversation_id):
"""
获取Gmail对话摘要
Args:
user (User): 用户对象
conversation_id (str): 对话ID
Returns:
tuple: (摘要内容, 错误信息)
"""
try:
# 查询对话
try:
conversation = GmailConversation.objects.get(conversation_id=conversation_id)
except GmailConversation.DoesNotExist:
return None, "对话不存在"
# 检查访问权限
if str(conversation.user.id) != str(user.id):
return None, "无权访问此对话"
# 获取最新的聊天历史ID
latest_chat = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('-created_at').first()
if not latest_chat:
return None, "对话中没有消息记录"
# 检查摘要是否已存在且是最新的
try:
summary = ConversationSummary.objects.get(conversation=conversation)
# 如果摘要存在且已包含最新消息,直接返回
if summary.last_message_id == str(latest_chat.id):
return summary.content, None
except ConversationSummary.DoesNotExist:
# 如果摘要不存在,则创建
summary = None
# 获取对话历史
chat_history = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('created_at')
if not chat_history:
return None, "对话中没有消息记录"
# 构造对话历史记录列表
conversation_history = []
for chat in chat_history:
conversation_history.append({
'content': chat.content,
'is_from_user': chat.role == 'user',
'timestamp': chat.created_at.isoformat()
})
# 使用AI服务生成摘要
summary_content, error = AIService.generate_conversation_summary(conversation_history)
if error:
return None, f"生成摘要失败: {error}"
# 持久化保存摘要
if summary:
# 更新现有摘要
summary.content = summary_content
summary.last_message_id = str(latest_chat.id)
summary.save()
else:
# 创建新摘要
summary = ConversationSummary.objects.create(
conversation=conversation,
content=summary_content,
last_message_id=str(latest_chat.id)
)
return summary_content, None
except Exception as e:
error_msg = f"获取对话摘要失败: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return None, error_msg
2025-05-20 15:57:10 +08:00
@staticmethod
def _safe_filename(filename):
"""
确保文件名安全移除不安全字符
Args:
filename: 原始文件名
Returns:
str: 安全的文件名
"""
# 替换不安全字符
unsafe_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
safe_name = filename
for char in unsafe_chars:
safe_name = safe_name.replace(char, '_')
# 限制文件名长度
if len(safe_name) > 200:
name, ext = os.path.splitext(safe_name)
safe_name = name[:196] + ext if ext else name[:200]
return safe_name
@staticmethod
def _parse_email_content(message):
"""
解析Gmail API返回的邮件内容
Args:
message: Gmail API返回的邮件对象
Returns:
dict: 包含邮件各部分内容的字典
"""
try:
# 提取邮件标识符
message_id = message['id']
thread_id = message['threadId']
# 提取邮件头部信息
headers = {}
for header in message['payload']['headers']:
headers[header['name'].lower()] = header['value']
# 提取发件人和收件人信息
from_header = headers.get('from', '')
to_header = headers.get('to', '')
subject = headers.get('subject', '(无主题)')
# 解析发件人和收件人邮箱
from_name, from_email = parseaddr(from_header)
to_name, to_email = parseaddr(to_header)
# 提取邮件日期
date_str = headers.get('date', '')
# 尝试解析日期,无法解析则使用当前时间
try:
parsed_date = email.utils.parsedate_to_datetime(date_str)
date = parsed_date.strftime('%Y-%m-%d %H:%M:%S')
except:
date = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
# 解析邮件内容
body_text, body_html, attachments = GmailService._parse_message_parts(message)
# 使用纯文本内容如果没有则使用从HTML提取的文本
body = body_text if body_text else GmailService._html_to_text(body_html) if body_html else ""
# 构建返回结果
result = {
'id': message_id,
'thread_id': thread_id,
'from': from_header,
'from_email': from_email,
'to': to_header,
'to_email': to_email,
'subject': subject,
'date': date,
'body': body,
'body_html': body_html,
'attachments': attachments
}
return result
except Exception as e:
logger.error(f"解析邮件内容失败: {str(e)}")
logger.error(traceback.format_exc())
return None
@staticmethod
def _parse_message_parts(message):
"""
递归解析邮件部分提取文本HTML和附件
Args:
message: Gmail API返回的邮件对象
Returns:
tuple: (纯文本内容, HTML内容, 附件列表)
"""
body_text = ""
body_html = ""
attachments = []
if 'payload' not in message:
return body_text, body_html, attachments
# 检查是否有单个部分
if 'body' in message['payload']:
data = message['payload']['body'].get('data', '')
if data:
decoded_data = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
mime_type = message['payload'].get('mimeType', '')
if mime_type == 'text/plain':
body_text = decoded_data
elif mime_type == 'text/html':
body_html = decoded_data
# 检查是否是附件
if message['payload'].get('filename') and message['payload']['body'].get('attachmentId'):
attachments.append({
'filename': message['payload']['filename'],
'mimeType': message['payload'].get('mimeType', 'application/octet-stream'),
'size': message['payload']['body'].get('size', 0),
'attachmentId': message['payload']['body']['attachmentId']
})
# 处理多部分邮件
if 'parts' in message['payload']:
for part in message['payload']['parts']:
# 递归处理每个部分
part_text, part_html, part_attachments = GmailService._process_message_part(part)
body_text += part_text
body_html += part_html
attachments.extend(part_attachments)
return body_text, body_html, attachments
@staticmethod
def _process_message_part(part):
"""
处理邮件的单个部分
Args:
part: 邮件部分
Returns:
tuple: (纯文本内容, HTML内容, 附件列表)
"""
body_text = ""
body_html = ""
attachments = []
# 获取MIME类型
mime_type = part.get('mimeType', '')
# 处理文本内容
if 'body' in part and 'data' in part['body']:
data = part['body']['data']
if data:
decoded_data = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
if mime_type == 'text/plain':
body_text = decoded_data
elif mime_type == 'text/html':
body_html = decoded_data
# 处理附件
if part.get('filename') and 'body' in part and 'attachmentId' in part['body']:
attachments.append({
'filename': part['filename'],
'mimeType': mime_type,
'size': part['body'].get('size', 0),
'attachmentId': part['body']['attachmentId']
})
# 处理嵌套的multipart部分
if mime_type.startswith('multipart/') and 'parts' in part:
for subpart in part['parts']:
sub_text, sub_html, sub_attachments = GmailService._process_message_part(subpart)
body_text += sub_text
body_html += sub_html
attachments.extend(sub_attachments)
return body_text, body_html, attachments
@staticmethod
def _html_to_text(html):
"""
从HTML内容中提取纯文本
Args:
html: HTML内容
Returns:
str: 提取的纯文本
"""
if not html:
return ""
# 简单地移除HTML标签
import re
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text)
return text.strip()
@staticmethod
def extract_email(header_value):
"""
从邮件头值中提取电子邮件地址
Args:
header_value: 邮件头值格式可能是"Name <email@domain.com>"或纯"email@domain.com"
Returns:
str: 提取出的电子邮件地址
"""
try:
if not header_value:
return ""
# 使用email.utils提供的解析函数
name, email = parseaddr(header_value)
return email.lower()
except Exception as e:
logger.error(f"提取邮件地址失败: {str(e)}")
return ""
@staticmethod
def get_email_body(message):
"""
从Gmail消息中提取邮件正文
Args:
message: Gmail API返回的邮件对象
Returns:
str: 邮件正文内容
"""
try:
# 优先使用解析消息部分的方法获取邮件内容
body_text, body_html, _ = GmailService._parse_message_parts(message)
# 如果存在纯文本内容,优先使用
if body_text:
return body_text
# 否则从HTML中提取文本
if body_html:
return GmailService._html_to_text(body_html)
# 如果以上都不存在尝试直接获取payload中的body
if 'payload' in message and 'body' in message['payload'] and 'data' in message['payload']['body']:
data = message['payload']['body']['data']
text = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
return text
return ""
except Exception as e:
logger.error(f"提取邮件正文失败: {str(e)}")
return ""
2025-05-13 11:58:17 +08:00