operations_project/apps/gmail/tasks.py

257 lines
10 KiB
Python
Raw Normal View History

2025-05-20 18:01:02 +08:00
import logging
import json
import base64
from celery import shared_task
from django.utils import timezone
from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail
from .services.gmail_service import GmailService
from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply
from apps.chat.models import ChatHistory
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # 重试延迟60秒
acks_late=True, # 任务完成后再确认
time_limit=600, # 任务超时限制600秒
soft_time_limit=300 # 软超时限制300秒
)
def process_push_notification(self, message_data, message_id, subscription):
"""
处理Gmail推送通知的Celery任务
Args:
message_data: Base64编码的消息数据
message_id: 消息ID
subscription: 订阅信息
Returns:
bool: 处理是否成功
"""
try:
logger.info(f"开始处理推送通知: {message_id}")
print(f"[Gmail Webhook Task] 处理推送通知: {message_id}")
# 检查是否已处理过该消息
if ProcessedPushNotification.objects.filter(message_id=message_id).exists():
logger.info(f"通知 {message_id} 已处理过,跳过")
print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过")
return True
# Base64解码消息数据
try:
decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8'))
print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}")
# 获取Gmail通知关键信息
email_address = decoded_data.get('emailAddress')
history_id = decoded_data.get('historyId')
if not email_address:
logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址")
# 记录失败的处理,防止重复处理
ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="unknown",
history_id=history_id or "unknown",
is_successful=False,
metadata=decoded_data
)
return False
# 创建处理记录
notification_record = ProcessedPushNotification.objects.create(
message_id=message_id,
email_address=email_address,
history_id=history_id or "unknown",
metadata=decoded_data
)
# 查找对应的Gmail凭证
credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first()
if not credential:
logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}")
notification_record.is_successful = False
notification_record.save()
return False
user = credential.user
logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}")
# 处理新邮件
processed_emails = GmailService.process_new_emails(user, credential, history_id)
logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)}")
# 处理自动回复
if processed_emails:
process_auto_replies.delay(user.id, credential.id, processed_emails)
# 更新凭证的历史ID
if history_id:
credential.last_history_id = history_id
credential.save()
logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}")
# 更新通知记录
notification_record.is_successful = True
notification_record.save()
return True
except Exception as e:
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(e)}")
# 记录失败的处理
ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="error",
history_id="error",
is_successful=False,
metadata={"error": str(e)}
)
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=e, countdown=30)
except Exception as e:
logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试重试任务
raise self.retry(exc=e, countdown=60)
@shared_task(
bind=True,
max_retries=2,
default_retry_delay=60,
time_limit=300,
soft_time_limit=240
)
def process_auto_replies(self, user_id, credential_id, email_ids):
"""
处理自动回复的任务
Args:
user_id: 用户ID
credential_id: Gmail凭证ID
email_ids: 需要处理的邮件ID列表
Returns:
bool: 处理是否成功
"""
try:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
credential = GmailCredential.objects.get(id=credential_id)
logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复")
# 遍历每个处理过的邮件ID
for email_id in email_ids:
try:
# 通过邮件ID查找对应的聊天记录
chat_msg = ChatHistory.objects.filter(
metadata__gmail_message_id=email_id
).order_by('-created_at').first()
if not chat_msg:
logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复")
continue
conversation_id = chat_msg.conversation_id
# 检查对话是否是活跃的
conversation = GmailConversation.objects.filter(
conversation_id=conversation_id,
is_active=True
).first()
if not conversation:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复")
continue
logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复")
# 检查邮件角色,只有达人发送的邮件才自动回复
if chat_msg.role != 'assistant':
logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复")
continue
# 查找对话的目标
goal = UserGoal.objects.filter(
conversation=conversation,
is_active=True
).first()
if not goal:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复")
continue
# 获取对话摘要
conversation_summary = get_conversation_summary(conversation_id)
if not conversation_summary:
conversation_summary = "无对话摘要"
# 获取最后一条达人消息
last_message = get_last_message(conversation_id)
if not last_message:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复")
continue
# 生成推荐回复
reply_content, error = generate_recommended_reply(
user=user,
goal_description=goal.description,
conversation_summary=conversation_summary,
last_message=last_message
)
if error:
logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}")
continue
if not reply_content:
logger.warning(f"[Auto Reply Task] 生成的推荐回复为空")
continue
# 构建回复的主题
subject = chat_msg.metadata.get('subject', '')
reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject
# 准备发送自动回复
logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}{conversation.influencer_email}")
success, reply_message_id = GmailService.send_email(
user=user,
user_email=conversation.user_email,
to_email=conversation.influencer_email,
subject=reply_subject,
body=reply_content
)
if success:
logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}")
# 更新目标状态
goal.last_activity_time = timezone.now()
if goal.status == 'pending':
goal.status = 'in_progress'
goal.save(update_fields=['last_activity_time', 'status', 'updated_at'])
else:
logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}")
except Exception as reply_error:
logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}")
import traceback
logger.error(traceback.format_exc())
return True
except Exception as e:
logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise self.retry(exc=e, countdown=30)