import os import logging import requests from celery import shared_task from django.utils import timezone from django.conf import settings logger = logging.getLogger(__name__) @shared_task def publish_scheduled_video(video_id): """定时发布视频的任务""" from .models import Video try: # 获取视频记录 video = Video.objects.get(id=video_id) # 检查视频状态是否为已排期 if video.status != 'scheduled': logger.warning(f"视频 {video_id} 状态不是'已排期',当前状态: {video.status},跳过发布") return # 检查视频文件是否存在 if not video.local_path or not os.path.exists(video.local_path): logger.error(f"视频 {video_id} 的本地文件不存在: {video.local_path}") video.status = 'failed' video.save() return # 模拟上传到平台的过程 # 在实际应用中,这里需要根据不同平台调用不同的API platform_account = video.platform_account platform_name = platform_account.platform_name # 模拟成功上传并获取视频URL video_url = f"https://example.com/{platform_name}/{video.id}" video_id = f"VID_{video.id}" # 在实际应用中,这里应该调用各平台的API logger.info(f"模拟上传视频 {video.id} 到 {platform_name} 平台") # 更新视频状态 video.status = 'published' video.publish_time = timezone.now() video.video_url = video_url video.video_id = video_id video.save() logger.info(f"视频 {video.id} 已成功发布到 {platform_name} 平台") # 记录到知识库 _update_knowledge_base(video) return { "success": True, "video_id": video.id, "platform": platform_name, "publish_time": video.publish_time.strftime("%Y-%m-%d %H:%M:%S"), "video_url": video_url } except Video.DoesNotExist: logger.error(f"未找到ID为 {video_id} 的视频记录") return {"success": False, "error": f"未找到ID为 {video_id} 的视频记录"} except Exception as e: logger.error(f"发布视频 {video_id} 失败: {str(e)}") # 尝试更新视频状态为失败 try: video = Video.objects.get(id=video_id) video.status = 'failed' video.save() except: pass return {"success": False, "error": str(e)} def _update_knowledge_base(video): """更新知识库中的视频信息""" from .models import KnowledgeBase, KnowledgeBaseDocument try: # 获取关联的平台账号和运营账号 platform_account = video.platform_account operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if not knowledge_base: logger.warning(f"未找到与运营账号 {operator.real_name} 关联的知识库") return # 查找相关的文档 document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_name__contains=video.title, status='active' ).first() if not document: logger.warning(f"未找到与视频 {video.title} 关联的知识库文档") return # 在实际应用中,这里应该调用外部API更新文档内容 logger.info(f"更新知识库文档 {document.document_id} 的视频发布状态") # 模拟更新文档内容 # 这里只记录日志,实际应用中需要调用外部API except Exception as e: logger.error(f"更新知识库失败: {str(e)}") @shared_task def check_scheduled_videos(): """定期检查计划发布的视频,处理未被正确调度的视频""" from .models import Video from datetime import timedelta try: # 查找所有已经过了计划发布时间但仍处于scheduled状态的视频 now = timezone.now() threshold = now - timedelta(minutes=30) # 30分钟容差 videos = Video.objects.filter( status='scheduled', scheduled_time__lt=threshold ) for video in videos: logger.warning(f"发现未按计划发布的视频: {video.id}, 计划发布时间: {video.scheduled_time}") # 手动触发发布任务 publish_scheduled_video.delay(video.id) return f"检查了 {videos.count()} 个未按计划发布的视频" except Exception as e: logger.error(f"检查未发布视频失败: {str(e)}") return f"检查未发布视频失败: {str(e)}"