From 0e62ef70ef117b77eda355929a9b55e405ff1128 Mon Sep 17 00:00:00 2001 From: wanjia Date: Thu, 29 May 2025 18:45:06 +0800 Subject: [PATCH] demo2 --- apps/gmail/tasks.py | 100 +++++++++++++++++++++++++++++++------------- daren/celery.py | 63 ++++++++++++++++++++++++++++ daren/settings.py | 14 +++++-- requirements.txt | Bin 2790 -> 3698 bytes 4 files changed, 144 insertions(+), 33 deletions(-) create mode 100644 daren/celery.py diff --git a/apps/gmail/tasks.py b/apps/gmail/tasks.py index 67d3700..51f8fc4 100644 --- a/apps/gmail/tasks.py +++ b/apps/gmail/tasks.py @@ -1,6 +1,7 @@ import logging import json import base64 +import traceback from celery import shared_task from django.utils import timezone from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail @@ -40,6 +41,18 @@ def process_push_notification(self, message_data, message_id, subscription): print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过") return True + # 尝试创建一个处理记录,标记为开始处理 + notification_record = ProcessedPushNotification.objects.create( + message_id=message_id, + email_address="processing", + history_id="unknown", + is_successful=False, + metadata={ + "subscription": subscription, + "started_at": timezone.now().isoformat() + } + ) + # Base64解码消息数据 try: decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8')) @@ -51,29 +64,27 @@ def process_push_notification(self, message_data, message_id, subscription): if not email_address: logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址") - # 记录失败的处理,防止重复处理 - ProcessedPushNotification.objects.create( - message_id=message_id, - email_address="unknown", - history_id=history_id or "unknown", - is_successful=False, - metadata=decoded_data - ) + # 更新处理记录 + notification_record.metadata.update({ + "error": "推送数据缺少邮箱地址", + "decoded_data": decoded_data + }) + notification_record.save() return False - # 创建处理记录 - notification_record = ProcessedPushNotification.objects.create( - message_id=message_id, - email_address=email_address, - history_id=history_id or "unknown", - metadata=decoded_data - ) + # 更新处理记录 + notification_record.email_address = email_address + notification_record.history_id = history_id or "unknown" + notification_record.metadata.update(decoded_data) + notification_record.save() # 查找对应的Gmail凭证 credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first() if not credential: logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}") - notification_record.is_successful = False + notification_record.metadata.update({ + "error": f"未找到对应的Gmail凭证: {email_address}" + }) notification_record.save() return False @@ -86,7 +97,16 @@ def process_push_notification(self, message_data, message_id, subscription): # 处理自动回复 if processed_emails: - process_auto_replies.delay(user.id, credential.id, processed_emails) + try: + process_auto_replies.delay(user.id, credential.id, processed_emails) + logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件") + except Exception as reply_task_error: + logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}") + notification_record.metadata.update({ + "auto_reply_error": str(reply_task_error), + "processed_emails": processed_emails + }) + notification_record.save() # 更新凭证的历史ID if history_id: @@ -94,29 +114,51 @@ def process_push_notification(self, message_data, message_id, subscription): credential.save() logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}") - # 更新通知记录 + # 更新通知记录为成功 notification_record.is_successful = True + notification_record.metadata.update({ + "completed_at": timezone.now().isoformat(), + "processed_emails_count": len(processed_emails) + }) notification_record.save() return True - except Exception as e: - logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(e)}") - # 记录失败的处理 - ProcessedPushNotification.objects.create( - message_id=message_id, - email_address="error", - history_id="error", - is_successful=False, - metadata={"error": str(e)} - ) + except json.JSONDecodeError as e: + logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}") + notification_record.metadata.update({ + "error": f"JSON解码失败: {str(e)}", + "raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符,避免数据过大 + }) + notification_record.save() # 抛出异常以便Celery可能进行重试 raise self.retry(exc=e, countdown=30) + except Exception as decode_error: + logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}") + notification_record.metadata.update({ + "error": f"解析推送数据失败: {str(decode_error)}", + "traceback": traceback.format_exc() + }) + notification_record.save() + # 抛出异常以便Celery可能进行重试 + raise self.retry(exc=decode_error, countdown=30) except Exception as e: logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}") - import traceback logger.error(traceback.format_exc()) + # 尝试更新通知记录 + try: + notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first() + if notification_record: + notification_record.metadata.update({ + "error": str(e), + "traceback": traceback.format_exc(), + "failed_at": timezone.now().isoformat() + }) + notification_record.save() + except Exception as record_error: + logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}") + # 尝试重试任务 raise self.retry(exc=e, countdown=60) diff --git a/daren/celery.py b/daren/celery.py new file mode 100644 index 0000000..c68644d --- /dev/null +++ b/daren/celery.py @@ -0,0 +1,63 @@ +import os +from celery import Celery +from django.conf import settings +import logging + +# 设置Django默认settings模块 +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'daren.settings') + +# 创建日志记录器 +logger = logging.getLogger(__name__) + +# 在应用启动时检查Redis连接 +def check_redis_connection(): + try: + import redis + from django.conf import settings + + # 从CELERY_BROKER_URL解析Redis连接信息 + broker_url = settings.CELERY_BROKER_URL + host = '127.0.0.1' # 默认 + port = 6379 # 默认 + db = 0 # 默认 + + # 简单解析broker_url (格式如: redis://127.0.0.1:6379/0) + if broker_url.startswith('redis://'): + parts = broker_url[8:].split('/') + if len(parts) > 1: + db = int(parts[1]) + + host_port = parts[0].split(':') + host = host_port[0] + if len(host_port) > 1: + port = int(host_port[1]) + + # 尝试连接 + r = redis.Redis(host=host, port=port, db=db, socket_timeout=10) + r.ping() + logger.info(f"成功连接到Redis服务器: {host}:{port}") + return True, None + except Exception as e: + error_msg = f"无法连接到Redis服务器: {str(e)}" + logger.error(error_msg) + return False, error_msg + +# 检查Redis连接 +redis_ok, redis_error = check_redis_connection() +if not redis_ok: + logger.critical(f"无法启动Celery,Redis连接失败: {redis_error}") + print(f"\n\n[Celery错误] Redis连接失败: {redis_error}\n") + print("请确保Redis服务正在运行,并且配置正确。如果问题仍然存在,请尝试修改settings.py中的CELERY_BROKER_URL。\n\n") + +# 创建Celery应用 +app = Celery('daren') + +# 使用Django settings的CELERY_配置值 +app.config_from_object('django.conf:settings', namespace='CELERY') + +# 自动发现并注册tasks模块中的任务 +app.autodiscover_tasks() + +@app.task(bind=True, ignore_result=True) +def debug_task(self): + print(f'Request: {self.request!r}') \ No newline at end of file diff --git a/daren/settings.py b/daren/settings.py index 0646d4a..c45a0ec 100644 --- a/daren/settings.py +++ b/daren/settings.py @@ -53,7 +53,9 @@ INSTALLED_APPS = [ "apps.common.apps.CommonConfig", "apps.knowledge_base.apps.KnowledgeBaseConfig", "apps.gmail.apps.GmailConfig", - "apps.feishu.apps.FeishuConfig" + "apps.feishu.apps.FeishuConfig", + 'django_celery_beat', # Celery定时任务 + 'django_celery_results', # Celery结果存储 ] MIDDLEWARE = [ @@ -242,7 +244,7 @@ SIMPLE_JWT = { API_BASE_URL = 'http://81.69.223.133:48329' SILICON_CLOUD_API_KEY = 'sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf' -GMAIL_WEBHOOK_URL = 'https://325a-180-159-100-165.ngrok-free.app/api/gmail/webhook/' +GMAIL_WEBHOOK_URL = 'https://ccf7-180-159-100-165.ngrok-free.app/api/gmail/webhook/' APPLICATION_ID = 'd5d11efa-ea9a-11ef-9933-0242ac120006' @@ -261,8 +263,8 @@ GMAIL_PUBSUB_TOPIC = 'gmail-watch-topic' # Celery配置 -CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作为消息代理 -CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 使用Redis存储任务结果 +CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # 使用具体IP地址而不是localhost +CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 使用具体IP地址而不是localhost CELERY_ACCEPT_CONTENT = ['json'] # 指定接受的内容类型 CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用JSON CELERY_RESULT_SERIALIZER = 'json' # 结果序列化使用JSON @@ -274,9 +276,13 @@ CELERY_WORKER_MAX_TASKS_PER_CHILD = 500 # 工作进程处理多少个任务后 # Windows平台Celery特定设置 CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # 启动时尝试重新连接 +CELERY_BROKER_CONNECTION_MAX_RETRIES = 10 # 最大重试次数 CELERY_WORKER_CONCURRENCY = 1 # Windows下使用单进程模式,避免权限错误 CELERY_TASK_ALWAYS_EAGER = False # 不要在Django主进程中执行任务 CELERY_BROKER_HEARTBEAT = 0 # 禁用心跳检测,解决Windows的问题 +CELERY_BROKER_CONNECTION_TIMEOUT = 30 # 设置连接超时时间 +CELERY_REDIS_SOCKET_TIMEOUT = 30 # Redis套接字超时 +CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 30 # Redis连接超时 FEISHU_APP_ID = "cli_a5c97daacb9e500d" FEISHU_APP_SECRET = "fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y" diff --git a/requirements.txt b/requirements.txt index 8db237aa3521e8ca544b22386cbcaa1b31b16f9f..28416f1606677e572005b3598b32a455d78a1965 100644 GIT binary patch delta 798 zcmZWny-vbl6uoT%BE&clV=x*+95gX#3l?xNP7cPA!NDluPpteDDTwIcBPjVM<5L(O z!-*$w^8r|0JolCY#xzaZ@1A?^Ip==gE93RotJOADs7yAwbcDyGJxbv<2f5v7gYKv$ z$JX??=f+*?Qb0i zRYCiBW?YGCg39_RTB0yc{aQj~mNA^i@z?@nFt33Q5&f^Dh3+m@!QPxR$R^Es><&gJ zAf^Jl^*@xx1m@t}p|*+#qgq-;^whQkN|sasaS6RPdOM7fejdSgEoJFXn0VwOM^?S*$CtA-AHmWDCPCL=!3lzI=J{Q}UqkKO

*a0e&AOf0`d;=_#*aLEtfCMPB90fE2lW+&h hlU4|dvk(cs0h2fjB9od645NYz0kfP8Q~{GL4n|Lu8h`)*