daren_project/user_management/tasks.py

143 lines
5.0 KiB
Python
Raw Permalink Normal View History

2025-04-29 10:22:57 +08:00
import os
import logging
import requests
from celery import shared_task
from django.utils import timezone
from django.conf import settings
logger = logging.getLogger(__name__)
@shared_task
def publish_scheduled_video(video_id):
"""定时发布视频的任务"""
from .models import Video
try:
# 获取视频记录
video = Video.objects.get(id=video_id)
# 检查视频状态是否为已排期
if video.status != 'scheduled':
logger.warning(f"视频 {video_id} 状态不是'已排期',当前状态: {video.status},跳过发布")
return
# 检查视频文件是否存在
if not video.local_path or not os.path.exists(video.local_path):
logger.error(f"视频 {video_id} 的本地文件不存在: {video.local_path}")
video.status = 'failed'
video.save()
return
# 模拟上传到平台的过程
# 在实际应用中这里需要根据不同平台调用不同的API
platform_account = video.platform_account
platform_name = platform_account.platform_name
# 模拟成功上传并获取视频URL
video_url = f"https://example.com/{platform_name}/{video.id}"
video_id = f"VID_{video.id}"
# 在实际应用中这里应该调用各平台的API
logger.info(f"模拟上传视频 {video.id}{platform_name} 平台")
# 更新视频状态
video.status = 'published'
video.publish_time = timezone.now()
video.video_url = video_url
video.video_id = video_id
video.save()
logger.info(f"视频 {video.id} 已成功发布到 {platform_name} 平台")
# 记录到知识库
_update_knowledge_base(video)
return {
"success": True,
"video_id": video.id,
"platform": platform_name,
"publish_time": video.publish_time.strftime("%Y-%m-%d %H:%M:%S"),
"video_url": video_url
}
except Video.DoesNotExist:
logger.error(f"未找到ID为 {video_id} 的视频记录")
return {"success": False, "error": f"未找到ID为 {video_id} 的视频记录"}
except Exception as e:
logger.error(f"发布视频 {video_id} 失败: {str(e)}")
# 尝试更新视频状态为失败
try:
video = Video.objects.get(id=video_id)
video.status = 'failed'
video.save()
except:
pass
return {"success": False, "error": str(e)}
def _update_knowledge_base(video):
"""更新知识库中的视频信息"""
from .models import KnowledgeBase, KnowledgeBaseDocument
try:
# 获取关联的平台账号和运营账号
platform_account = video.platform_account
operator = platform_account.operator
# 查找对应的知识库
knowledge_base = KnowledgeBase.objects.filter(
name__contains=operator.real_name,
type='private'
).first()
if not knowledge_base:
logger.warning(f"未找到与运营账号 {operator.real_name} 关联的知识库")
return
# 查找相关的文档
document = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
document_name__contains=video.title,
status='active'
).first()
if not document:
logger.warning(f"未找到与视频 {video.title} 关联的知识库文档")
return
# 在实际应用中这里应该调用外部API更新文档内容
logger.info(f"更新知识库文档 {document.document_id} 的视频发布状态")
# 模拟更新文档内容
# 这里只记录日志实际应用中需要调用外部API
except Exception as e:
logger.error(f"更新知识库失败: {str(e)}")
@shared_task
def check_scheduled_videos():
"""定期检查计划发布的视频,处理未被正确调度的视频"""
from .models import Video
from datetime import timedelta
try:
# 查找所有已经过了计划发布时间但仍处于scheduled状态的视频
now = timezone.now()
threshold = now - timedelta(minutes=30) # 30分钟容差
videos = Video.objects.filter(
status='scheduled',
scheduled_time__lt=threshold
)
for video in videos:
logger.warning(f"发现未按计划发布的视频: {video.id}, 计划发布时间: {video.scheduled_time}")
# 手动触发发布任务
publish_scheduled_video.delay(video.id)
return f"检查了 {videos.count()} 个未按计划发布的视频"
except Exception as e:
logger.error(f"检查未发布视频失败: {str(e)}")
return f"检查未发布视频失败: {str(e)}"