daren_project/user_management/gmail_account_views.py
2025-04-29 10:22:57 +08:00

753 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import traceback
from datetime import datetime, timedelta
from django.utils import timezone
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from rest_framework.response import Response
from django.conf import settings
from .models import GmailCredential
from .gmail_integration import GmailIntegration, GmailServiceManager
logger = logging.getLogger(__name__)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def list_gmail_accounts(request):
"""列出用户的所有Gmail账户"""
try:
# 获取用户的所有Gmail凭证
credentials = GmailCredential.objects.filter(
user=request.user
).order_by('-is_default', '-updated_at')
# 转换为简单的数据结构
accounts = []
for cred in credentials:
# 检查账户状态
is_valid = False
try:
# 尝试使用API获取认证状态
gmail_integration = GmailIntegration(
user=request.user,
gmail_credential_id=str(cred.id)
)
is_valid = gmail_integration.authenticate()
except Exception:
pass
# 检查监听状态
watch_expired = True
if cred.watch_expiration:
watch_expired = cred.watch_expiration < timezone.now()
accounts.append({
'id': str(cred.id),
'name': cred.name,
'gmail_email': cred.gmail_email or "未知",
'is_default': cred.is_default,
'is_active': cred.is_active,
'is_valid': is_valid, # 凭证是否有效
'watch_expired': watch_expired,
'watch_expiration': cred.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if cred.watch_expiration else None,
'created_at': cred.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': cred.updated_at.strftime('%Y-%m-%d %H:%M:%S')
})
return Response({
'code': 200,
'message': '获取Gmail账户列表成功',
'data': accounts
})
except Exception as e:
logger.error(f"获取Gmail账户列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取Gmail账户列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def set_default_gmail_account(request):
"""设置默认Gmail账户"""
try:
# 获取账户ID
account_id = request.data.get('account_id')
if not account_id:
return Response({
'code': 400,
'message': '缺少账户ID参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 查找账户
try:
credential = GmailCredential.objects.get(
id=account_id,
user=request.user
)
except GmailCredential.DoesNotExist:
return Response({
'code': 404,
'message': '找不到指定的Gmail账户',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 设置为默认账户
credential.is_default = True
credential.save() # save方法会自动处理其他账户的默认状态
# 清理服务单例,确保下次使用时获取的是最新状态
GmailServiceManager.clear_instance(request.user)
return Response({
'code': 200,
'message': f'已将{credential.gmail_email or credential.name}设为默认Gmail账户',
'data': {
'id': str(credential.id),
'name': credential.name,
'gmail_email': credential.gmail_email,
'is_default': True
}
})
except Exception as e:
logger.error(f"设置默认Gmail账户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'设置默认Gmail账户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def update_gmail_account(request):
"""更新Gmail账户信息"""
try:
# 获取参数
account_id = request.data.get('account_id')
name = request.data.get('name')
is_active = request.data.get('is_active')
if not account_id:
return Response({
'code': 400,
'message': '缺少账户ID参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 查找账户
try:
credential = GmailCredential.objects.get(
id=account_id,
user=request.user
)
except GmailCredential.DoesNotExist:
return Response({
'code': 404,
'message': '找不到指定的Gmail账户',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 更新信息
if name is not None:
credential.name = name
if is_active is not None:
credential.is_active = is_active
credential.save()
# 清理服务单例
GmailServiceManager.clear_instance(request.user, str(credential.id))
return Response({
'code': 200,
'message': '更新Gmail账户成功',
'data': {
'id': str(credential.id),
'name': credential.name,
'gmail_email': credential.gmail_email,
'is_active': credential.is_active,
'is_default': credential.is_default
}
})
except Exception as e:
logger.error(f"更新Gmail账户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新Gmail账户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['DELETE'])
@permission_classes([IsAuthenticated])
def delete_gmail_account(request, account_id):
"""删除Gmail账户"""
try:
# 查找账户
try:
credential = GmailCredential.objects.get(
id=account_id,
user=request.user
)
except GmailCredential.DoesNotExist:
return Response({
'code': 404,
'message': '找不到指定的Gmail账户',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 记录账户信息
account_info = {
'id': str(credential.id),
'name': credential.name,
'gmail_email': credential.gmail_email
}
# 清理服务单例
GmailServiceManager.clear_instance(request.user, str(credential.id))
# 删除账户
credential.delete()
# 如果删除了默认账户,设置新的默认账户
default_exists = GmailCredential.objects.filter(
user=request.user,
is_default=True
).exists()
if not default_exists:
# 设置最新的账户为默认
latest_credential = GmailCredential.objects.filter(
user=request.user
).order_by('-updated_at').first()
if latest_credential:
latest_credential.is_default = True
latest_credential.save()
return Response({
'code': 200,
'message': f'成功删除Gmail账户: {account_info["name"]}',
'data': account_info
})
except Exception as e:
logger.error(f"删除Gmail账户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除Gmail账户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_gmail_account(request):
"""添加新的Gmail账户"""
try:
# 获取可选的代理设置
use_proxy = request.data.get('use_proxy', True)
proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890')
# 获取账户名称
name = request.data.get('name', '新Gmail账户')
# 获取客户端密钥支持JSON格式字符串或上传的文件
client_secret_json = None
# 检查是否有上传的JSON文件
if 'client_secret_file' in request.FILES:
import json
file_content = request.FILES['client_secret_file'].read().decode('utf-8')
try:
client_secret_json = json.loads(file_content)
logger.info("成功从上传的JSON文件解析客户端密钥")
except json.JSONDecodeError:
return Response({
'code': 400,
'message': '上传的JSON文件格式无效',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
else:
# 尝试从请求数据中获取JSON字符串
client_secret_json = request.data.get('client_secret_json')
if not client_secret_json:
return Response({
'code': 400,
'message': '缺少客户端密钥参数请提供client_secret_json或上传client_secret_file',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证用户是否已达到Gmail账户数量限制
max_accounts_per_user = getattr(settings, 'MAX_GMAIL_ACCOUNTS_PER_USER', 5)
current_accounts_count = GmailCredential.objects.filter(user=request.user, is_active=True).count()
if current_accounts_count >= max_accounts_per_user:
return Response({
'code': 400,
'message': f'您最多只能绑定{max_accounts_per_user}个Gmail账户',
'data': {
'current_count': current_accounts_count,
'max_allowed': max_accounts_per_user
}
}, status=status.HTTP_400_BAD_REQUEST)
# 创建Gmail集成实例
gmail_integration = GmailIntegration(
user=request.user,
client_secret_json=client_secret_json,
use_proxy=use_proxy,
proxy_url=proxy_url
)
# 开始认证流程
try:
gmail_integration.authenticate()
# 认证会抛出包含认证URL的异常
except Exception as e:
error_message = str(e)
# 检查是否包含认证URL正常的OAuth流程
if "Please visit this URL" in error_message:
# 提取认证URL
auth_url = error_message.split("URL: ")[1].split(" ")[0]
# 记录认证会话信息方便handle_gmail_auth_code使用
request.session['gmail_auth_pending'] = {
'name': name,
'use_proxy': use_proxy,
'proxy_url': proxy_url,
# 不存储client_secret_json应该已经保存在OAuth流程中
}
return Response({
'code': 202, # Accepted需要进一步操作
'message': '需要Gmail授权',
'data': {
'auth_url': auth_url,
'name': name
}
})
elif "Token has been expired or revoked" in error_message:
return Response({
'code': 401,
'message': 'OAuth令牌已过期或被撤销请重新授权',
'data': None
}, status=status.HTTP_401_UNAUTHORIZED)
else:
# 其他错误
logger.error(f"Gmail认证过程中出现错误: {error_message}")
return Response({
'code': 500,
'message': f'Gmail认证失败: {error_message}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'code': 200,
'message': '添加Gmail账户成功',
'data': None
})
except Exception as e:
logger.error(f"添加Gmail账户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'添加Gmail账户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def handle_gmail_auth_code(request):
"""处理Gmail授权回调码"""
try:
# 获取授权码
auth_code = request.data.get('auth_code')
if not auth_code:
return Response({
'code': 400,
'message': '缺少授权码参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取账户名称和其他参数,优先从会话中获取
auth_pending = request.session.pop('gmail_auth_pending', {})
name = request.data.get('name', auth_pending.get('name', '新Gmail账户'))
use_proxy = request.data.get('use_proxy', auth_pending.get('use_proxy', True))
proxy_url = request.data.get('proxy_url', auth_pending.get('proxy_url', 'http://127.0.0.1:7890'))
# 创建Gmail集成实例
gmail_integration = GmailIntegration(
user=request.user,
use_proxy=use_proxy,
proxy_url=proxy_url
)
# 处理授权码
try:
result = gmail_integration.handle_auth_code(auth_code)
# 设置账户名称
if gmail_integration.gmail_credential:
gmail_integration.gmail_credential.name = name
gmail_integration.gmail_credential.save()
# 初始化监听
try:
watch_result = gmail_integration.setup_watch()
logger.info(f"初始化Gmail监听成功: {watch_result}")
except Exception as watch_error:
logger.error(f"初始化Gmail监听失败: {str(watch_error)}")
# 返回结果
return Response({
'code': 200,
'message': 'Gmail账户授权成功',
'data': {
'id': str(gmail_integration.gmail_credential.id) if gmail_integration.gmail_credential else None,
'name': name,
'gmail_email': gmail_integration.gmail_credential.gmail_email if gmail_integration.gmail_credential else None,
'watch_result': watch_result if 'watch_result' in locals() else None
}
})
except Exception as auth_error:
error_message = str(auth_error)
logger.error(f"处理Gmail授权码失败: {error_message}")
if "invalid_grant" in error_message.lower():
return Response({
'code': 400,
'message': '授权码无效或已过期,请重新授权',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
return Response({
'code': 500,
'message': f'处理Gmail授权码失败: {error_message}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"处理Gmail授权码失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理Gmail授权码失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def refresh_gmail_account_watch(request):
"""刷新特定Gmail账户的监听"""
try:
# 获取账户ID
account_id = request.data.get('account_id')
if not account_id:
return Response({
'code': 400,
'message': '缺少账户ID参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 查找账户
try:
credential = GmailCredential.objects.get(
id=account_id,
user=request.user,
is_active=True
)
except GmailCredential.DoesNotExist:
return Response({
'code': 404,
'message': '找不到指定的Gmail账户',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 获取可选的代理设置
use_proxy = request.data.get('use_proxy', True)
proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890')
# 创建Gmail集成实例
gmail_integration = GmailIntegration(
user=request.user,
gmail_credential_id=str(credential.id),
use_proxy=use_proxy,
proxy_url=proxy_url
)
# 认证Gmail
if not gmail_integration.authenticate():
return Response({
'code': 400,
'message': 'Gmail认证失败',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 设置监听
watch_result = gmail_integration.setup_watch()
# 更新监听过期时间
expiration = watch_result.get('expiration')
history_id = watch_result.get('historyId')
if expiration:
# 转换为datetime对象
expiration_time = datetime.fromtimestamp(int(expiration) / 1000)
credential.watch_expiration = expiration_time
if history_id:
credential.last_history_id = history_id
credential.save()
return Response({
'code': 200,
'message': '刷新Gmail监听成功',
'data': {
'account_id': str(credential.id),
'name': credential.name,
'gmail_email': credential.gmail_email,
'expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None,
'history_id': credential.last_history_id
}
})
except Exception as e:
logger.error(f"刷新Gmail监听失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'刷新Gmail监听失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def check_specific_gmail_auth(request):
"""检查特定Gmail账户的认证状态"""
try:
# 获取可选的账户ID参数
gmail_credential_id = request.query_params.get('account_id')
# 如果提供了账户ID检查特定账户
if gmail_credential_id:
credential = GmailCredential.objects.filter(
id=gmail_credential_id,
user=request.user
).first()
else:
# 否则检查默认账户
credential = GmailCredential.objects.filter(
user=request.user,
is_active=True,
is_default=True
).first()
# 如果没有默认账户,检查最新的账户
if not credential:
credential = GmailCredential.objects.filter(
user=request.user,
is_active=True
).order_by('-updated_at').first()
if not credential:
return Response({
'code': 404,
'message': '未找到Gmail认证信息',
'data': {
'authenticated': False,
'needs_setup': True
}
})
# 获取可选的代理设置
use_proxy = request.query_params.get('use_proxy', 'true').lower() == 'true'
proxy_url = request.query_params.get('proxy_url', 'http://127.0.0.1:7890')
# 创建Gmail集成实例
gmail_integration = GmailIntegration(
user=request.user,
gmail_credential_id=str(credential.id),
use_proxy=use_proxy,
proxy_url=proxy_url
)
# 测试认证
auth_valid = gmail_integration.authenticate()
# 检查监听是否过期
watch_expired = True
if credential.watch_expiration:
watch_expired = credential.watch_expiration < timezone.now()
return Response({
'code': 200,
'message': '认证信息获取成功',
'data': {
'authenticated': auth_valid,
'needs_setup': not auth_valid,
'account_id': str(credential.id),
'name': credential.name,
'gmail_email': credential.gmail_email,
'is_default': credential.is_default,
'watch_expired': watch_expired,
'last_history_id': credential.last_history_id,
'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None
}
})
except Exception as e:
logger.error(f"检查Gmail认证状态失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'检查Gmail认证状态失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def clear_gmail_cache(request):
"""清除Gmail服务缓存解决授权和监听问题"""
try:
# 获取参数
gmail_email = request.data.get('gmail_email')
account_id = request.data.get('account_id')
if not gmail_email and not account_id:
return Response({
'code': 400,
'message': '需要提供gmail_email或account_id参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 根据不同参数清除缓存
if account_id:
# 如果提供了account_id清除特定账号的缓存
try:
credential = GmailCredential.objects.get(
id=account_id,
user=request.user
)
gmail_email = credential.gmail_email
# 清除特定账号的缓存
GmailServiceManager.clear_instance(request.user, account_id)
logger.info(f"已清除用户 {request.user.email} 的Gmail账号 {account_id} 缓存")
except GmailCredential.DoesNotExist:
return Response({
'code': 404,
'message': '找不到指定的Gmail账号',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
else:
# 如果只提供了gmail_email清除所有相关缓存
cleared_count = GmailServiceManager.clear_all_instances_by_email(gmail_email)
logger.info(f"已清除Gmail邮箱 {gmail_email}{cleared_count} 个缓存实例")
# 查找与此邮箱关联的所有凭证
credentials = GmailCredential.objects.filter(
gmail_email=gmail_email,
is_active=True
).select_related('user')
# 创建Gmail集成实例测试认证并刷新监听
success_refreshed = []
failed_refreshed = []
for credential in credentials:
try:
# 创建Gmail集成实例
integration = GmailIntegration(
user=credential.user,
gmail_credential_id=str(credential.id)
)
# 测试认证
if integration.authenticate():
# 刷新监听
try:
watch_result = integration.setup_watch()
# 更新监听过期时间
if 'expiration' in watch_result:
# 转换为datetime对象
from datetime import datetime
expiration_time = datetime.fromtimestamp(int(watch_result['expiration']) / 1000)
credential.watch_expiration = expiration_time
credential.save()
success_refreshed.append({
'id': str(credential.id),
'gmail_email': credential.gmail_email,
'user_id': str(credential.user.id),
'user_email': credential.user.email
})
except Exception as watch_error:
failed_refreshed.append({
'id': str(credential.id),
'gmail_email': credential.gmail_email,
'user_id': str(credential.user.id),
'user_email': credential.user.email,
'error': str(watch_error)
})
else:
failed_refreshed.append({
'id': str(credential.id),
'gmail_email': credential.gmail_email,
'user_id': str(credential.user.id),
'user_email': credential.user.email,
'error': '认证失败'
})
except Exception as e:
failed_refreshed.append({
'id': str(credential.id),
'gmail_email': credential.gmail_email,
'user_id': str(credential.user.id) if credential.user else None,
'user_email': credential.user.email if credential.user else None,
'error': str(e)
})
# 处理队列通知
queue_result = None
if success_refreshed:
try:
# 处理与这些成功刷新的凭证相关的队列通知
from .gmail_integration import GmailIntegration
for cred_info in success_refreshed:
user_obj = GmailCredential.objects.get(id=cred_info['id']).user
queue_result = GmailIntegration.process_queued_notifications(user=user_obj)
except Exception as queue_error:
logger.error(f"处理队列通知失败: {str(queue_error)}")
return Response({
'code': 200,
'message': '清除Gmail缓存成功',
'data': {
'gmail_email': gmail_email,
'success_refreshed': success_refreshed,
'failed_refreshed': failed_refreshed,
'queue_processed': queue_result
}
})
except Exception as e:
logger.error(f"清除Gmail缓存失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'清除Gmail缓存失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)