daren/apps/gmail/tasks.py

409 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
import base64
import traceback
from celery import shared_task
from django.utils import timezone
from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail
from .services.gmail_service import GmailService
from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply
from apps.chat.models import ChatHistory
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # 重试延迟60秒
acks_late=True, # 任务完成后再确认
time_limit=600, # 任务超时限制600秒
soft_time_limit=300 # 软超时限制300秒
)
def process_push_notification(self, message_data, message_id, subscription):
"""
处理Gmail推送通知的Celery任务
Args:
message_data: Base64编码的消息数据
message_id: 消息ID
subscription: 订阅信息
Returns:
bool: 处理是否成功
"""
try:
logger.info(f"开始处理推送通知: {message_id}")
print(f"[Gmail Webhook Task] 处理推送通知: {message_id}")
# 检查是否已处理过该消息
if ProcessedPushNotification.objects.filter(message_id=message_id).exists():
logger.info(f"通知 {message_id} 已处理过,跳过")
print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过")
return True
# 尝试创建一个处理记录,标记为开始处理
notification_record = ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="processing",
history_id="unknown",
is_successful=False,
metadata={
"subscription": subscription,
"started_at": timezone.now().isoformat()
}
)
# Base64解码消息数据
try:
decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8'))
print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}")
# 获取Gmail通知关键信息
email_address = decoded_data.get('emailAddress')
history_id = decoded_data.get('historyId')
if not email_address:
logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址")
# 更新处理记录
notification_record.metadata.update({
"error": "推送数据缺少邮箱地址",
"decoded_data": decoded_data
})
notification_record.save()
return False
# 更新处理记录
notification_record.email_address = email_address
notification_record.history_id = history_id or "unknown"
notification_record.metadata.update(decoded_data)
notification_record.save()
# 查找对应的Gmail凭证
credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first()
if not credential:
logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}")
notification_record.metadata.update({
"error": f"未找到对应的Gmail凭证: {email_address}"
})
notification_record.save()
return False
user = credential.user
logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}")
# 处理新邮件
processed_emails = GmailService.process_new_emails(user, credential, history_id)
logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)}")
# 处理达人回复状态更新
if processed_emails:
update_creator_conversation_status(user, processed_emails)
# 处理自动回复
if processed_emails:
try:
process_auto_replies.delay(user.id, credential.id, processed_emails)
logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件")
except Exception as reply_task_error:
logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}")
notification_record.metadata.update({
"auto_reply_error": str(reply_task_error),
"processed_emails": processed_emails
})
notification_record.save()
# 更新凭证的历史ID
if history_id:
credential.last_history_id = history_id
credential.save()
logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}")
# 更新通知记录为成功
notification_record.is_successful = True
notification_record.metadata.update({
"completed_at": timezone.now().isoformat(),
"processed_emails_count": len(processed_emails)
})
notification_record.save()
return True
except json.JSONDecodeError as e:
logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}")
notification_record.metadata.update({
"error": f"JSON解码失败: {str(e)}",
"raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符避免数据过大
})
notification_record.save()
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=e, countdown=30)
except Exception as decode_error:
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}")
notification_record.metadata.update({
"error": f"解析推送数据失败: {str(decode_error)}",
"traceback": traceback.format_exc()
})
notification_record.save()
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=decode_error, countdown=30)
except Exception as e:
logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}")
logger.error(traceback.format_exc())
# 尝试更新通知记录
try:
notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first()
if notification_record:
notification_record.metadata.update({
"error": str(e),
"traceback": traceback.format_exc(),
"failed_at": timezone.now().isoformat()
})
notification_record.save()
except Exception as record_error:
logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}")
# 尝试重试任务
raise self.retry(exc=e, countdown=60)
def update_creator_conversation_status(user, email_ids):
"""
根据新处理的邮件更新达人对话跟踪状态
Args:
user: 用户对象
email_ids: 处理的邮件ID列表
Returns:
bool: 处理是否成功
"""
try:
# 尝试导入CreatorConversationTracker模型
try:
from apps.feishu.models import CreatorConversationTracker
except ImportError:
logger.warning("CreatorConversationTracker模型不存在跳过更新达人对话状态")
return False
# 遍历每个邮件ID
for email_id in email_ids:
# 通过邮件ID查找对应的聊天记录
chat_msg = ChatHistory.objects.filter(
metadata__gmail_message_id=email_id
).order_by('-created_at').first()
if not chat_msg:
logger.info(f"邮件 {email_id} 未找到对应的聊天记录,跳过状态更新")
continue
# 只处理达人的消息
if chat_msg.role != 'assistant':
continue
conversation_id = chat_msg.conversation_id
# 查找关联的对话
conversation = GmailConversation.objects.filter(
conversation_id=conversation_id
).first()
if not conversation:
logger.info(f"未找到对话 {conversation_id},跳过状态更新")
continue
# 查找对应的跟踪记录
tracker = CreatorConversationTracker.objects.filter(
conversation_id=conversation_id
).first()
if not tracker:
logger.info(f"未找到对话 {conversation_id} 的跟踪记录,跳过状态更新")
continue
# 更新回复状态
tracker.has_replied = True
# 检查是否达成目标
goal_achieved = check_goal_achievement(conversation_id)
if goal_achieved:
tracker.goal_achieved = True
# 保存更新
tracker.updated_at = timezone.now()
tracker.save()
logger.info(f"更新了对话 {conversation_id} 的跟踪状态: has_replied=True, goal_achieved={goal_achieved}")
return True
except Exception as e:
logger.error(f"更新达人对话跟踪状态失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
def check_goal_achievement(conversation_id):
"""
检查对话是否达成目标(找到价格信息)
Args:
conversation_id: 对话ID
Returns:
bool: 是否达成目标
"""
try:
# 获取对话中的所有消息
messages = ChatHistory.objects.filter(conversation_id=conversation_id).order_by('created_at')
# 简单判断:检查是否有回复中包含价格信息
keywords = ['rate', 'price', 'cost', '$', 'usd', 'charge', 'fee']
for message in messages:
# 只检查达人的回复
if message.role == 'assistant':
content_lower = message.content.lower()
if any(keyword in content_lower for keyword in keywords):
logger.info(f"对话 {conversation_id} 已达成目标,找到价格信息")
return True
return False
except Exception as e:
logger.error(f"检查目标达成失败: {str(e)}")
return False
@shared_task(
bind=True,
max_retries=2,
default_retry_delay=60,
time_limit=300,
soft_time_limit=240
)
def process_auto_replies(self, user_id, credential_id, email_ids):
"""
处理自动回复的任务
Args:
user_id: 用户ID
credential_id: Gmail凭证ID
email_ids: 需要处理的邮件ID列表
Returns:
bool: 处理是否成功
"""
try:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
credential = GmailCredential.objects.get(id=credential_id)
logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复")
# 遍历每个处理过的邮件ID
for email_id in email_ids:
try:
# 通过邮件ID查找对应的聊天记录
chat_msg = ChatHistory.objects.filter(
metadata__gmail_message_id=email_id
).order_by('-created_at').first()
if not chat_msg:
logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复")
continue
conversation_id = chat_msg.conversation_id
# 检查对话是否是活跃的
conversation = GmailConversation.objects.filter(
conversation_id=conversation_id,
is_active=True
).first()
if not conversation:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复")
continue
logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复")
# 检查邮件角色,只有达人发送的邮件才自动回复
if chat_msg.role != 'assistant':
logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复")
continue
# 查找对话的目标
goal = UserGoal.objects.filter(
conversation=conversation,
is_active=True
).first()
if not goal:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复")
continue
# 获取对话摘要
conversation_summary = get_conversation_summary(conversation_id)
if not conversation_summary:
conversation_summary = "无对话摘要"
# 获取最后一条达人消息
last_message = get_last_message(conversation_id)
if not last_message:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复")
continue
# 生成推荐回复
reply_content, error = generate_recommended_reply(
user=user,
goal_description=goal.description,
conversation_summary=conversation_summary,
last_message=last_message
)
if error:
logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}")
continue
if not reply_content:
logger.warning(f"[Auto Reply Task] 生成的推荐回复为空")
continue
# 构建回复的主题
subject = chat_msg.metadata.get('subject', '')
reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject
# 准备发送自动回复
logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}{conversation.influencer_email}")
success, reply_message_id = GmailService.send_email(
user=user,
user_email=conversation.user_email,
to_email=conversation.influencer_email,
subject=reply_subject,
body=reply_content
)
if success:
logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}")
# 更新目标状态
goal.last_activity_time = timezone.now()
if goal.status == 'pending':
goal.status = 'in_progress'
goal.save(update_fields=['last_activity_time', 'status', 'updated_at'])
else:
logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}")
except Exception as reply_error:
logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}")
import traceback
logger.error(traceback.format_exc())
return True
except Exception as e:
logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise self.retry(exc=e, countdown=30)