from django.shortcuts import render import json import uuid import logging from django.db import transaction from django.shortcuts import get_object_or_404 from django.conf import settings from django.utils import timezone from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from django.db.models import Q import os from user_management.models import OperatorAccount, PlatformAccount, Video, KnowledgeBase, KnowledgeBaseDocument, User from .serializers import ( OperatorAccountSerializer, PlatformAccountSerializer, VideoSerializer, KnowledgeBaseSerializer, KnowledgeBaseDocumentSerializer, MultiPlatformAccountSerializer ) from .pagination import CustomPagination logger = logging.getLogger(__name__) class OperatorAccountViewSet(viewsets.ModelViewSet): """运营账号管理视图集""" queryset = OperatorAccount.objects.all() serializer_class = OperatorAccountSerializer permission_classes = [IsAuthenticated] pagination_class = CustomPagination def list(self, request, *args, **kwargs): """获取运营账号列表""" queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) # 使用自定义分页器的响应 return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response({ "code": 200, "message": "获取运营账号列表成功", "data": serializer.data }) def retrieve(self, request, *args, **kwargs): """获取运营账号详情""" instance = self.get_object() serializer = self.get_serializer(instance) return Response({ "code": 200, "message": "获取运营账号详情成功", "data": serializer.data }) def update(self, request, *args, **kwargs): """更新运营账号信息""" partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) return Response({ "code": 200, "message": "更新运营账号信息成功", "data": serializer.data }) def partial_update(self, request, *args, **kwargs): """部分更新运营账号信息""" kwargs['partial'] = True return self.update(request, *args, **kwargs) def create(self, request, *args, **kwargs): """创建运营账号并自动创建对应的私有知识库""" with transaction.atomic(): # 1. 创建运营账号 serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) # 2. 手动保存数据而不是使用serializer.save(),确保不传入UUID operator_data = serializer.validated_data operator = OperatorAccount.objects.create(**operator_data) # 3. 为每个运营账号创建一个私有知识库 knowledge_base = KnowledgeBase.objects.create( user_id=request.user.id, # 使用当前用户作为创建者 name=f"{operator.real_name}的运营知识库", desc=f"用于存储{operator.real_name}({operator.username})相关的运营数据", type='private', department=operator.department ) # 4. 创建知识库文档记录 - 运营信息文档 document_data = { "name": f"{operator.real_name}_运营信息", "paragraphs": [ { "title": "运营账号基本信息", "content": f""" 用户名: {operator.username} 真实姓名: {operator.real_name} 邮箱: {operator.email} 电话: {operator.phone} 职位: {operator.get_position_display()} 部门: {operator.department} 创建时间: {operator.created_at.strftime('%Y-%m-%d %H:%M:%S')} uuid: {operator.uuid} """, "is_active": True } ] } # 调用外部API创建文档 document_id = self._create_document(knowledge_base.external_id, document_data) if document_id: # 创建知识库文档记录 KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=document_data["name"], external_id=document_id, uploader_name=request.user.username ) return Response({ "code": 200, "message": "运营账号创建成功,并已创建对应知识库", "data": { "operator": self.get_serializer(operator).data, "knowledge_base": { "id": knowledge_base.id, "name": knowledge_base.name, "external_id": knowledge_base.external_id } } }, status=status.HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): """删除运营账号并更新相关知识库状态""" operator = self.get_object() # 更新知识库状态或删除关联文档 knowledge_bases = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ) for kb in knowledge_bases: # 可以选择删除知识库,或者更新知识库状态 # 这里我们更新对应的文档状态 documents = KnowledgeBaseDocument.objects.filter( knowledge_base=kb, document_name__contains=operator.real_name ) for doc in documents: doc.status = 'deleted' doc.save() operator.is_active = False # 软删除 operator.save() return Response({ "code": 200, "message": "运营账号已停用,相关知识库文档已标记为删除", "data": None }) def _create_document(self, external_id, doc_data): """调用外部API创建文档""" try: if not external_id: logger.error("创建文档失败:知识库external_id为空") return None # 在实际应用中,这里需要调用外部API创建文档 # 模拟创建文档并返回document_id document_id = str(uuid.uuid4()) logger.info(f"模拟创建文档成功,document_id: {document_id}") return document_id except Exception as e: logger.error(f"创建文档失败: {str(e)}") return None class PlatformAccountViewSet(viewsets.ModelViewSet): """平台账号管理视图集""" queryset = PlatformAccount.objects.all() serializer_class = PlatformAccountSerializer permission_classes = [IsAuthenticated] pagination_class = CustomPagination def list(self, request, *args, **kwargs): """获取平台账号列表""" queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) # 使用自定义分页器的响应 return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response({ "code": 200, "message": "获取平台账号列表成功", "data": serializer.data }) def retrieve(self, request, *args, **kwargs): """获取平台账号详情""" instance = self.get_object() serializer = self.get_serializer(instance) return Response({ "code": 200, "message": "获取平台账号详情成功", "data": serializer.data }) def update(self, request, *args, **kwargs): """更新平台账号信息""" partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) return Response({ "code": 200, "message": "更新平台账号信息成功", "data": serializer.data }) def partial_update(self, request, *args, **kwargs): """部分更新平台账号信息""" kwargs['partial'] = True return self.update(request, *args, **kwargs) def create(self, request, *args, **kwargs): """创建平台账号并记录到知识库""" # 检查请求数据中是否包含platforms字段,判断是否为多平台账号创建 if 'platforms' in request.data and isinstance(request.data['platforms'], list): # 使用多平台账号序列化器 serializer = MultiPlatformAccountSerializer(data=request.data) serializer.is_valid(raise_exception=True) created_accounts = [] with transaction.atomic(): # 获取基础账号信息 base_data = serializer.validated_data.copy() platforms_data = base_data.pop('platforms') operator = base_data['operator'] # 遍历平台数据创建多个平台账号 for platform_data in platforms_data: # 创建平台账号数据 platform_account_data = base_data.copy() platform_account_data['platform_name'] = platform_data['platform_name'] platform_account_data['account_url'] = platform_data['platform_url'] # 创建平台账号 platform_account = PlatformAccount.objects.create(**platform_account_data) created_accounts.append(platform_account) # 记录到知识库 self._add_to_knowledge_base(platform_account, request.user) # 将创建的账号序列化返回 result_serializer = self.get_serializer(created_accounts, many=True) return Response({ "code": 200, "message": "多平台账号创建成功,并已添加到知识库", "data": result_serializer.data }, status=status.HTTP_201_CREATED) else: # 传统单平台账号创建流程 with transaction.atomic(): # 处理operator字段,可能是字符串类型的ID data = request.data.copy() if 'operator' in data and isinstance(data['operator'], str): try: # 尝试通过ID查找运营账号 operator_id = data['operator'] try: # 先尝试通过整数ID查找 operator_id_int = int(operator_id) operator = OperatorAccount.objects.get(id=operator_id_int) except (ValueError, OperatorAccount.DoesNotExist): # 如果无法转换为整数或找不到对应账号,尝试通过用户名或真实姓名查找 operator = OperatorAccount.objects.filter( Q(username=operator_id) | Q(real_name=operator_id) ).first() if not operator: return Response({ "code": 404, "message": f"未找到运营账号: {operator_id},请提供有效的ID、用户名或真实姓名", "data": None }, status=status.HTTP_404_NOT_FOUND) # 更新请求数据中的operator字段为找到的operator的ID data['operator'] = operator.id except Exception as e: return Response({ "code": 400, "message": f"处理运营账号ID时出错: {str(e)}", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 创建平台账号 serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) # 手动创建平台账号,不使用serializer.save()避免ID问题 platform_data = serializer.validated_data platform_account = PlatformAccount.objects.create(**platform_data) # 记录到知识库 self._add_to_knowledge_base(platform_account, request.user) return Response({ "code": 200, "message": "平台账号创建成功,并已添加到知识库", "data": self.get_serializer(platform_account).data }, status=status.HTTP_201_CREATED) def _add_to_knowledge_base(self, platform_account, user): """将平台账号添加到知识库""" # 获取关联的运营账号 operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base and knowledge_base.external_id: # 创建平台账号文档 document_data = { "name": f"{platform_account.account_name}_{platform_account.platform_name}_账号信息", "paragraphs": [ { "title": "平台账号基本信息", "content": f""" 平台: {platform_account.get_platform_name_display()} 账号名称: {platform_account.account_name} 账号ID: {platform_account.account_id} 账号状态: {platform_account.get_status_display()} 粉丝数: {platform_account.followers_count} 账号链接: {platform_account.account_url} 账号描述: {platform_account.description or '无'} 标签: {platform_account.tags or '无'} 头像链接: {platform_account.profile_image or '无'} 最后发布时间: {platform_account.last_posting.strftime('%Y-%m-%d %H:%M:%S') if platform_account.last_posting else '未发布'} 创建时间: {platform_account.created_at.strftime('%Y-%m-%d %H:%M:%S')} 最后登录: {platform_account.last_login.strftime('%Y-%m-%d %H:%M:%S') if platform_account.last_login else '从未登录'} """, "is_active": True } ] } # 调用外部API创建文档 document_id = self._create_document(knowledge_base.external_id, document_data) if document_id: # 创建知识库文档记录 KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=document_data["name"], external_id=document_id, uploader_name=user.username ) def destroy(self, request, *args, **kwargs): """删除平台账号并更新相关知识库文档""" platform_account = self.get_object() # 获取关联的运营账号 operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: # 查找相关文档并标记为删除 documents = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base ).filter( Q(document_name__contains=platform_account.account_name) | Q(document_name__contains=platform_account.platform_name) ) for doc in documents: doc.status = 'deleted' doc.save() # 删除平台账号 self.perform_destroy(platform_account) return Response({ "code": 200, "message": "平台账号已删除,相关知识库文档已标记为删除", "data": None }) def _create_document(self, external_id, doc_data): """调用外部API创建文档""" try: if not external_id: logger.error("创建文档失败:知识库external_id为空") return None # 在实际应用中,这里需要调用外部API创建文档 # 模拟创建文档并返回document_id document_id = str(uuid.uuid4()) logger.info(f"模拟创建文档成功,document_id: {document_id}") return document_id except Exception as e: logger.error(f"创建文档失败: {str(e)}") return None @action(detail=True, methods=['post']) def update_followers(self, request, pk=None): """更新平台账号粉丝数并同步到知识库""" platform_account = self.get_object() followers_count = request.data.get('followers_count') if not followers_count: return Response({ "code": 400, "message": "粉丝数不能为空", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 更新粉丝数 platform_account.followers_count = followers_count platform_account.save() # 同步到知识库 operator = platform_account.operator knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: # 查找相关文档 document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).filter( Q(document_name__contains=platform_account.account_name) | Q(document_name__contains=platform_account.platform_name) ).first() if document: # 这里应该调用外部API更新文档内容 # 但由于我们没有实际的API,只做记录 logger.info(f"应当更新文档 {document.document_id} 的粉丝数为 {followers_count}") return Response({ "code": 200, "message": "粉丝数更新成功", "data": { "id": platform_account.id, "account_name": platform_account.account_name, "followers_count": platform_account.followers_count } }) @action(detail=True, methods=['post']) def update_profile(self, request, pk=None): """更新平台账号的头像、标签和最后发布时间""" platform_account = self.get_object() # 获取更新的资料数据 profile_data = {} # 处理标签 if 'tags' in request.data: profile_data['tags'] = request.data['tags'] # 处理头像 if 'profile_image' in request.data: profile_data['profile_image'] = request.data['profile_image'] # 处理最后发布时间 if 'last_posting' in request.data: try: # 尝试解析时间字符串 from dateutil import parser last_posting = parser.parse(request.data['last_posting']) profile_data['last_posting'] = last_posting except Exception as e: return Response({ "code": 400, "message": f"最后发布时间格式错误: {str(e)}", "data": None }, status=status.HTTP_400_BAD_REQUEST) if not profile_data: return Response({ "code": 400, "message": "没有提供任何需更新的资料数据", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 更新平台账号资料 for field, value in profile_data.items(): setattr(platform_account, field, value) platform_account.save() # 同步到知识库 # 在实际应用中应该调用外部API更新文档内容 operator = platform_account.operator knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, status='active' ).filter( Q(document_name__contains=platform_account.account_name) | Q(document_name__contains=platform_account.platform_name) ).first() if document: logger.info(f"应当更新文档 {document.document_id} 的平台账号资料数据") return Response({ "code": 200, "message": "平台账号资料更新成功", "data": self.get_serializer(platform_account).data }) class VideoViewSet(viewsets.ModelViewSet): """视频管理视图集""" queryset = Video.objects.all() serializer_class = VideoSerializer permission_classes = [IsAuthenticated] pagination_class = CustomPagination def list(self, request, *args, **kwargs): """获取视频列表""" queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) # 使用自定义分页器的响应 return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response({ "code": 200, "message": "获取视频列表成功", "data": serializer.data }) def retrieve(self, request, *args, **kwargs): """获取视频详情""" instance = self.get_object() serializer = self.get_serializer(instance) return Response({ "code": 200, "message": "获取视频详情成功", "data": serializer.data }) def update(self, request, *args, **kwargs): """更新视频信息""" partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) return Response({ "code": 200, "message": "更新视频信息成功", "data": serializer.data }) def partial_update(self, request, *args, **kwargs): """部分更新视频信息""" kwargs['partial'] = True return self.update(request, *args, **kwargs) def create(self, request, *args, **kwargs): """创建视频并记录到知识库""" with transaction.atomic(): # 处理platform_account字段,可能是字符串类型的ID data = request.data.copy() if 'platform_account' in data and isinstance(data['platform_account'], str): try: # 尝试通过ID查找平台账号 platform_id = data['platform_account'] try: # 先尝试通过整数ID查找 platform_id_int = int(platform_id) platform = PlatformAccount.objects.get(id=platform_id_int) except (ValueError, PlatformAccount.DoesNotExist): # 如果无法转换为整数或找不到对应账号,尝试通过账号名称或账号ID查找 platform = PlatformAccount.objects.filter( Q(account_name=platform_id) | Q(account_id=platform_id) ).first() if not platform: return Response({ "code": 404, "message": f"未找到平台账号: {platform_id},请提供有效的ID、账号名称或账号ID", "data": None }, status=status.HTTP_404_NOT_FOUND) # 更新请求数据中的platform_account字段为找到的platform的ID data['platform_account'] = platform.id except Exception as e: return Response({ "code": 400, "message": f"处理平台账号ID时出错: {str(e)}", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 创建视频 serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) # 手动创建视频,不使用serializer.save()避免ID问题 video_data = serializer.validated_data video = Video.objects.create(**video_data) # 获取关联的平台账号和运营账号 platform_account = video.platform_account operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base and knowledge_base.external_id: # 创建视频文档 document_data = { "name": f"{video.title}_{platform_account.account_name}_视频信息", "paragraphs": [ { "title": "视频基本信息", "content": f""" 标题: {video.title} 平台: {platform_account.get_platform_name_display()} 账号: {platform_account.account_name} 视频ID: {video.video_id} 发布时间: {video.publish_time.strftime('%Y-%m-%d %H:%M:%S') if video.publish_time else '未发布'} 视频链接: {video.video_url} 点赞数: {video.likes_count} 评论数: {video.comments_count} 分享数: {video.shares_count} 观看数: {video.views_count} 视频描述: {video.description or '无'} """, "is_active": True } ] } # 调用外部API创建文档 document_id = self._create_document(knowledge_base.external_id, document_data) if document_id: # 创建知识库文档记录 KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=document_data["name"], external_id=document_id, uploader_name=request.user.username ) return Response({ "code": 200, "message": "视频创建成功,并已添加到知识库", "data": self.get_serializer(video).data }, status=status.HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): """删除视频记录并更新相关知识库文档""" video = self.get_object() # 获取关联的平台账号和运营账号 platform_account = video.platform_account operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: # 查找相关文档并标记为删除 documents = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_name__contains=video.title ) for doc in documents: doc.status = 'deleted' doc.save() # 删除视频记录 self.perform_destroy(video) return Response({ "code": 200, "message": "视频记录已删除,相关知识库文档已标记为删除", "data": None }) def _create_document(self, external_id, doc_data): """调用外部API创建文档""" try: if not external_id: logger.error("创建文档失败:知识库external_id为空") return None # 在实际应用中,这里需要调用外部API创建文档 # 模拟创建文档并返回document_id document_id = str(uuid.uuid4()) logger.info(f"模拟创建文档成功,document_id: {document_id}") return document_id except Exception as e: logger.error(f"创建文档失败: {str(e)}") return None @action(detail=True, methods=['post']) def update_stats(self, request, pk=None): """更新视频统计数据并同步到知识库""" video = self.get_object() # 获取更新的统计数据 stats = {} for field in ['views_count', 'likes_count', 'comments_count', 'shares_count']: if field in request.data: stats[field] = request.data[field] if not stats: return Response({ "code": 400, "message": "没有提供任何统计数据", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 更新视频统计数据 for field, value in stats.items(): setattr(video, field, value) video.save() # 同步到知识库 # 在实际应用中应该调用外部API更新文档内容 platform_account = video.platform_account operator = platform_account.operator knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_name__contains=video.title, status='active' ).first() if document: logger.info(f"应当更新文档 {document.document_id} 的视频统计数据") return Response({ "code": 200, "message": "视频统计数据更新成功", "data": { "id": video.id, "title": video.title, "views_count": video.views_count, "likes_count": video.likes_count, "comments_count": video.comments_count, "shares_count": video.shares_count } }) @action(detail=True, methods=['post']) def publish(self, request, pk=None): """发布视频并更新状态""" video = self.get_object() # 检查视频状态 if video.status not in ['draft', 'scheduled']: return Response({ "code": 400, "message": f"当前视频状态为 {video.get_status_display()},无法发布", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 获取视频URL video_url = request.data.get('video_url') if not video_url: return Response({ "code": 400, "message": "未提供视频URL", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 更新视频状态和URL video.video_url = video_url video.status = 'published' video.publish_time = timezone.now() video.save() # 同步到知识库 # 在实际应用中应该调用外部API更新文档内容 platform_account = video.platform_account operator = platform_account.operator knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_name__contains=video.title, status='active' ).first() if document: logger.info(f"应当更新文档 {document.document_id} 的视频发布状态") return Response({ "code": 200, "message": "视频已成功发布", "data": { "id": video.id, "title": video.title, "status": video.status, "video_url": video.video_url, "publish_time": video.publish_time } }) @action(detail=False, methods=['post']) def upload_video(self, request): """上传视频文件并创建视频记录""" try: # 获取上传的视频文件 video_file = request.FILES.get('video_file') if not video_file: return Response({ "code": 400, "message": "未提供视频文件", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 获取平台账号ID platform_account_id = request.data.get('platform_account') if not platform_account_id: return Response({ "code": 400, "message": "未提供平台账号ID", "data": None }, status=status.HTTP_400_BAD_REQUEST) try: platform_account = PlatformAccount.objects.get(id=platform_account_id) except PlatformAccount.DoesNotExist: return Response({ "code": 404, "message": f"未找到ID为{platform_account_id}的平台账号", "data": None }, status=status.HTTP_404_NOT_FOUND) # 创建保存视频的目录 import os from django.conf import settings # 确保文件保存目录存在 media_root = getattr(settings, 'MEDIA_ROOT', os.path.join(settings.BASE_DIR, 'media')) videos_dir = os.path.join(media_root, 'videos') account_dir = os.path.join(videos_dir, f"{platform_account.platform_name}_{platform_account.account_name}") if not os.path.exists(videos_dir): os.makedirs(videos_dir) if not os.path.exists(account_dir): os.makedirs(account_dir) # 生成唯一的文件名 import time timestamp = int(time.time()) file_name = f"{timestamp}_{video_file.name}" file_path = os.path.join(account_dir, file_name) # 保存视频文件 with open(file_path, 'wb+') as destination: for chunk in video_file.chunks(): destination.write(chunk) # 创建视频记录 video_data = { 'platform_account': platform_account, 'title': request.data.get('title', os.path.splitext(video_file.name)[0]), 'description': request.data.get('description', ''), 'local_path': file_path, 'status': 'draft', 'tags': request.data.get('tags', '') } # 如果提供了计划发布时间,则设置状态为已排期 scheduled_time = request.data.get('scheduled_time') if scheduled_time: from dateutil import parser try: parsed_time = parser.parse(scheduled_time) video_data['scheduled_time'] = parsed_time video_data['status'] = 'scheduled' except Exception as e: return Response({ "code": 400, "message": f"计划发布时间格式错误: {str(e)}", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 创建视频记录 video = Video.objects.create(**video_data) # 添加到知识库 self._add_to_knowledge_base(video, platform_account) # 如果是已排期状态,创建定时任务 if video.status == 'scheduled': self._create_publish_task(video) return Response({ "code": 200, "message": "视频上传成功", "data": { "id": video.id, "title": video.title, "status": video.get_status_display(), "scheduled_time": video.scheduled_time } }, status=status.HTTP_201_CREATED) except Exception as e: logger.error(f"视频上传失败: {str(e)}") return Response({ "code": 500, "message": f"视频上传失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def _add_to_knowledge_base(self, video, platform_account): """将视频添加到知识库""" # 获取关联的运营账号 operator = platform_account.operator # 查找对应的知识库 knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base and knowledge_base.external_id: # 创建视频文档 document_data = { "name": f"{video.title}_{platform_account.account_name}_视频信息", "paragraphs": [ { "title": "视频基本信息", "content": f""" 标题: {video.title} 平台: {platform_account.get_platform_name_display()} 账号: {platform_account.account_name} 状态: {video.get_status_display()} 本地路径: {video.local_path} 计划发布时间: {video.scheduled_time.strftime('%Y-%m-%d %H:%M:%S') if video.scheduled_time else '未设置'} 视频描述: {video.description or '无'} 标签: {video.tags or '无'} 创建时间: {video.created_at.strftime('%Y-%m-%d %H:%M:%S')} """, "is_active": True } ] } # 调用外部API创建文档 document_id = self._create_document(knowledge_base.external_id, document_data) if document_id: # 创建知识库文档记录 KnowledgeBaseDocument.objects.create( knowledge_base=knowledge_base, document_id=document_id, document_name=document_data["name"], external_id=document_id, uploader_name="系统" ) def _create_publish_task(self, video): """创建定时发布任务""" try: from django_celery_beat.models import PeriodicTask, CrontabSchedule import json from datetime import datetime scheduled_time = video.scheduled_time # 创建定时任务 schedule, _ = CrontabSchedule.objects.get_or_create( minute=scheduled_time.minute, hour=scheduled_time.hour, day_of_month=scheduled_time.day, month_of_year=scheduled_time.month, ) # 创建周期性任务 task_name = f"Publish_Video_{video.id}_{datetime.now().timestamp()}" PeriodicTask.objects.create( name=task_name, task='user_management.tasks.publish_scheduled_video', crontab=schedule, args=json.dumps([video.id]), one_off=True, # 只执行一次 start_time=scheduled_time ) logger.info(f"已创建视频 {video.id} 的定时发布任务,计划发布时间: {scheduled_time}") except Exception as e: logger.error(f"创建定时发布任务失败: {str(e)}") # 记录错误但不中断流程 @action(detail=True, methods=['post']) def manual_publish(self, request, pk=None): """手动发布视频""" video = self.get_object() # 检查视频状态是否允许发布 if video.status not in ['draft', 'scheduled']: return Response({ "code": 400, "message": f"当前视频状态为 {video.get_status_display()},无法发布", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 检查视频文件是否存在 if not video.local_path or not os.path.exists(video.local_path): return Response({ "code": 400, "message": "视频文件不存在,无法发布", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 自动发布 - 不依赖Celery任务 try: # 模拟上传到平台 platform_account = video.platform_account platform_name = platform_account.platform_name # 创建模拟的视频URL和ID video_url = f"https://example.com/{platform_name}/{video.id}" video_id = f"VID_{video.id}" # 更新视频状态 video.status = 'published' video.publish_time = timezone.now() video.video_url = video_url video.video_id = video_id video.save() logger.info(f"视频 {video.id} 已手动发布") # 更新知识库文档 platform_account = video.platform_account operator = platform_account.operator knowledge_base = KnowledgeBase.objects.filter( name__contains=operator.real_name, type='private' ).first() if knowledge_base: document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_name__contains=video.title, status='active' ).first() if document: logger.info(f"应当更新文档 {document.document_id} 的视频发布状态") return Response({ "code": 200, "message": "视频发布成功", "data": { "id": video.id, "title": video.title, "status": "published", "video_url": video_url, "publish_time": video.publish_time.strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: logger.error(f"手动发布视频失败: {str(e)}") return Response({ "code": 500, "message": f"发布失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)