daren_project/user_management/management/commands/test_video_upload.py

142 lines
5.9 KiB
Python
Raw Permalink Normal View History

2025-04-29 10:22:57 +08:00
import os
import datetime
from django.core.management.base import BaseCommand
from django.utils import timezone
from user_management.models import PlatformAccount, Video
from django.conf import settings
class Command(BaseCommand):
help = '测试视频上传和定时发布功能'
def add_arguments(self, parser):
parser.add_argument('video_path', type=str, help='视频文件路径')
parser.add_argument('platform_account_id', type=int, help='平台账号ID')
parser.add_argument('--title', type=str, help='视频标题(可选)')
parser.add_argument('--desc', type=str, help='视频描述(可选)')
parser.add_argument('--schedule', type=str, help='计划发布时间,格式: YYYY-MM-DD HH:MM:SS (可选)')
def handle(self, *args, **options):
video_path = options['video_path']
platform_account_id = options['platform_account_id']
title = options.get('title')
desc = options.get('desc')
schedule_str = options.get('schedule')
# 验证视频文件是否存在
if not os.path.exists(video_path):
self.stderr.write(self.style.ERROR(f'错误: 视频文件不存在: {video_path}'))
return
# 验证平台账号是否存在
try:
platform_account = PlatformAccount.objects.get(id=platform_account_id)
except PlatformAccount.DoesNotExist:
self.stderr.write(self.style.ERROR(f'错误: 未找到ID为{platform_account_id}的平台账号'))
return
# 设置标题(如果未提供,则使用文件名)
if not title:
title = os.path.splitext(os.path.basename(video_path))[0]
# 准备保存视频的目录
media_root = getattr(settings, 'MEDIA_ROOT', os.path.join(settings.BASE_DIR, 'media'))
videos_dir = os.path.join(media_root, 'videos')
account_dir = os.path.join(videos_dir, f"{platform_account.platform_name}_{platform_account.account_name}")
if not os.path.exists(videos_dir):
os.makedirs(videos_dir)
if not os.path.exists(account_dir):
os.makedirs(account_dir)
# 生成唯一的文件名
import time
timestamp = int(time.time())
file_name = f"{timestamp}_{os.path.basename(video_path)}"
file_path = os.path.join(account_dir, file_name)
# 复制视频文件
with open(video_path, 'rb') as src_file:
with open(file_path, 'wb') as dest_file:
dest_file.write(src_file.read())
self.stdout.write(self.style.SUCCESS(f'视频文件已复制到: {file_path}'))
# 创建视频记录
video_data = {
'platform_account': platform_account,
'title': title,
'description': desc,
'local_path': file_path,
'status': 'draft',
}
# 处理计划发布时间
if schedule_str:
try:
from dateutil import parser
scheduled_time = parser.parse(schedule_str)
# 如果时间已过设置为当前时间后5分钟
now = timezone.now()
if scheduled_time <= now:
scheduled_time = now + datetime.timedelta(minutes=5)
self.stdout.write(self.style.WARNING(
f'警告: 计划时间已过已调整为当前时间后5分钟: {scheduled_time}'
))
video_data['scheduled_time'] = scheduled_time
video_data['status'] = 'scheduled'
except Exception as e:
self.stderr.write(self.style.ERROR(f'错误: 解析时间失败: {str(e)}'))
return
# 创建视频对象
video = Video.objects.create(**video_data)
self.stdout.write(self.style.SUCCESS(f'创建视频记录成功ID: {video.id}'))
# 如果是计划发布,创建定时任务
if video.status == 'scheduled':
try:
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
scheduled_time = video.scheduled_time
# 创建定时任务
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=scheduled_time.minute,
hour=scheduled_time.hour,
day_of_month=scheduled_time.day,
month_of_year=scheduled_time.month,
)
# 创建周期性任务
task_name = f"Publish_Video_{video.id}_{time.time()}"
PeriodicTask.objects.create(
name=task_name,
task='user_management.tasks.publish_scheduled_video',
crontab=schedule,
args=json.dumps([video.id]),
one_off=True, # 只执行一次
start_time=scheduled_time
)
self.stdout.write(self.style.SUCCESS(
f'创建定时发布任务成功,计划发布时间: {scheduled_time}'
))
except Exception as e:
self.stderr.write(self.style.ERROR(f'创建定时任务失败: {str(e)}'))
self.stdout.write(self.style.SUCCESS('操作完成'))
# 打印执行方式提示
if video.status == 'scheduled':
self.stdout.write(f"\n视频将在 {video.scheduled_time} 自动发布")
self.stdout.write("\n要手动发布,可以使用以下命令:")
else:
self.stdout.write("\n要发布该视频,可以使用以下命令:")
self.stdout.write(f"python manage.py publish_video {video.id}")