import logging import json import base64 import traceback from celery import shared_task from django.utils import timezone from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail from .services.gmail_service import GmailService from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply from apps.chat.models import ChatHistory logger = logging.getLogger(__name__) @shared_task( bind=True, max_retries=3, default_retry_delay=60, # 重试延迟60秒 acks_late=True, # 任务完成后再确认 time_limit=600, # 任务超时限制600秒 soft_time_limit=300 # 软超时限制300秒 ) def process_push_notification(self, message_data, message_id, subscription): """ 处理Gmail推送通知的Celery任务 Args: message_data: Base64编码的消息数据 message_id: 消息ID subscription: 订阅信息 Returns: bool: 处理是否成功 """ try: logger.info(f"开始处理推送通知: {message_id}") print(f"[Gmail Webhook Task] 处理推送通知: {message_id}") # 检查是否已处理过该消息 if ProcessedPushNotification.objects.filter(message_id=message_id).exists(): logger.info(f"通知 {message_id} 已处理过,跳过") print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过") return True # 尝试创建一个处理记录,标记为开始处理 notification_record = ProcessedPushNotification.objects.create( message_id=message_id, email_address="processing", history_id="unknown", is_successful=False, metadata={ "subscription": subscription, "started_at": timezone.now().isoformat() } ) # Base64解码消息数据 try: decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8')) print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}") # 获取Gmail通知关键信息 email_address = decoded_data.get('emailAddress') history_id = decoded_data.get('historyId') if not email_address: logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址") # 更新处理记录 notification_record.metadata.update({ "error": "推送数据缺少邮箱地址", "decoded_data": decoded_data }) notification_record.save() return False # 更新处理记录 notification_record.email_address = email_address notification_record.history_id = history_id or "unknown" notification_record.metadata.update(decoded_data) notification_record.save() # 查找对应的Gmail凭证 credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first() if not credential: logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}") notification_record.metadata.update({ "error": f"未找到对应的Gmail凭证: {email_address}" }) notification_record.save() return False user = credential.user logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}") # 处理新邮件 processed_emails = GmailService.process_new_emails(user, credential, history_id) logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)} 封") # 处理达人回复状态更新 if processed_emails: update_creator_conversation_status(user, processed_emails) # 处理自动回复 if processed_emails: try: process_auto_replies.delay(user.id, credential.id, processed_emails) logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件") except Exception as reply_task_error: logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}") notification_record.metadata.update({ "auto_reply_error": str(reply_task_error), "processed_emails": processed_emails }) notification_record.save() # 更新凭证的历史ID if history_id: credential.last_history_id = history_id credential.save() logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}") # 更新通知记录为成功 notification_record.is_successful = True notification_record.metadata.update({ "completed_at": timezone.now().isoformat(), "processed_emails_count": len(processed_emails) }) notification_record.save() return True except json.JSONDecodeError as e: logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}") notification_record.metadata.update({ "error": f"JSON解码失败: {str(e)}", "raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符,避免数据过大 }) notification_record.save() # 抛出异常以便Celery可能进行重试 raise self.retry(exc=e, countdown=30) except Exception as decode_error: logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}") notification_record.metadata.update({ "error": f"解析推送数据失败: {str(decode_error)}", "traceback": traceback.format_exc() }) notification_record.save() # 抛出异常以便Celery可能进行重试 raise self.retry(exc=decode_error, countdown=30) except Exception as e: logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}") logger.error(traceback.format_exc()) # 尝试更新通知记录 try: notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first() if notification_record: notification_record.metadata.update({ "error": str(e), "traceback": traceback.format_exc(), "failed_at": timezone.now().isoformat() }) notification_record.save() except Exception as record_error: logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}") # 尝试重试任务 raise self.retry(exc=e, countdown=60) def update_creator_conversation_status(user, email_ids): """ 根据新处理的邮件更新达人对话跟踪状态 Args: user: 用户对象 email_ids: 处理的邮件ID列表 Returns: bool: 处理是否成功 """ try: # 尝试导入CreatorConversationTracker模型 try: from apps.feishu.models import CreatorConversationTracker except ImportError: logger.warning("CreatorConversationTracker模型不存在,跳过更新达人对话状态") return False # 遍历每个邮件ID for email_id in email_ids: # 通过邮件ID查找对应的聊天记录 chat_msg = ChatHistory.objects.filter( metadata__gmail_message_id=email_id ).order_by('-created_at').first() if not chat_msg: logger.info(f"邮件 {email_id} 未找到对应的聊天记录,跳过状态更新") continue # 只处理达人的消息 if chat_msg.role != 'assistant': continue conversation_id = chat_msg.conversation_id # 查找关联的对话 conversation = GmailConversation.objects.filter( conversation_id=conversation_id ).first() if not conversation: logger.info(f"未找到对话 {conversation_id},跳过状态更新") continue # 查找对应的跟踪记录 tracker = CreatorConversationTracker.objects.filter( conversation_id=conversation_id ).first() if not tracker: logger.info(f"未找到对话 {conversation_id} 的跟踪记录,跳过状态更新") continue # 更新回复状态 tracker.has_replied = True # 检查是否达成目标 goal_achieved = check_goal_achievement(conversation_id) if goal_achieved: tracker.goal_achieved = True # 保存更新 tracker.updated_at = timezone.now() tracker.save() logger.info(f"更新了对话 {conversation_id} 的跟踪状态: has_replied=True, goal_achieved={goal_achieved}") return True except Exception as e: logger.error(f"更新达人对话跟踪状态失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return False def check_goal_achievement(conversation_id): """ 检查对话是否达成目标(找到价格信息) Args: conversation_id: 对话ID Returns: bool: 是否达成目标 """ try: # 获取对话中的所有消息 messages = ChatHistory.objects.filter(conversation_id=conversation_id).order_by('created_at') # 简单判断:检查是否有回复中包含价格信息 keywords = ['rate', 'price', 'cost', '$', 'usd', 'charge', 'fee'] for message in messages: # 只检查达人的回复 if message.role == 'assistant': content_lower = message.content.lower() if any(keyword in content_lower for keyword in keywords): logger.info(f"对话 {conversation_id} 已达成目标,找到价格信息") return True return False except Exception as e: logger.error(f"检查目标达成失败: {str(e)}") return False @shared_task( bind=True, max_retries=2, default_retry_delay=60, time_limit=300, soft_time_limit=240 ) def process_auto_replies(self, user_id, credential_id, email_ids): """ 处理自动回复的任务 Args: user_id: 用户ID credential_id: Gmail凭证ID email_ids: 需要处理的邮件ID列表 Returns: bool: 处理是否成功 """ try: from django.contrib.auth import get_user_model User = get_user_model() user = User.objects.get(id=user_id) credential = GmailCredential.objects.get(id=credential_id) logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复") # 遍历每个处理过的邮件ID for email_id in email_ids: try: # 通过邮件ID查找对应的聊天记录 chat_msg = ChatHistory.objects.filter( metadata__gmail_message_id=email_id ).order_by('-created_at').first() if not chat_msg: logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复") continue conversation_id = chat_msg.conversation_id # 检查对话是否是活跃的 conversation = GmailConversation.objects.filter( conversation_id=conversation_id, is_active=True ).first() if not conversation: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复") continue logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复") # 检查邮件角色,只有达人发送的邮件才自动回复 if chat_msg.role != 'assistant': logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复") continue # 查找对话的目标 goal = UserGoal.objects.filter( conversation=conversation, is_active=True ).first() if not goal: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复") continue # 获取对话摘要 conversation_summary = get_conversation_summary(conversation_id) if not conversation_summary: conversation_summary = "无对话摘要" # 获取最后一条达人消息 last_message = get_last_message(conversation_id) if not last_message: logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复") continue # 生成推荐回复 reply_content, error = generate_recommended_reply( user=user, goal_description=goal.description, conversation_summary=conversation_summary, last_message=last_message ) if error: logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}") continue if not reply_content: logger.warning(f"[Auto Reply Task] 生成的推荐回复为空") continue # 构建回复的主题 subject = chat_msg.metadata.get('subject', '') reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject # 准备发送自动回复 logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}到{conversation.influencer_email}") success, reply_message_id = GmailService.send_email( user=user, user_email=conversation.user_email, to_email=conversation.influencer_email, subject=reply_subject, body=reply_content ) if success: logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}") # 更新目标状态 goal.last_activity_time = timezone.now() if goal.status == 'pending': goal.status = 'in_progress' goal.save(update_fields=['last_activity_time', 'status', 'updated_at']) else: logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}") except Exception as reply_error: logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}") import traceback logger.error(traceback.format_exc()) return True except Exception as e: logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}") import traceback logger.error(traceback.format_exc()) raise self.retry(exc=e, countdown=30)