daren_project/temp/gmail_webhook_updated.py

463 lines
22 KiB
Python
Raw Permalink Normal View History

2025-04-29 10:22:57 +08:00
@api_view(['POST'])
@permission_classes([])
def gmail_webhook(request):
"""Gmail推送通知webhook"""
try:
# 导入需要的模块
import logging
import traceback
from django.utils import timezone
from django.contrib.auth import get_user_model
from rest_framework import status
from rest_framework.response import Response
# 获取用户模型
User = get_user_model()
# 导入Gmail集成相关的模块
from .models import GmailCredential
from .gmail_integration import GmailIntegration, GmailServiceManager
logger = logging.getLogger(__name__)
# 添加更详细的日志
logger.info(f"接收到Gmail webhook请求: 路径={request.path}, 方法={request.method}")
logger.info(f"请求头: {dict(request.headers)}")
logger.info(f"请求数据: {request.data}")
# 验证请求来源(可以添加额外的安全校验)
data = request.data
if not data:
return Response({
'code': 400,
'message': '无效的请求数据',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 处理数据
email_address = None
history_id = None
# 处理Google Pub/Sub消息格式
if isinstance(data, dict) and 'message' in data and 'data' in data['message']:
try:
import base64
import json
logger.info("检测到Google Pub/Sub消息格式")
# Base64解码data字段
encoded_data = data['message']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
logger.info(f"解码后的数据: {decoded_data}")
# 解析JSON获取email和historyId
json_data = json.loads(decoded_data)
email_address = json_data.get('emailAddress')
history_id = json_data.get('historyId')
logger.info(f"从Pub/Sub消息中提取: email={email_address}, historyId={history_id}")
except Exception as decode_error:
logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}")
logger.error(traceback.format_exc())
# 处理其他格式的数据
elif isinstance(data, dict):
# 直接使用JSON格式数据
logger.info("接收到JSON格式数据")
email_address = data.get('emailAddress')
history_id = data.get('historyId')
elif hasattr(data, 'decode'):
# 尝试解析原始数据
logger.info("接收到原始数据格式,尝试解析")
try:
import json
json_data = json.loads(data.decode('utf-8'))
email_address = json_data.get('emailAddress')
history_id = json_data.get('historyId')
except Exception as parse_error:
logger.error(f"解析请求数据失败: {str(parse_error)}")
email_address = None
history_id = None
else:
# 尝试从请求参数获取
logger.info("尝试从请求参数获取数据")
email_address = request.GET.get('emailAddress') or request.POST.get('emailAddress')
history_id = request.GET.get('historyId') or request.POST.get('historyId')
logger.info(f"提取的邮箱: {email_address}, 历史ID: {history_id}")
if not email_address or not history_id:
return Response({
'code': 400,
'message': '缺少必要的参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 查找用户和认证信息 - 优化的查找逻辑
user = None
credential = None
# 1. 首先尝试直接通过Gmail凭证表查找
credential = GmailCredential.objects.filter(
gmail_email=email_address,
is_active=True
).select_related('user').order_by('-is_default', '-updated_at').first()
if credential:
user = credential.user
logger.info(f"通过gmail_email直接找到用户和凭证: 用户={user.email}, 凭证ID={credential.id}")
else:
# 2. 如果没找到,尝试通过用户邮箱查找
user = User.objects.filter(email=email_address).first()
if user:
logger.info(f"通过用户邮箱找到用户: {user.email}")
# 为该用户查找任何有效的Gmail凭证
credential = GmailCredential.objects.filter(
user=user,
is_active=True
).order_by('-is_default', '-updated_at').first()
if credential:
logger.info(f"为用户 {user.email} 找到有效的Gmail凭证: {credential.id}")
else:
logger.error(f"无法找到与{email_address}关联的用户或凭证")
return Response({
'code': 404,
'message': f'找不到与 {email_address} 关联的用户',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
if not credential:
logger.error(f"用户 {user.email} 没有有效的Gmail凭证")
return Response({
'code': 404,
'message': f'找不到用户 {user.email} 的Gmail凭证',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 更新history_id无论如何都记录这次历史ID
credential.last_history_id = history_id
credential.save()
# 清除可能存在的缓存实例,确保使用最新凭证
GmailServiceManager.clear_instance(user, str(credential.id))
# 检查凭证是否需要重新授权
notification_queued = False
if credential.needs_reauth or not credential.credentials:
logger.warning(f"Gmail凭证需要重新授权将通知保存到队列: {email_address}")
# 保存到通知队列
from .models import GmailNotificationQueue
import json
# 将通知数据序列化
try:
notification_json = json.dumps(data)
except:
notification_json = f'{{"emailAddress": "{email_address}", "historyId": "{history_id}"}}'
# 创建队列记录
GmailNotificationQueue.objects.create(
user=user,
gmail_credential=credential,
email=email_address,
history_id=str(history_id),
notification_data=notification_json,
processed=False
)
logger.info(f"Gmail通知已保存到队列等待用户重新授权: {email_address}")
notification_queued = True
# 直接返回成功,但记录需要用户重新授权
return Response({
'code': 202, # Accepted
'message': '通知已保存到队列,等待用户重新授权',
'data': {
'user_id': str(user.id),
'history_id': history_id,
'needs_reauth': True
}
})
# 如果请求中包含达人邮箱,直接处理特定达人的邮件
talent_email = data.get('talent_email') or request.GET.get('talent_email')
if talent_email and user:
logger.info(f"检测到特定达人邮箱: {talent_email},将直接处理其最近邮件")
try:
# 创建Gmail集成实例 - 使用明确的凭证ID
integration = GmailIntegration(user=user, gmail_credential_id=str(credential.id))
if integration.authenticate():
# 获取达人最近的邮件
recent_emails = integration.get_recent_emails(
from_email=talent_email,
max_results=5 # 限制获取最近5封
)
if recent_emails:
logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件")
# 创建或获取知识库
knowledge_base, created = integration.create_talent_knowledge_base(talent_email)
# 保存对话
result = integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base)
logger.info(f"已处理达人 {talent_email} 的最近邮件: {result}")
else:
logger.info(f"没有找到来自 {talent_email} 的最近邮件")
else:
logger.error("Gmail认证失败无法处理特定达人邮件")
# 如果还没有保存到队列,保存通知数据
if not notification_queued:
# 保存到通知队列
from .models import GmailNotificationQueue
import json
try:
notification_json = json.dumps(data)
except:
notification_json = f'{{"emailAddress": "{email_address}", "historyId": "{history_id}", "talent_email": "{talent_email}"}}'
GmailNotificationQueue.objects.create(
user=user,
gmail_credential=credential,
email=email_address,
history_id=str(history_id),
notification_data=notification_json,
processed=False
)
logger.info(f"Gmail通知(含达人邮箱)已保存到队列: {email_address}, 达人: {talent_email}")
except Exception as talent_error:
logger.error(f"处理达人邮件失败: {str(talent_error)}")
logger.error(traceback.format_exc())
# 处理普通通知
try:
# 创建Gmail集成实例 - 明确使用找到的凭证ID
integration = GmailIntegration(user=user, gmail_credential_id=str(credential.id))
# 记录详细的凭证信息,帮助排查问题
logger.info(f"处理普通通知: 用户ID={user.id}, 凭证ID={credential.id}, Gmail邮箱={credential.gmail_email}")
auth_success = integration.authenticate()
if auth_success:
logger.info(f"Gmail认证成功开始处理通知: {email_address}")
# 强制设置最小历史ID差值确保能获取新消息
try:
# 从凭证中获取历史ID并确保作为整数比较
last_history_id = int(credential.last_history_id or 0)
current_history_id = int(history_id)
# 如果历史ID没有变化设置一个小的偏移量确保获取最近消息
if current_history_id <= last_history_id:
# 设置较小的历史ID以确保获取最近的消息
adjusted_history_id = max(1, last_history_id - 10)
logger.info(f"调整历史ID: {last_history_id} -> {adjusted_history_id},以确保能获取最近的消息")
# 修改请求中的历史ID
if isinstance(data, dict) and 'message' in data and 'data' in data['message']:
# 对于Pub/Sub格式修改解码后的JSON
try:
import base64
import json
decoded_data = base64.b64decode(data['message']['data']).decode('utf-8')
json_data = json.loads(decoded_data)
json_data['historyId'] = str(adjusted_history_id)
data['message']['data'] = base64.b64encode(json.dumps(json_data).encode('utf-8')).decode('utf-8')
logger.info(f"已调整Pub/Sub消息中的历史ID为: {adjusted_history_id}")
except Exception as adjust_error:
logger.error(f"调整历史ID失败: {str(adjust_error)}")
else:
# 直接修改data中的historyId
if isinstance(data, dict) and 'historyId' in data:
data['historyId'] = str(adjusted_history_id)
logger.info(f"已调整请求中的历史ID为: {adjusted_history_id}")
except Exception as history_adjust_error:
logger.error(f"历史ID调整失败: {str(history_adjust_error)}")
result = integration.process_notification(data)
# 日志记录处理结果
if result:
logger.info(f"Gmail通知处理成功检测到新消息: {email_address}")
else:
logger.warning(f"Gmail通知处理完成但未检测到新消息: {email_address}")
# 如果处理成功尝试通过WebSocket发送通知
if result:
try:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# 获取Channel Layer
channel_layer = get_channel_layer()
if channel_layer:
# 发送WebSocket消息
async_to_sync(channel_layer.group_send)(
f"notification_user_{user.id}",
{
"type": "notification",
"data": {
"message_type": "gmail_update",
"message": "您的Gmail有新消息已自动处理",
"history_id": history_id,
"timestamp": timezone.now().isoformat()
}
}
)
logger.info(f"发送WebSocket通知成功: user_id={user.id}")
except Exception as ws_error:
logger.error(f"发送WebSocket通知失败: {str(ws_error)}")
logger.info(f"Gmail通知处理成功: {email_address}")
return Response({
'code': 200,
'message': '通知已处理',
'data': {
'user_id': str(user.id),
'history_id': history_id,
'success': True,
'new_messages': result
}
})
else:
# 认证失败,保存通知到队列
logger.error(f"Gmail认证失败: {email_address}, 用户ID={user.id}, 凭证ID={credential.id}")
# 尝试获取详细的认证失败原因
try:
# 尝试刷新令牌
refresh_result = integration.refresh_token()
if refresh_result:
logger.info(f"令牌刷新成功,将重新尝试处理")
result = integration.process_notification(data)
if result:
logger.info(f"刷新令牌后处理成功!")
return Response({
'code': 200,
'message': '通知已处理(令牌刷新后)',
'data': {
'user_id': str(user.id),
'history_id': history_id,
'success': True
}
})
except Exception as refresh_error:
logger.error(f"尝试刷新令牌失败: {str(refresh_error)}")
# 如果还没有保存到队列,保存通知数据
if not notification_queued:
# 保存到通知队列
from .models import GmailNotificationQueue
import json
try:
notification_json = json.dumps(data)
except:
notification_json = f'{{"emailAddress": "{email_address}", "historyId": "{history_id}"}}'
# 标记凭证需要重新授权
credential.needs_reauth = True
credential.save()
logger.info(f"已标记凭证 {credential.id} 需要重新授权")
GmailNotificationQueue.objects.create(
user=user,
gmail_credential=credential,
email=email_address,
history_id=str(history_id),
notification_data=notification_json,
processed=False
)
logger.info(f"Gmail通知已保存到队列: {email_address}")
# 返回处理成功,但告知需要重新授权
return Response({
'code': 202, # Accepted
'message': '通知已保存到队列,等待重新获取授权',
'data': {
'user_id': str(user.id),
'history_id': history_id,
'needs_reauth': True
}
})
except Exception as process_error:
logger.error(f"处理Gmail通知失败: {str(process_error)}")
logger.error(traceback.format_exc())
# 保存到通知队列
if not notification_queued:
try:
from .models import GmailNotificationQueue
import json
try:
notification_json = json.dumps(data)
except:
notification_json = f'{{"emailAddress": "{email_address}", "historyId": "{history_id}"}}'
# 标记凭证需要重新授权 - 可能是令牌问题导致的错误
error_msg = str(process_error).lower()
if "invalid_grant" in error_msg or "token" in error_msg or "auth" in error_msg or "认证" in error_msg:
credential.needs_reauth = True
credential.save()
logger.info(f"根据错误信息标记凭证 {credential.id} 需要重新授权")
GmailNotificationQueue.objects.create(
user=user,
gmail_credential=credential,
email=email_address,
history_id=str(history_id),
notification_data=notification_json,
processed=False,
error_message=str(process_error)[:255]
)
logger.info(f"由于处理错误Gmail通知已保存到队列: {email_address}")
except Exception as queue_error:
logger.error(f"保存通知到队列失败: {str(queue_error)}")
# 仍然返回成功防止Google重试导致重复通知
return Response({
'code': 202,
'message': '通知已保存,稍后处理',
'data': {
'user_id': str(user.id),
'history_id': history_id,
'error': str(process_error)[:100] # 截断错误信息
}
})
except Exception as e:
logger.error(f"处理Gmail webhook失败: {str(e)}")
logger.error(traceback.format_exc())
# 尝试更安全的响应,尽可能提供有用信息
try:
# 如果已经提取了邮箱和历史ID等信息记录在响应中
response_data = {
'error': str(e)[:200]
}
if 'email_address' in locals() and email_address:
response_data['email_address'] = email_address
if 'history_id' in locals() and history_id:
response_data['history_id'] = history_id
if 'user' in locals() and user:
response_data['user_id'] = str(user.id)
return Response({
'code': 500,
'message': '处理通知失败',
'data': response_data
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except:
# 最后的备用方案
return Response({
'code': 500,
'message': '处理通知失败',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)