143 lines
5.0 KiB
Python
143 lines
5.0 KiB
Python
import os
|
||
import logging
|
||
import requests
|
||
from celery import shared_task
|
||
from django.utils import timezone
|
||
from django.conf import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
@shared_task
|
||
def publish_scheduled_video(video_id):
|
||
"""定时发布视频的任务"""
|
||
from .models import Video
|
||
|
||
try:
|
||
# 获取视频记录
|
||
video = Video.objects.get(id=video_id)
|
||
|
||
# 检查视频状态是否为已排期
|
||
if video.status != 'scheduled':
|
||
logger.warning(f"视频 {video_id} 状态不是'已排期',当前状态: {video.status},跳过发布")
|
||
return
|
||
|
||
# 检查视频文件是否存在
|
||
if not video.local_path or not os.path.exists(video.local_path):
|
||
logger.error(f"视频 {video_id} 的本地文件不存在: {video.local_path}")
|
||
video.status = 'failed'
|
||
video.save()
|
||
return
|
||
|
||
# 模拟上传到平台的过程
|
||
# 在实际应用中,这里需要根据不同平台调用不同的API
|
||
platform_account = video.platform_account
|
||
platform_name = platform_account.platform_name
|
||
|
||
# 模拟成功上传并获取视频URL
|
||
video_url = f"https://example.com/{platform_name}/{video.id}"
|
||
video_id = f"VID_{video.id}"
|
||
|
||
# 在实际应用中,这里应该调用各平台的API
|
||
logger.info(f"模拟上传视频 {video.id} 到 {platform_name} 平台")
|
||
|
||
# 更新视频状态
|
||
video.status = 'published'
|
||
video.publish_time = timezone.now()
|
||
video.video_url = video_url
|
||
video.video_id = video_id
|
||
video.save()
|
||
|
||
logger.info(f"视频 {video.id} 已成功发布到 {platform_name} 平台")
|
||
|
||
# 记录到知识库
|
||
_update_knowledge_base(video)
|
||
|
||
return {
|
||
"success": True,
|
||
"video_id": video.id,
|
||
"platform": platform_name,
|
||
"publish_time": video.publish_time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"video_url": video_url
|
||
}
|
||
|
||
except Video.DoesNotExist:
|
||
logger.error(f"未找到ID为 {video_id} 的视频记录")
|
||
return {"success": False, "error": f"未找到ID为 {video_id} 的视频记录"}
|
||
except Exception as e:
|
||
logger.error(f"发布视频 {video_id} 失败: {str(e)}")
|
||
|
||
# 尝试更新视频状态为失败
|
||
try:
|
||
video = Video.objects.get(id=video_id)
|
||
video.status = 'failed'
|
||
video.save()
|
||
except:
|
||
pass
|
||
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def _update_knowledge_base(video):
|
||
"""更新知识库中的视频信息"""
|
||
from .models import KnowledgeBase, KnowledgeBaseDocument
|
||
|
||
try:
|
||
# 获取关联的平台账号和运营账号
|
||
platform_account = video.platform_account
|
||
operator = platform_account.operator
|
||
|
||
# 查找对应的知识库
|
||
knowledge_base = KnowledgeBase.objects.filter(
|
||
name__contains=operator.real_name,
|
||
type='private'
|
||
).first()
|
||
|
||
if not knowledge_base:
|
||
logger.warning(f"未找到与运营账号 {operator.real_name} 关联的知识库")
|
||
return
|
||
|
||
# 查找相关的文档
|
||
document = KnowledgeBaseDocument.objects.filter(
|
||
knowledge_base=knowledge_base,
|
||
document_name__contains=video.title,
|
||
status='active'
|
||
).first()
|
||
|
||
if not document:
|
||
logger.warning(f"未找到与视频 {video.title} 关联的知识库文档")
|
||
return
|
||
|
||
# 在实际应用中,这里应该调用外部API更新文档内容
|
||
logger.info(f"更新知识库文档 {document.document_id} 的视频发布状态")
|
||
|
||
# 模拟更新文档内容
|
||
# 这里只记录日志,实际应用中需要调用外部API
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新知识库失败: {str(e)}")
|
||
|
||
@shared_task
|
||
def check_scheduled_videos():
|
||
"""定期检查计划发布的视频,处理未被正确调度的视频"""
|
||
from .models import Video
|
||
from datetime import timedelta
|
||
|
||
try:
|
||
# 查找所有已经过了计划发布时间但仍处于scheduled状态的视频
|
||
now = timezone.now()
|
||
threshold = now - timedelta(minutes=30) # 30分钟容差
|
||
|
||
videos = Video.objects.filter(
|
||
status='scheduled',
|
||
scheduled_time__lt=threshold
|
||
)
|
||
|
||
for video in videos:
|
||
logger.warning(f"发现未按计划发布的视频: {video.id}, 计划发布时间: {video.scheduled_time}")
|
||
# 手动触发发布任务
|
||
publish_scheduled_video.delay(video.id)
|
||
|
||
return f"检查了 {videos.count()} 个未按计划发布的视频"
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查未发布视频失败: {str(e)}")
|
||
return f"检查未发布视频失败: {str(e)}" |