daren_project/user_management/tasks.py
2025-04-29 10:22:57 +08:00

143 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import requests
from celery import shared_task
from django.utils import timezone
from django.conf import settings
logger = logging.getLogger(__name__)
@shared_task
def publish_scheduled_video(video_id):
"""定时发布视频的任务"""
from .models import Video
try:
# 获取视频记录
video = Video.objects.get(id=video_id)
# 检查视频状态是否为已排期
if video.status != 'scheduled':
logger.warning(f"视频 {video_id} 状态不是'已排期',当前状态: {video.status},跳过发布")
return
# 检查视频文件是否存在
if not video.local_path or not os.path.exists(video.local_path):
logger.error(f"视频 {video_id} 的本地文件不存在: {video.local_path}")
video.status = 'failed'
video.save()
return
# 模拟上传到平台的过程
# 在实际应用中这里需要根据不同平台调用不同的API
platform_account = video.platform_account
platform_name = platform_account.platform_name
# 模拟成功上传并获取视频URL
video_url = f"https://example.com/{platform_name}/{video.id}"
video_id = f"VID_{video.id}"
# 在实际应用中这里应该调用各平台的API
logger.info(f"模拟上传视频 {video.id}{platform_name} 平台")
# 更新视频状态
video.status = 'published'
video.publish_time = timezone.now()
video.video_url = video_url
video.video_id = video_id
video.save()
logger.info(f"视频 {video.id} 已成功发布到 {platform_name} 平台")
# 记录到知识库
_update_knowledge_base(video)
return {
"success": True,
"video_id": video.id,
"platform": platform_name,
"publish_time": video.publish_time.strftime("%Y-%m-%d %H:%M:%S"),
"video_url": video_url
}
except Video.DoesNotExist:
logger.error(f"未找到ID为 {video_id} 的视频记录")
return {"success": False, "error": f"未找到ID为 {video_id} 的视频记录"}
except Exception as e:
logger.error(f"发布视频 {video_id} 失败: {str(e)}")
# 尝试更新视频状态为失败
try:
video = Video.objects.get(id=video_id)
video.status = 'failed'
video.save()
except:
pass
return {"success": False, "error": str(e)}
def _update_knowledge_base(video):
"""更新知识库中的视频信息"""
from .models import KnowledgeBase, KnowledgeBaseDocument
try:
# 获取关联的平台账号和运营账号
platform_account = video.platform_account
operator = platform_account.operator
# 查找对应的知识库
knowledge_base = KnowledgeBase.objects.filter(
name__contains=operator.real_name,
type='private'
).first()
if not knowledge_base:
logger.warning(f"未找到与运营账号 {operator.real_name} 关联的知识库")
return
# 查找相关的文档
document = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
document_name__contains=video.title,
status='active'
).first()
if not document:
logger.warning(f"未找到与视频 {video.title} 关联的知识库文档")
return
# 在实际应用中这里应该调用外部API更新文档内容
logger.info(f"更新知识库文档 {document.document_id} 的视频发布状态")
# 模拟更新文档内容
# 这里只记录日志实际应用中需要调用外部API
except Exception as e:
logger.error(f"更新知识库失败: {str(e)}")
@shared_task
def check_scheduled_videos():
"""定期检查计划发布的视频,处理未被正确调度的视频"""
from .models import Video
from datetime import timedelta
try:
# 查找所有已经过了计划发布时间但仍处于scheduled状态的视频
now = timezone.now()
threshold = now - timedelta(minutes=30) # 30分钟容差
videos = Video.objects.filter(
status='scheduled',
scheduled_time__lt=threshold
)
for video in videos:
logger.warning(f"发现未按计划发布的视频: {video.id}, 计划发布时间: {video.scheduled_time}")
# 手动触发发布任务
publish_scheduled_video.delay(video.id)
return f"检查了 {videos.count()} 个未按计划发布的视频"
except Exception as e:
logger.error(f"检查未发布视频失败: {str(e)}")
return f"检查未发布视频失败: {str(e)}"