daren/apps/gmail/tasks.py

409 lines
16 KiB
Python
Raw Normal View History

2025-05-29 10:08:06 +08:00
import logging
import json
import base64
2025-05-29 18:45:06 +08:00
import traceback
2025-05-29 10:08:06 +08:00
from celery import shared_task
from django.utils import timezone
from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail
from .services.gmail_service import GmailService
from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply
from apps.chat.models import ChatHistory
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # 重试延迟60秒
acks_late=True, # 任务完成后再确认
time_limit=600, # 任务超时限制600秒
soft_time_limit=300 # 软超时限制300秒
)
def process_push_notification(self, message_data, message_id, subscription):
"""
处理Gmail推送通知的Celery任务
Args:
message_data: Base64编码的消息数据
message_id: 消息ID
subscription: 订阅信息
Returns:
bool: 处理是否成功
"""
try:
logger.info(f"开始处理推送通知: {message_id}")
print(f"[Gmail Webhook Task] 处理推送通知: {message_id}")
# 检查是否已处理过该消息
if ProcessedPushNotification.objects.filter(message_id=message_id).exists():
logger.info(f"通知 {message_id} 已处理过,跳过")
print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过")
return True
2025-05-29 18:45:06 +08:00
# 尝试创建一个处理记录,标记为开始处理
notification_record = ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="processing",
history_id="unknown",
is_successful=False,
metadata={
"subscription": subscription,
"started_at": timezone.now().isoformat()
}
)
2025-05-29 10:08:06 +08:00
# Base64解码消息数据
try:
decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8'))
print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}")
# 获取Gmail通知关键信息
email_address = decoded_data.get('emailAddress')
history_id = decoded_data.get('historyId')
if not email_address:
logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址")
2025-05-29 18:45:06 +08:00
# 更新处理记录
notification_record.metadata.update({
"error": "推送数据缺少邮箱地址",
"decoded_data": decoded_data
})
notification_record.save()
2025-05-29 10:08:06 +08:00
return False
2025-05-29 18:45:06 +08:00
# 更新处理记录
notification_record.email_address = email_address
notification_record.history_id = history_id or "unknown"
notification_record.metadata.update(decoded_data)
notification_record.save()
2025-05-29 10:08:06 +08:00
# 查找对应的Gmail凭证
credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first()
if not credential:
logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}")
2025-05-29 18:45:06 +08:00
notification_record.metadata.update({
"error": f"未找到对应的Gmail凭证: {email_address}"
})
2025-05-29 10:08:06 +08:00
notification_record.save()
return False
user = credential.user
logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}")
# 处理新邮件
processed_emails = GmailService.process_new_emails(user, credential, history_id)
logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)}")
# 处理达人回复状态更新
if processed_emails:
update_creator_conversation_status(user, processed_emails)
2025-05-29 10:08:06 +08:00
# 处理自动回复
if processed_emails:
2025-05-29 18:45:06 +08:00
try:
process_auto_replies.delay(user.id, credential.id, processed_emails)
logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件")
except Exception as reply_task_error:
logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}")
notification_record.metadata.update({
"auto_reply_error": str(reply_task_error),
"processed_emails": processed_emails
})
notification_record.save()
2025-05-29 10:08:06 +08:00
# 更新凭证的历史ID
if history_id:
credential.last_history_id = history_id
credential.save()
logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}")
2025-05-29 18:45:06 +08:00
# 更新通知记录为成功
2025-05-29 10:08:06 +08:00
notification_record.is_successful = True
2025-05-29 18:45:06 +08:00
notification_record.metadata.update({
"completed_at": timezone.now().isoformat(),
"processed_emails_count": len(processed_emails)
})
2025-05-29 10:08:06 +08:00
notification_record.save()
return True
2025-05-29 18:45:06 +08:00
except json.JSONDecodeError as e:
logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}")
notification_record.metadata.update({
"error": f"JSON解码失败: {str(e)}",
"raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符避免数据过大
})
notification_record.save()
2025-05-29 10:08:06 +08:00
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=e, countdown=30)
2025-05-29 18:45:06 +08:00
except Exception as decode_error:
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}")
notification_record.metadata.update({
"error": f"解析推送数据失败: {str(decode_error)}",
"traceback": traceback.format_exc()
})
notification_record.save()
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=decode_error, countdown=30)
2025-05-29 10:08:06 +08:00
except Exception as e:
logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}")
logger.error(traceback.format_exc())
2025-05-29 18:45:06 +08:00
# 尝试更新通知记录
try:
notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first()
if notification_record:
notification_record.metadata.update({
"error": str(e),
"traceback": traceback.format_exc(),
"failed_at": timezone.now().isoformat()
})
notification_record.save()
except Exception as record_error:
logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}")
2025-05-29 10:08:06 +08:00
# 尝试重试任务
raise self.retry(exc=e, countdown=60)
def update_creator_conversation_status(user, email_ids):
"""
根据新处理的邮件更新达人对话跟踪状态
Args:
user: 用户对象
email_ids: 处理的邮件ID列表
Returns:
bool: 处理是否成功
"""
try:
# 尝试导入CreatorConversationTracker模型
try:
from apps.feishu.models import CreatorConversationTracker
except ImportError:
logger.warning("CreatorConversationTracker模型不存在跳过更新达人对话状态")
return False
# 遍历每个邮件ID
for email_id in email_ids:
# 通过邮件ID查找对应的聊天记录
chat_msg = ChatHistory.objects.filter(
metadata__gmail_message_id=email_id
).order_by('-created_at').first()
if not chat_msg:
logger.info(f"邮件 {email_id} 未找到对应的聊天记录,跳过状态更新")
continue
# 只处理达人的消息
if chat_msg.role != 'assistant':
continue
conversation_id = chat_msg.conversation_id
# 查找关联的对话
conversation = GmailConversation.objects.filter(
conversation_id=conversation_id
).first()
if not conversation:
logger.info(f"未找到对话 {conversation_id},跳过状态更新")
continue
# 查找对应的跟踪记录
tracker = CreatorConversationTracker.objects.filter(
conversation_id=conversation_id
).first()
if not tracker:
logger.info(f"未找到对话 {conversation_id} 的跟踪记录,跳过状态更新")
continue
# 更新回复状态
tracker.has_replied = True
# 检查是否达成目标
goal_achieved = check_goal_achievement(conversation_id)
if goal_achieved:
tracker.goal_achieved = True
# 保存更新
tracker.updated_at = timezone.now()
tracker.save()
logger.info(f"更新了对话 {conversation_id} 的跟踪状态: has_replied=True, goal_achieved={goal_achieved}")
return True
except Exception as e:
logger.error(f"更新达人对话跟踪状态失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
def check_goal_achievement(conversation_id):
"""
检查对话是否达成目标找到价格信息
Args:
conversation_id: 对话ID
Returns:
bool: 是否达成目标
"""
try:
# 获取对话中的所有消息
messages = ChatHistory.objects.filter(conversation_id=conversation_id).order_by('created_at')
# 简单判断:检查是否有回复中包含价格信息
keywords = ['rate', 'price', 'cost', '$', 'usd', 'charge', 'fee']
for message in messages:
# 只检查达人的回复
if message.role == 'assistant':
content_lower = message.content.lower()
if any(keyword in content_lower for keyword in keywords):
logger.info(f"对话 {conversation_id} 已达成目标,找到价格信息")
return True
return False
except Exception as e:
logger.error(f"检查目标达成失败: {str(e)}")
return False
2025-05-29 10:08:06 +08:00
@shared_task(
bind=True,
max_retries=2,
default_retry_delay=60,
time_limit=300,
soft_time_limit=240
)
def process_auto_replies(self, user_id, credential_id, email_ids):
"""
处理自动回复的任务
Args:
user_id: 用户ID
credential_id: Gmail凭证ID
email_ids: 需要处理的邮件ID列表
Returns:
bool: 处理是否成功
"""
try:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
credential = GmailCredential.objects.get(id=credential_id)
logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复")
# 遍历每个处理过的邮件ID
for email_id in email_ids:
try:
# 通过邮件ID查找对应的聊天记录
chat_msg = ChatHistory.objects.filter(
metadata__gmail_message_id=email_id
).order_by('-created_at').first()
if not chat_msg:
logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复")
continue
conversation_id = chat_msg.conversation_id
# 检查对话是否是活跃的
conversation = GmailConversation.objects.filter(
conversation_id=conversation_id,
is_active=True
).first()
if not conversation:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复")
continue
logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复")
# 检查邮件角色,只有达人发送的邮件才自动回复
if chat_msg.role != 'assistant':
logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复")
continue
# 查找对话的目标
goal = UserGoal.objects.filter(
conversation=conversation,
is_active=True
).first()
if not goal:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复")
continue
# 获取对话摘要
conversation_summary = get_conversation_summary(conversation_id)
if not conversation_summary:
conversation_summary = "无对话摘要"
# 获取最后一条达人消息
last_message = get_last_message(conversation_id)
if not last_message:
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复")
continue
# 生成推荐回复
reply_content, error = generate_recommended_reply(
user=user,
goal_description=goal.description,
conversation_summary=conversation_summary,
last_message=last_message
)
if error:
logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}")
continue
if not reply_content:
logger.warning(f"[Auto Reply Task] 生成的推荐回复为空")
continue
# 构建回复的主题
subject = chat_msg.metadata.get('subject', '')
reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject
# 准备发送自动回复
logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}{conversation.influencer_email}")
success, reply_message_id = GmailService.send_email(
user=user,
user_email=conversation.user_email,
to_email=conversation.influencer_email,
subject=reply_subject,
body=reply_content
)
if success:
logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}")
# 更新目标状态
goal.last_activity_time = timezone.now()
if goal.status == 'pending':
goal.status = 'in_progress'
goal.save(update_fields=['last_activity_time', 'status', 'updated_at'])
else:
logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}")
except Exception as reply_error:
logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}")
import traceback
logger.error(traceback.format_exc())
return True
except Exception as e:
logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise self.retry(exc=e, countdown=30)