import logging import traceback from datetime import datetime, timedelta from django.utils import timezone from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated, IsAdminUser from rest_framework.response import Response from django.conf import settings from .models import GmailCredential from .gmail_integration import GmailIntegration, GmailServiceManager logger = logging.getLogger(__name__) @api_view(['GET']) @permission_classes([IsAuthenticated]) def list_gmail_accounts(request): """列出用户的所有Gmail账户""" try: # 获取用户的所有Gmail凭证 credentials = GmailCredential.objects.filter( user=request.user ).order_by('-is_default', '-updated_at') # 转换为简单的数据结构 accounts = [] for cred in credentials: # 检查账户状态 is_valid = False try: # 尝试使用API获取认证状态 gmail_integration = GmailIntegration( user=request.user, gmail_credential_id=str(cred.id) ) is_valid = gmail_integration.authenticate() except Exception: pass # 检查监听状态 watch_expired = True if cred.watch_expiration: watch_expired = cred.watch_expiration < timezone.now() accounts.append({ 'id': str(cred.id), 'name': cred.name, 'gmail_email': cred.gmail_email or "未知", 'is_default': cred.is_default, 'is_active': cred.is_active, 'is_valid': is_valid, # 凭证是否有效 'watch_expired': watch_expired, 'watch_expiration': cred.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if cred.watch_expiration else None, 'created_at': cred.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': cred.updated_at.strftime('%Y-%m-%d %H:%M:%S') }) return Response({ 'code': 200, 'message': '获取Gmail账户列表成功', 'data': accounts }) except Exception as e: logger.error(f"获取Gmail账户列表失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取Gmail账户列表失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def set_default_gmail_account(request): """设置默认Gmail账户""" try: # 获取账户ID account_id = request.data.get('account_id') if not account_id: return Response({ 'code': 400, 'message': '缺少账户ID参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 查找账户 try: credential = GmailCredential.objects.get( id=account_id, user=request.user ) except GmailCredential.DoesNotExist: return Response({ 'code': 404, 'message': '找不到指定的Gmail账户', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 设置为默认账户 credential.is_default = True credential.save() # save方法会自动处理其他账户的默认状态 # 清理服务单例,确保下次使用时获取的是最新状态 GmailServiceManager.clear_instance(request.user) return Response({ 'code': 200, 'message': f'已将{credential.gmail_email or credential.name}设为默认Gmail账户', 'data': { 'id': str(credential.id), 'name': credential.name, 'gmail_email': credential.gmail_email, 'is_default': True } }) except Exception as e: logger.error(f"设置默认Gmail账户失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'设置默认Gmail账户失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def update_gmail_account(request): """更新Gmail账户信息""" try: # 获取参数 account_id = request.data.get('account_id') name = request.data.get('name') is_active = request.data.get('is_active') if not account_id: return Response({ 'code': 400, 'message': '缺少账户ID参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 查找账户 try: credential = GmailCredential.objects.get( id=account_id, user=request.user ) except GmailCredential.DoesNotExist: return Response({ 'code': 404, 'message': '找不到指定的Gmail账户', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 更新信息 if name is not None: credential.name = name if is_active is not None: credential.is_active = is_active credential.save() # 清理服务单例 GmailServiceManager.clear_instance(request.user, str(credential.id)) return Response({ 'code': 200, 'message': '更新Gmail账户成功', 'data': { 'id': str(credential.id), 'name': credential.name, 'gmail_email': credential.gmail_email, 'is_active': credential.is_active, 'is_default': credential.is_default } }) except Exception as e: logger.error(f"更新Gmail账户失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'更新Gmail账户失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['DELETE']) @permission_classes([IsAuthenticated]) def delete_gmail_account(request, account_id): """删除Gmail账户""" try: # 查找账户 try: credential = GmailCredential.objects.get( id=account_id, user=request.user ) except GmailCredential.DoesNotExist: return Response({ 'code': 404, 'message': '找不到指定的Gmail账户', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 记录账户信息 account_info = { 'id': str(credential.id), 'name': credential.name, 'gmail_email': credential.gmail_email } # 清理服务单例 GmailServiceManager.clear_instance(request.user, str(credential.id)) # 删除账户 credential.delete() # 如果删除了默认账户,设置新的默认账户 default_exists = GmailCredential.objects.filter( user=request.user, is_default=True ).exists() if not default_exists: # 设置最新的账户为默认 latest_credential = GmailCredential.objects.filter( user=request.user ).order_by('-updated_at').first() if latest_credential: latest_credential.is_default = True latest_credential.save() return Response({ 'code': 200, 'message': f'成功删除Gmail账户: {account_info["name"]}', 'data': account_info }) except Exception as e: logger.error(f"删除Gmail账户失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'删除Gmail账户失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def add_gmail_account(request): """添加新的Gmail账户""" try: # 获取可选的代理设置 use_proxy = request.data.get('use_proxy', True) proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890') # 获取账户名称 name = request.data.get('name', '新Gmail账户') # 获取客户端密钥(支持JSON格式字符串或上传的文件) client_secret_json = None # 检查是否有上传的JSON文件 if 'client_secret_file' in request.FILES: import json file_content = request.FILES['client_secret_file'].read().decode('utf-8') try: client_secret_json = json.loads(file_content) logger.info("成功从上传的JSON文件解析客户端密钥") except json.JSONDecodeError: return Response({ 'code': 400, 'message': '上传的JSON文件格式无效', 'data': None }, status=status.HTTP_400_BAD_REQUEST) else: # 尝试从请求数据中获取JSON字符串 client_secret_json = request.data.get('client_secret_json') if not client_secret_json: return Response({ 'code': 400, 'message': '缺少客户端密钥参数,请提供client_secret_json或上传client_secret_file', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 验证用户是否已达到Gmail账户数量限制 max_accounts_per_user = getattr(settings, 'MAX_GMAIL_ACCOUNTS_PER_USER', 5) current_accounts_count = GmailCredential.objects.filter(user=request.user, is_active=True).count() if current_accounts_count >= max_accounts_per_user: return Response({ 'code': 400, 'message': f'您最多只能绑定{max_accounts_per_user}个Gmail账户', 'data': { 'current_count': current_accounts_count, 'max_allowed': max_accounts_per_user } }, status=status.HTTP_400_BAD_REQUEST) # 创建Gmail集成实例 gmail_integration = GmailIntegration( user=request.user, client_secret_json=client_secret_json, use_proxy=use_proxy, proxy_url=proxy_url ) # 开始认证流程 try: gmail_integration.authenticate() # 认证会抛出包含认证URL的异常 except Exception as e: error_message = str(e) # 检查是否包含认证URL(正常的OAuth流程) if "Please visit this URL" in error_message: # 提取认证URL auth_url = error_message.split("URL: ")[1].split(" ")[0] # 记录认证会话信息,方便handle_gmail_auth_code使用 request.session['gmail_auth_pending'] = { 'name': name, 'use_proxy': use_proxy, 'proxy_url': proxy_url, # 不存储client_secret_json,应该已经保存在OAuth流程中 } return Response({ 'code': 202, # Accepted,需要进一步操作 'message': '需要Gmail授权', 'data': { 'auth_url': auth_url, 'name': name } }) elif "Token has been expired or revoked" in error_message: return Response({ 'code': 401, 'message': 'OAuth令牌已过期或被撤销,请重新授权', 'data': None }, status=status.HTTP_401_UNAUTHORIZED) else: # 其他错误 logger.error(f"Gmail认证过程中出现错误: {error_message}") return Response({ 'code': 500, 'message': f'Gmail认证失败: {error_message}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({ 'code': 200, 'message': '添加Gmail账户成功', 'data': None }) except Exception as e: logger.error(f"添加Gmail账户失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'添加Gmail账户失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def handle_gmail_auth_code(request): """处理Gmail授权回调码""" try: # 获取授权码 auth_code = request.data.get('auth_code') if not auth_code: return Response({ 'code': 400, 'message': '缺少授权码参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 获取账户名称和其他参数,优先从会话中获取 auth_pending = request.session.pop('gmail_auth_pending', {}) name = request.data.get('name', auth_pending.get('name', '新Gmail账户')) use_proxy = request.data.get('use_proxy', auth_pending.get('use_proxy', True)) proxy_url = request.data.get('proxy_url', auth_pending.get('proxy_url', 'http://127.0.0.1:7890')) # 创建Gmail集成实例 gmail_integration = GmailIntegration( user=request.user, use_proxy=use_proxy, proxy_url=proxy_url ) # 处理授权码 try: result = gmail_integration.handle_auth_code(auth_code) # 设置账户名称 if gmail_integration.gmail_credential: gmail_integration.gmail_credential.name = name gmail_integration.gmail_credential.save() # 初始化监听 try: watch_result = gmail_integration.setup_watch() logger.info(f"初始化Gmail监听成功: {watch_result}") except Exception as watch_error: logger.error(f"初始化Gmail监听失败: {str(watch_error)}") # 返回结果 return Response({ 'code': 200, 'message': 'Gmail账户授权成功', 'data': { 'id': str(gmail_integration.gmail_credential.id) if gmail_integration.gmail_credential else None, 'name': name, 'gmail_email': gmail_integration.gmail_credential.gmail_email if gmail_integration.gmail_credential else None, 'watch_result': watch_result if 'watch_result' in locals() else None } }) except Exception as auth_error: error_message = str(auth_error) logger.error(f"处理Gmail授权码失败: {error_message}") if "invalid_grant" in error_message.lower(): return Response({ 'code': 400, 'message': '授权码无效或已过期,请重新授权', 'data': None }, status=status.HTTP_400_BAD_REQUEST) return Response({ 'code': 500, 'message': f'处理Gmail授权码失败: {error_message}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"处理Gmail授权码失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'处理Gmail授权码失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def refresh_gmail_account_watch(request): """刷新特定Gmail账户的监听""" try: # 获取账户ID account_id = request.data.get('account_id') if not account_id: return Response({ 'code': 400, 'message': '缺少账户ID参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 查找账户 try: credential = GmailCredential.objects.get( id=account_id, user=request.user, is_active=True ) except GmailCredential.DoesNotExist: return Response({ 'code': 404, 'message': '找不到指定的Gmail账户', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 获取可选的代理设置 use_proxy = request.data.get('use_proxy', True) proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890') # 创建Gmail集成实例 gmail_integration = GmailIntegration( user=request.user, gmail_credential_id=str(credential.id), use_proxy=use_proxy, proxy_url=proxy_url ) # 认证Gmail if not gmail_integration.authenticate(): return Response({ 'code': 400, 'message': 'Gmail认证失败', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 设置监听 watch_result = gmail_integration.setup_watch() # 更新监听过期时间 expiration = watch_result.get('expiration') history_id = watch_result.get('historyId') if expiration: # 转换为datetime对象 expiration_time = datetime.fromtimestamp(int(expiration) / 1000) credential.watch_expiration = expiration_time if history_id: credential.last_history_id = history_id credential.save() return Response({ 'code': 200, 'message': '刷新Gmail监听成功', 'data': { 'account_id': str(credential.id), 'name': credential.name, 'gmail_email': credential.gmail_email, 'expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None, 'history_id': credential.last_history_id } }) except Exception as e: logger.error(f"刷新Gmail监听失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'刷新Gmail监听失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) @permission_classes([IsAuthenticated]) def check_specific_gmail_auth(request): """检查特定Gmail账户的认证状态""" try: # 获取可选的账户ID参数 gmail_credential_id = request.query_params.get('account_id') # 如果提供了账户ID,检查特定账户 if gmail_credential_id: credential = GmailCredential.objects.filter( id=gmail_credential_id, user=request.user ).first() else: # 否则检查默认账户 credential = GmailCredential.objects.filter( user=request.user, is_active=True, is_default=True ).first() # 如果没有默认账户,检查最新的账户 if not credential: credential = GmailCredential.objects.filter( user=request.user, is_active=True ).order_by('-updated_at').first() if not credential: return Response({ 'code': 404, 'message': '未找到Gmail认证信息', 'data': { 'authenticated': False, 'needs_setup': True } }) # 获取可选的代理设置 use_proxy = request.query_params.get('use_proxy', 'true').lower() == 'true' proxy_url = request.query_params.get('proxy_url', 'http://127.0.0.1:7890') # 创建Gmail集成实例 gmail_integration = GmailIntegration( user=request.user, gmail_credential_id=str(credential.id), use_proxy=use_proxy, proxy_url=proxy_url ) # 测试认证 auth_valid = gmail_integration.authenticate() # 检查监听是否过期 watch_expired = True if credential.watch_expiration: watch_expired = credential.watch_expiration < timezone.now() return Response({ 'code': 200, 'message': '认证信息获取成功', 'data': { 'authenticated': auth_valid, 'needs_setup': not auth_valid, 'account_id': str(credential.id), 'name': credential.name, 'gmail_email': credential.gmail_email, 'is_default': credential.is_default, 'watch_expired': watch_expired, 'last_history_id': credential.last_history_id, 'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None } }) except Exception as e: logger.error(f"检查Gmail认证状态失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'检查Gmail认证状态失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) @permission_classes([IsAuthenticated]) def clear_gmail_cache(request): """清除Gmail服务缓存,解决授权和监听问题""" try: # 获取参数 gmail_email = request.data.get('gmail_email') account_id = request.data.get('account_id') if not gmail_email and not account_id: return Response({ 'code': 400, 'message': '需要提供gmail_email或account_id参数', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 根据不同参数清除缓存 if account_id: # 如果提供了account_id,清除特定账号的缓存 try: credential = GmailCredential.objects.get( id=account_id, user=request.user ) gmail_email = credential.gmail_email # 清除特定账号的缓存 GmailServiceManager.clear_instance(request.user, account_id) logger.info(f"已清除用户 {request.user.email} 的Gmail账号 {account_id} 缓存") except GmailCredential.DoesNotExist: return Response({ 'code': 404, 'message': '找不到指定的Gmail账号', 'data': None }, status=status.HTTP_404_NOT_FOUND) else: # 如果只提供了gmail_email,清除所有相关缓存 cleared_count = GmailServiceManager.clear_all_instances_by_email(gmail_email) logger.info(f"已清除Gmail邮箱 {gmail_email} 的 {cleared_count} 个缓存实例") # 查找与此邮箱关联的所有凭证 credentials = GmailCredential.objects.filter( gmail_email=gmail_email, is_active=True ).select_related('user') # 创建Gmail集成实例,测试认证并刷新监听 success_refreshed = [] failed_refreshed = [] for credential in credentials: try: # 创建Gmail集成实例 integration = GmailIntegration( user=credential.user, gmail_credential_id=str(credential.id) ) # 测试认证 if integration.authenticate(): # 刷新监听 try: watch_result = integration.setup_watch() # 更新监听过期时间 if 'expiration' in watch_result: # 转换为datetime对象 from datetime import datetime expiration_time = datetime.fromtimestamp(int(watch_result['expiration']) / 1000) credential.watch_expiration = expiration_time credential.save() success_refreshed.append({ 'id': str(credential.id), 'gmail_email': credential.gmail_email, 'user_id': str(credential.user.id), 'user_email': credential.user.email }) except Exception as watch_error: failed_refreshed.append({ 'id': str(credential.id), 'gmail_email': credential.gmail_email, 'user_id': str(credential.user.id), 'user_email': credential.user.email, 'error': str(watch_error) }) else: failed_refreshed.append({ 'id': str(credential.id), 'gmail_email': credential.gmail_email, 'user_id': str(credential.user.id), 'user_email': credential.user.email, 'error': '认证失败' }) except Exception as e: failed_refreshed.append({ 'id': str(credential.id), 'gmail_email': credential.gmail_email, 'user_id': str(credential.user.id) if credential.user else None, 'user_email': credential.user.email if credential.user else None, 'error': str(e) }) # 处理队列通知 queue_result = None if success_refreshed: try: # 处理与这些成功刷新的凭证相关的队列通知 from .gmail_integration import GmailIntegration for cred_info in success_refreshed: user_obj = GmailCredential.objects.get(id=cred_info['id']).user queue_result = GmailIntegration.process_queued_notifications(user=user_obj) except Exception as queue_error: logger.error(f"处理队列通知失败: {str(queue_error)}") return Response({ 'code': 200, 'message': '清除Gmail缓存成功', 'data': { 'gmail_email': gmail_email, 'success_refreshed': success_refreshed, 'failed_refreshed': failed_refreshed, 'queue_processed': queue_result } }) except Exception as e: logger.error(f"清除Gmail缓存失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'清除Gmail缓存失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)