daren/apps/knowledge_base/views.py

1270 lines
55 KiB
Python
Raw Normal View History

2025-05-29 17:21:16 +08:00
# # apps/knowledge_base/views.py
# import logging
# import json
# import traceback
# from django.db.models import Q
# from django.db import transaction
# from django.utils import timezone
# from django.http import Http404
# from rest_framework import viewsets, status
# from rest_framework.response import Response
# from rest_framework.permissions import IsAuthenticated
# from rest_framework.decorators import action
# import requests
# from apps.user.models import User
# from apps.knowledge_base.models import KnowledgeBase, KnowledgeBaseDocument
# from apps.permissions.models import KnowledgeBasePermission as KBPermissionModel
# from apps.permissions.services.permission_service import KnowledgeBasePermissionMixin
# from apps.knowledge_base.serializers import KnowledgeBaseSerializer, KnowledgeBaseDocumentSerializer
# from apps.common.services.external_api_service import (
# create_external_dataset, delete_external_dataset, call_split_api_multiple,
# call_upload_api, call_delete_document_api, ExternalAPIError,
# get_external_document_list, get_external_document_paragraphs,
# call_delete_document_api
# )
# from daren import settings
# logger = logging.getLogger(__name__)
# class KnowledgeBaseViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet):
# serializer_class = KnowledgeBaseSerializer
# permission_classes = [IsAuthenticated]
# def list(self, request, *args, **kwargs):
# try:
# queryset = self.get_queryset()
# keyword = request.query_params.get('keyword', '')
# if keyword:
# query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \
# Q(department__icontains=keyword) | Q(group__icontains=keyword)
# queryset = queryset.filter(query)
# try:
# page = int(request.query_params.get('page', 1))
# page_size = int(request.query_params.get('page_size', 10))
# except ValueError:
# page = 1
# page_size = 10
# total = queryset.count()
# start = (page - 1) * page_size
# end = start + page_size
# paginated_queryset = queryset[start:end]
# serializer = self.get_serializer(paginated_queryset, many=True)
# data = serializer.data
# user = request.user
# for item in data:
# kb_type = item['type']
# department = item.get('department')
# group = item.get('group')
# creator_id = item.get('user_id')
# kb_id = item['id']
# explicit_permission = KBPermissionModel.objects.filter(
# knowledge_base_id=kb_id,
# user=user,
# status='active'
# ).first()
# if explicit_permission:
# item['permissions'] = {
# 'can_read': explicit_permission.can_read,
# 'can_edit': explicit_permission.can_edit,
# 'can_delete': explicit_permission.can_delete
# }
# item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
# else:
# item['permissions'] = {
# 'can_read': self._can_read(kb_type, user, department, group, creator_id, kb_id),
# 'can_edit': self._can_edit(kb_type, user, department, group, creator_id, kb_id),
# 'can_delete': self._can_delete(kb_type, user, department, group, creator_id, kb_id)
# }
# item['expires_at'] = None if kb_type == 'admin' else None
# if keyword:
# if 'name' in item and keyword.lower() in item['name'].lower():
# item['highlighted_name'] = item['name'].replace(
# keyword, f'<em class="highlight">{keyword}</em>'
# )
# if 'desc' in item and item.get('desc') is not None:
# desc_text = str(item['desc'])
# if keyword.lower() in desc_text.lower():
# item['highlighted_desc'] = desc_text.replace(
# keyword, f'<em class="highlight">{keyword}</em>'
# )
# return Response({
# "code": 200,
# "message": "获取知识库列表成功",
# "data": {
# "total": total,
# "page": page,
# "page_size": page_size,
# "keyword": keyword if keyword else None,
# "items": data
# }
# })
# except Exception as e:
# logger.error(f"获取知识库列表失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"获取知识库列表失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# def get_queryset(self):
# user = self.request.user
# queryset = KnowledgeBase.objects.all()
# permission_conditions = Q()
# permission_conditions |= Q(type='admin')
# permission_conditions |= Q(user_id=user.id)
# active_permissions = KBPermissionModel.objects.filter(
# user=user,
# can_read=True,
# status='active',
# expires_at__gt=timezone.now()
# ).values_list('knowledge_base_id', flat=True)
# if active_permissions:
# permission_conditions |= Q(id__in=active_permissions)
# if user.role == 'admin':
# permission_conditions |= ~Q(type='private') | Q(user_id=user.id)
# elif user.role == 'leader':
# permission_conditions |= Q(type__in=['leader', 'member'], department=user.department)
# elif user.role in ['member', 'user']:
# permission_conditions |= Q(type='leader', department=user.department)
# permission_conditions |= Q(type='member', department=user.department, group=user.group)
# return queryset.filter(permission_conditions).distinct()
# def create(self, request, *args, **kwargs):
# try:
# name = request.data.get('name')
# if not name:
# return Response({
# 'code': 400,
# 'message': '知识库名称不能为空',
# 'data': None
# }, status=status.HTTP_400_BAD_REQUEST)
# if KnowledgeBase.objects.filter(name=name).exists():
# return Response({
# 'code': 400,
# 'message': f'知识库名称 "{name}" 已存在',
# 'data': None
# }, status=status.HTTP_400_BAD_REQUEST)
# user = request.user
# type = request.data.get('type', 'private')
# department = request.data.get('department')
# group = request.data.get('group')
# if type == 'admin':
# department = None
# group = None
# elif type == 'secret':
# if user.role != 'admin':
# return Response({
# 'code': 403,
# 'message': '只有管理员可以创建保密级知识库',
# 'data': None
# }, status=status.HTTP_403_FORBIDDEN)
# department = None
# group = None
# elif type == 'leader':
# if user.role != 'admin':
# return Response({
# 'code': 403,
# 'message': '只有管理员可以创建组长级知识库',
# 'data': None
# }, status=status.HTTP_403_FORBIDDEN)
# if not department:
# return Response({
# 'code': 400,
# 'message': '创建组长级知识库时必须指定部门',
# 'data': None
# }, status=status.HTTP_400_BAD_REQUEST)
# elif type == 'member':
# if user.role not in ['admin', 'leader']:
# return Response({
# 'code': 403,
# 'message': '只有管理员和组长可以创建成员级知识库',
# 'data': None
# }, status=status.HTTP_403_FORBIDDEN)
# if user.role == 'admin' and not department:
# return Response({
# 'code': 400,
# 'message': '管理员创建成员知识库时必须指定部门',
# 'data': None
# }, status=status.HTTP_400_BAD_REQUEST)
# elif user.role == 'leader':
# department = user.department
# if not group:
# return Response({
# 'code': 400,
# 'message': '创建成员知识库时必须指定组',
# 'data': None
# }, status=status.HTTP_400_BAD_REQUEST)
# elif type == 'private':
# department = None
# group = None
# data = request.data.copy()
# data['department'] = department
# data['group'] = group
# serializer = self.get_serializer(data=data, context={'request': request})
# if not serializer.is_valid():
# logger.error(f"数据验证失败: {serializer.errors}")
# return Response({
# 'code': 400,
# 'message': '数据验证失败',
# 'data': serializer.errors
# }, status=status.HTTP_400_BAD_REQUEST)
# with transaction.atomic():
# knowledge_base = serializer.save()
# logger.info(f"知识库创建成功: id={knowledge_base.id}, name={knowledge_base.name}, user_id={knowledge_base.user_id}")
# external_id = create_external_dataset(knowledge_base)
# logger.info(f"外部知识库创建成功获取ID: {external_id}")
# knowledge_base.external_id = external_id
# knowledge_base.save()
# logger.info(f"更新knowledge_base的external_id为: {external_id}")
# KBPermissionModel.objects.create(
# knowledge_base=knowledge_base,
# user=request.user,
# can_read=True,
# can_edit=True,
# can_delete=True,
# granted_by=request.user,
# status='active'
# )
# logger.info(f"创建者权限创建成功")
# permissions = []
# if type == 'admin':
# users_query = User.objects.exclude(id=request.user.id)
# permissions = [
# KBPermissionModel(
# knowledge_base=knowledge_base,
# user=user,
# can_read=True,
# can_edit=True,
# can_delete=True,
# granted_by=request.user,
# status='active'
# ) for user in users_query
# ]
# elif type == 'secret':
# users_query = User.objects.filter(role='admin').exclude(id=request.user.id)
# permissions = [
# KBPermissionModel(
# knowledge_base=knowledge_base,
# user=user,
# can_read=True,
# can_edit=self._can_edit(type, user),
# can_delete=self._can_delete(type, user),
# granted_by=request.user,
# status='active'
# ) for user in users_query
# ]
# elif type == 'leader':
# users_query = User.objects.filter(
# Q(role='admin') | Q(role='leader', department=department)
# ).exclude(id=request.user.id)
# permissions = [
# KBPermissionModel(
# knowledge_base=knowledge_base,
# user=user,
# can_read=True,
# can_edit=self._can_edit(type, user),
# can_delete=self._can_delete(type, user),
# granted_by=request.user,
# status='active'
# ) for user in users_query
# ]
# elif type == 'member':
# users_query = User.objects.filter(
# Q(role='admin') | Q(department=department, role='leader') |
# Q(department=department, group=group, role='member')
# ).exclude(id=request.user.id)
# permissions = [
# KBPermissionModel(
# knowledge_base=knowledge_base,
# user=user,
# can_read=True,
# can_edit=self._can_edit(type, user),
# can_delete=self._can_delete(type, user),
# granted_by=request.user,
# status='active'
# ) for user in users_query
# ]
# if permissions:
# KBPermissionModel.objects.bulk_create(permissions)
# logger.info(f"{type}类型权限创建完成: {len(permissions)}条记录")
# return Response({
# 'code': 200,
# 'message': '知识库创建成功',
# 'data': {
# 'knowledge_base': serializer.data,
# 'external_id': knowledge_base.external_id
# }
# })
# except ExternalAPIError as e:
# logger.error(f"外部知识库创建失败: {str(e)}")
# return Response({
# 'code': 500,
# 'message': f'创建知识库失败: {str(e)}',
# 'data': None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# except Exception as e:
# logger.error(f"创建知识库失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# 'code': 500,
# 'message': f'创建知识库失败: {str(e)}',
# 'data': None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# def update(self, request, *args, **kwargs):
# try:
# instance = self.get_object()
# user = request.user
# if not self.check_knowledge_base_permission(instance, user, 'edit'):
# return Response({
# "code": 403,
# "message": "没有编辑权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# with transaction.atomic():
# serializer = self.get_serializer(instance, data=request.data, partial=True)
# serializer.is_valid(raise_exception=True)
# self.perform_update(serializer)
# if instance.external_id:
# try:
# api_data = {
# "name": serializer.validated_data.get('name', instance.name),
# "desc": serializer.validated_data.get('desc', instance.desc),
# "type": "0",
# "meta": {},
# "documents": []
# }
# response = requests.put(
# f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}',
# json=api_data,
# headers={'Content-Type': 'application/json'},
# )
# if response.status_code != 200:
# raise ExternalAPIError(f"更新外部知识库失败,状态码: {response.status_code}, 响应: {response.text}")
# api_response = response.json()
# if not api_response.get('code') == 200:
# raise ExternalAPIError(f"更新外部知识库失败: {api_response.get('message', '未知错误')}")
# logger.info(f"外部知识库更新成功: {instance.external_id}")
# except requests.exceptions.Timeout:
# raise ExternalAPIError("请求超时,请稍后重试")
# except requests.exceptions.RequestException as e:
# raise ExternalAPIError(f"API请求失败: {str(e)}")
# except Exception as e:
# raise ExternalAPIError(f"更新外部知识库失败: {str(e)}")
# return Response({
# "code": 200,
# "message": "知识库更新成功",
# "data": serializer.data
# })
# except Http404:
# return Response({
# "code": 404,
# "message": "知识库不存在",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# except ExternalAPIError as e:
# logger.error(f"更新外部知识库失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": str(e),
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# except Exception as e:
# logger.error(f"更新知识库失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"更新知识库失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# def destroy(self, request, *args, **kwargs):
# try:
# instance = self.get_object()
# user = request.user
# if not self.check_knowledge_base_permission(instance, user, 'delete'):
# return Response({
# "code": 403,
# "message": "没有删除权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# external_delete_success = True
# external_error_message = None
# if instance.external_id:
# external_delete_success = delete_external_dataset(instance.external_id)
# if not external_delete_success:
# external_error_message = "外部知识库删除失败"
# logger.warning(f"外部知识库删除失败,将继续删除本地知识库: {external_error_message}")
# self.perform_destroy(instance)
# logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}")
# if not external_delete_success:
# return Response({
# "code": 200,
# "message": f"知识库已删除,但外部知识库删除失败: {external_error_message}",
# "data": None
# })
# return Response({
# "code": 200,
# "message": "知识库删除成功",
# "data": None
# })
# except Http404:
# return Response({
# "code": 404,
# "message": "知识库不存在",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# except Exception as e:
# logger.error(f"删除知识库失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"删除知识库失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['get'])
# def permissions(self, request, pk=None):
# try:
# instance = self.get_object()
# user = request.user
# permissions_data = {
# "can_read": self.check_knowledge_base_permission(instance, user, 'read'),
# "can_edit": self.check_knowledge_base_permission(instance, user, 'edit'),
# "can_delete": self.check_knowledge_base_permission(instance, user, 'delete')
# }
# return Response({
# "code": 200,
# "message": "获取权限信息成功",
# "data": {
# "knowledge_base_id": instance.id,
# "knowledge_base_name": instance.name,
# "permissions": permissions_data
# }
# })
# except Http404:
# return Response({
# "code": 404,
# "message": "知识库不存在",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# except Exception as e:
# logger.error(f"获取权限信息失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"获取权限信息失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=False, methods=['get'])
# def summary(self, request):
# try:
# user = request.user
# queryset = KnowledgeBase.objects.exclude(type='secret')
# summaries = []
# for kb in queryset:
# permissions = {
# 'can_read': self.check_knowledge_base_permission(kb, user, 'read'),
# 'can_edit': self.check_knowledge_base_permission(kb, user, 'edit'),
# 'can_delete': self.check_knowledge_base_permission(kb, user, 'delete')
# }
# explicit_permission = KBPermissionModel.objects.filter(
# knowledge_base_id=kb.id,
# user=user,
# status='active'
# ).first()
# expires_at = None
# if explicit_permission:
# expires_at = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
# elif kb.type == 'admin':
# expires_at = None
# summary = {
# 'id': str(kb.id),
# 'name': kb.name,
# 'desc': kb.desc,
# 'type': kb.type,
# 'department': kb.department,
# 'permissions': permissions,
# 'expires_at': expires_at
# }
# summaries.append(summary)
# return Response({
# 'code': 200,
# 'message': '获取知识库概要信息成功',
# 'data': summaries
# })
# except Exception as e:
# logger.error(f"获取知识库概要信息失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# 'code': 500,
# 'message': f'获取知识库概要信息失败: {str(e)}',
# 'data': None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# def retrieve(self, request, *args, **kwargs):
# try:
# instance = self.get_object()
# serializer = self.get_serializer(instance)
# data = serializer.data
# user = request.user
# data['permissions'] = {
# 'can_read': self.check_knowledge_base_permission(instance, user, 'read'),
# 'can_edit': self.check_knowledge_base_permission(instance, user, 'edit'),
# 'can_delete': self.check_knowledge_base_permission(instance, user, 'delete')
# }
# explicit_permission = KBPermissionModel.objects.filter(
# knowledge_base_id=instance.id,
# user=user,
# status='active'
# ).first()
# if explicit_permission:
# data['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
# else:
# data['expires_at'] = None if instance.type == 'admin' else None
# return Response({
# 'code': 200,
# 'message': '获取知识库详情成功',
# 'data': data
# })
# except Exception as e:
# logger.error(f"获取知识库详情失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# 'code': 500,
# 'message': f'获取知识库详情失败: {str(e)}',
# 'data': None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=False, methods=['get'])
# def search(self, request):
# try:
# keyword = request.query_params.get('keyword', '')
# if not keyword:
# return Response({
# "code": 400,
# "message": "搜索关键字不能为空",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# try:
# page = int(request.query_params.get('page', 1))
# page_size = int(request.query_params.get('page_size', 10))
# except ValueError:
# page = 1
# page_size = 10
# query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \
# Q(department__icontains=keyword) | Q(group__icontains=keyword)
# queryset = KnowledgeBase.objects.filter(query).exclude(type='secret')
# user = request.user
# active_permissions = KBPermissionModel.objects.filter(
# user=user,
# status='active',
# expires_at__gt=timezone.now()
# ).select_related('knowledge_base')
# permission_map = {
# str(perm.knowledge_base.id): {
# 'can_read': perm.can_read,
# 'can_edit': perm.can_edit,
# 'can_delete': perm.can_delete
# }
# for perm in active_permissions
# }
# total = queryset.count()
# start = (page - 1) * page_size
# end = start + page_size
# paginated_queryset = queryset[start:end]
# serializer = self.get_serializer(paginated_queryset, many=True)
# data = serializer.data
# result_items = []
# for item in data:
# temp_kb = KnowledgeBase(
# id=item['id'],
# type=item['type'],
# department=item.get('department'),
# group=item.get('group'),
# user_id=item.get('user_id')
# )
# explicit_permission = KBPermissionModel.objects.filter(
# knowledge_base_id=item['id'],
# user=user,
# status='active'
# ).first()
# if explicit_permission:
# kb_permissions = {
# 'can_read': explicit_permission.can_read,
# 'can_edit': explicit_permission.can_edit,
# 'can_delete': explicit_permission.can_delete
# }
# item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
# else:
# kb_permissions = {
# 'can_read': self.check_knowledge_base_permission(temp_kb, user, 'read'),
# 'can_edit': self.check_knowledge_base_permission(temp_kb, user, 'edit'),
# 'can_delete': self.check_knowledge_base_permission(temp_kb, user, 'delete')
# }
# item['expires_at'] = None if item['type'] == 'admin' else None
# item['permissions'] = kb_permissions
# if kb_permissions['can_read']:
# result_items.append(item)
# else:
# summary_info = {
# 'id': item['id'],
# 'name': item['name'],
# 'type': item['type'],
# 'department': item.get('department'),
# 'permissions': kb_permissions
# }
# result_items.append(summary_info)
# if 'name' in item and keyword.lower() in item['name'].lower():
# item['highlighted_name'] = item['name'].replace(
# keyword, f'<em class="highlight">{keyword}</em>'
# )
# if 'desc' in item and item.get('desc') is not None:
# desc_text = str(item['desc'])
# if keyword.lower() in desc_text.lower():
# item['highlighted_desc'] = desc_text.replace(
# keyword, f'<em class="highlight">{keyword}</em>'
# )
# return Response({
# "code": 200,
# "message": "搜索知识库成功",
# "data": {
# "total": total,
# "page": page,
# "page_size": page_size,
# "keyword": keyword,
# "items": result_items
# }
# })
# except Exception as e:
# logger.error(f"搜索知识库失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"搜索知识库失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['post'])
# def change_type(self, request, pk=None):
# try:
# instance = self.get_object()
# user = request.user
# if not self.check_knowledge_base_permission(instance, user, 'edit'):
# return Response({
# "code": 403,
# "message": "没有修改权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# new_type = request.data.get('type')
# if not new_type:
# return Response({
# "code": 400,
# "message": "新类型不能为空",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# valid_types = ['private', 'admin', 'secret', 'leader', 'member']
# if new_type not in valid_types:
# return Response({
# "code": 400,
# "message": f"无效的知识库类型,可选值: {', '.join(valid_types)}",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# if new_type == 'leader' and not user.role == 'admin':
# if new_type not in ['private', 'member']:
# return Response({
# "code": 403,
# "message": "组长只能将知识库设置为private或member类型",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# department = request.data.get('department')
# group = request.data.get('group')
# if new_type == 'leader' and not user.role == 'admin':
# if department and department != user.department:
# return Response({
# "code": 403,
# "message": "组长只能为本部门设置知识库",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# department = user.department
# if new_type == 'leader':
# if not department:
# return Response({
# "code": 400,
# "message": "组长级知识库必须指定部门",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# if new_type == 'member':
# if not department:
# return Response({
# "code": 400,
# "message": "成员级知识库必须指定部门",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# if not group:
# return Response({
# "code": 400,
# "message": "成员级知识库必须指定组",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# if new_type in ['admin', 'secret']:
# department = None
# group = None
# if new_type == 'private':
# if department is None:
# department = instance.department
# if group is None:
# group = instance.group
# instance.type = new_type
# instance.department = department
# instance.group = group
# instance.save()
# return Response({
# "code": 200,
# "message": f"知识库类型已更新为{new_type}",
# "data": {
# "id": instance.id,
# "name": instance.name,
# "type": instance.type,
# "department": instance.department,
# "group": instance.group
# }
# })
# except Http404:
# return Response({
# "code": 404,
# "message": "知识库不存在",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# except Exception as e:
# logger.error(f"修改知识库类型失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"修改知识库类型失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['post'])
# def upload_document(self, request, pk=None):
# try:
# instance = self.get_object()
# user = request.user
# if not self.check_knowledge_base_permission(instance, user, 'edit'):
# return Response({
# "code": 403,
# "message": "没有编辑权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# logger.info(f"请求内容: {request.data}")
# logger.info(f"请求FILES: {request.FILES}")
# files = []
# if 'files' in request.FILES:
# files = request.FILES.getlist('files')
# elif 'file' in request.FILES:
# files = request.FILES.getlist('file')
# elif any(key.startswith('files[') for key in request.FILES):
# files = [file for key, file in request.FILES.items() if key.startswith('files[')]
# elif any(key.startswith('file[') for key in request.FILES):
# files = [file for key, file in request.FILES.items() if key.startswith('file[')]
# elif len(request.FILES) > 0:
# files = list(request.FILES.values())
# if not files:
# return Response({
# "code": 400,
# "message": "未找到上传文件,请确保表单字段名为'files'或'file'",
# "data": {
# "available_fields": list(request.FILES.keys())
# }
# }, status=status.HTTP_400_BAD_REQUEST)
# logger.info(f"接收到 {len(files)} 个文件上传请求")
# saved_documents = []
# failed_documents = []
# if not instance.external_id:
# return Response({
# "code": 400,
# "message": "知识库没有有效的external_id请先创建知识库",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# try:
# verify_url = f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}'
# verify_response = requests.get(verify_url)
# if verify_response.status_code != 200:
# logger.error(f"外部知识库不存在或无法访问: {instance.external_id}, 状态码: {verify_response.status_code}")
# return Response({
# "code": 404,
# "message": f"外部知识库不存在或无法访问: {instance.external_id}",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# verify_data = verify_response.json()
# if verify_data.get('code') != 200:
# logger.error(f"验证外部知识库失败: {verify_data.get('message')}")
# return Response({
# "code": verify_data.get('code', 500),
# "message": f"验证外部知识库失败: {verify_data.get('message', '未知错误')}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# logger.info(f"外部知识库验证成功: {instance.external_id}")
# except Exception as e:
# logger.error(f"验证外部知识库时出错: {str(e)}")
# return Response({
# "code": 500,
# "message": f"验证外部知识库时出错: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# # 批量处理所有文件
# split_response = call_split_api_multiple(files)
# if not split_response or split_response.get('code') != 200:
# error_msg = f"文件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}"
# logger.error(error_msg)
# return Response({
# "code": 400,
# "message": error_msg,
# "data": {
# "uploaded_count": 0,
# "failed_count": len(files),
# "total_files": len(files),
# "documents": [],
# "failed_documents": [{"name": file.name, "error": error_msg} for file in files]
# }
# }, status=status.HTTP_400_BAD_REQUEST)
# # 处理分割结果
# documents_data = split_response.get('data', [])
# if not documents_data:
# logger.warning(f"批量分割API未返回文档数据")
# return Response({
# "code": 400,
# "message": "文件分割未返回有效数据",
# "data": {
# "uploaded_count": 0,
# "failed_count": len(files),
# "total_files": len(files),
# "documents": [],
# "failed_documents": [{"name": file.name, "error": "分割未返回有效数据"} for file in files]
# }
# }, status=status.HTTP_400_BAD_REQUEST)
# logger.info(f"成功分割出 {len(documents_data)} 个文档,准备上传")
# # 处理每个文档
# for doc in documents_data:
# doc_name = doc.get('name', '未命名文档')
# doc_content = doc.get('content', [])
# logger.info(f"处理文档: {doc_name}, 包含 {len(doc_content)} 个段落")
# if not doc_content:
# doc_content = [{
# 'title': '文档内容',
# 'content': '文件内容无法自动分割,请检查文件格式。'
# }]
# doc_data = {
# "name": doc_name,
# "paragraphs": []
# }
# for paragraph in doc_content:
# doc_data["paragraphs"].append({
# "content": paragraph.get('content', ''),
# "title": paragraph.get('title', ''),
# "is_active": True,
# "problem_list": []
# })
# upload_response = call_upload_api(instance.external_id, doc_data)
# if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# document_id = upload_response['data']['id']
# doc_record = KnowledgeBaseDocument.objects.create(
# knowledge_base=instance,
# document_id=document_id,
# document_name=doc_name,
# external_id=document_id,
# uploader_name=user.name
# )
# saved_documents.append({
# "id": str(doc_record.id),
# "name": doc_record.document_name,
# "external_id": doc_record.external_id
# })
# logger.info(f"文档 '{doc_name}' 上传成功ID: {document_id}")
# else:
# error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
# logger.error(f"文档 '{doc_name}' 上传失败: {error_msg}")
# failed_documents.append({
# "name": doc_name,
# "error": error_msg
# })
# if saved_documents:
# return Response({
# "code": 200,
# "message": f"文档上传完成,成功: {len(saved_documents)},失败: {len(failed_documents)}",
# "data": {
# "uploaded_count": len(saved_documents),
# "failed_count": len(failed_documents),
# "total_files": len(files),
# "documents": saved_documents,
# "failed_documents": failed_documents
# }
# })
# else:
# return Response({
# "code": 400,
# "message": f"所有文档上传失败",
# "data": {
# "uploaded_count": 0,
# "failed_count": len(failed_documents),
# "total_files": len(files),
# "documents": [],
# "failed_documents": failed_documents
# }
# }, status=status.HTTP_400_BAD_REQUEST)
# except Exception as e:
# logger.error(f"文档上传失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"文档上传失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['get'])
# def documents(self, request, pk=None):
# """获取知识库的文档列表"""
# try:
# instance = self.get_object()
# user = request.user
# # 权限检查
# if not self.check_knowledge_base_permission(instance, user, 'read'):
# return Response({
# "code": 403,
# "message": "没有查看权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# # 检查external_id是否存在
# if not instance.external_id:
# return Response({
# "code": 400,
# "message": "知识库没有有效的external_id",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# # 调用外部API获取文档列表
# try:
# external_documents = get_external_document_list(instance.external_id)
# # 同步外部文档到本地数据库
# for doc in external_documents:
# external_id = doc.get('id')
# doc_name = doc.get('name')
# if external_id and doc_name:
# kb_doc, created = KnowledgeBaseDocument.objects.update_or_create(
# knowledge_base=instance,
# external_id=external_id,
# defaults={
# 'document_id': external_id,
# 'document_name': doc_name,
# 'status': 'active' if doc.get('is_active', True) else 'deleted'
# }
# )
# if created:
# logger.info(f"同步创建文档: {doc_name}, ID: {external_id}")
# else:
# logger.info(f"同步更新文档: {doc_name}, ID: {external_id}")
# # 获取最新的本地文档数据
# documents = KnowledgeBaseDocument.objects.filter(
# knowledge_base=instance,
# status='active'
# ).order_by('-create_time')
# # 构建响应数据
# documents_data = [{
# "id": str(doc.id),
# "document_id": doc.document_id,
# "name": doc.document_name,
# "external_id": doc.external_id,
# "created_at": doc.create_time.strftime('%Y-%m-%d %H:%M:%S'),
# "char_length": next((d.get('char_length', 0) for d in external_documents if d.get('id') == doc.external_id), 0),
# "paragraph_count": next((d.get('paragraph_count', 0) for d in external_documents if d.get('id') == doc.external_id), 0),
# "is_active": next((d.get('is_active', True) for d in external_documents if d.get('id') == doc.external_id), True),
# "uploader_name": doc.uploader_name
# } for doc in documents]
# return Response({
# "code": 200,
# "message": "获取文档列表成功",
# "data": documents_data
# })
# except ExternalAPIError as e:
# logger.error(f"获取文档列表失败: {str(e)}")
# return Response({
# "code": 500,
# "message": str(e),
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# except Exception as e:
# logger.error(f"获取文档列表失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"获取文档列表失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['get'])
# def document_content(self, request, pk=None):
# """获取文档内容 - 段落列表"""
# try:
# knowledge_base = self.get_object()
# user = request.user
# # 权限检查
# if not self.check_knowledge_base_permission(knowledge_base, user, 'read'):
# return Response({
# "code": 403,
# "message": "没有查看权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# # 获取文档ID
# document_id = request.query_params.get('document_id')
# if not document_id:
# return Response({
# "code": 400,
# "message": "缺少document_id参数",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# # 验证文档存在
# document = KnowledgeBaseDocument.objects.filter(
# knowledge_base=knowledge_base,
# document_id=document_id,
# status='active'
# ).first()
# if not document:
# return Response({
# "code": 404,
# "message": "文档不存在或已删除",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# # 调用外部API获取文档段落内容
# try:
# paragraphs = get_external_document_paragraphs(knowledge_base.external_id, document.external_id)
# # 直接返回外部API的段落数据
# return Response({
# "code": 200,
# "message": "获取文档内容成功",
# "data": {
# "document_id": document_id,
# "name": document.document_name,
# "paragraphs": paragraphs
# }
# })
# except ExternalAPIError as e:
# logger.error(f"获取文档段落内容失败: {str(e)}")
# return Response({
# "code": 500,
# "message": str(e),
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# except Exception as e:
# logger.error(f"获取文档内容失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"获取文档内容失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# @action(detail=True, methods=['delete'])
# def delete_document(self, request, pk=None):
# """删除知识库文档"""
# try:
# knowledge_base = self.get_object()
# user = request.user
# # 权限检查
# if not self.check_knowledge_base_permission(knowledge_base, user, 'edit'):
# return Response({
# "code": 403,
# "message": "没有编辑权限",
# "data": None
# }, status=status.HTTP_403_FORBIDDEN)
# # 获取文档ID
# document_id = request.query_params.get('document_id')
# if not document_id:
# return Response({
# "code": 400,
# "message": "缺少document_id参数",
# "data": None
# }, status=status.HTTP_400_BAD_REQUEST)
# # 验证文档存在
# document = KnowledgeBaseDocument.objects.filter(
# knowledge_base=knowledge_base,
# document_id=document_id,
# status='active'
# ).first()
# if not document:
# return Response({
# "code": 404,
# "message": "文档不存在或已删除",
# "data": None
# }, status=status.HTTP_404_NOT_FOUND)
# # 调用外部API删除文档
# external_id = document.external_id
# delete_result = call_delete_document_api(knowledge_base.external_id, external_id)
# # 无论外部API结果如何都更新本地状态
# document.status = 'deleted'
# document.save()
# # 检查外部API结果
# if delete_result.get('code') != 200:
# logger.warning(f"外部API删除文档失败但本地标记已更新: {delete_result.get('message')}")
# return Response({
# "code": 200,
# "message": "文档在系统中已标记为删除但外部API调用失败",
# "data": {
# "document_id": document_id,
# "name": document.document_name,
# "error": delete_result.get('message')
# }
# })
# return Response({
# "code": 200,
# "message": "文档删除成功",
# "data": {
# "document_id": document_id,
# "name": document.document_name
# }
# })
# except Exception as e:
# logger.error(f"删除文档失败: {str(e)}")
# logger.error(traceback.format_exc())
# return Response({
# "code": 500,
# "message": f"删除文档失败: {str(e)}",
# "data": None
# }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)