import os import datetime from django.core.management.base import BaseCommand from django.utils import timezone from user_management.models import PlatformAccount, Video from django.conf import settings class Command(BaseCommand): help = '测试视频上传和定时发布功能' def add_arguments(self, parser): parser.add_argument('video_path', type=str, help='视频文件路径') parser.add_argument('platform_account_id', type=int, help='平台账号ID') parser.add_argument('--title', type=str, help='视频标题(可选)') parser.add_argument('--desc', type=str, help='视频描述(可选)') parser.add_argument('--schedule', type=str, help='计划发布时间,格式: YYYY-MM-DD HH:MM:SS (可选)') def handle(self, *args, **options): video_path = options['video_path'] platform_account_id = options['platform_account_id'] title = options.get('title') desc = options.get('desc') schedule_str = options.get('schedule') # 验证视频文件是否存在 if not os.path.exists(video_path): self.stderr.write(self.style.ERROR(f'错误: 视频文件不存在: {video_path}')) return # 验证平台账号是否存在 try: platform_account = PlatformAccount.objects.get(id=platform_account_id) except PlatformAccount.DoesNotExist: self.stderr.write(self.style.ERROR(f'错误: 未找到ID为{platform_account_id}的平台账号')) return # 设置标题(如果未提供,则使用文件名) if not title: title = os.path.splitext(os.path.basename(video_path))[0] # 准备保存视频的目录 media_root = getattr(settings, 'MEDIA_ROOT', os.path.join(settings.BASE_DIR, 'media')) videos_dir = os.path.join(media_root, 'videos') account_dir = os.path.join(videos_dir, f"{platform_account.platform_name}_{platform_account.account_name}") if not os.path.exists(videos_dir): os.makedirs(videos_dir) if not os.path.exists(account_dir): os.makedirs(account_dir) # 生成唯一的文件名 import time timestamp = int(time.time()) file_name = f"{timestamp}_{os.path.basename(video_path)}" file_path = os.path.join(account_dir, file_name) # 复制视频文件 with open(video_path, 'rb') as src_file: with open(file_path, 'wb') as dest_file: dest_file.write(src_file.read()) self.stdout.write(self.style.SUCCESS(f'视频文件已复制到: {file_path}')) # 创建视频记录 video_data = { 'platform_account': platform_account, 'title': title, 'description': desc, 'local_path': file_path, 'status': 'draft', } # 处理计划发布时间 if schedule_str: try: from dateutil import parser scheduled_time = parser.parse(schedule_str) # 如果时间已过,设置为当前时间后5分钟 now = timezone.now() if scheduled_time <= now: scheduled_time = now + datetime.timedelta(minutes=5) self.stdout.write(self.style.WARNING( f'警告: 计划时间已过,已调整为当前时间后5分钟: {scheduled_time}' )) video_data['scheduled_time'] = scheduled_time video_data['status'] = 'scheduled' except Exception as e: self.stderr.write(self.style.ERROR(f'错误: 解析时间失败: {str(e)}')) return # 创建视频对象 video = Video.objects.create(**video_data) self.stdout.write(self.style.SUCCESS(f'创建视频记录成功,ID: {video.id}')) # 如果是计划发布,创建定时任务 if video.status == 'scheduled': try: from django_celery_beat.models import PeriodicTask, CrontabSchedule import json scheduled_time = video.scheduled_time # 创建定时任务 schedule, _ = CrontabSchedule.objects.get_or_create( minute=scheduled_time.minute, hour=scheduled_time.hour, day_of_month=scheduled_time.day, month_of_year=scheduled_time.month, ) # 创建周期性任务 task_name = f"Publish_Video_{video.id}_{time.time()}" PeriodicTask.objects.create( name=task_name, task='user_management.tasks.publish_scheduled_video', crontab=schedule, args=json.dumps([video.id]), one_off=True, # 只执行一次 start_time=scheduled_time ) self.stdout.write(self.style.SUCCESS( f'创建定时发布任务成功,计划发布时间: {scheduled_time}' )) except Exception as e: self.stderr.write(self.style.ERROR(f'创建定时任务失败: {str(e)}')) self.stdout.write(self.style.SUCCESS('操作完成')) # 打印执行方式提示 if video.status == 'scheduled': self.stdout.write(f"\n视频将在 {video.scheduled_time} 自动发布") self.stdout.write("\n要手动发布,可以使用以下命令:") else: self.stdout.write("\n要发布该视频,可以使用以下命令:") self.stdout.write(f"python manage.py publish_video {video.id}")