From 94b82723facd69a81f90596dc95df8efedf212b7 Mon Sep 17 00:00:00 2001 From: wanjia Date: Tue, 3 Jun 2025 18:50:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=9B=9E=E5=A4=8D=E8=BE=BE?= =?UTF-8?q?=E4=BA=BA=E5=92=8C=E5=B0=86=E5=9B=9E=E5=A4=8D=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E5=88=B0=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/feishu/models.py | 27 ++- apps/feishu/urls.py | 4 +- apps/feishu/views.py | 371 +++++++++++++++++++++++++++++++++++++++++- apps/gmail/tasks.py | 110 +++++++++++++ 4 files changed, 508 insertions(+), 4 deletions(-) diff --git a/apps/feishu/models.py b/apps/feishu/models.py index 6a84285..7dbf6e8 100644 --- a/apps/feishu/models.py +++ b/apps/feishu/models.py @@ -72,4 +72,29 @@ class FeishuTableMapping(models.Model): unique_together = ('app_token', 'table_id') def __str__(self): - return f"{self.feishu_table_name or self.table_name} ({self.table_id})" \ No newline at end of file + return f"{self.feishu_table_name or self.table_name} ({self.table_id})" + +class CreatorConversationTracker(models.Model): + """ + 跟踪创作者回复和目标完成情况的模型 + """ + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + creator_profile = models.ForeignKey('daren_detail.CreatorProfile', on_delete=models.CASCADE, verbose_name='创作者') + conversation_id = models.CharField(max_length=100, verbose_name='对话ID') + has_replied = models.BooleanField(default=False, verbose_name='是否已回复') + goal_achieved = models.BooleanField(default=False, verbose_name='是否达成目标') + user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='用户') + user_email = models.EmailField(verbose_name='用户邮箱') + influencer_email = models.EmailField(verbose_name='达人邮箱') + + created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') + updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间') + + class Meta: + verbose_name = '创作者对话跟踪' + verbose_name_plural = '创作者对话跟踪' + db_table = 'creator_conversation_tracker' + unique_together = ('creator_profile', 'conversation_id') + + def __str__(self): + return f"{self.creator_profile} - {self.conversation_id}" \ No newline at end of file diff --git a/apps/feishu/urls.py b/apps/feishu/urls.py index d1a3570..54303dd 100644 --- a/apps/feishu/urls.py +++ b/apps/feishu/urls.py @@ -6,7 +6,7 @@ from .views import ( FeishuTableMappingDetailView, GmailExtractionView, AutoGmailConversationView, - + BatchGmailConversationView ) urlpatterns = [ @@ -16,5 +16,5 @@ urlpatterns = [ path('mappings//', FeishuTableMappingDetailView.as_view(), name='feishu-table-mapping-detail'), path('extract-gmail/', GmailExtractionView.as_view(), name='gmail-extraction'), path('auto-conversations/', AutoGmailConversationView.as_view(), name='auto-conversations'), - + path('batch-conversations/', BatchGmailConversationView.as_view(), name='batch-conversations'), ] diff --git a/apps/feishu/views.py b/apps/feishu/views.py index b0e262f..7d6e180 100644 --- a/apps/feishu/views.py +++ b/apps/feishu/views.py @@ -8,7 +8,7 @@ from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.http import Http404 -from .models import FeishuTableMapping +from .models import FeishuTableMapping, CreatorConversationTracker from .services.bitable_service import BitableService from .services.data_sync_service import DataSyncService from .services.gmail_extraction_service import GmailExtractionService @@ -524,3 +524,372 @@ OOIN Media""" } }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +class BatchGmailConversationView(APIView): + """ + 批量Gmail自动对话API,支持根据达人库批量发送邮件并跟踪回复状态 + """ + permission_classes = [IsAuthenticated] + authentication_classes = [CustomTokenAuthentication] + + def post(self, request, *args, **kwargs): + """ + 批量创建自动对话,根据达人库中的邮箱发送打招呼消息 + + 请求参数: + - user_email: 用户的Gmail邮箱(已授权) + - creator_ids: 创作者ID列表,可选,不提供则使用creator_pool_id + - creator_pool_id: 创作者库ID,可选,不提供则使用creator_ids + - goal_description: 对话目标描述 + - batch_size: 批量处理大小,默认10 + """ + try: + # 获取请求数据 + data = request.data + user_email = data.get('user_email') + creator_ids = data.get('creator_ids', []) + creator_pool_id = data.get('creator_pool_id') + goal_description = data.get('goal_description') + batch_size = int(data.get('batch_size', 10)) + + # 验证必填参数 + if not user_email or not goal_description: + return Response({ + 'code': 400, + 'message': '缺少必要参数: user_email, goal_description', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + if not creator_ids and not creator_pool_id: + return Response({ + 'code': 400, + 'message': '请提供creator_ids或creator_pool_id其中之一', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 验证Gmail凭证 + credential = GmailCredential.objects.filter(user=request.user, email=user_email).first() + if not credential: + return Response({ + 'code': 400, + 'message': f"未找到{user_email}的Gmail授权", + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 获取创作者列表 + from apps.daren_detail.models import CreatorProfile, PrivateCreatorPool, PrivateCreatorRelation + + creators = [] + if creator_ids: + creators = CreatorProfile.objects.filter(id__in=creator_ids) + elif creator_pool_id: + # 从私有池获取创作者 + relations = PrivateCreatorRelation.objects.filter( + private_pool_id=creator_pool_id, + status='active' + ).select_related('creator') + creators = [relation.creator for relation in relations] + + if not creators: + return Response({ + 'code': 400, + 'message': '未找到有效的创作者', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 筛选有邮箱的创作者 + valid_creators = [creator for creator in creators if creator.email] + + if not valid_creators: + return Response({ + 'code': 400, + 'message': '所选创作者中没有有效的邮箱地址', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + logger.info(f"开始批量创建Gmail自动对话: 用户={request.user.name}, 邮箱={user_email}, 创作者数量={len(valid_creators)}") + + # 创建结果统计 + result_stats = { + 'total': len(valid_creators), + 'success': 0, + 'failed': 0, + 'skipped': 0, + 'details': [] + } + + # 准备固定的打招呼消息 + greeting_message = """Paid Collaboration Opportunity with TikTok's #1 Fragrance Brand 🌸 +Hi, +I'm Vira from OOIN Media, and I'm reaching out on behalf of a top-performing fragrance brand Sttes on TikTok Shop—currently ranked #1 in the perfume category. +This brand has already launched several viral products and is now looking to partner with select creators like you through paid collaborations to continue driving awareness and sales. +We'd love to explore a partnership and would appreciate it if you could share: +Your rate for a single TikTok video +Whether you offer bundle pricing for multiple videos +Any additional details or formats you offer (e.g. story integration, livestream add-ons, etc.) +The product has strong market traction, proven conversions, and a competitive commission structure if you're also open to affiliate partnerships. +Looking forward to the opportunity to work together and hearing your rates! +Warm regards, +Vira +OOIN Media""" + + # 按批次处理 + for i in range(0, len(valid_creators), batch_size): + batch_creators = valid_creators[i:i+batch_size] + for creator in batch_creators: + # 跳过没有邮箱的创作者 + if not creator.email: + result_stats['skipped'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'skipped', + 'reason': '缺少邮箱地址' + }) + continue + + try: + # 检查是否已存在对话 + existing_tracker = CreatorConversationTracker.objects.filter( + creator_profile=creator, + user=request.user, + user_email=user_email + ).first() + + if existing_tracker: + # 已存在追踪记录,跳过 + result_stats['skipped'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'skipped', + 'reason': f'已存在对话 {existing_tracker.conversation_id}' + }) + continue + + # 查找现有对话 + existing_conversation = GmailConversation.objects.filter( + user=request.user, + user_email=user_email, + influencer_email=creator.email + ).first() + + conversation_id = None + is_new_conversation = False + + if existing_conversation: + # 使用现有对话 + conversation = existing_conversation + conversation_id = conversation.conversation_id + # 激活对话 + conversation.is_active = True + conversation.save() + logger.info(f"找到并激活现有对话: {conversation_id}, 创作者={creator.name}") + else: + # 创建新对话 + conversation_id = f"gmail_{request.user.id}_{str(uuid.uuid4())[:8]}" + conversation = GmailConversation.objects.create( + user=request.user, + user_email=user_email, + influencer_email=creator.email, + conversation_id=conversation_id, + title=f"与 {creator.name} ({creator.email}) 的Gmail对话", + is_active=True, + has_sent_greeting=False, + metadata={ + 'auto_conversation': True, + 'batch_process': True, + 'creator_id': str(creator.id), + 'created_at': timezone.now().isoformat() + } + ) + is_new_conversation = True + logger.info(f"创建新的自动对话: {conversation_id}, 创作者={creator.name}") + + # 使用goal_service创建或更新目标 + goal, is_new_goal = get_or_create_goal( + user=request.user, + conversation_id=conversation_id, + goal_description=goal_description + ) + + # 创建跟踪记录 + tracker = CreatorConversationTracker.objects.create( + creator_profile=creator, + conversation_id=conversation_id, + user=request.user, + user_email=user_email, + influencer_email=creator.email + ) + + # 检查是否需要发送打招呼消息 + if not conversation.has_sent_greeting: + # 发送打招呼消息 + subject = "Paid Collaboration Opportunity with TikTok's #1 Fragrance Brand" + logger.info(f"开始向 {creator.email} 发送打招呼消息") + + # 使用GmailService发送邮件 + success, message_id = GmailService.send_email( + user=request.user, + user_email=user_email, + to_email=creator.email, + subject=subject, + body=greeting_message + ) + + if success: + # 将打招呼消息保存到聊天历史 + try: + # 查找或创建默认知识库 + knowledge_base = KnowledgeBase.objects.filter(user_id=request.user.id, type='private').first() + if knowledge_base: + # 创建聊天消息 + ChatHistory.objects.create( + user=request.user, + knowledge_base=knowledge_base, + conversation_id=conversation_id, + message_id=f"greeting_{message_id}", + title=conversation.title, + role="user", # 用户发出的消息 + content=greeting_message, + metadata={ + 'gmail_message_id': message_id, + 'from': user_email, + 'to': creator.email, + 'date': timezone.now().isoformat(), + 'subject': subject, + 'greeting': True, + 'source': 'gmail', + 'creator_id': str(creator.id) + } + ) + logger.info(f"打招呼消息已保存到聊天历史: {message_id}") + else: + logger.warning("未找到默认知识库,打招呼消息未保存到聊天历史") + except Exception as chat_error: + logger.error(f"保存打招呼消息到聊天历史失败: {str(chat_error)}") + + # 更新对话的has_sent_greeting字段 + conversation.has_sent_greeting = True + conversation.save(update_fields=['has_sent_greeting', 'updated_at']) + + result_stats['success'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'success', + 'conversation_id': conversation_id, + 'email': creator.email + }) + else: + # 发送失败 + logger.error(f"发送打招呼消息失败: {message_id}, 创作者={creator.name}") + result_stats['failed'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'failed', + 'reason': f"发送邮件失败: {message_id}" + }) + else: + # 已发送过打招呼消息 + logger.info(f"对话 {conversation_id} 已经发送过打招呼消息,不再重复发送") + result_stats['skipped'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'skipped', + 'reason': '已发送过打招呼消息', + 'conversation_id': conversation_id + }) + + except Exception as creator_error: + logger.error(f"处理创作者 {creator.name} 时出错: {str(creator_error)}") + result_stats['failed'] += 1 + result_stats['details'].append({ + 'creator_id': str(creator.id), + 'creator_name': creator.name, + 'status': 'error', + 'reason': str(creator_error) + }) + + # 设置Gmail推送通知 + notification_result, notification_error = GmailService.setup_gmail_push_notification( + user=request.user, + user_email=user_email + ) + + if not notification_result and notification_error: + logger.warning(f"设置Gmail推送通知失败: {notification_error},但批量对话创建成功") + + # 返回结果 + return Response({ + 'code': 200, + 'message': f"批量自动对话处理完成: 成功 {result_stats['success']}, 失败 {result_stats['failed']}, 跳过 {result_stats['skipped']}", + 'data': { + 'stats': result_stats, + 'push_notification': notification_result + } + }, status=status.HTTP_200_OK) + + except Exception as e: + logger.error(f"批量创建自动对话失败: {str(e)}") + error_details = traceback.format_exc() + return Response({ + 'code': 500, + 'message': f'批量创建自动对话失败: {str(e)}', + 'data': { + 'details': error_details[:500] + } + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + def get(self, request, *args, **kwargs): + """ + 获取所有批量创建的对话跟踪状态 + """ + try: + # 获取用户所有跟踪记录 + trackers = CreatorConversationTracker.objects.filter(user=request.user).select_related('creator_profile') + + # 统计数据 + stats = { + 'total': trackers.count(), + 'replied': trackers.filter(has_replied=True).count(), + 'goal_achieved': trackers.filter(goal_achieved=True).count() + } + + # 整理返回数据 + tracker_data = [] + for tracker in trackers: + tracker_data.append({ + 'id': str(tracker.id), + 'creator_id': str(tracker.creator_profile.id), + 'creator_name': tracker.creator_profile.name, + 'creator_email': tracker.influencer_email, + 'conversation_id': tracker.conversation_id, + 'has_replied': tracker.has_replied, + 'goal_achieved': tracker.goal_achieved, + 'created_at': tracker.created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': tracker.updated_at.strftime('%Y-%m-%d %H:%M:%S') + }) + + return Response({ + 'code': 200, + 'message': '获取跟踪状态成功', + 'data': { + 'stats': stats, + 'trackers': tracker_data + } + }) + + except Exception as e: + logger.error(f"获取跟踪状态失败: {str(e)}") + error_details = traceback.format_exc() + return Response({ + 'code': 500, + 'message': f'获取跟踪状态失败: {str(e)}', + 'data': { + 'details': error_details[:500] + } + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + diff --git a/apps/gmail/tasks.py b/apps/gmail/tasks.py index 51f8fc4..a0eb60b 100644 --- a/apps/gmail/tasks.py +++ b/apps/gmail/tasks.py @@ -95,6 +95,10 @@ def process_push_notification(self, message_data, message_id, subscription): processed_emails = GmailService.process_new_emails(user, credential, history_id) logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)} 封") + # 处理达人回复状态更新 + if processed_emails: + update_creator_conversation_status(user, processed_emails) + # 处理自动回复 if processed_emails: try: @@ -162,6 +166,112 @@ def process_push_notification(self, message_data, message_id, subscription): # 尝试重试任务 raise self.retry(exc=e, countdown=60) +def update_creator_conversation_status(user, email_ids): + """ + 根据新处理的邮件更新达人对话跟踪状态 + + Args: + user: 用户对象 + email_ids: 处理的邮件ID列表 + + Returns: + bool: 处理是否成功 + """ + try: + # 尝试导入CreatorConversationTracker模型 + try: + from apps.feishu.models import CreatorConversationTracker + except ImportError: + logger.warning("CreatorConversationTracker模型不存在,跳过更新达人对话状态") + return False + + # 遍历每个邮件ID + for email_id in email_ids: + # 通过邮件ID查找对应的聊天记录 + chat_msg = ChatHistory.objects.filter( + metadata__gmail_message_id=email_id + ).order_by('-created_at').first() + + if not chat_msg: + logger.info(f"邮件 {email_id} 未找到对应的聊天记录,跳过状态更新") + continue + + # 只处理达人的消息 + if chat_msg.role != 'assistant': + continue + + conversation_id = chat_msg.conversation_id + + # 查找关联的对话 + conversation = GmailConversation.objects.filter( + conversation_id=conversation_id + ).first() + + if not conversation: + logger.info(f"未找到对话 {conversation_id},跳过状态更新") + continue + + # 查找对应的跟踪记录 + tracker = CreatorConversationTracker.objects.filter( + conversation_id=conversation_id + ).first() + + if not tracker: + logger.info(f"未找到对话 {conversation_id} 的跟踪记录,跳过状态更新") + continue + + # 更新回复状态 + tracker.has_replied = True + + # 检查是否达成目标 + goal_achieved = check_goal_achievement(conversation_id) + if goal_achieved: + tracker.goal_achieved = True + + # 保存更新 + tracker.updated_at = timezone.now() + tracker.save() + + logger.info(f"更新了对话 {conversation_id} 的跟踪状态: has_replied=True, goal_achieved={goal_achieved}") + + return True + + except Exception as e: + logger.error(f"更新达人对话跟踪状态失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return False + +def check_goal_achievement(conversation_id): + """ + 检查对话是否达成目标(找到价格信息) + + Args: + conversation_id: 对话ID + + Returns: + bool: 是否达成目标 + """ + try: + # 获取对话中的所有消息 + messages = ChatHistory.objects.filter(conversation_id=conversation_id).order_by('created_at') + + # 简单判断:检查是否有回复中包含价格信息 + keywords = ['rate', 'price', 'cost', '$', 'usd', 'charge', 'fee'] + + for message in messages: + # 只检查达人的回复 + if message.role == 'assistant': + content_lower = message.content.lower() + if any(keyword in content_lower for keyword in keywords): + logger.info(f"对话 {conversation_id} 已达成目标,找到价格信息") + return True + + return False + + except Exception as e: + logger.error(f"检查目标达成失败: {str(e)}") + return False @shared_task( bind=True,