diff --git a/apps/chat/migrations/0003_alter_chathistory_content.py b/apps/chat/migrations/0003_alter_chathistory_content.py new file mode 100644 index 0000000..74d8014 --- /dev/null +++ b/apps/chat/migrations/0003_alter_chathistory_content.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2 on 2025-05-20 09:21 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('chat', '0002_alter_chathistory_content'), + ] + + operations = [ + migrations.AlterField( + model_name='chathistory', + name='content', + field=models.TextField(), + ), + ] diff --git a/apps/gmail/migrations/0010_processedpushnotification_unmatchedemail.py b/apps/gmail/migrations/0010_processedpushnotification_unmatchedemail.py new file mode 100644 index 0000000..68d587b --- /dev/null +++ b/apps/gmail/migrations/0010_processedpushnotification_unmatchedemail.py @@ -0,0 +1,53 @@ +# Generated by Django 5.2 on 2025-05-20 09:21 + +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('gmail', '0009_gmailconversation_has_sent_greeting_and_more'), + ] + + operations = [ + migrations.CreateModel( + name='ProcessedPushNotification', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('message_id', models.CharField(help_text='Pub/Sub消息ID', max_length=255, unique=True)), + ('email_address', models.EmailField(help_text='通知关联的Gmail邮箱', max_length=254)), + ('history_id', models.CharField(help_text='Gmail历史ID', max_length=100)), + ('processed_at', models.DateTimeField(auto_now_add=True, help_text='处理时间')), + ('is_successful', models.BooleanField(default=True, help_text='处理是否成功')), + ('metadata', models.JSONField(blank=True, default=dict, help_text='额外信息')), + ], + options={ + 'verbose_name': '已处理推送通知', + 'verbose_name_plural': '已处理推送通知', + 'db_table': 'gmail_processed_push_notifications', + 'ordering': ['-processed_at'], + 'indexes': [models.Index(fields=['message_id'], name='gmail_proce_message_912a0c_idx'), models.Index(fields=['email_address', 'history_id'], name='gmail_proce_email_a_2e3770_idx')], + }, + ), + migrations.CreateModel( + name='UnmatchedEmail', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('message_id', models.CharField(help_text='Gmail邮件ID', max_length=255, unique=True)), + ('user_id', models.IntegerField(help_text='用户ID')), + ('user_email', models.EmailField(help_text='用户Gmail邮箱', max_length=254)), + ('from_email', models.EmailField(help_text='发件人邮箱', max_length=254)), + ('to_email', models.EmailField(help_text='收件人邮箱', max_length=254)), + ('subject', models.CharField(blank=True, help_text='邮件主题', max_length=500)), + ('processed_at', models.DateTimeField(auto_now_add=True, help_text='处理时间')), + ], + options={ + 'verbose_name': '未匹配邮件', + 'verbose_name_plural': '未匹配邮件', + 'db_table': 'gmail_unmatched_emails', + 'ordering': ['-processed_at'], + 'indexes': [models.Index(fields=['message_id'], name='gmail_unmat_message_c1924d_idx'), models.Index(fields=['user_id'], name='gmail_unmat_user_id_ee06fe_idx'), models.Index(fields=['user_email', 'from_email'], name='gmail_unmat_user_em_a096af_idx')], + }, + ), + ] diff --git a/apps/gmail/migrations/0011_alter_unmatchedemail_user_id.py b/apps/gmail/migrations/0011_alter_unmatchedemail_user_id.py new file mode 100644 index 0000000..73e4ae2 --- /dev/null +++ b/apps/gmail/migrations/0011_alter_unmatchedemail_user_id.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2 on 2025-05-20 09:33 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('gmail', '0010_processedpushnotification_unmatchedemail'), + ] + + operations = [ + migrations.AlterField( + model_name='unmatchedemail', + name='user_id', + field=models.CharField(help_text='用户ID (UUID字符串形式)', max_length=36), + ), + ] diff --git a/apps/gmail/models.py b/apps/gmail/models.py index fc2b1f8..f587e29 100644 --- a/apps/gmail/models.py +++ b/apps/gmail/models.py @@ -182,4 +182,56 @@ class AutoReplyConfig(models.Model): """增加回复计数并更新时间""" self.current_replies += 1 self.last_reply_time = timezone.now() - self.save(update_fields=['current_replies', 'last_reply_time', 'updated_at']) \ No newline at end of file + self.save(update_fields=['current_replies', 'last_reply_time', 'updated_at']) + +class ProcessedPushNotification(models.Model): + """ + 已处理的推送通知记录,用于去重防止重复处理同一个通知 + """ + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + message_id = models.CharField(max_length=255, unique=True, help_text="Pub/Sub消息ID") + email_address = models.EmailField(help_text="通知关联的Gmail邮箱") + history_id = models.CharField(max_length=100, help_text="Gmail历史ID") + processed_at = models.DateTimeField(auto_now_add=True, help_text="处理时间") + is_successful = models.BooleanField(default=True, help_text="处理是否成功") + metadata = models.JSONField(default=dict, blank=True, help_text="额外信息") + + class Meta: + db_table = 'gmail_processed_push_notifications' + verbose_name = '已处理推送通知' + verbose_name_plural = '已处理推送通知' + ordering = ['-processed_at'] + indexes = [ + models.Index(fields=['message_id']), + models.Index(fields=['email_address', 'history_id']), + ] + + def __str__(self): + return f"{self.email_address} - {self.history_id} ({self.message_id})" + +class UnmatchedEmail(models.Model): + """ + 记录未匹配到对话的邮件,避免重复处理 + """ + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + message_id = models.CharField(max_length=255, unique=True, help_text="Gmail邮件ID") + user_id = models.CharField(max_length=36, help_text="用户ID (UUID字符串形式)") + user_email = models.EmailField(help_text="用户Gmail邮箱") + from_email = models.EmailField(help_text="发件人邮箱") + to_email = models.EmailField(help_text="收件人邮箱") + subject = models.CharField(max_length=500, blank=True, help_text="邮件主题") + processed_at = models.DateTimeField(auto_now_add=True, help_text="处理时间") + + class Meta: + db_table = 'gmail_unmatched_emails' + verbose_name = '未匹配邮件' + verbose_name_plural = '未匹配邮件' + ordering = ['-processed_at'] + indexes = [ + models.Index(fields=['message_id']), + models.Index(fields=['user_id']), + models.Index(fields=['user_email', 'from_email']), + ] + + def __str__(self): + return f"{self.user_email} - {self.from_email} - {self.subject}" \ No newline at end of file diff --git a/apps/gmail/services/gmail_service.py b/apps/gmail/services/gmail_service.py index 18d4eb1..705092b 100644 --- a/apps/gmail/services/gmail_service.py +++ b/apps/gmail/services/gmail_service.py @@ -780,6 +780,23 @@ class GmailService: return True else: logger.info(f"未找到匹配的对话: 用户邮箱={to_email}, 达人邮箱={from_email}") + + # 记录未匹配的邮件,避免重复处理 + from apps.gmail.models import UnmatchedEmail + + # 检查是否已经记录过 + existing = UnmatchedEmail.objects.filter(message_id=msg_id).exists() + if not existing: + # 创建未匹配邮件记录 + UnmatchedEmail.objects.create( + message_id=msg_id, + user_id=str(user.id), # 将UUID转换为字符串 + user_email=to_email, + from_email=from_email, + to_email=to_email, + subject=subject[:500] # 截取前500个字符,避免超出字段长度 + ) + logger.info(f"已记录未匹配的邮件: {msg_id}") else: logger.info(f"邮件不是发送给用户的: 发件人={from_email}, 收件人={to_email}, 用户邮箱={user_email}") diff --git a/apps/gmail/tasks.py b/apps/gmail/tasks.py new file mode 100644 index 0000000..67d3700 --- /dev/null +++ b/apps/gmail/tasks.py @@ -0,0 +1,257 @@ +import logging +import json +import base64 +from celery import shared_task +from django.utils import timezone +from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail +from .services.gmail_service import GmailService +from .services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply +from apps.chat.models import ChatHistory + +logger = logging.getLogger(__name__) + +@shared_task( + bind=True, + max_retries=3, + default_retry_delay=60, # 重试延迟60秒 + acks_late=True, # 任务完成后再确认 + time_limit=600, # 任务超时限制600秒 + soft_time_limit=300 # 软超时限制300秒 +) +def process_push_notification(self, message_data, message_id, subscription): + """ + 处理Gmail推送通知的Celery任务 + + Args: + message_data: Base64编码的消息数据 + message_id: 消息ID + subscription: 订阅信息 + + Returns: + bool: 处理是否成功 + """ + try: + logger.info(f"开始处理推送通知: {message_id}") + print(f"[Gmail Webhook Task] 处理推送通知: {message_id}") + + # 检查是否已处理过该消息 + if ProcessedPushNotification.objects.filter(message_id=message_id).exists(): + logger.info(f"通知 {message_id} 已处理过,跳过") + print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过") + return True + + # Base64解码消息数据 + try: + decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8')) + print(f"[Gmail Webhook Task] 解码后的数据: {decoded_data}") + + # 获取Gmail通知关键信息 + email_address = decoded_data.get('emailAddress') + history_id = decoded_data.get('historyId') + + if not email_address: + logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址") + # 记录失败的处理,防止重复处理 + ProcessedPushNotification.objects.create( + message_id=message_id, + email_address="unknown", + history_id=history_id or "unknown", + is_successful=False, + metadata=decoded_data + ) + return False + + # 创建处理记录 + notification_record = ProcessedPushNotification.objects.create( + message_id=message_id, + email_address=email_address, + history_id=history_id or "unknown", + metadata=decoded_data + ) + + # 查找对应的Gmail凭证 + credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first() + if not credential: + logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}") + notification_record.is_successful = False + notification_record.save() + return False + + user = credential.user + logger.info(f"[Gmail Webhook Task] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}") + + # 处理新邮件 + processed_emails = GmailService.process_new_emails(user, credential, history_id) + logger.info(f"[Gmail Webhook Task] 新邮件处理完成,共 {len(processed_emails)} 封") + + # 处理自动回复 + if processed_emails: + process_auto_replies.delay(user.id, credential.id, processed_emails) + + # 更新凭证的历史ID + if history_id: + credential.last_history_id = history_id + credential.save() + logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}") + + # 更新通知记录 + notification_record.is_successful = True + notification_record.save() + + return True + + except Exception as e: + logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(e)}") + # 记录失败的处理 + ProcessedPushNotification.objects.create( + message_id=message_id, + email_address="error", + history_id="error", + is_successful=False, + metadata={"error": str(e)} + ) + # 抛出异常以便Celery可能进行重试 + raise self.retry(exc=e, countdown=30) + + except Exception as e: + logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + # 尝试重试任务 + raise self.retry(exc=e, countdown=60) + + +@shared_task( + bind=True, + max_retries=2, + default_retry_delay=60, + time_limit=300, + soft_time_limit=240 +) +def process_auto_replies(self, user_id, credential_id, email_ids): + """ + 处理自动回复的任务 + + Args: + user_id: 用户ID + credential_id: Gmail凭证ID + email_ids: 需要处理的邮件ID列表 + + Returns: + bool: 处理是否成功 + """ + try: + from django.contrib.auth import get_user_model + User = get_user_model() + + user = User.objects.get(id=user_id) + credential = GmailCredential.objects.get(id=credential_id) + + logger.info(f"[Auto Reply Task] 开始处理{len(email_ids)}封邮件的自动回复") + + # 遍历每个处理过的邮件ID + for email_id in email_ids: + try: + # 通过邮件ID查找对应的聊天记录 + chat_msg = ChatHistory.objects.filter( + metadata__gmail_message_id=email_id + ).order_by('-created_at').first() + + if not chat_msg: + logger.info(f"[Auto Reply Task] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复") + continue + + conversation_id = chat_msg.conversation_id + + # 检查对话是否是活跃的 + conversation = GmailConversation.objects.filter( + conversation_id=conversation_id, + is_active=True + ).first() + + if not conversation: + logger.info(f"[Auto Reply Task] 对话 {conversation_id} 不是活跃的,跳过自动回复") + continue + + logger.info(f"[Auto Reply Task] 发现活跃对话 {conversation_id},准备自动回复") + + # 检查邮件角色,只有达人发送的邮件才自动回复 + if chat_msg.role != 'assistant': + logger.info(f"[Auto Reply Task] 邮件 {email_id} 不是达人发送的,跳过自动回复") + continue + + # 查找对话的目标 + goal = UserGoal.objects.filter( + conversation=conversation, + is_active=True + ).first() + + if not goal: + logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有活跃目标,跳过自动回复") + continue + + # 获取对话摘要 + conversation_summary = get_conversation_summary(conversation_id) + if not conversation_summary: + conversation_summary = "无对话摘要" + + # 获取最后一条达人消息 + last_message = get_last_message(conversation_id) + if not last_message: + logger.info(f"[Auto Reply Task] 对话 {conversation_id} 没有达人消息,跳过自动回复") + continue + + # 生成推荐回复 + reply_content, error = generate_recommended_reply( + user=user, + goal_description=goal.description, + conversation_summary=conversation_summary, + last_message=last_message + ) + + if error: + logger.error(f"[Auto Reply Task] 生成推荐回复失败: {error}") + continue + + if not reply_content: + logger.warning(f"[Auto Reply Task] 生成的推荐回复为空") + continue + + # 构建回复的主题 + subject = chat_msg.metadata.get('subject', '') + reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject + + # 准备发送自动回复 + logger.info(f"[Auto Reply Task] 准备发送自动回复: 从{conversation.user_email}到{conversation.influencer_email}") + + success, reply_message_id = GmailService.send_email( + user=user, + user_email=conversation.user_email, + to_email=conversation.influencer_email, + subject=reply_subject, + body=reply_content + ) + + if success: + logger.info(f"[Auto Reply Task] 已成功发送自动回复: {reply_message_id}") + + # 更新目标状态 + goal.last_activity_time = timezone.now() + if goal.status == 'pending': + goal.status = 'in_progress' + goal.save(update_fields=['last_activity_time', 'status', 'updated_at']) + else: + logger.error(f"[Auto Reply Task] 发送自动回复失败: {reply_message_id}") + + except Exception as reply_error: + logger.error(f"[Auto Reply Task] 处理自动回复过程中出错: {str(reply_error)}") + import traceback + logger.error(traceback.format_exc()) + + return True + + except Exception as e: + logger.error(f"[Auto Reply Task] 处理自动回复失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + raise self.retry(exc=e, countdown=30) \ No newline at end of file diff --git a/apps/gmail/views.py b/apps/gmail/views.py index 7b61027..a5cf10e 100644 --- a/apps/gmail/views.py +++ b/apps/gmail/views.py @@ -932,7 +932,7 @@ class GmailWebhookView(APIView): def post(self, request): """ 处理POST请求,接收Gmail Pub/Sub推送通知, - 从中获取邮箱地址和历史ID,然后保存最新邮件到数据库。 + 将推送通知交给Celery任务队列异步处理。 Args: request: Django REST Framework请求对象,包含Pub/Sub消息。 @@ -948,8 +948,6 @@ class GmailWebhookView(APIView): # 打印请求时间和基本信息 current_time = timezone.now() print(f"接收时间: {current_time.strftime('%Y-%m-%d %H:%M:%S.%f')}") - print(f"请求头: {dict(request.headers)}") - print(f"原始数据: {request.data}") # 解析推送消息 message = request.data.get('message', {}) @@ -968,180 +966,27 @@ class GmailWebhookView(APIView): 'data': None }, status=status.HTTP_400_BAD_REQUEST) - # Base64解码消息数据 - try: - decoded_data = json.loads(base64.b64decode(data).decode('utf-8')) - print(f"[Gmail Webhook] 解码后的数据: {decoded_data}") - - # 获取Gmail通知关键信息 - email_address = decoded_data.get('emailAddress') - history_id = decoded_data.get('historyId') - - print(f"邮箱地址: {email_address}") - print(f"历史ID: {history_id}") - - if not email_address: - print("[Gmail Webhook] 错误: 推送数据缺少邮箱地址") - return Response({ - 'code': 400, - 'message': '推送数据缺少邮箱地址', - 'data': None - }, status=status.HTTP_400_BAD_REQUEST) - - # 查找对应的Gmail凭证 - credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first() - if credential: - user = credential.user - print(f"[Gmail Webhook] 找到有效凭证: 用户ID {user.id}, 邮箱 {email_address}") - - # 获取并保存最新邮件 - print("[Gmail Webhook] 开始获取最新邮件...") - - try: - # 使用GmailService的静态方法处理新邮件 - processed_emails = GmailService.process_new_emails(user, credential, history_id) - print(f"[Gmail Webhook] 新邮件处理完成,共 {len(processed_emails)} 封") - - # 处理活跃对话的自动回复 - if processed_emails: - print(f"[Gmail Webhook] 开始处理自动回复...") - - # 获取刚处理的邮件的对应的对话信息 - from apps.gmail.models import GmailConversation, UserGoal - from apps.chat.models import ChatHistory - - # 遍历每个处理过的邮件ID - for email_id in processed_emails: - try: - # 通过邮件ID查找对应的聊天记录 - chat_msg = ChatHistory.objects.filter( - metadata__gmail_message_id=email_id - ).order_by('-created_at').first() - - if not chat_msg: - print(f"[Gmail Webhook] 邮件 {email_id} 未找到对应的聊天记录,跳过自动回复") - continue - - conversation_id = chat_msg.conversation_id - - # 检查对话是否是活跃的 - conversation = GmailConversation.objects.filter( - conversation_id=conversation_id, - is_active=True - ).first() - - if not conversation: - print(f"[Gmail Webhook] 对话 {conversation_id} 不是活跃的,跳过自动回复") - continue - - print(f"[Gmail Webhook] 发现活跃对话 {conversation_id},准备自动回复") - - # 检查邮件角色,只有达人发送的邮件才自动回复 - if chat_msg.role != 'assistant': - print(f"[Gmail Webhook] 邮件 {email_id} 不是达人发送的,跳过自动回复") - continue - - # 查找对话的目标 - goal = UserGoal.objects.filter( - conversation=conversation, - is_active=True - ).first() - - if not goal: - print(f"[Gmail Webhook] 对话 {conversation_id} 没有活跃目标,跳过自动回复") - continue - - # 获取对话摘要 - from apps.gmail.services.goal_service import get_conversation_summary, get_last_message, generate_recommended_reply - - conversation_summary = get_conversation_summary(conversation_id) - if not conversation_summary: - conversation_summary = "无对话摘要" - - # 获取最后一条达人消息 - last_message = get_last_message(conversation_id) - if not last_message: - print(f"[Gmail Webhook] 对话 {conversation_id} 没有达人消息,跳过自动回复") - continue - - # 生成推荐回复 - reply_content, error = generate_recommended_reply( - user=user, - goal_description=goal.description, - conversation_summary=conversation_summary, - last_message=last_message - ) - - if error: - print(f"[Gmail Webhook] 生成推荐回复失败: {error}") - continue - - if not reply_content: - print(f"[Gmail Webhook] 生成的推荐回复为空") - continue - - # 构建回复的主题 - subject = chat_msg.metadata.get('subject', '') - reply_subject = f"回复: {subject}" if not subject.startswith('回复:') else subject - - # 准备发送自动回复 - print(f"[Gmail Webhook] 准备发送自动回复: 从{conversation.user_email}到{conversation.influencer_email}") - - success, reply_message_id = GmailService.send_email( - user=user, - user_email=conversation.user_email, - to_email=conversation.influencer_email, - subject=reply_subject, - body=reply_content - ) - - if success: - print(f"[Gmail Webhook] 已成功发送自动回复: {reply_message_id}") - - # 更新目标状态 - goal.last_activity_time = timezone.now() - if goal.status == 'pending': - goal.status = 'in_progress' - goal.save(update_fields=['last_activity_time', 'status', 'updated_at']) - else: - print(f"[Gmail Webhook] 发送自动回复失败: {reply_message_id}") - - except Exception as reply_error: - print(f"[Gmail Webhook] 处理自动回复过程中出错: {str(reply_error)}") - import traceback - print(traceback.format_exc()) - - # 更新凭证的历史ID(移到处理完成后再更新) - if history_id: - credential.last_history_id = history_id - credential.save() - print(f"[Gmail Webhook] 已更新凭证历史ID: {history_id}") - - except Exception as e: - print(f"[Gmail Webhook] 获取最新邮件失败: {str(e)}") - logger.error(f"获取最新邮件失败: {str(e)}") - import traceback - print(traceback.format_exc()) - else: - print(f"[Gmail Webhook] 警告: 未找到对应的Gmail凭证: {email_address}") - logger.warning(f"收到推送通知,但未找到对应的Gmail凭证: {email_address}") - - except Exception as e: - print(f"[Gmail Webhook] 解析推送数据失败: {str(e)}") - logger.error(f"解析推送数据失败: {str(e)}") + # 检查该消息是否已处理过 + from .models import ProcessedPushNotification + if ProcessedPushNotification.objects.filter(message_id=message_id).exists(): + print(f"[Gmail Webhook] 通知 {message_id} 已处理过,跳过") return Response({ - 'code': 400, - 'message': f'解析推送数据失败: {str(e)}', + 'code': 200, + 'message': '通知已处理过,跳过', 'data': None - }, status=status.HTTP_400_BAD_REQUEST) - - print("[Gmail Webhook] 推送处理完成") + }) + + # 将消息提交到Celery任务队列进行异步处理 + from .tasks import process_push_notification + process_push_notification.delay(data, message_id, subscription) + + print("[Gmail Webhook] 已将推送通知提交到任务队列处理") print("="*100 + "\n") # 返回成功响应 return Response({ 'code': 200, - 'message': '推送通知处理成功', + 'message': '推送通知已接收并提交到任务队列', 'data': None }) diff --git a/apps/template/migrations/0002_remove_template_created_by.py b/apps/template/migrations/0002_remove_template_created_by.py new file mode 100644 index 0000000..a3dafce --- /dev/null +++ b/apps/template/migrations/0002_remove_template_created_by.py @@ -0,0 +1,17 @@ +# Generated by Django 5.2 on 2025-05-20 09:21 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('template', '0001_initial'), + ] + + operations = [ + migrations.RemoveField( + model_name='template', + name='created_by', + ), + ] diff --git a/daren_project/__init__.py b/daren_project/__init__.py index e69de29..8ca2c7f 100644 --- a/daren_project/__init__.py +++ b/daren_project/__init__.py @@ -0,0 +1,7 @@ +# 配置Django启动时加载Celery +from __future__ import absolute_import, unicode_literals + +# 确保celery app在Django启动时被加载 +from .celery import app as celery_app + +__all__ = ('celery_app',) diff --git a/daren_project/celery.py b/daren_project/celery.py new file mode 100644 index 0000000..98ea2fd --- /dev/null +++ b/daren_project/celery.py @@ -0,0 +1,19 @@ +import os +from celery import Celery +from django.conf import settings + +# 设置Django默认settings模块 +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'daren_project.settings') + +# 创建Celery应用 +app = Celery('daren_project') + +# 使用Django settings的CELERY_配置值 +app.config_from_object('django.conf:settings', namespace='CELERY') + +# 自动发现并注册tasks模块中的任务 +app.autodiscover_tasks() + +@app.task(bind=True, ignore_result=True) +def debug_task(self): + print(f'Request: {self.request!r}') \ No newline at end of file diff --git a/daren_project/settings.py b/daren_project/settings.py index c0ba028..085a806 100644 --- a/daren_project/settings.py +++ b/daren_project/settings.py @@ -56,6 +56,8 @@ INSTALLED_APPS = [ 'apps.operation', 'apps.discovery', # 新添加的Discovery应用 'apps.template', # 新添加的Template应用 + 'django_celery_beat', # Celery定时任务 + 'django_celery_results', # Celery结果存储 ] MIDDLEWARE = [ @@ -140,6 +142,7 @@ USE_TZ = True # 静态文件和模板 STATIC_URL = '/static/' STATICFILES_DIRS = [BASE_DIR / 'static'] +STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles') # Default primary key field type # https://docs.djangoproject.com/en/5.2/ref/settings/#default-auto-field @@ -192,13 +195,31 @@ PROXY_URL = 'http://127.0.0.1:7890' # Gmail Pub/Sub相关设置 -GOOGLE_CLOUD_PROJECT_ID = 'knowledge-454905' # 替换为您的Google Cloud项目ID +GOOGLE_CLOUD_PROJECT_ID = 'knowledge-454905' # 主题名称 GMAIL_PUBSUB_TOPIC = 'gmail-watch-topic' # 设置允许使用Google Pub/Sub的应用列表 INSTALLED_APPS += ['google.cloud.pubsub'] +# Celery配置 +CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作为消息代理 +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 使用Redis存储任务结果 +CELERY_ACCEPT_CONTENT = ['json'] # 指定接受的内容类型 +CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用JSON +CELERY_RESULT_SERIALIZER = 'json' # 结果序列化使用JSON +CELERY_TIMEZONE = 'Asia/Shanghai' # 使用上海时区 +CELERY_TASK_TRACK_STARTED = True # 追踪任务的开始状态 +CELERY_TASK_TIME_LIMIT = 300 # 任务的hard time limit +CELERY_TASK_SOFT_TIME_LIMIT = 240 # 任务的soft time limit +CELERY_WORKER_MAX_TASKS_PER_CHILD = 500 # 工作进程处理多少个任务后重启 + +# Windows平台Celery特定设置 +CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # 启动时尝试重新连接 +CELERY_WORKER_CONCURRENCY = 1 # Windows下使用单进程模式,避免权限错误 +CELERY_TASK_ALWAYS_EAGER = False # 不要在Django主进程中执行任务 +CELERY_BROKER_HEARTBEAT = 0 # 禁用心跳检测,解决Windows的问题 + FEISHU_APP_ID = "cli_a5c97daacb9e500d" FEISHU_APP_SECRET = "fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y" FEISHU_DEFAULT_APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg" diff --git a/query b/query new file mode 100644 index 0000000..7800f0f --- /dev/null +++ b/query @@ -0,0 +1 @@ +redis diff --git a/requirements.txt b/requirements.txt index b486196..07849d5 100644 Binary files a/requirements.txt and b/requirements.txt differ