operations_project/apps/gmail/services/gmail_service.py
2025-05-20 15:57:10 +08:00

1458 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import logging
import base64
import email
from email.utils import parseaddr
import datetime
import shutil
import uuid
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from django.conf import settings
from django.utils import timezone
from django.db import transaction
from ..models import GmailCredential, GmailConversation, GmailAttachment, ConversationSummary
from apps.chat.models import ChatHistory
from apps.knowledge_base.models import KnowledgeBase
import requests
from apps.common.services.notification_service import NotificationService
import threading
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.header import Header
import mimetypes
from apps.accounts.models import User
from apps.common.services.ai_service import AIService
import traceback
# 配置日志记录器
logger = logging.getLogger(__name__)
# 全局设置环境变量代理,用于 HTTP 和 HTTPS 请求
proxy_url = getattr(settings, 'PROXY_URL', None)
if proxy_url:
os.environ['HTTP_PROXY'] = proxy_url
os.environ['HTTPS_PROXY'] = proxy_url
logger.info(f"Gmail服务已设置全局代理环境变量: {proxy_url}")
class GmailService:
# 定义 Gmail API 所需的 OAuth 2.0 权限范围
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/pubsub']
# 定义存储临时客户端密钥文件的目录
TOKEN_DIR = os.path.join(settings.BASE_DIR, 'gmail_tokens')
# 附件存储目录
ATTACHMENT_DIR = os.path.join(settings.BASE_DIR, 'media', 'gmail_attachments')
# Gmail 监听 Pub/Sub 主题和订阅(仅后缀)
PUBSUB_TOPIC = getattr(settings, 'GMAIL_PUBSUB_TOPIC', 'gmail-notifications')
PUBSUB_SUBSCRIPTION = getattr(settings, 'GMAIL_PUBSUB_SUBSCRIPTION', 'gmail-notifications-sub')
@staticmethod
def initiate_authentication(user, client_secret_json):
"""
启动 Gmail API 的 OAuth 2.0 认证流程,生成授权 URL。
Args:
user: Django 用户对象,用于关联认证。
client_secret_json: 包含客户端密钥的 JSON 字典,通常由 Google Cloud Console 获取。
Returns:
str: 授权 URL用户需访问该 URL 进行认证并获取授权代码。
Raises:
Exception: 如果创建临时文件、生成授权 URL 或其他操作失败。
"""
try:
# 确保临时文件目录存在
os.makedirs(GmailService.TOKEN_DIR, exist_ok=True)
# 创建临时客户端密钥文件路径,基于用户 ID 避免冲突
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
# 将客户端密钥 JSON 写入临时文件
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,使用临时密钥文件和指定权限范围
# 代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob' # 使用 OOB 流程,适合非 Web 应用
)
# 生成授权 URL强制用户同意权限
auth_url, _ = flow.authorization_url(prompt='consent')
logger.info(f"Generated auth URL for user {user.id}: {auth_url}")
return auth_url
except Exception as e:
# 记录错误并抛出异常
logger.error(f"Error initiating Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件,确保不留下敏感信息
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def complete_authentication(user, auth_code, client_secret_json):
"""
完成 Gmail API 的 OAuth 2.0 认证流程,使用授权代码获取凭证并保存。
Args:
user: Django 用户对象,用于关联凭证。
auth_code: 用户在授权 URL 页面获取的授权代码。
client_secret_json: 包含客户端密钥的 JSON 字典。
Returns:
GmailCredential: 保存的 Gmail 凭证对象,包含用户邮箱和认证信息。
Raises:
HttpError: 如果 Gmail API 请求失败(如无效授权代码)。
Exception: 如果保存凭证或文件操作失败。
"""
try:
# 创建临时客户端密钥文件路径
temp_client_secret_path = os.path.join(GmailService.TOKEN_DIR, f'client_secret_{user.id}.json')
with open(temp_client_secret_path, 'w') as f:
json.dump(client_secret_json, f)
# 初始化 OAuth 2.0 流程,代理通过环境变量自动应用
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secret_path,
scopes=GmailService.SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob'
)
# 使用授权代码获取访问令牌和刷新令牌
flow.fetch_token(code=auth_code)
credentials = flow.credentials
# 创建 Gmail API 服务并获取用户邮箱
service = build('gmail', 'v1', credentials=credentials)
profile = service.users().getProfile(userId='me').execute()
email = profile['emailAddress']
# 保存或更新 Gmail 凭证到数据库
credential, created = GmailCredential.objects.get_or_create(
user=user,
email=email,
defaults={'is_default': not GmailCredential.objects.filter(user=user).exists()}
)
credential.set_credentials(credentials)
credential.save()
# 确保只有一个默认凭证
if credential.is_default:
GmailCredential.objects.filter(user=user).exclude(id=credential.id).update(is_default=False)
logger.info(f"Gmail credential saved for user {user.id}, email: {email}")
return credential
except HttpError as e:
# 记录 Gmail API 错误并抛出
logger.error(f"Gmail API error for user {user.id}: {str(e)}")
raise
except Exception as e:
# 记录其他错误并抛出
logger.error(f"Error completing Gmail authentication for user {user.id}: {str(e)}")
raise
finally:
# 清理临时文件
if os.path.exists(temp_client_secret_path):
os.remove(temp_client_secret_path)
@staticmethod
def get_service(credential):
"""
使用存储的凭证创建 Gmail API 服务实例。
Args:
credential: GmailCredential 对象,包含用户的 OAuth 2.0 凭证。
Returns:
googleapiclient.discovery.Resource: Gmail API 服务实例,用于后续 API 调用。
Raises:
Exception: 如果凭证无效或创建服务失败。
"""
try:
logger.info(f"获取Gmail服务实例用户: {credential.user.username}, 邮箱: {credential.email}")
# 从数据库凭证中获取 Google API 凭证对象
credentials = credential.get_credentials()
# 检查凭证是否需要刷新
if credentials.expired:
logger.info(f"OAuth凭证已过期尝试刷新...")
# 创建 Gmail API 服务,代理通过环境变量自动应用
service = build('gmail', 'v1', credentials=credentials)
logger.info(f"成功创建Gmail服务实例")
return service
except Exception as e:
logger.error(f"创建Gmail服务失败: {str(e)}")
# 如果是凭证刷新失败,标记凭证为无效
if 'invalid_grant' in str(e).lower() or '401' in str(e) or 'token has been expired or revoked' in str(e).lower():
logger.error(f"OAuth凭证刷新失败标记凭证为无效")
credential.is_valid = False
credential.save()
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
raise
@staticmethod
def get_conversations(user, user_email, influencer_email):
"""
获取用户和达人之间的Gmail对话
Args:
user: 当前用户对象
user_email: 用户的Gmail邮箱 (已授权)
influencer_email: 达人的Gmail邮箱
Returns:
tuple: (对话列表, 错误信息)
"""
try:
# 确保附件目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return None, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 构建搜索查询 - 查找与达人的所有邮件往来
query = f"from:({user_email} OR {influencer_email}) to:({user_email} OR {influencer_email})"
logger.info(f"Gmail搜索查询: {query}")
# 获取满足条件的所有邮件
response = service.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
# 如果有更多页,继续获取
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = service.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
messages.extend(response['messages'])
logger.info(f"找到 {len(messages)} 封邮件")
# 获取每封邮件的详细内容
conversations = []
for msg in messages:
try:
message = service.users().messages().get(userId='me', id=msg['id']).execute()
email_data = GmailService._parse_email_content(message)
if email_data:
conversations.append(email_data)
except Exception as e:
logger.error(f"处理邮件 {msg['id']} 时出错: {str(e)}")
# 按时间排序
conversations.sort(key=lambda x: x['date'])
return conversations, None
except Exception as e:
logger.error(f"获取Gmail对话失败: {str(e)}")
return None, f"获取Gmail对话失败: {str(e)}"
@staticmethod
def download_attachment(user, gmail_credential, message_id, attachment_id, filename):
"""
下载邮件附件
Args:
user: 当前用户
gmail_credential: Gmail凭证
message_id: 邮件ID
attachment_id: 附件ID
filename: 文件名
Returns:
str: 保存的文件路径
"""
try:
service = GmailService.get_service(gmail_credential)
attachment = service.users().messages().attachments().get(
userId='me',
messageId=message_id,
id=attachment_id
).execute()
data = attachment['data']
file_data = base64.urlsafe_b64decode(data)
# 安全处理文件名
safe_filename = GmailService._safe_filename(filename)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{user.id}_{timestamp}_{safe_filename}"
# 保存附件
filepath = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
with open(filepath, 'wb') as f:
f.write(file_data)
logger.info(f"附件已保存: {filepath}")
return filepath
except Exception as e:
logger.error(f"下载附件失败: {str(e)}")
return None
@staticmethod
@transaction.atomic
def save_conversations_to_chat(user, user_email, influencer_email, kb_id=None):
"""
保存Gmail对话到聊天记录
Args:
user: 当前用户
user_email: 用户Gmail邮箱
influencer_email: 达人Gmail邮箱
kb_id: 知识库ID (可选)
Returns:
tuple: (对话ID, 错误信息)
"""
try:
# 获取Gmail凭证
gmail_credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not gmail_credential:
return None, f"未找到{user_email}的授权信息"
# 获取对话
conversations, error = GmailService.get_conversations(user, user_email, influencer_email)
if error:
return None, error
if not conversations:
return None, "未找到与该达人的对话记录"
# 获取或创建默认知识库
if not kb_id:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
return None, "未找到默认知识库,请先创建一个知识库"
else:
knowledge_base = KnowledgeBase.objects.filter(id=kb_id).first()
if not knowledge_base:
return None, f"知识库ID {kb_id} 不存在"
# 创建会话ID
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
# 创建或更新Gmail对话记录
gmail_conversation, created = GmailConversation.objects.get_or_create(
user=user,
user_email=user_email,
influencer_email=influencer_email,
defaults={
'conversation_id': conversation_id,
'title': f"{influencer_email} 的Gmail对话",
'last_sync_time': timezone.now()
}
)
if not created:
# 使用现有的会话ID
conversation_id = gmail_conversation.conversation_id
gmail_conversation.last_sync_time = timezone.now()
gmail_conversation.save()
# 逐个保存邮件到聊天历史
chat_messages = []
for email_data in conversations:
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == user_email.lower()
role = 'user' if is_from_user else 'assistant'
# 记录角色判断信息,以便调试
logger.info(f"邮件角色判断: 发件人={email_data['from_email']}, 用户邮箱={user_email}, 判定为{'用户' if is_from_user else '达人'}")
# 直接使用原始邮件内容,不进行任何清理或修改
content = email_data['body']
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
title=gmail_conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
'subject': email_data['subject'], # 将主题保存在metadata中
'source': 'gmail'
}
)
chat_messages.append(chat_message)
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
gmail_credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=gmail_conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return conversation_id, None
except Exception as e:
logger.error(f"保存Gmail对话到聊天记录失败: {str(e)}")
return None, f"保存Gmail对话到聊天记录失败: {str(e)}"
@staticmethod
def setup_gmail_push_notification(user, user_email, topic_name=None, subscription_name=None):
"""
为Gmail账户设置Pub/Sub推送通知
Args:
user: 当前用户
user_email: 用户Gmail邮箱
topic_name: 自定义主题名称 (可选)
subscription_name: 自定义订阅名称 (可选)
Returns:
tuple: (成功标志, 错误信息)
"""
try:
# 获取Gmail凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 获取最新的历史ID
try:
profile = service.users().getProfile(userId='me').execute()
latest_history_id = profile.get('historyId')
if latest_history_id:
# 保存最新历史ID到凭证
credential.last_history_id = latest_history_id
credential.save()
logger.info(f"已更新{user_email}的历史ID: {latest_history_id}")
except Exception as history_error:
logger.error(f"获取历史ID失败: {str(history_error)}")
# 设置Pub/Sub主题和订阅名称
project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT_ID', '')
if not project_id:
return False, "未配置Google Cloud项目ID"
# 完整的主题格式必须是projects/{project_id}/topics/{topic_name}
# 使用已经在Google Cloud Console中创建的主题gmail-watch-topic
full_topic_name = f'projects/{project_id}/topics/gmail-watch-topic'
logger.info(f"使用主题名称: {full_topic_name}")
# 为Gmail账户启用推送通知
request = {
'labelIds': ['INBOX'],
'topicName': full_topic_name
}
try:
# 先停止现有的监听
service.users().stop(userId='me').execute()
logger.info(f"已停止现有的监听: {user_email}")
except Exception as stop_error:
logger.warning(f"停止现有监听失败(可能无监听): {str(stop_error)}")
# 启动新的监听
watch_response = service.users().watch(userId='me', body=request).execute()
logger.info(f"已为 {user_email} 设置Gmail推送通知主题: {full_topic_name}, 响应: {watch_response}")
# 如果响应中包含historyId保存它
if 'historyId' in watch_response:
credential.last_history_id = watch_response['historyId']
credential.save()
logger.info(f"已从watch响应更新{user_email}的历史ID: {watch_response['historyId']}")
return True, None
except Exception as e:
logger.error(f"设置Gmail推送通知失败: {str(e)}")
return False, f"设置Gmail推送通知失败: {str(e)}"
@staticmethod
def process_new_emails(user, credential, history_id=None):
"""
处理新邮件,获取最新收件箱邮件,进行相关处理
Args:
user: Django用户对象
credential: Gmail凭证对象
history_id: 历史ID如果提供则只获取该ID之后的变更
Returns:
list: 处理的新邮件ID列表
"""
try:
# 获取Gmail服务
service = GmailService.get_service(credential)
# 获取用户邮箱
user_email = credential.email
if not service:
logger.error(f"获取Gmail服务失败: {user_email}")
return []
logger.info(f"开始处理新邮件: 用户 {user.username}, 邮箱 {user_email}")
# 获取历史记录如果没有历史ID则无法获取历史记录
if not history_id and credential.last_history_id:
history_id = credential.last_history_id
logger.info(f"使用凭证中的历史ID: {history_id}")
# 如果历史ID存在则获取新变更否则直接获取最新邮件
new_messages = []
messages_found = False # 跟踪是否通过历史ID找到了新邮件
if history_id:
try:
history_list = service.users().history().list(
userId='me',
startHistoryId=history_id,
historyTypes=['messageAdded']
).execute()
# 提取所有新增邮件ID
message_ids = []
# 处理历史变更
if 'history' in history_list:
for history in history_list['history']:
if 'messagesAdded' in history:
for message_added in history['messagesAdded']:
message = message_added.get('message', {})
msg_id = message.get('id')
if msg_id:
message_ids.append(msg_id)
logger.info(f"从历史变更中获取到 {len(message_ids)} 封新邮件")
# 获取邮件完整内容
for msg_id in message_ids:
try:
message = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
new_messages.append(message)
messages_found = True # 标记找到了至少一封新邮件
except Exception as e:
logger.error(f"获取邮件 {msg_id} 详情失败: {str(e)}")
# 如果通过历史ID没有找到新邮件直接获取最新邮件
if not messages_found:
logger.info("通过历史ID没有找到新邮件直接获取最新邮件...")
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
except Exception as he:
logger.error(f"获取历史变更失败: {str(he)}")
# 历史获取失败,退回到获取最新邮件
logger.info("尝试直接获取最新邮件...")
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
else:
# 没有历史ID直接获取最新邮件
logger.info("没有历史ID直接获取最新邮件...")
# 获取收件箱最新邮件
results = service.users().messages().list(userId='me', labelIds=['INBOX'], maxResults=10).execute()
messages = results.get('messages', [])
# 获取每封邮件的详细信息
for message in messages:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
new_messages.append(msg)
except Exception as e:
logger.error(f"获取邮件 {message['id']} 详情失败: {str(e)}")
# 获取已有的邮件ID避免重复处理
from apps.chat.models import ChatHistory
# 查询数据库中已经存在的Gmail消息ID
existing_message_ids = set()
for chat in ChatHistory.objects.filter(
metadata__has_key='gmail_message_id'
).values_list('metadata', flat=True):
if chat and 'gmail_message_id' in chat:
existing_message_ids.add(chat['gmail_message_id'])
logger.info(f"数据库中已存在 {len(existing_message_ids)} 封邮件记录")
logger.info(f"获取到 {len(new_messages)} 封新邮件,开始处理")
# 处理每封新邮件
processed_ids = []
for message in new_messages:
try:
# 提取邮件基本信息
msg_id = message.get('id')
# 检查邮件是否已处理
if msg_id in existing_message_ids:
logger.info(f"邮件 {msg_id} 已存在于数据库中,跳过处理")
continue
# 使用单独的方法处理单个邮件
success = GmailService._process_single_email(user, credential, message, msg_id, user_email)
if success:
processed_ids.append(msg_id)
logger.info(f"成功处理邮件: {msg_id}")
else:
logger.warning(f"邮件处理失败或未满足处理条件: {msg_id}")
except Exception as msg_error:
logger.error(f"处理邮件失败: {str(msg_error)}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"成功处理 {len(processed_ids)} 封新邮件")
return processed_ids
except Exception as e:
logger.error(f"处理新邮件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
@staticmethod
def _process_single_email(user, credential, message, msg_id, user_email):
"""
处理单个邮件,提取信息并保存到对话历史
Args:
user: 用户对象
credential: Gmail凭证
message: 邮件对象
msg_id: 邮件ID
user_email: 用户邮箱
Returns:
bool: 处理是否成功
"""
try:
# 解析邮件头部
headers = message.get('payload', {}).get('headers', [])
subject = ''
from_email = ''
to_email = ''
date_str = ''
for header in headers:
if header.get('name') == 'Subject':
subject = header.get('value', '')
elif header.get('name') == 'From':
from_email = GmailService.extract_email(header.get('value', ''))
elif header.get('name') == 'To':
to_email = GmailService.extract_email(header.get('value', ''))
elif header.get('name') == 'Date':
date_str = header.get('value', '')
logger.info(f"处理邮件详情: ID={msg_id}, 主题='{subject}', 发件人={from_email}, 收件人={to_email}, 日期={date_str}")
# 邮件内部日期
internal_date = message.get('internalDate')
if internal_date:
timestamp_ms = int(internal_date)
dt = datetime.datetime.fromtimestamp(timestamp_ms / 1000.0)
logger.info(f"邮件内部时间戳: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
# 提取邮件内容
body = GmailService.get_email_body(message)
logger.info(f"邮件内容长度: {len(body)} 字符")
# 检查是否是用户收到的邮件(即达人->用户)
if to_email == user_email:
# 查找相关的对话
from apps.gmail.models import GmailConversation
conversations = GmailConversation.objects.filter(
user=user,
user_email=to_email,
influencer_email=from_email
)
if conversations.exists():
# 如果找到对话,保存邮件到对话历史
conversation = conversations.first()
logger.info(f"找到匹配的对话: ID={conversation.conversation_id}, 用户邮箱={to_email}, 达人邮箱={from_email}")
# 尝试将邮件关联到对话并保存
GmailService._save_email_to_chat(user, credential, conversation, {
'id': msg_id,
'from': f"{from_email}",
'from_email': from_email,
'to': f"{to_email}",
'to_email': to_email,
'subject': subject,
'body': body,
'date': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'attachments': [] # 这里可以添加附件处理逻辑
})
logger.info(f"已将邮件保存到对话历史: {msg_id}")
return True
else:
logger.info(f"未找到匹配的对话: 用户邮箱={to_email}, 达人邮箱={from_email}")
else:
logger.info(f"邮件不是发送给用户的: 发件人={from_email}, 收件人={to_email}, 用户邮箱={user_email}")
return False
except Exception as e:
logger.error(f"处理单个邮件失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
@staticmethod
def send_email(user, user_email, to_email, subject, body, attachments=None):
"""
发送Gmail邮件支持附件
Args:
user: 用户对象
user_email: 发件人Gmail邮箱已授权
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
attachments: 附件列表,格式为 [{'path': '本地文件路径', 'filename': '文件名称(可选)'}]
Returns:
tuple: (成功标志, 消息ID或错误信息)
"""
try:
# 获取凭证
credential = GmailCredential.objects.filter(user=user, email=user_email).first()
if not credential:
return False, f"未找到{user_email}的授权信息"
# 获取Gmail服务
service = GmailService.get_service(credential)
# 创建邮件
message = MIMEMultipart()
message['to'] = to_email
message['from'] = user_email
message['subject'] = Header(subject, 'utf-8').encode()
# 添加正文
text_part = MIMEText(body, 'plain', 'utf-8')
message.attach(text_part)
# 添加附件
if attachments and isinstance(attachments, list):
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
logger.warning(f"附件文件不存在: {filepath}")
continue
# 确定MIME类型
content_type, encoding = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
main_type, sub_type = content_type.split('/', 1)
try:
with open(filepath, 'rb') as f:
file_data = f.read()
file_part = MIMEApplication(file_data, Name=filename)
file_part['Content-Disposition'] = f'attachment; filename="{filename}"'
message.attach(file_part)
logger.info(f"已添加附件: {filename}")
except Exception as e:
logger.error(f"处理附件时出错: {str(e)}")
# 编码邮件为base64
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
# 发送邮件
result = service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
message_id = result.get('id')
# 查找或创建对话
conversation = None
try:
# 查找现有对话
conversation = GmailConversation.objects.filter(
user=user,
user_email=user_email,
influencer_email=to_email
).first()
if not conversation:
# 创建新对话
conversation_id = f"gmail_{user.id}_{str(uuid.uuid4())[:8]}"
conversation = GmailConversation.objects.create(
user=user,
user_email=user_email,
influencer_email=to_email,
conversation_id=conversation_id,
title=f"{to_email} 的Gmail对话",
last_sync_time=timezone.now()
)
else:
# 更新最后同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
# 获取知识库
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.warning(f"未找到默认知识库,邮件发送成功但未保存到聊天记录")
else:
# 创建聊天消息 - 直接使用邮件正文,不添加主题前缀
content = body
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role='user',
content=content,
metadata={
'gmail_message_id': message_id,
'from': user_email,
'to': to_email,
'date': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'subject': subject, # 将主题保存在metadata中
'source': 'gmail'
}
)
# 如果有附件,保存附件信息
if attachments and isinstance(attachments, list):
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
for attachment in attachments:
if 'path' not in attachment:
continue
filepath = attachment['path']
filename = attachment.get('filename', os.path.basename(filepath))
if not os.path.exists(filepath):
continue
# 复制附件到Gmail附件目录
try:
# 确保目录存在
os.makedirs(GmailService.ATTACHMENT_DIR, exist_ok=True)
# 生成唯一文件名
unique_filename = f"{uuid.uuid4()}_{filename}"
target_path = os.path.join(GmailService.ATTACHMENT_DIR, unique_filename)
# 复制文件
shutil.copy2(filepath, target_path)
# 获取文件大小和类型
filesize = os.path.getsize(filepath)
content_type, _ = mimetypes.guess_type(filepath)
if content_type is None:
content_type = 'application/octet-stream'
# 创建附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=message_id,
attachment_id=f"outgoing_{uuid.uuid4()}",
filename=filename,
file_path=target_path,
content_type=content_type,
size=filesize,
sender_email=user_email,
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': filename,
'size': filesize,
'mime_type': content_type,
'url': gmail_attachment.get_absolute_url()
})
except Exception as e:
logger.error(f"处理发送邮件附件时出错: {str(e)}")
# 保存更新的元数据
if metadata['attachments']:
chat_message.metadata = metadata
chat_message.save()
except Exception as e:
logger.error(f"保存发送的邮件到聊天记录失败: {str(e)}")
logger.info(f"成功发送邮件到 {to_email}")
return True, message_id
except Exception as e:
logger.error(f"发送Gmail邮件失败: {str(e)}")
return False, f"发送Gmail邮件失败: {str(e)}"
@staticmethod
def _save_email_to_chat(user, credential, conversation, email_data):
"""
保存一封邮件到聊天历史
Args:
user: 用户对象
credential: Gmail凭证对象
conversation: Gmail对话对象
email_data: 邮件数据
Returns:
bool: 成功标志
"""
try:
# 查找关联的知识库
first_message = ChatHistory.objects.filter(
conversation_id=conversation.conversation_id
).first()
if not first_message:
knowledge_base = KnowledgeBase.objects.filter(user_id=user.id, type='private').first()
if not knowledge_base:
logger.error("未找到默认知识库")
return False
else:
knowledge_base = first_message.knowledge_base
# 确定发送者角色 (user 或 assistant)
is_from_user = email_data['from_email'].lower() == credential.email.lower()
role = 'user' if is_from_user else 'assistant'
# 记录角色判断信息,以便调试
logger.info(f"邮件角色判断: 发件人={email_data['from_email']}, 用户邮箱={credential.email}, 判定为{'用户' if is_from_user else '达人'}, 角色={role}")
logger.info(f"邮件详情: 主题='{email_data['subject']}', 发送时间={email_data['date']}")
# 直接使用原始邮件内容,不进行任何清理或修改
content = email_data['body']
# 创建聊天消息
chat_message = ChatHistory.objects.create(
user=user,
knowledge_base=knowledge_base,
conversation_id=conversation.conversation_id,
title=conversation.title,
role=role,
content=content,
metadata={
'gmail_message_id': email_data['id'],
'from': email_data['from'],
'to': email_data['to'],
'date': email_data['date'],
'subject': email_data['subject'], # 将主题保存在metadata中
'source': 'gmail'
}
)
# 记录保存的消息ID和角色
logger.info(f"已保存邮件到聊天历史: ID={chat_message.id}, 角色={role}, conversation_id={conversation.conversation_id}")
# 更新对话的同步时间
conversation.last_sync_time = timezone.now()
conversation.save()
# 处理附件
if email_data['attachments']:
for attachment in email_data['attachments']:
if 'attachmentId' in attachment:
# 下载附件
file_path = GmailService.download_attachment(
user,
credential,
email_data['id'],
attachment['attachmentId'],
attachment['filename']
)
if file_path:
# 保存附件记录
gmail_attachment = GmailAttachment.objects.create(
conversation=conversation,
email_message_id=email_data['id'],
attachment_id=attachment['attachmentId'],
filename=attachment['filename'],
file_path=file_path,
content_type=attachment['mimeType'],
size=attachment['size'],
sender_email=email_data['from_email'],
chat_message_id=str(chat_message.id)
)
# 更新聊天消息,添加附件信息
metadata = chat_message.metadata or {}
if 'attachments' not in metadata:
metadata['attachments'] = []
metadata['attachments'].append({
'id': str(gmail_attachment.id),
'filename': attachment['filename'],
'size': attachment['size'],
'mime_type': attachment['mimeType'],
'url': gmail_attachment.get_absolute_url()
})
chat_message.metadata = metadata
chat_message.save()
return True
except Exception as e:
logger.error(f"保存Gmail新邮件到聊天记录失败: {str(e)}")
return False
@staticmethod
def get_conversation_summary(user, conversation_id):
"""
获取Gmail对话摘要
Args:
user (User): 用户对象
conversation_id (str): 对话ID
Returns:
tuple: (摘要内容, 错误信息)
"""
try:
# 查询对话
try:
conversation = GmailConversation.objects.get(conversation_id=conversation_id)
except GmailConversation.DoesNotExist:
return None, "对话不存在"
# 检查访问权限
if str(conversation.user.id) != str(user.id):
return None, "无权访问此对话"
# 获取最新的聊天历史ID
latest_chat = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('-created_at').first()
if not latest_chat:
return None, "对话中没有消息记录"
# 检查摘要是否已存在且是最新的
try:
summary = ConversationSummary.objects.get(conversation=conversation)
# 如果摘要存在且已包含最新消息,直接返回
if summary.last_message_id == str(latest_chat.id):
return summary.content, None
except ConversationSummary.DoesNotExist:
# 如果摘要不存在,则创建
summary = None
# 获取对话历史
chat_history = ChatHistory.objects.filter(
conversation_id=conversation_id
).order_by('created_at')
if not chat_history:
return None, "对话中没有消息记录"
# 构造对话历史记录列表
conversation_history = []
for chat in chat_history:
conversation_history.append({
'content': chat.content,
'is_from_user': chat.role == 'user',
'timestamp': chat.created_at.isoformat()
})
# 使用AI服务生成摘要
summary_content, error = AIService.generate_conversation_summary(conversation_history)
if error:
return None, f"生成摘要失败: {error}"
# 持久化保存摘要
if summary:
# 更新现有摘要
summary.content = summary_content
summary.last_message_id = str(latest_chat.id)
summary.save()
else:
# 创建新摘要
summary = ConversationSummary.objects.create(
conversation=conversation,
content=summary_content,
last_message_id=str(latest_chat.id)
)
return summary_content, None
except Exception as e:
error_msg = f"获取对话摘要失败: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return None, error_msg
@staticmethod
def _safe_filename(filename):
"""
确保文件名安全,移除不安全字符
Args:
filename: 原始文件名
Returns:
str: 安全的文件名
"""
# 替换不安全字符
unsafe_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
safe_name = filename
for char in unsafe_chars:
safe_name = safe_name.replace(char, '_')
# 限制文件名长度
if len(safe_name) > 200:
name, ext = os.path.splitext(safe_name)
safe_name = name[:196] + ext if ext else name[:200]
return safe_name
@staticmethod
def _parse_email_content(message):
"""
解析Gmail API返回的邮件内容
Args:
message: Gmail API返回的邮件对象
Returns:
dict: 包含邮件各部分内容的字典
"""
try:
# 提取邮件标识符
message_id = message['id']
thread_id = message['threadId']
# 提取邮件头部信息
headers = {}
for header in message['payload']['headers']:
headers[header['name'].lower()] = header['value']
# 提取发件人和收件人信息
from_header = headers.get('from', '')
to_header = headers.get('to', '')
subject = headers.get('subject', '(无主题)')
# 解析发件人和收件人邮箱
from_name, from_email = parseaddr(from_header)
to_name, to_email = parseaddr(to_header)
# 提取邮件日期
date_str = headers.get('date', '')
# 尝试解析日期,无法解析则使用当前时间
try:
parsed_date = email.utils.parsedate_to_datetime(date_str)
date = parsed_date.strftime('%Y-%m-%d %H:%M:%S')
except:
date = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
# 解析邮件内容
body_text, body_html, attachments = GmailService._parse_message_parts(message)
# 使用纯文本内容如果没有则使用从HTML提取的文本
body = body_text if body_text else GmailService._html_to_text(body_html) if body_html else ""
# 构建返回结果
result = {
'id': message_id,
'thread_id': thread_id,
'from': from_header,
'from_email': from_email,
'to': to_header,
'to_email': to_email,
'subject': subject,
'date': date,
'body': body,
'body_html': body_html,
'attachments': attachments
}
return result
except Exception as e:
logger.error(f"解析邮件内容失败: {str(e)}")
logger.error(traceback.format_exc())
return None
@staticmethod
def _parse_message_parts(message):
"""
递归解析邮件部分提取文本、HTML和附件
Args:
message: Gmail API返回的邮件对象
Returns:
tuple: (纯文本内容, HTML内容, 附件列表)
"""
body_text = ""
body_html = ""
attachments = []
if 'payload' not in message:
return body_text, body_html, attachments
# 检查是否有单个部分
if 'body' in message['payload']:
data = message['payload']['body'].get('data', '')
if data:
decoded_data = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
mime_type = message['payload'].get('mimeType', '')
if mime_type == 'text/plain':
body_text = decoded_data
elif mime_type == 'text/html':
body_html = decoded_data
# 检查是否是附件
if message['payload'].get('filename') and message['payload']['body'].get('attachmentId'):
attachments.append({
'filename': message['payload']['filename'],
'mimeType': message['payload'].get('mimeType', 'application/octet-stream'),
'size': message['payload']['body'].get('size', 0),
'attachmentId': message['payload']['body']['attachmentId']
})
# 处理多部分邮件
if 'parts' in message['payload']:
for part in message['payload']['parts']:
# 递归处理每个部分
part_text, part_html, part_attachments = GmailService._process_message_part(part)
body_text += part_text
body_html += part_html
attachments.extend(part_attachments)
return body_text, body_html, attachments
@staticmethod
def _process_message_part(part):
"""
处理邮件的单个部分
Args:
part: 邮件部分
Returns:
tuple: (纯文本内容, HTML内容, 附件列表)
"""
body_text = ""
body_html = ""
attachments = []
# 获取MIME类型
mime_type = part.get('mimeType', '')
# 处理文本内容
if 'body' in part and 'data' in part['body']:
data = part['body']['data']
if data:
decoded_data = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
if mime_type == 'text/plain':
body_text = decoded_data
elif mime_type == 'text/html':
body_html = decoded_data
# 处理附件
if part.get('filename') and 'body' in part and 'attachmentId' in part['body']:
attachments.append({
'filename': part['filename'],
'mimeType': mime_type,
'size': part['body'].get('size', 0),
'attachmentId': part['body']['attachmentId']
})
# 处理嵌套的multipart部分
if mime_type.startswith('multipart/') and 'parts' in part:
for subpart in part['parts']:
sub_text, sub_html, sub_attachments = GmailService._process_message_part(subpart)
body_text += sub_text
body_html += sub_html
attachments.extend(sub_attachments)
return body_text, body_html, attachments
@staticmethod
def _html_to_text(html):
"""
从HTML内容中提取纯文本
Args:
html: HTML内容
Returns:
str: 提取的纯文本
"""
if not html:
return ""
# 简单地移除HTML标签
import re
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text)
return text.strip()
@staticmethod
def extract_email(header_value):
"""
从邮件头值中提取电子邮件地址
Args:
header_value: 邮件头值,格式可能是"Name <email@domain.com>"或纯"email@domain.com"
Returns:
str: 提取出的电子邮件地址
"""
try:
if not header_value:
return ""
# 使用email.utils提供的解析函数
name, email = parseaddr(header_value)
return email.lower()
except Exception as e:
logger.error(f"提取邮件地址失败: {str(e)}")
return ""
@staticmethod
def get_email_body(message):
"""
从Gmail消息中提取邮件正文
Args:
message: Gmail API返回的邮件对象
Returns:
str: 邮件正文内容
"""
try:
# 优先使用解析消息部分的方法获取邮件内容
body_text, body_html, _ = GmailService._parse_message_parts(message)
# 如果存在纯文本内容,优先使用
if body_text:
return body_text
# 否则从HTML中提取文本
if body_html:
return GmailService._html_to_text(body_html)
# 如果以上都不存在尝试直接获取payload中的body
if 'payload' in message and 'body' in message['payload'] and 'data' in message['payload']['body']:
data = message['payload']['body']['data']
text = base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
return text
return ""
except Exception as e:
logger.error(f"提取邮件正文失败: {str(e)}")
return ""