# apps/knowledge_base/views.py import logging import json import traceback from django.db.models import Q from django.db import transaction from django.utils import timezone from django.http import Http404 from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from rest_framework.decorators import action import requests from apps.user.models import User from apps.knowledge_base.models import KnowledgeBase, KnowledgeBaseDocument from apps.common.services.external_api_service import ( create_external_dataset, delete_external_dataset, call_split_api_multiple, call_upload_api, call_delete_document_api, ExternalAPIError, get_external_document_list, get_external_document_paragraphs, call_delete_document_api ) from apps.knowledge_base.serializers import KnowledgeBaseSerializer, KnowledgeBaseDocumentSerializer from daren import settings from apps.user.authentication import CustomTokenAuthentication logger = logging.getLogger(__name__) class KnowledgeBaseViewSet(viewsets.ModelViewSet): serializer_class = KnowledgeBaseSerializer permission_classes = [IsAuthenticated] authentication_classes = [CustomTokenAuthentication] def list(self, request, *args, **kwargs): try: queryset = self.get_queryset() keyword = request.query_params.get('keyword', '') if keyword: query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \ Q(department__icontains=keyword) | Q(group__icontains=keyword) queryset = queryset.filter(query) try: page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) except ValueError: page = 1 page_size = 10 total = queryset.count() start = (page - 1) * page_size end = start + page_size paginated_queryset = queryset[start:end] serializer = self.get_serializer(paginated_queryset, many=True) data = serializer.data user = request.user # for item in data: # kb_id = item['id'] # explicit_permission = KBPermissionModel.objects.filter( # knowledge_base_id=kb_id, # user=user, # status='active' # ).first() # if explicit_permission: # item['permissions'] = { # 'can_read': explicit_permission.can_read, # 'can_edit': explicit_permission.can_edit, # 'can_delete': explicit_permission.can_delete # } # item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None # else: # # 默认所有用户都有权限 # item['permissions'] = { # 'can_read': True, # 'can_edit': True, # 'can_delete': True # } # item['expires_at'] = None # if keyword: # if 'name' in item and keyword.lower() in item['name'].lower(): # item['highlighted_name'] = item['name'].replace( # keyword, f'{keyword}' # ) # if 'desc' in item and item.get('desc') is not None: # desc_text = str(item['desc']) # if keyword.lower() in desc_text.lower(): # item['highlighted_desc'] = desc_text.replace( # keyword, f'{keyword}' # ) return Response({ "code": 200, "message": "获取知识库列表成功", "data": { "total": total, "page": page, "page_size": page_size, "keyword": keyword if keyword else None, "items": data } }) except Exception as e: logger.error(f"获取知识库列表失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"获取知识库列表失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def get_queryset(self): # 返回所有知识库,不做权限过滤 return KnowledgeBase.objects.all() def create(self, request, *args, **kwargs): try: # 简单验证知识库名称 name = request.data.get('name') if not name: return Response({ 'code': 400, 'message': '知识库名称不能为空', 'data': None }, status=status.HTTP_400_BAD_REQUEST) if KnowledgeBase.objects.filter(name=name).exists(): return Response({ 'code': 400, 'message': f'知识库名称 "{name}" 已存在', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 准备数据,忽略type、department、group字段 data = request.data.copy() # 序列化并创建知识库 serializer = self.get_serializer(data=data, context={'request': request}) if not serializer.is_valid(): logger.error(f"数据验证失败: {serializer.errors}") return Response({ 'code': 400, 'message': '数据验证失败', 'data': serializer.errors }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): # 保存知识库 knowledge_base = serializer.save() logger.info(f"知识库创建成功: id={knowledge_base.id}, name={knowledge_base.name}, user_id={knowledge_base.user_id}") # 创建外部知识库 external_id = create_external_dataset(knowledge_base) logger.info(f"外部知识库创建成功,获取ID: {external_id}") # 更新external_id knowledge_base.external_id = external_id knowledge_base.save() logger.info(f"更新knowledge_base的external_id为: {external_id}") # # 创建创建者权限记录 # KBPermissionModel.objects.create( # knowledge_base=knowledge_base, # user=request.user, # can_read=True, # can_edit=True, # can_delete=True, # granted_by=request.user, # status='active' # ) # logger.info(f"创建者权限创建成功") return Response({ 'code': 200, 'message': '知识库创建成功', 'data': { 'knowledge_base': serializer.data, 'external_id': knowledge_base.external_id } }) except ExternalAPIError as e: logger.error(f"外部知识库创建失败: {str(e)}") return Response({ 'code': 500, 'message': f'创建知识库失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"创建知识库失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'创建知识库失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def update(self, request, *args, **kwargs): try: instance = self.get_object() user = request.user # 所有用户都有编辑权限 with transaction.atomic(): serializer = self.get_serializer(instance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) self.perform_update(serializer) if instance.external_id: try: api_data = { "name": serializer.validated_data.get('name', instance.name), "desc": serializer.validated_data.get('desc', instance.desc), "type": "0", "meta": {}, "documents": [] } response = requests.put( f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}', json=api_data, headers={'Content-Type': 'application/json'}, ) if response.status_code != 200: raise ExternalAPIError(f"更新外部知识库失败,状态码: {response.status_code}, 响应: {response.text}") api_response = response.json() if not api_response.get('code') == 200: raise ExternalAPIError(f"更新外部知识库失败: {api_response.get('message', '未知错误')}") logger.info(f"外部知识库更新成功: {instance.external_id}") except requests.exceptions.Timeout: raise ExternalAPIError("请求超时,请稍后重试") except requests.exceptions.RequestException as e: raise ExternalAPIError(f"API请求失败: {str(e)}") except Exception as e: raise ExternalAPIError(f"更新外部知识库失败: {str(e)}") return Response({ "code": 200, "message": "知识库更新成功", "data": serializer.data }) except Http404: return Response({ "code": 404, "message": "知识库不存在", "data": None }, status=status.HTTP_404_NOT_FOUND) except ExternalAPIError as e: logger.error(f"更新外部知识库失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": str(e), "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"更新知识库失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"更新知识库失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def destroy(self, request, *args, **kwargs): try: instance = self.get_object() # 所有用户都有删除权限 external_delete_success = True external_error_message = None if instance.external_id: external_delete_success = delete_external_dataset(instance.external_id) if not external_delete_success: external_error_message = "外部知识库删除失败" logger.warning(f"外部知识库删除失败,将继续删除本地知识库: {external_error_message}") self.perform_destroy(instance) logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}") if not external_delete_success: return Response({ "code": 200, "message": f"知识库已删除,但外部知识库删除失败: {external_error_message}", "data": None }) return Response({ "code": 200, "message": "知识库删除成功", "data": None }) except Http404: return Response({ "code": 404, "message": "知识库不存在", "data": None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"删除知识库失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"删除知识库失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['get']) def permissions(self, request, pk=None): try: instance = self.get_object() # 所有用户都有所有权限 permissions_data = { "can_read": True, "can_edit": True, "can_delete": True } return Response({ "code": 200, "message": "获取权限信息成功", "data": { "knowledge_base_id": instance.id, "knowledge_base_name": instance.name, "permissions": permissions_data } }) except Http404: return Response({ "code": 404, "message": "知识库不存在", "data": None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"获取权限信息失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"获取权限信息失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def summary(self, request): try: user = request.user queryset = KnowledgeBase.objects.all() summaries = [] for kb in queryset: # 所有用户都有权限 permissions = { 'can_read': True, 'can_edit': True, 'can_delete': True } # explicit_permission = KBPermissionModel.objects.filter( # knowledge_base_id=kb.id, # user=user, # status='active' # ).first() # expires_at = None # if explicit_permission: # expires_at = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None summary = { 'id': str(kb.id), 'name': kb.name, 'desc': kb.desc, 'type': kb.type, 'department': kb.department, 'permissions': permissions, # 'expires_at': expires_at } summaries.append(summary) return Response({ 'code': 200, 'message': '获取知识库概要信息成功', 'data': summaries }) except Exception as e: logger.error(f"获取知识库概要信息失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取知识库概要信息失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def retrieve(self, request, *args, **kwargs): try: instance = self.get_object() serializer = self.get_serializer(instance) data = serializer.data user = request.user # 所有用户都有权限 data['permissions'] = { 'can_read': True, 'can_edit': True, 'can_delete': True } # explicit_permission = KBPermissionModel.objects.filter( # knowledge_base_id=instance.id, # user=user, # status='active' # ).first() # if explicit_permission: # data['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None # else: # data['expires_at'] = None return Response({ 'code': 200, 'message': '获取知识库详情成功', 'data': data }) except Exception as e: logger.error(f"获取知识库详情失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取知识库详情失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def search(self, request): try: keyword = request.query_params.get('keyword', '') if not keyword: return Response({ "code": 400, "message": "搜索关键字不能为空", "data": None }, status=status.HTTP_400_BAD_REQUEST) try: page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) except ValueError: page = 1 page_size = 10 query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \ Q(department__icontains=keyword) | Q(group__icontains=keyword) queryset = KnowledgeBase.objects.filter(query) user = request.user total = queryset.count() start = (page - 1) * page_size end = start + page_size paginated_queryset = queryset[start:end] serializer = self.get_serializer(paginated_queryset, many=True) data = serializer.data result_items = [] for item in data: # 所有用户都有权限 kb_permissions = { 'can_read': True, 'can_edit': True, 'can_delete': True } # explicit_permission = KBPermissionModel.objects.filter( # knowledge_base_id=item['id'], # user=user, # status='active' # ).first() # if explicit_permission: # item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None # else: # item['expires_at'] = None item['permissions'] = kb_permissions result_items.append(item) if 'name' in item and keyword.lower() in item['name'].lower(): item['highlighted_name'] = item['name'].replace( keyword, f'{keyword}' ) if 'desc' in item and item.get('desc') is not None: desc_text = str(item['desc']) if keyword.lower() in desc_text.lower(): item['highlighted_desc'] = desc_text.replace( keyword, f'{keyword}' ) return Response({ "code": 200, "message": "搜索知识库成功", "data": { "total": total, "page": page, "page_size": page_size, "keyword": keyword, "items": result_items } }) except Exception as e: logger.error(f"搜索知识库失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"搜索知识库失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['post']) def upload_document(self, request, pk=None): try: instance = self.get_object() user = request.user # 所有用户都有编辑权限 logger.info(f"请求内容: {request.data}") logger.info(f"请求FILES: {request.FILES}") files = [] if 'files' in request.FILES: files = request.FILES.getlist('files') elif 'file' in request.FILES: files = request.FILES.getlist('file') elif any(key.startswith('files[') for key in request.FILES): files = [file for key, file in request.FILES.items() if key.startswith('files[')] elif any(key.startswith('file[') for key in request.FILES): files = [file for key, file in request.FILES.items() if key.startswith('file[')] elif len(request.FILES) > 0: files = list(request.FILES.values()) if not files: return Response({ "code": 400, "message": "未找到上传文件,请确保表单字段名为'files'或'file'", "data": { "available_fields": list(request.FILES.keys()) } }, status=status.HTTP_400_BAD_REQUEST) logger.info(f"接收到 {len(files)} 个文件上传请求") saved_documents = [] failed_documents = [] if not instance.external_id: return Response({ "code": 400, "message": "知识库没有有效的external_id,请先创建知识库", "data": None }, status=status.HTTP_400_BAD_REQUEST) try: verify_url = f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}' verify_response = requests.get(verify_url) if verify_response.status_code != 200: logger.error(f"外部知识库不存在或无法访问: {instance.external_id}, 状态码: {verify_response.status_code}") return Response({ "code": 404, "message": f"外部知识库不存在或无法访问: {instance.external_id}", "data": None }, status=status.HTTP_404_NOT_FOUND) verify_data = verify_response.json() if verify_data.get('code') != 200: logger.error(f"验证外部知识库失败: {verify_data.get('message')}") return Response({ "code": verify_data.get('code', 500), "message": f"验证外部知识库失败: {verify_data.get('message', '未知错误')}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) logger.info(f"外部知识库验证成功: {instance.external_id}") except Exception as e: logger.error(f"验证外部知识库时出错: {str(e)}") return Response({ "code": 500, "message": f"验证外部知识库时出错: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # 批量处理所有文件 split_response = call_split_api_multiple(files) if not split_response or split_response.get('code') != 200: error_msg = f"文件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}" logger.error(error_msg) return Response({ "code": 400, "message": error_msg, "data": { "uploaded_count": 0, "failed_count": len(files), "total_files": len(files), "documents": [], "failed_documents": [{"name": file.name, "error": error_msg} for file in files] } }, status=status.HTTP_400_BAD_REQUEST) # 处理分割结果 documents_data = split_response.get('data', []) if not documents_data: logger.warning(f"批量分割API未返回文档数据") return Response({ "code": 400, "message": "文件分割未返回有效数据", "data": { "uploaded_count": 0, "failed_count": len(files), "total_files": len(files), "documents": [], "failed_documents": [{"name": file.name, "error": "分割未返回有效数据"} for file in files] } }, status=status.HTTP_400_BAD_REQUEST) logger.info(f"成功分割出 {len(documents_data)} 个文档,准备上传") # 处理每个文档 for doc in documents_data: doc_name = doc.get('name', '未命名文档') doc_content = doc.get('content', []) logger.info(f"处理文档: {doc_name}, 包含 {len(doc_content)} 个段落") if not doc_content: doc_content = [{ 'title': '文档内容', 'content': '文件内容无法自动分割,请检查文件格式。' }] doc_data = { "name": doc_name, "paragraphs": [] } for paragraph in doc_content: doc_data["paragraphs"].append({ "content": paragraph.get('content', ''), "title": paragraph.get('title', ''), "is_active": True, "problem_list": [] }) upload_response = call_upload_api(instance.external_id, doc_data) if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): document_id = upload_response['data']['id'] doc_record = KnowledgeBaseDocument.objects.create( knowledge_base=instance, document_id=document_id, document_name=doc_name, external_id=document_id, uploader_name=user.name ) saved_documents.append({ "id": str(doc_record.id), "name": doc_record.document_name, "external_id": doc_record.external_id }) logger.info(f"文档 '{doc_name}' 上传成功,ID: {document_id}") else: error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' logger.error(f"文档 '{doc_name}' 上传失败: {error_msg}") failed_documents.append({ "name": doc_name, "error": error_msg }) if saved_documents: return Response({ "code": 200, "message": f"文档上传完成,成功: {len(saved_documents)},失败: {len(failed_documents)}", "data": { "uploaded_count": len(saved_documents), "failed_count": len(failed_documents), "total_files": len(files), "documents": saved_documents, "failed_documents": failed_documents } }) else: return Response({ "code": 400, "message": f"所有文档上传失败", "data": { "uploaded_count": 0, "failed_count": len(failed_documents), "total_files": len(files), "documents": [], "failed_documents": failed_documents } }, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error(f"文档上传失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"文档上传失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['get']) def documents(self, request, pk=None): """获取知识库的文档列表""" try: instance = self.get_object() user = request.user # 所有用户都有查看权限 # 检查external_id是否存在 if not instance.external_id: return Response({ "code": 400, "message": "知识库没有有效的external_id", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 调用外部API获取文档列表 try: external_documents = get_external_document_list(instance.external_id) # 同步外部文档到本地数据库 for doc in external_documents: external_id = doc.get('id') doc_name = doc.get('name') if external_id and doc_name: kb_doc, created = KnowledgeBaseDocument.objects.update_or_create( knowledge_base=instance, external_id=external_id, defaults={ 'document_id': external_id, 'document_name': doc_name, 'status': 'active' if doc.get('is_active', True) else 'deleted' } ) if created: logger.info(f"同步创建文档: {doc_name}, ID: {external_id}") else: logger.info(f"同步更新文档: {doc_name}, ID: {external_id}") # 获取最新的本地文档数据 documents = KnowledgeBaseDocument.objects.filter( knowledge_base=instance, status='active' ).order_by('-create_time') # 构建响应数据 documents_data = [{ "id": str(doc.id), "document_id": doc.document_id, "name": doc.document_name, "external_id": doc.external_id, "created_at": doc.create_time.strftime('%Y-%m-%d %H:%M:%S'), "char_length": next((d.get('char_length', 0) for d in external_documents if d.get('id') == doc.external_id), 0), "paragraph_count": next((d.get('paragraph_count', 0) for d in external_documents if d.get('id') == doc.external_id), 0), "is_active": next((d.get('is_active', True) for d in external_documents if d.get('id') == doc.external_id), True), "uploader_name": doc.uploader_name } for doc in documents] return Response({ "code": 200, "message": "获取文档列表成功", "data": documents_data }) except ExternalAPIError as e: logger.error(f"获取文档列表失败: {str(e)}") return Response({ "code": 500, "message": str(e), "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"获取文档列表失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"获取文档列表失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['get']) def document_content(self, request, pk=None): """获取文档内容 - 段落列表""" try: knowledge_base = self.get_object() user = request.user # 所有用户都有查看权限 # 获取文档ID document_id = request.query_params.get('document_id') if not document_id: return Response({ "code": 400, "message": "缺少document_id参数", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证文档存在 document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_id=document_id, status='active' ).first() if not document: return Response({ "code": 404, "message": "文档不存在或已删除", "data": None }, status=status.HTTP_404_NOT_FOUND) # 调用外部API获取文档段落内容 try: paragraphs = get_external_document_paragraphs(knowledge_base.external_id, document.external_id) # 直接返回外部API的段落数据 return Response({ "code": 200, "message": "获取文档内容成功", "data": { "document_id": document_id, "name": document.document_name, "paragraphs": paragraphs } }) except ExternalAPIError as e: logger.error(f"获取文档段落内容失败: {str(e)}") return Response({ "code": 500, "message": str(e), "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f"获取文档内容失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"获取文档内容失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['delete']) def delete_document(self, request, pk=None): """删除知识库文档""" try: knowledge_base = self.get_object() user = request.user # 所有用户都有编辑权限 # 获取文档ID document_id = request.query_params.get('document_id') if not document_id: return Response({ "code": 400, "message": "缺少document_id参数", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证文档存在 document = KnowledgeBaseDocument.objects.filter( knowledge_base=knowledge_base, document_id=document_id, status='active' ).first() if not document: return Response({ "code": 404, "message": "文档不存在或已删除", "data": None }, status=status.HTTP_404_NOT_FOUND) # 调用外部API删除文档 external_id = document.external_id delete_result = call_delete_document_api(knowledge_base.external_id, external_id) # 无论外部API结果如何,都更新本地状态 document.status = 'deleted' document.save() # 检查外部API结果 if delete_result.get('code') != 200: logger.warning(f"外部API删除文档失败,但本地标记已更新: {delete_result.get('message')}") return Response({ "code": 200, "message": "文档在系统中已标记为删除,但外部API调用失败", "data": { "document_id": document_id, "name": document.document_name, "error": delete_result.get('message') } }) return Response({ "code": 200, "message": "文档删除成功", "data": { "document_id": document_id, "name": document.document_name } }) except Exception as e: logger.error(f"删除文档失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"删除文档失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)