This commit is contained in:
wanjia 2025-05-29 18:45:06 +08:00
parent 8f92d70af9
commit 0e62ef70ef
4 changed files with 144 additions and 33 deletions

View File

@ -1,6 +1,7 @@
import logging
import json
import base64
import traceback
from celery import shared_task
from django.utils import timezone
from .models import GmailCredential, GmailConversation, UserGoal, ProcessedPushNotification, UnmatchedEmail
@ -40,6 +41,18 @@ def process_push_notification(self, message_data, message_id, subscription):
print(f"[Gmail Webhook Task] 通知 {message_id} 已处理过,跳过")
return True
# 尝试创建一个处理记录,标记为开始处理
notification_record = ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="processing",
history_id="unknown",
is_successful=False,
metadata={
"subscription": subscription,
"started_at": timezone.now().isoformat()
}
)
# Base64解码消息数据
try:
decoded_data = json.loads(base64.b64decode(message_data).decode('utf-8'))
@ -51,29 +64,27 @@ def process_push_notification(self, message_data, message_id, subscription):
if not email_address:
logger.error("[Gmail Webhook Task] 错误: 推送数据缺少邮箱地址")
# 记录失败的处理,防止重复处理
ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="unknown",
history_id=history_id or "unknown",
is_successful=False,
metadata=decoded_data
)
# 更新处理记录
notification_record.metadata.update({
"error": "推送数据缺少邮箱地址",
"decoded_data": decoded_data
})
notification_record.save()
return False
# 创建处理记录
notification_record = ProcessedPushNotification.objects.create(
message_id=message_id,
email_address=email_address,
history_id=history_id or "unknown",
metadata=decoded_data
)
# 更新处理记录
notification_record.email_address = email_address
notification_record.history_id = history_id or "unknown"
notification_record.metadata.update(decoded_data)
notification_record.save()
# 查找对应的Gmail凭证
credential = GmailCredential.objects.filter(email=email_address, is_valid=True).first()
if not credential:
logger.warning(f"[Gmail Webhook Task] 警告: 未找到对应的Gmail凭证: {email_address}")
notification_record.is_successful = False
notification_record.metadata.update({
"error": f"未找到对应的Gmail凭证: {email_address}"
})
notification_record.save()
return False
@ -86,7 +97,16 @@ def process_push_notification(self, message_data, message_id, subscription):
# 处理自动回复
if processed_emails:
process_auto_replies.delay(user.id, credential.id, processed_emails)
try:
process_auto_replies.delay(user.id, credential.id, processed_emails)
logger.info(f"[Gmail Webhook Task] 已提交自动回复任务处理 {len(processed_emails)} 封邮件")
except Exception as reply_task_error:
logger.error(f"[Gmail Webhook Task] 提交自动回复任务失败: {str(reply_task_error)}")
notification_record.metadata.update({
"auto_reply_error": str(reply_task_error),
"processed_emails": processed_emails
})
notification_record.save()
# 更新凭证的历史ID
if history_id:
@ -94,29 +114,51 @@ def process_push_notification(self, message_data, message_id, subscription):
credential.save()
logger.info(f"[Gmail Webhook Task] 已更新凭证历史ID: {history_id}")
# 更新通知记录
# 更新通知记录为成功
notification_record.is_successful = True
notification_record.metadata.update({
"completed_at": timezone.now().isoformat(),
"processed_emails_count": len(processed_emails)
})
notification_record.save()
return True
except Exception as e:
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(e)}")
# 记录失败的处理
ProcessedPushNotification.objects.create(
message_id=message_id,
email_address="error",
history_id="error",
is_successful=False,
metadata={"error": str(e)}
)
except json.JSONDecodeError as e:
logger.error(f"[Gmail Webhook Task] JSON解码失败: {str(e)}")
notification_record.metadata.update({
"error": f"JSON解码失败: {str(e)}",
"raw_data": message_data[:100] if message_data else "None" # 只保存前100个字符避免数据过大
})
notification_record.save()
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=e, countdown=30)
except Exception as decode_error:
logger.error(f"[Gmail Webhook Task] 解析推送数据失败: {str(decode_error)}")
notification_record.metadata.update({
"error": f"解析推送数据失败: {str(decode_error)}",
"traceback": traceback.format_exc()
})
notification_record.save()
# 抛出异常以便Celery可能进行重试
raise self.retry(exc=decode_error, countdown=30)
except Exception as e:
logger.error(f"[Gmail Webhook Task] 处理推送通知失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试更新通知记录
try:
notification_record = ProcessedPushNotification.objects.filter(message_id=message_id).first()
if notification_record:
notification_record.metadata.update({
"error": str(e),
"traceback": traceback.format_exc(),
"failed_at": timezone.now().isoformat()
})
notification_record.save()
except Exception as record_error:
logger.error(f"[Gmail Webhook Task] 更新通知记录失败: {str(record_error)}")
# 尝试重试任务
raise self.retry(exc=e, countdown=60)

63
daren/celery.py Normal file
View File

@ -0,0 +1,63 @@
import os
from celery import Celery
from django.conf import settings
import logging
# 设置Django默认settings模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'daren.settings')
# 创建日志记录器
logger = logging.getLogger(__name__)
# 在应用启动时检查Redis连接
def check_redis_connection():
try:
import redis
from django.conf import settings
# 从CELERY_BROKER_URL解析Redis连接信息
broker_url = settings.CELERY_BROKER_URL
host = '127.0.0.1' # 默认
port = 6379 # 默认
db = 0 # 默认
# 简单解析broker_url (格式如: redis://127.0.0.1:6379/0)
if broker_url.startswith('redis://'):
parts = broker_url[8:].split('/')
if len(parts) > 1:
db = int(parts[1])
host_port = parts[0].split(':')
host = host_port[0]
if len(host_port) > 1:
port = int(host_port[1])
# 尝试连接
r = redis.Redis(host=host, port=port, db=db, socket_timeout=10)
r.ping()
logger.info(f"成功连接到Redis服务器: {host}:{port}")
return True, None
except Exception as e:
error_msg = f"无法连接到Redis服务器: {str(e)}"
logger.error(error_msg)
return False, error_msg
# 检查Redis连接
redis_ok, redis_error = check_redis_connection()
if not redis_ok:
logger.critical(f"无法启动CeleryRedis连接失败: {redis_error}")
print(f"\n\n[Celery错误] Redis连接失败: {redis_error}\n")
print("请确保Redis服务正在运行并且配置正确。如果问题仍然存在请尝试修改settings.py中的CELERY_BROKER_URL。\n\n")
# 创建Celery应用
app = Celery('daren')
# 使用Django settings的CELERY_配置值
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现并注册tasks模块中的任务
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')

View File

@ -53,7 +53,9 @@ INSTALLED_APPS = [
"apps.common.apps.CommonConfig",
"apps.knowledge_base.apps.KnowledgeBaseConfig",
"apps.gmail.apps.GmailConfig",
"apps.feishu.apps.FeishuConfig"
"apps.feishu.apps.FeishuConfig",
'django_celery_beat', # Celery定时任务
'django_celery_results', # Celery结果存储
]
MIDDLEWARE = [
@ -242,7 +244,7 @@ SIMPLE_JWT = {
API_BASE_URL = 'http://81.69.223.133:48329'
SILICON_CLOUD_API_KEY = 'sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf'
GMAIL_WEBHOOK_URL = 'https://325a-180-159-100-165.ngrok-free.app/api/gmail/webhook/'
GMAIL_WEBHOOK_URL = 'https://ccf7-180-159-100-165.ngrok-free.app/api/gmail/webhook/'
APPLICATION_ID = 'd5d11efa-ea9a-11ef-9933-0242ac120006'
@ -261,8 +263,8 @@ GMAIL_PUBSUB_TOPIC = 'gmail-watch-topic'
# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 使用Redis存储任务结果
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # 使用具体IP地址而不是localhost
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 使用具体IP地址而不是localhost
CELERY_ACCEPT_CONTENT = ['json'] # 指定接受的内容类型
CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用JSON
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化使用JSON
@ -274,9 +276,13 @@ CELERY_WORKER_MAX_TASKS_PER_CHILD = 500 # 工作进程处理多少个任务后
# Windows平台Celery特定设置
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # 启动时尝试重新连接
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10 # 最大重试次数
CELERY_WORKER_CONCURRENCY = 1 # Windows下使用单进程模式避免权限错误
CELERY_TASK_ALWAYS_EAGER = False # 不要在Django主进程中执行任务
CELERY_BROKER_HEARTBEAT = 0 # 禁用心跳检测解决Windows的问题
CELERY_BROKER_CONNECTION_TIMEOUT = 30 # 设置连接超时时间
CELERY_REDIS_SOCKET_TIMEOUT = 30 # Redis套接字超时
CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 30 # Redis连接超时
FEISHU_APP_ID = "cli_a5c97daacb9e500d"
FEISHU_APP_SECRET = "fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y"

Binary file not shown.