import logging import json import base64 from celery import shared_task from django.utils import timezone from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail from .services.gmail_service import GmailService from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply from apps.chat.models import ChatHistory logger = logging.getLogger(__name__) @shared_task( bind=True, max_retries=3, default_retry_delay=60, # 重试延迟60秒 acks_late=True, # 任务完成后再确认 time_limit=600, # 任务超时限制600秒 soft_time_limit=300 # 软超时限制300秒 ) def process_push_notification(self, message_data, message_id, subscription): """ 处理Gmail推送通知的Celery任务 Args: message_data: Base64编码的消息数据 message_id: 消息ID subscription: 订阅信息 Returns: bool: 处理是否成功 """ try: logger.info(f"开始处理推送通知: {message_id}") print(f"[Gmail Webhook Task] 处理推送通知: {message_id}") # 检查是否已处理过该消息 if ProcessedPushNotification.objects.filter(message_id=message_id).exists(): logger.info(f"通知 {message_id} 已处理过,跳过") print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过") return True # Base64解码消息数据 try: decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8')) print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}") # 获取Gmail通知关键信息 email_address = decoded_data.get('emailAddress') history_id = decoded_data.get('historyId') if not email_address: logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址") # 记录失败的处理,防止重复处理 ProcessedPushNotification.objects.create( message_id=message_id, email_address="unknown", history_id=history_id or "unknown", is_successful=False, metadata=decoded_data ) return False # 创建处理记录 notification_record = ProcessedPushNotification.objects.create( message_id=message_id, email_address=email_address, history_id=history_id or "unknown", metadata=decoded_data ) # 查找对应的Gmail凭证 credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first() if not credential: logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}") notification_record.is_successful = False notification_record.save() return False user = credential.user logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}") # 处理新邮件 processed_emails = GmailService.process_new_emails(user, credential, history_id) logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)} 封") # 处理自动回复 if processed_emails: process_auto_replies.delay(user.id, credential.id, processed_emails) # 更新凭证的历史ID if history_id: credential.last_history_id = history_id credential.save() logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}") # 更新通知记录 notification_record.is_successful = True notification_record.save() return True except Exception as e: logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(e)}") # 记录失败的处理 ProcessedPushNotification.objects.create( message_id=message_id, email_address="error", history_id="error", is_successful=False, metadata={"error": str(e)} ) # 抛出异常以便Celery可能进行重试 raise self.retry(exc=e, countdown=30) except Exception as e: logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}") import traceback logger.error(traceback.format_exc()) # 尝试重试任务 raise self.retry(exc=e, countdown=60) @shared_task( bind=True, max_retries=2, default_retry_delay=60, time_limit=300, soft_time_limit=240 ) def process_auto_replies(self, user_id, credential_id, email_ids): """ 处理自动回复的任务 Args: user_id: 用户ID credential_id: Gmail凭证ID email_ids: 需要处理的邮件ID列表 Returns: bool: 处理是否成功 """ try: from django.contrib.auth import get_user_model User = get_user_model() user = User.objects.get(id=user_id) credential = GmailCredential.objects.get(id=credential_id) logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复") # 遍历每个处理过的邮件ID for email_id in email_ids: try: # 通过邮件ID查找对应的聊天记录 chat_msg = ChatHistory.objects.filter( metadata__gmail_message_id=email_id ).order_by('-created_at').first() if not chat_msg: logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复") continue conversation_id = chat_msg.conversation_id # 检查对话是否是活跃的 conversation = GmailConversation.objects.filter( conversation_id=conversation_id, is_active=True ).first() if not conversation: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复") continue logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复") # 检查邮件角色,只有达人发送的邮件才自动回复 if chat_msg.role != 'assistant': logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复") continue # 查找对话的目标 goal = UserGoal.objects.filter( conversation=conversation, is_active=True ).first() if not goal: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复") continue # 获取对话摘要 conversation_summary = get_conversation_summary(conversation_id) if not conversation_summary: conversation_summary = "无对话摘要" # 获取最后一条达人消息 last_message = get_last_message(conversation_id) if not last_message: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复") continue # 生成推荐回复 reply_content, error = generate_recommended_reply( user=user, goal_description=goal.description, conversation_summary=conversation_summary, last_message=last_message ) if error: logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}") continue if not reply_content: logger.warning(f"[Auto Reply Task] 生成的推荐回复为空") continue # 构建回复的主题 subject = chat_msg.metadata.get('subject', '') reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject # 准备发送自动回复 logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}到{conversation.influencer_email}") success, reply_message_id = GmailService.send_email( user=user, user_email=conversation.user_email, to_email=conversation.influencer_email, subject=reply_subject, body=reply_content ) if success: logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}") # 更新目标状态 goal.last_activity_time = timezone.now() if goal.status == 'pending': goal.status = 'in_progress' goal.save(update_fields=['last_activity_time', 'status', 'updated_at']) else: logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}") except Exception as reply_error: logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}") import traceback logger.error(traceback.format_exc()) return True except Exception as e: logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}") import traceback logger.error(traceback.format_exc()) raise self.retry(exc=e, countdown=30)