2025-05-29 10:08:06 +08:00
|
|
|
|
import logging
|
|
|
|
|
import json
|
|
|
|
|
import base64
|
2025-05-29 18:45:06 +08:00
|
|
|
|
import traceback
|
2025-05-29 10:08:06 +08:00
|
|
|
|
from celery import shared_task
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail
|
|
|
|
|
from .services.gmail_service import GmailService
|
|
|
|
|
from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply
|
|
|
|
|
from apps.chat.models import ChatHistory
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
@shared_task(
|
|
|
|
|
bind=True,
|
|
|
|
|
max_retries=3,
|
|
|
|
|
default_retry_delay=60, # 重试延迟60秒
|
|
|
|
|
acks_late=True, # 任务完成后再确认
|
|
|
|
|
time_limit=600, # 任务超时限制600秒
|
|
|
|
|
soft_time_limit=300 # 软超时限制300秒
|
|
|
|
|
)
|
|
|
|
|
def process_push_notification(self, message_data, message_id, subscription):
|
|
|
|
|
"""
|
|
|
|
|
处理Gmail推送通知的Celery任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
message_data: Base64编码的消息数据
|
|
|
|
|
message_id: 消息ID
|
|
|
|
|
subscription: 订阅信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 处理是否成功
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
logger.info(f"开始处理推送通知: {message_id}")
|
|
|
|
|
print(f"[Gmail Webhook Task] 处理推送通知: {message_id}")
|
|
|
|
|
|
|
|
|
|
# 检查是否已处理过该消息
|
|
|
|
|
if ProcessedPushNotification.objects.filter(message_id=message_id).exists():
|
|
|
|
|
logger.info(f"通知 {message_id} 已处理过,跳过")
|
|
|
|
|
print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过")
|
|
|
|
|
return True
|
|
|
|
|
|
2025-05-29 18:45:06 +08:00
|
|
|
|
# 尝试创建一个处理记录,标记为开始处理
|
|
|
|
|
notification_record = ProcessedPushNotification.objects.create(
|
|
|
|
|
message_id=message_id,
|
|
|
|
|
email_address="processing",
|
|
|
|
|
history_id="unknown",
|
|
|
|
|
is_successful=False,
|
|
|
|
|
metadata={
|
|
|
|
|
"subscription": subscription,
|
|
|
|
|
"started_at": timezone.now().isoformat()
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
2025-05-29 10:08:06 +08:00
|
|
|
|
# Base64解码消息数据
|
|
|
|
|
try:
|
|
|
|
|
decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8'))
|
|
|
|
|
print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}")
|
|
|
|
|
|
|
|
|
|
# 获取Gmail通知关键信息
|
|
|
|
|
email_address = decoded_data.get('emailAddress')
|
|
|
|
|
history_id = decoded_data.get('historyId')
|
|
|
|
|
|
|
|
|
|
if not email_address:
|
|
|
|
|
logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址")
|
2025-05-29 18:45:06 +08:00
|
|
|
|
# 更新处理记录
|
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"error": "推送数据缺少邮箱地址",
|
|
|
|
|
"decoded_data": decoded_data
|
|
|
|
|
})
|
|
|
|
|
notification_record.save()
|
2025-05-29 10:08:06 +08:00
|
|
|
|
return False
|
|
|
|
|
|
2025-05-29 18:45:06 +08:00
|
|
|
|
# 更新处理记录
|
|
|
|
|
notification_record.email_address = email_address
|
|
|
|
|
notification_record.history_id = history_id or "unknown"
|
|
|
|
|
notification_record.metadata.update(decoded_data)
|
|
|
|
|
notification_record.save()
|
2025-05-29 10:08:06 +08:00
|
|
|
|
|
|
|
|
|
# 查找对应的Gmail凭证
|
|
|
|
|
credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first()
|
|
|
|
|
if not credential:
|
|
|
|
|
logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}")
|
2025-05-29 18:45:06 +08:00
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"error": f"未找到对应的Gmail凭证: {email_address}"
|
|
|
|
|
})
|
2025-05-29 10:08:06 +08:00
|
|
|
|
notification_record.save()
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
user = credential.user
|
|
|
|
|
logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}")
|
|
|
|
|
|
|
|
|
|
# 处理新邮件
|
|
|
|
|
processed_emails = GmailService.process_new_emails(user, credential, history_id)
|
|
|
|
|
logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)} 封")
|
|
|
|
|
|
2025-06-03 18:50:29 +08:00
|
|
|
|
# 处理达人回复状态更新
|
|
|
|
|
if processed_emails:
|
|
|
|
|
update_creator_conversation_status(user, processed_emails)
|
|
|
|
|
|
2025-05-29 10:08:06 +08:00
|
|
|
|
# 处理自动回复
|
|
|
|
|
if processed_emails:
|
2025-05-29 18:45:06 +08:00
|
|
|
|
try:
|
|
|
|
|
process_auto_replies.delay(user.id, credential.id, processed_emails)
|
|
|
|
|
logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件")
|
|
|
|
|
except Exception as reply_task_error:
|
|
|
|
|
logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}")
|
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"auto_reply_error": str(reply_task_error),
|
|
|
|
|
"processed_emails": processed_emails
|
|
|
|
|
})
|
|
|
|
|
notification_record.save()
|
2025-05-29 10:08:06 +08:00
|
|
|
|
|
|
|
|
|
# 更新凭证的历史ID
|
|
|
|
|
if history_id:
|
|
|
|
|
credential.last_history_id = history_id
|
|
|
|
|
credential.save()
|
|
|
|
|
logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}")
|
|
|
|
|
|
2025-05-29 18:45:06 +08:00
|
|
|
|
# 更新通知记录为成功
|
2025-05-29 10:08:06 +08:00
|
|
|
|
notification_record.is_successful = True
|
2025-05-29 18:45:06 +08:00
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"completed_at": timezone.now().isoformat(),
|
|
|
|
|
"processed_emails_count": len(processed_emails)
|
|
|
|
|
})
|
2025-05-29 10:08:06 +08:00
|
|
|
|
notification_record.save()
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
2025-05-29 18:45:06 +08:00
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}")
|
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"error": f"JSON解码失败: {str(e)}",
|
|
|
|
|
"raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符,避免数据过大
|
|
|
|
|
})
|
|
|
|
|
notification_record.save()
|
2025-05-29 10:08:06 +08:00
|
|
|
|
# 抛出异常以便Celery可能进行重试
|
|
|
|
|
raise self.retry(exc=e, countdown=30)
|
2025-05-29 18:45:06 +08:00
|
|
|
|
except Exception as decode_error:
|
|
|
|
|
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}")
|
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"error": f"解析推送数据失败: {str(decode_error)}",
|
|
|
|
|
"traceback": traceback.format_exc()
|
|
|
|
|
})
|
|
|
|
|
notification_record.save()
|
|
|
|
|
# 抛出异常以便Celery可能进行重试
|
|
|
|
|
raise self.retry(exc=decode_error, countdown=30)
|
2025-05-29 10:08:06 +08:00
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}")
|
|
|
|
|
logger.error(traceback.format_exc())
|
2025-05-29 18:45:06 +08:00
|
|
|
|
# 尝试更新通知记录
|
|
|
|
|
try:
|
|
|
|
|
notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first()
|
|
|
|
|
if notification_record:
|
|
|
|
|
notification_record.metadata.update({
|
|
|
|
|
"error": str(e),
|
|
|
|
|
"traceback": traceback.format_exc(),
|
|
|
|
|
"failed_at": timezone.now().isoformat()
|
|
|
|
|
})
|
|
|
|
|
notification_record.save()
|
|
|
|
|
except Exception as record_error:
|
|
|
|
|
logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}")
|
|
|
|
|
|
2025-05-29 10:08:06 +08:00
|
|
|
|
# 尝试重试任务
|
|
|
|
|
raise self.retry(exc=e, countdown=60)
|
|
|
|
|
|
2025-06-03 18:50:29 +08:00
|
|
|
|
def update_creator_conversation_status(user, email_ids):
|
|
|
|
|
"""
|
|
|
|
|
根据新处理的邮件更新达人对话跟踪状态
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
user: 用户对象
|
|
|
|
|
email_ids: 处理的邮件ID列表
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 处理是否成功
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 尝试导入CreatorConversationTracker模型
|
|
|
|
|
try:
|
|
|
|
|
from apps.feishu.models import CreatorConversationTracker
|
|
|
|
|
except ImportError:
|
|
|
|
|
logger.warning("CreatorConversationTracker模型不存在,跳过更新达人对话状态")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# 遍历每个邮件ID
|
|
|
|
|
for email_id in email_ids:
|
|
|
|
|
# 通过邮件ID查找对应的聊天记录
|
|
|
|
|
chat_msg = ChatHistory.objects.filter(
|
|
|
|
|
metadata__gmail_message_id=email_id
|
|
|
|
|
).order_by('-created_at').first()
|
|
|
|
|
|
|
|
|
|
if not chat_msg:
|
|
|
|
|
logger.info(f"邮件 {email_id} 未找到对应的聊天记录,跳过状态更新")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 只处理达人的消息
|
|
|
|
|
if chat_msg.role != 'assistant':
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
conversation_id = chat_msg.conversation_id
|
|
|
|
|
|
|
|
|
|
# 查找关联的对话
|
|
|
|
|
conversation = GmailConversation.objects.filter(
|
|
|
|
|
conversation_id=conversation_id
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if not conversation:
|
|
|
|
|
logger.info(f"未找到对话 {conversation_id},跳过状态更新")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 查找对应的跟踪记录
|
|
|
|
|
tracker = CreatorConversationTracker.objects.filter(
|
|
|
|
|
conversation_id=conversation_id
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if not tracker:
|
|
|
|
|
logger.info(f"未找到对话 {conversation_id} 的跟踪记录,跳过状态更新")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 更新回复状态
|
|
|
|
|
tracker.has_replied = True
|
|
|
|
|
|
|
|
|
|
# 检查是否达成目标
|
|
|
|
|
goal_achieved = check_goal_achievement(conversation_id)
|
|
|
|
|
if goal_achieved:
|
|
|
|
|
tracker.goal_achieved = True
|
|
|
|
|
|
|
|
|
|
# 保存更新
|
|
|
|
|
tracker.updated_at = timezone.now()
|
|
|
|
|
tracker.save()
|
|
|
|
|
|
|
|
|
|
logger.info(f"更新了对话 {conversation_id} 的跟踪状态: has_replied=True, goal_achieved={goal_achieved}")
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"更新达人对话跟踪状态失败: {str(e)}")
|
|
|
|
|
import traceback
|
|
|
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def check_goal_achievement(conversation_id):
|
|
|
|
|
"""
|
|
|
|
|
检查对话是否达成目标(找到价格信息)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
conversation_id: 对话ID
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 是否达成目标
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 获取对话中的所有消息
|
|
|
|
|
messages = ChatHistory.objects.filter(conversation_id=conversation_id).order_by('created_at')
|
|
|
|
|
|
|
|
|
|
# 简单判断:检查是否有回复中包含价格信息
|
|
|
|
|
keywords = ['rate', 'price', 'cost', '$', 'usd', 'charge', 'fee']
|
|
|
|
|
|
|
|
|
|
for message in messages:
|
|
|
|
|
# 只检查达人的回复
|
|
|
|
|
if message.role == 'assistant':
|
|
|
|
|
content_lower = message.content.lower()
|
|
|
|
|
if any(keyword in content_lower for keyword in keywords):
|
|
|
|
|
logger.info(f"对话 {conversation_id} 已达成目标,找到价格信息")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"检查目标达成失败: {str(e)}")
|
|
|
|
|
return False
|
2025-05-29 10:08:06 +08:00
|
|
|
|
|
|
|
|
|
@shared_task(
|
|
|
|
|
bind=True,
|
|
|
|
|
max_retries=2,
|
|
|
|
|
default_retry_delay=60,
|
|
|
|
|
time_limit=300,
|
|
|
|
|
soft_time_limit=240
|
|
|
|
|
)
|
|
|
|
|
def process_auto_replies(self, user_id, credential_id, email_ids):
|
|
|
|
|
"""
|
|
|
|
|
处理自动回复的任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
user_id: 用户ID
|
|
|
|
|
credential_id: Gmail凭证ID
|
|
|
|
|
email_ids: 需要处理的邮件ID列表
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 处理是否成功
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
from django.contrib.auth import get_user_model
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
|
|
|
|
user = User.objects.get(id=user_id)
|
|
|
|
|
credential = GmailCredential.objects.get(id=credential_id)
|
|
|
|
|
|
|
|
|
|
logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复")
|
|
|
|
|
|
|
|
|
|
# 遍历每个处理过的邮件ID
|
|
|
|
|
for email_id in email_ids:
|
|
|
|
|
try:
|
|
|
|
|
# 通过邮件ID查找对应的聊天记录
|
|
|
|
|
chat_msg = ChatHistory.objects.filter(
|
|
|
|
|
metadata__gmail_message_id=email_id
|
|
|
|
|
).order_by('-created_at').first()
|
|
|
|
|
|
|
|
|
|
if not chat_msg:
|
|
|
|
|
logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
conversation_id = chat_msg.conversation_id
|
|
|
|
|
|
|
|
|
|
# 检查对话是否是活跃的
|
|
|
|
|
conversation = GmailConversation.objects.filter(
|
|
|
|
|
conversation_id=conversation_id,
|
|
|
|
|
is_active=True
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if not conversation:
|
|
|
|
|
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复")
|
|
|
|
|
|
|
|
|
|
# 检查邮件角色,只有达人发送的邮件才自动回复
|
|
|
|
|
if chat_msg.role != 'assistant':
|
|
|
|
|
logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 查找对话的目标
|
|
|
|
|
goal = UserGoal.objects.filter(
|
|
|
|
|
conversation=conversation,
|
|
|
|
|
is_active=True
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if not goal:
|
|
|
|
|
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 获取对话摘要
|
|
|
|
|
conversation_summary = get_conversation_summary(conversation_id)
|
|
|
|
|
if not conversation_summary:
|
|
|
|
|
conversation_summary = "无对话摘要"
|
|
|
|
|
|
|
|
|
|
# 获取最后一条达人消息
|
|
|
|
|
last_message = get_last_message(conversation_id)
|
|
|
|
|
if not last_message:
|
|
|
|
|
logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 生成推荐回复
|
|
|
|
|
reply_content, error = generate_recommended_reply(
|
|
|
|
|
user=user,
|
|
|
|
|
goal_description=goal.description,
|
|
|
|
|
conversation_summary=conversation_summary,
|
|
|
|
|
last_message=last_message
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if error:
|
|
|
|
|
logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if not reply_content:
|
|
|
|
|
logger.warning(f"[Auto Reply Task] 生成的推荐回复为空")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 构建回复的主题
|
|
|
|
|
subject = chat_msg.metadata.get('subject', '')
|
|
|
|
|
reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject
|
|
|
|
|
|
|
|
|
|
# 准备发送自动回复
|
|
|
|
|
logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}到{conversation.influencer_email}")
|
|
|
|
|
|
|
|
|
|
success, reply_message_id = GmailService.send_email(
|
|
|
|
|
user=user,
|
|
|
|
|
user_email=conversation.user_email,
|
|
|
|
|
to_email=conversation.influencer_email,
|
|
|
|
|
subject=reply_subject,
|
|
|
|
|
body=reply_content
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if success:
|
|
|
|
|
logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}")
|
|
|
|
|
|
|
|
|
|
# 更新目标状态
|
|
|
|
|
goal.last_activity_time = timezone.now()
|
|
|
|
|
if goal.status == 'pending':
|
|
|
|
|
goal.status = 'in_progress'
|
|
|
|
|
goal.save(update_fields=['last_activity_time', 'status', 'updated_at'])
|
|
|
|
|
else:
|
|
|
|
|
logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}")
|
|
|
|
|
|
|
|
|
|
except Exception as reply_error:
|
|
|
|
|
logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}")
|
|
|
|
|
import traceback
|
|
|
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}")
|
|
|
|
|
import traceback
|
|
|
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
raise self.retry(exc=e, countdown=30)
|