daren_project/user_management/views.py

2984 lines
120 KiB
Python
Raw Normal View History

from rest_framework import viewsets, status
from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.permissions import IsAuthenticated, AllowAny, IsAdminUser
from rest_framework.response import Response
from rest_framework.exceptions import APIException, PermissionDenied, ValidationError, NotFound
from rest_framework.authentication import TokenAuthentication
from django.utils import timezone
from django.db import connection
from django.db.models import Q, Max, Count, F
from datetime import timedelta, datetime
import mysql.connector
from django.contrib.auth import get_user_model, authenticate, login, logout
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from rest_framework.authtoken.models import Token
import requests
import json
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
import sys
import random
import string
import time
import logging
import os
from rest_framework.test import APIRequestFactory
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.http import Http404
from django.db import IntegrityError
from channels.exceptions import ChannelFull
from django.conf import settings
from django.shortcuts import get_object_or_404
from django.db import models
from rest_framework.views import APIView
from django.core.validators import validate_email
# from django.core.exceptions import ValidationError
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
import uuid
from rest_framework import serializers
import traceback
2025-03-17 16:46:37 +08:00
# 添加模型导入
from .models import (
User,
Data, # 替换原来的 AdminData, LeaderData, MemberData
Permission, # 替换原来的 DataPermission, TablePermission
ChatHistory,
KnowledgeBase,
Notification,
KnowledgeBasePermission as KBPermissionModel
)
from .serializers import (
UserSerializer,
DataSerializer, # 需要更新
PermissionSerializer, # 需要更新
ChatHistorySerializer,
KnowledgeBaseSerializer,
KnowledgePermissionSerializer, # 添加这个导入
NotificationSerializer
)
# 导入自定义权限类
from .permissions import ResourceCRUDPermission, PermissionRequestPermission, DataPermission, KnowledgeBasePermission as KBPermissionClass
from .exceptions import ExternalAPIError
# 获取正确的用户模型
User = get_user_model()
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler() # 输出到控制台
]
)
class ChatHistoryViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated]
queryset = ChatHistory.objects.all()
def get_queryset(self):
"""确保用户只能看到自己的未删除的聊天记录"""
return ChatHistory.objects.filter(
user=self.request.user,
is_deleted=False
)
def list(self, request):
"""获取聊天记录列表"""
try:
# 获取查询参数
dataset_id = request.query_params.get('dataset_id')
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
query = self.get_queryset()
if dataset_id:
# 获取特定知识库的完整对话历史
records = query.filter(
knowledge_base__id=dataset_id
).order_by('created_at')
conversation = {
'dataset_id': dataset_id,
'dataset_name': records.first().knowledge_base.name if records.exists() else None,
'messages': [{
'id': record.id,
'role': record.role,
'content': record.content,
'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S')
} for record in records]
}
return Response({
'code': 200,
'message': '获取成功',
'data': conversation
})
else:
# 获取所有对话的概览
latest_chats = query.values(
'conversation_id',
'knowledge_base__id',
'knowledge_base__name'
).annotate(
latest_id=Max('id'),
message_count=Count('id'),
last_message=Max('created_at')
).order_by('-last_message')
# 计算分页
total = latest_chats.count()
start = (page - 1) * page_size
end = start + page_size
# 获取分页数据
chats = latest_chats[start:end]
results = []
for chat in chats:
latest_record = ChatHistory.objects.get(id=chat['latest_id'])
results.append({
'conversation_id': chat['conversation_id'],
'dataset_id': str(chat['knowledge_base__id']),
'dataset_name': chat['knowledge_base__name'],
'message_count': chat['message_count'],
'last_message': latest_record.content,
'last_time': chat['last_message'].strftime('%Y-%m-%d %H:%M:%S')
})
return Response({
'code': 200,
'message': '获取成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"获取聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def create(self, request):
"""创建聊天记录"""
try:
data = request.data
required_fields = ['dataset_id', 'dataset_name', 'question', 'answer']
# 检查必填字段
for field in required_fields:
if field not in data:
return Response({
'code': 400,
'message': f'缺少必填字段: {field}',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取或创建对话ID
conversation_id = data.get('conversation_id', str(uuid.uuid4()))
# 获取知识库 - 不进行 UUID 转换
try:
knowledge_base = KnowledgeBase.objects.filter(id=data['dataset_id']).first()
if not knowledge_base:
return Response({
'code': 404,
'message': '知识库不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response({
'code': 400,
'message': f'无效的知识库ID: {str(e)}',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户问题记录
question_record = ChatHistory.objects.create(
user=request.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
role='user',
content=data['question'],
metadata={'model_name': data.get('model_name', 'default')}
)
# 创建AI回答记录
answer_record = ChatHistory.objects.create(
user=request.user,
knowledge_base=knowledge_base,
conversation_id=conversation_id,
parent_id=str(question_record.id),
role='assistant',
content=data['answer'],
metadata={'model_name': data.get('model_name', 'default')}
)
return Response({
'code': 200,
'message': '创建成功',
'data': {
'id': answer_record.id,
'conversation_id': conversation_id,
'dataset_id': str(knowledge_base.id),
'role': 'assistant',
'content': answer_record.content,
'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"创建聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def update(self, request, pk=None):
"""更新聊天记录"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
data = request.data
updateable_fields = ['content', 'metadata']
if 'content' in data:
record.content = data['content']
if 'metadata' in data:
current_metadata = record.metadata or {}
current_metadata.update(data['metadata'])
record.metadata = current_metadata
record.save()
return Response({
'code': 200,
'message': '更新成功',
'data': {
'id': record.id,
'conversation_id': record.conversation_id,
'role': record.role,
'content': record.content,
'metadata': record.metadata,
'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
logger.error(f"更新聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, pk=None):
"""删除聊天记录(软删除)"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
record.soft_delete()
return Response({
'code': 200,
'message': '删除成功',
'data': None
})
except Exception as e:
logger.error(f"删除聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索聊天记录"""
try:
# 获取查询参数
keyword = request.query_params.get('keyword', '').strip()
dataset_id = request.query_params.get('dataset_id')
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
# 基础查询
query = self.get_queryset()
# 添加过滤条件
if keyword:
query = query.filter(
Q(content__icontains=keyword) |
Q(knowledge_base__name__icontains=keyword)
)
if dataset_id:
query = query.filter(knowledge_base__id=dataset_id)
if start_date:
query = query.filter(created_at__gte=start_date)
if end_date:
query = query.filter(created_at__lte=end_date)
# 计算分页
total = query.count()
start = (page - 1) * page_size
end = start + page_size
# 获取分页数据
records = query.order_by('-created_at')[start:end]
# 序列化数据
results = []
for record in records:
result = {
'id': record.id,
'conversation_id': record.conversation_id,
'dataset_id': str(record.knowledge_base.id),
'dataset_name': record.knowledge_base.name,
'role': record.role,
'content': record.content,
'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'metadata': record.metadata
}
if keyword:
result['highlights'] = {
'content': self._highlight_keyword(record.content, keyword)
}
results.append(result)
return Response({
'code': 200,
'message': '搜索成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"搜索聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'搜索失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _highlight_keyword(self, text, keyword):
"""高亮关键词"""
if not keyword or not text:
return text
return text.replace(
keyword,
f'<em class="highlight">{keyword}</em>'
)
class KnowledgeBaseViewSet(viewsets.ModelViewSet):
serializer_class = KnowledgeBaseSerializer
permission_classes = [IsAuthenticated]
def list(self, request, *args, **kwargs):
try:
queryset = self.get_queryset()
2025-03-10 17:03:58 +08:00
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
# 如果有关键字,构建搜索条件
if keyword:
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
queryset = queryset.filter(query)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 获取用户所有有效的知识库权限
user = request.user
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 为每个知识库添加权限信息
for item in data:
kb_id = item['id']
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can_read/_can_edit/_can_delete方法设置默认权限
else:
# 获取必要的知识库属性
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
# 使用方法判断权限
permissions['can_read'] = self._can_read(kb_type, user, department, group, creator_id)
permissions['can_edit'] = self._can_edit(kb_type, user, department, group, creator_id)
permissions['can_delete'] = self._can_delete(kb_type, user, department, group, creator_id)
# 添加权限信息到知识库数据
item['permissions'] = permissions
# 如果有关键字,添加高亮信息
if keyword:
# 处理name高亮
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 处理desc高亮注意处理None值
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc'])
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "获取知识库列表成功",
2025-03-10 17:03:58 +08:00
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword if keyword else None,
"items": data
}
})
except Exception as e:
logger.error(f"获取知识库列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取知识库列表失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_queryset(self):
"""获取用户有权限查看的知识库列表"""
user = self.request.user
2025-03-10 17:03:58 +08:00
all_knowledge_bases = KnowledgeBase.objects.all()
2025-03-10 17:03:58 +08:00
# 首先过滤出用户创建的或有显式读权限的知识库
explicit_permission_kbs = KnowledgeBase.objects.filter(
Q(user_id=user.id) | # 是创建者
Q( # 或在权限表中有读权限
id__in=KBPermissionModel.objects.filter(
user=user,
can_read=True,
status='active'
).values_list('knowledge_base_id', flat=True)
)
2025-03-10 17:03:58 +08:00
)
# 获取显式权限知识库的ID集合
explicit_kb_ids = set(explicit_permission_kbs.values_list('id', flat=True))
# 对于没有显式权限的知识库使用_can_read方法判断
result_kbs = list(explicit_permission_kbs)
# 如果用户是管理员,可以查看所有知识库
if user.role == 'admin':
return all_knowledge_bases
# 否则,根据角色和部门/组筛选
for kb in all_knowledge_bases:
if kb.id not in explicit_kb_ids:
has_read_permission = self._can_read(
type=kb.type,
user=user,
department=kb.department,
group=kb.group,
creator_id=kb.user_id
)
if has_read_permission:
result_kbs.append(kb)
# 转换为queryset并去重
kb_ids = [kb.id for kb in result_kbs]
return KnowledgeBase.objects.filter(id__in=kb_ids).distinct()
def create(self, request, *args, **kwargs):
try:
# 1. 验证知识库名称
name = request.data.get('name')
if not name:
return Response({
'code': 400,
'message': '知识库名称不能为空',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if KnowledgeBase.objects.filter(name=name).exists():
return Response({
'code': 400,
'message': f'知识库名称 "{name}" 已存在',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 2. 验证用户权限和必填字段
user = request.user
type = request.data.get('type', 'private')
department = request.data.get('department')
group = request.data.get('group')
# 权限验证
if type == 'admin':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建管理员级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
department = None
group = None
elif type == 'secret':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建保密级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
2025-03-13 13:39:02 +08:00
department = None
group = None
elif type == 'leader':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建组长级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if not department:
return Response({
'code': 400,
'message': '创建组长级知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif type == 'member':
if user.role not in ['admin', 'leader']:
return Response({
'code': 403,
'message': '只有管理员和组长可以创建成员级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if user.role == 'admin' and not department:
return Response({
'code': 400,
'message': '管理员创建成员知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif user.role == 'leader':
department = user.department
if not group:
return Response({
'code': 400,
'message': '创建成员知识库时必须指定组',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
2025-03-13 13:39:02 +08:00
elif type == 'private':
# 对于private类型不保存department和group
department = None
group = None
# 3. 验证请求数据
data = request.data.copy()
data['department'] = department
data['group'] = group
# 不需要手动设置 user_id由序列化器自动处理
serializer = self.get_serializer(data=data)
if not serializer.is_valid():
logger.error(f"数据验证失败: {serializer.errors}")
return Response({
'code': 400,
'message': '数据验证失败',
'data': serializer.errors
}, status=status.HTTP_400_BAD_REQUEST)
with transaction.atomic():
# 4. 创建知识库
try:
knowledge_base = serializer.save()
logger.info(f"知识库创建成功: id={knowledge_base.id}, name={knowledge_base.name}, user_id={knowledge_base.user_id}")
except Exception as e:
logger.error(f"知识库创建失败: {str(e)}")
raise
# 5. 调用外部API创建知识库
try:
external_response = self._create_external_dataset(knowledge_base)
logger.info(f"外部知识库创建响应: {external_response}")
# 处理外部API响应
if isinstance(external_response, str):
knowledge_base.external_id = external_response
knowledge_base.save()
logger.info(f"更新knowledge_base的external_id为: {external_response}")
else:
if external_response.get('code') == 200:
external_id = external_response.get('data', {}).get('id')
if external_id:
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"更新knowledge_base的external_id为: {external_id}")
else:
raise ValueError("外部API响应中未找到知识库ID")
else:
raise ValueError(f"外部API调用失败: {external_response.get('message', '未知错误')}")
except Exception as e:
logger.error(f"外部知识库创建失败: {str(e)}")
logger.error(f"外部API响应内容: {external_response if locals().get('external_response') else 'No response'}")
raise ExternalAPIError(f"外部知识库创建失败: {str(e)}")
# 6. 创建权限记录
try:
# 创建者权限
KBPermissionModel.objects.create(
knowledge_base=knowledge_base,
user=request.user,
can_read=True,
can_edit=True,
can_delete=True,
granted_by=request.user,
status='active'
)
logger.info(f"创建者权限创建成功")
# 根据类型批量创建其他用户权限
if type == 'admin':
users_query = User.objects.exclude(id=request.user.id)
elif type == 'secret':
users_query = User.objects.filter(role='admin').exclude(id=request.user.id)
elif type == 'leader':
users_query = User.objects.filter(
Q(role='admin') |
Q(role='leader', department=department)
).exclude(id=request.user.id)
elif type == 'member':
users_query = User.objects.filter(
Q(role='admin') |
Q(department=department, role='leader') |
Q(department=department, group=group, role='member')
).exclude(id=request.user.id)
else: # private
users_query = User.objects.none()
if users_query.exists():
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=self._can_edit(type, user),
can_delete=self._can_delete(type, user),
granted_by=request.user,
status='active'
) for user in users_query
]
KBPermissionModel.objects.bulk_create(permissions)
logger.info(f"{type}类型权限创建完成: {len(permissions)}条记录")
except Exception as e:
logger.error(f"权限创建失败: {str(e)}")
logger.error(traceback.format_exc())
raise
return Response({
'code': 200,
'message': '知识库创建成功',
'data': {
'knowledge_base': serializer.data,
'external_id': knowledge_base.external_id
}
})
except Exception as e:
logger.error(f"创建知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建知识库失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-10 17:03:58 +08:00
def _can_edit(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有编辑权限"""
2025-03-10 17:03:58 +08:00
if user.role == 'admin':
return True # 管理员对所有知识库都有编辑权限
if type == 'secret':
return False # 除管理员外其他人无权编辑secret类型知识库
if type == 'leader':
# 同部门组长可编辑leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 同部门组长可编辑所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员只能编辑同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可编辑
return str(user.id) == str(creator_id)
return False
2025-03-10 17:03:58 +08:00
def _can_delete(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有删除权限"""
2025-03-10 17:03:58 +08:00
if user.role == 'admin':
return True # 管理员对所有知识库都有删除权限
if type in ['admin', 'secret']:
2025-03-10 17:03:58 +08:00
return False # 除管理员外其他人无权删除admin和secret类型知识库
if type == 'leader':
# 同部门组长可删除leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 只有组长可以删除成员知识库(组员不能删除)
return user.role == 'leader' and user.department == department
if type == 'private':
# 私有知识库只有创建者可删除
return str(user.id) == str(creator_id)
return False
def _can_read(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有读取权限"""
if user.role == 'admin':
return True # 管理员可以读取所有知识库
if type == 'secret':
return False # 除管理员外secret类型知识库对其他人不可读
if type == 'leader':
# 同部门的leader和member都可以读取
return user.department == department
if type == 'member':
# 同部门组长可以读取所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员可以读取同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可读
return str(user.id) == str(creator_id)
return False
def _create_external_dataset(self, instance):
"""创建外部知识库"""
try:
api_data = {
"name": instance.name,
"desc": instance.desc,
"type": "0", # 添加必要的type字段
"meta": {}, # 添加必要的meta字段
"documents": [] # 初始化为空列表
}
response = requests.post(
f'{settings.API_BASE_URL}/api/dataset',
json=api_data,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code != 200:
raise ExternalAPIError(f"创建失败,状态码: {response.status_code}, 响应: {response.text}")
api_response = response.json()
if not api_response.get('code') == 200:
raise ExternalAPIError(f"业务处理失败: {api_response.get('message', '未知错误')}")
dataset_id = api_response.get('data', {}).get('id')
if not dataset_id:
raise ExternalAPIError("响应数据中缺少dataset id")
return dataset_id
except requests.exceptions.Timeout:
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
raise ExternalAPIError(f"创建外部知识库失败: {str(e)}")
def _delete_external_dataset(self, external_id):
"""删除外部知识库"""
try:
if not external_id:
raise ExternalAPIError("外部知识库ID不能为空")
response = requests.delete(
f'{settings.API_BASE_URL}/api/dataset/{external_id}',
headers={'Content-Type': 'application/json'},
timeout=30
)
logger.info(f"删除外部知识库响应: status_code={response.status_code}, response={response.text}")
# 检查响应状态码
if response.status_code == 404:
logger.warning(f"外部知识库不存在: {external_id}")
return True # 如果知识库不存在,也视为删除成功
elif response.status_code not in [200, 204]:
raise ExternalAPIError(f"删除失败,状态码: {response.status_code}, 响应: {response.text}")
# 如果是 204 状态码,说明删除成功但无返回内容
if response.status_code == 204:
logger.info(f"外部知识库删除成功: {external_id}")
return True
# 如果是 200 状态码,检查响应内容
try:
api_response = response.json()
if api_response.get('code') != 200:
raise ExternalAPIError(f"业务处理失败: {api_response.get('message', '未知错误')}")
logger.info(f"外部知识库删除成功: {external_id}")
return True
except ValueError:
# 如果无法解析 JSON但状态码是 200也认为成功
logger.warning(f"外部知识库删除响应无法解析JSON但状态码为200视为成功: {external_id}")
return True
except requests.exceptions.Timeout:
logger.error(f"删除外部知识库超时: {external_id}")
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
logger.error(f"删除外部知识库请求异常: {external_id}, error={str(e)}")
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
logger.error(f"删除外部知识库其他错误: {external_id}, error={str(e)}")
raise ExternalAPIError(f"删除外部知识库失败: {str(e)}")
def update(self, request, *args, **kwargs):
"""更新知识库"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
2025-03-10 17:03:58 +08:00
# 首先检查权限表中是否有显式权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
2025-03-10 17:03:58 +08:00
user=user,
can_edit=True,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果权限表中没有记录则使用_can_edit方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有编辑权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 执行本地更新
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
# 更新外部知识库
if instance.external_id:
try:
api_data = {
"name": serializer.validated_data.get('name', instance.name),
"desc": serializer.validated_data.get('desc', instance.desc),
"type": "0", # 保持与创建时一致
"meta": {}, # 保持与创建时一致
"documents": [] # 保持与创建时一致
}
response = requests.put(
f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}',
json=api_data,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code != 200:
raise ExternalAPIError(f"更新外部知识库失败,状态码: {response.status_code}, 响应: {response.text}")
api_response = response.json()
if not api_response.get('code') == 200:
raise ExternalAPIError(f"更新外部知识库失败: {api_response.get('message', '未知错误')}")
logger.info(f"外部知识库更新成功: {instance.external_id}")
except requests.exceptions.Timeout:
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
raise ExternalAPIError(f"更新外部知识库失败: {str(e)}")
return Response({
"code": 200,
"message": "知识库更新成功",
"data": serializer.data
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except ExternalAPIError as e:
logger.error(f"更新外部知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": str(e),
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"更新知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"更新知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, *args, **kwargs):
"""删除知识库"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
# 检查删除权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
2025-03-10 17:03:58 +08:00
user=user,
can_delete=True,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果权限表中没有记录则使用_can_delete方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有删除权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 删除外部知识库
if instance.external_id:
try:
self._delete_external_dataset(instance.external_id)
logger.info(f"外部知识库删除成功: {instance.external_id}")
except ExternalAPIError as e:
logger.error(f"删除外部知识库失败: {str(e)}")
return Response({
"code": 500,
"message": f"删除外部知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 删除本地知识库
self.perform_destroy(instance)
logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}")
return Response({
"code": 200,
"message": "知识库删除成功",
"data": None
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"删除知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"删除知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def permissions(self, request, pk=None):
"""获取用户对特定知识库的权限"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
2025-03-10 17:03:58 +08:00
# 构建默认权限数据
permissions_data = {
"can_read": False,
"can_edit": False,
"can_delete": False
}
2025-03-10 17:03:58 +08:00
# 从权限表获取权限信息
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果在权限表中有记录,则使用表中的权限
if permission:
permissions_data.update({
"can_read": permission.can_read,
"can_edit": permission.can_edit,
"can_delete": permission.can_delete
})
2025-03-10 17:03:58 +08:00
# 否则使用_can_read/_can_edit/_can_delete方法判断权限
else:
permissions_data.update({
"can_read": self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_edit": self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_delete": self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
return Response({
"code": 200,
"message": "获取权限信息成功",
"data": {
"knowledge_base_id": instance.id,
"knowledge_base_name": instance.name,
"permissions": permissions_data
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取权限信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def summary(self, request):
"""获取所有可见知识库的概要信息除了secret类型"""
try:
user = request.user
# 基础查询排除secret类型的知识库
queryset = KnowledgeBase.objects.exclude(type='secret')
# 获取用户所有有效的知识库权限
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
summaries = []
for kb in queryset:
# 获取基础权限
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
kb_id = str(kb.id)
if kb_id in permission_map:
permissions.update(permission_map[kb_id])
# 如果没有特定权限,根据角色和部门设置默认权限
else:
if user.role == 'admin':
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif kb.type == 'leader':
if user.role == 'leader' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif user.role == 'member' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': False,
'can_delete': False
})
elif kb.type == 'member':
if user.role == 'leader' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif user.role == 'member' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': False,
'can_delete': False
})
elif kb.type == 'private' and str(kb.user_id) == str(user.id):
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
# 只返回概要信息
summary = {
'id': str(kb.id),
'name': kb.name,
'desc': kb.desc,
'type': kb.type,
'department': kb.department,
'permissions': permissions
}
summaries.append(summary)
return Response({
'code': 200,
'message': '获取知识库概要信息成功',
'data': summaries
})
except Exception as e:
return Response({
'code': 500,
'message': f'获取知识库概要信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-10 17:03:58 +08:00
def retrieve(self, request, *args, **kwargs):
try:
# 获取知识库对象
instance = self.get_object()
serializer = self.get_serializer(instance)
data = serializer.data
# 获取用户
user = request.user
# 默认权限设置
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 尝试获取特定权限记录
try:
permission_record = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active',
expires_at__gt=timezone.now()
).first()
if permission_record:
permissions.update({
'can_read': permission_record.can_read,
'can_edit': permission_record.can_edit,
'can_delete': permission_record.can_delete
})
else:
# 使用_can_read/_can_edit/_can_delete方法判断权限
permissions.update({
'can_read': self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_edit': self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_delete': self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
# 权限获取失败时使用默认权限
# 添加权限信息到返回数据
data['permissions'] = permissions
return Response({
'code': 200,
'message': '获取知识库详情成功',
'data': data
})
except Exception as e:
logger.error(f"获取知识库详情失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取知识库详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索知识库功能"""
try:
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
if not keyword:
return Response({
"code": 400,
"message": "搜索关键字不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 构建搜索条件
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
# 排除 secret 类型的知识库
queryset = KnowledgeBase.objects.filter(query).exclude(type='secret')
# 获取用户
user = request.user
# 获取用户所有有效的知识库权限
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 处理每个知识库项的权限和返回内容
result_items = []
for item in data:
kb_id = item['id']
kb_permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
kb_permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can方法判断默认权限
else:
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
kb_permissions.update({
'can_read': self._can_read(kb_type, user, department, group, creator_id),
'can_edit': self._can_edit(kb_type, user, department, group, creator_id),
'can_delete': self._can_delete(kb_type, user, department, group, creator_id)
})
# 添加权限信息
item['permissions'] = kb_permissions
# 根据权限返回不同级别的信息
if kb_permissions['can_read']:
# 有读取权限,返回完整信息
result_items.append(item)
else:
# 无读取权限,只返回概要信息
summary_info = {
'id': item['id'],
'name': item['name'],
'type': item['type'],
'department': item.get('department'),
'permissions': kb_permissions
}
result_items.append(summary_info)
# 高亮搜索关键字
for item in result_items:
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 确保desc不为None并且是字符串
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc']) # 转换为字符串以确保安全
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "搜索知识库成功",
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword,
"items": result_items
}
})
except Exception as e:
logger.error(f"搜索知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"搜索知识库失败: {str(e)}",
"data": None
2025-03-13 13:39:02 +08:00
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def change_type(self, request, pk=None):
"""修改知识库类型"""
try:
instance = self.get_object()
user = request.user
# 判断角色和权限
is_creator = str(user.id) == str(instance.user_id)
is_admin = user.role == 'admin'
is_leader = user.role == 'leader'
is_member = user.role == 'member' or user.role == 'user' # 组员或普通用户
# 组员无权修改知识库类型
if is_member and not (is_admin or is_leader):
return Response({
"code": 403,
"message": "组员无权修改知识库类型只能使用private类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 权限检查
if not is_creator:
# 非创建者无法修改知识库类型
return Response({
"code": 403,
"message": "只有知识库创建者可以修改知识库类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 获取新类型
new_type = request.data.get('type')
if not new_type:
return Response({
"code": 400,
"message": "新类型不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证类型是否有效
valid_types = ['private', 'admin', 'secret', 'leader', 'member']
if new_type not in valid_types:
return Response({
"code": 400,
"message": f"无效的知识库类型,可选值: {', '.join(valid_types)}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 角色特定的类型限制
if is_leader and not is_admin: # 组长且不是管理员
# 组长只能在private和member类型之间切换
if new_type not in ['private', 'member']:
return Response({
"code": 403,
"message": "组长只能将知识库设置为private或member类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 处理department和group字段
department = request.data.get('department')
group = request.data.get('group')
# 组长只能设置自己部门
if is_leader and not is_admin and new_type == 'member':
if department and department != user.department:
return Response({
"code": 403,
"message": "组长只能为本部门设置知识库",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 如果未指定部门,强制设置为组长的部门
department = user.department
# 根据类型验证必填字段
if new_type == 'leader':
if not department:
return Response({
"code": 400,
"message": "组长级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if new_type == 'member':
if not department:
return Response({
"code": 400,
"message": "成员级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if not group:
return Response({
"code": 400,
"message": "成员级知识库必须指定组",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是admin或secret类型清除department和group
if new_type in ['admin', 'secret']:
department = None
group = None
# 如果是private类型但未指定department和group使用原值
if new_type == 'private':
if department is None:
department = instance.department
if group is None:
group = instance.group
# 更新知识库类型和相关字段
instance.type = new_type
instance.department = department
instance.group = group
instance.save()
return Response({
"code": 200,
"message": f"知识库类型已更新为{new_type}",
"data": {
"id": instance.id,
"name": instance.name,
"type": instance.type,
"department": instance.department,
"group": instance.group
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"修改知识库类型失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"修改知识库类型失败: {str(e)}",
"data": None
2025-03-10 17:03:58 +08:00
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class PermissionViewSet(viewsets.ModelViewSet):
serializer_class = PermissionSerializer
permission_classes = [IsAuthenticated]
def can_manage_knowledge_base(self, user, knowledge_base):
"""检查用户是否是知识库的创建者"""
return str(knowledge_base.user_id) == str(user.id)
def get_queryset(self):
"""
获取权限申请列表:
1. 自己发出的申请
2. 对自己有管理权限的知识库收到的申请
"""
user = self.request.user
# 获取用户有管理权限的知识库ID列表
managed_kb_ids = KBPermissionModel.objects.filter(
user=user,
can_edit=True, # 假设有编辑权限就可以管理权限申请
status='active'
).values_list('knowledge_base_id', flat=True)
# 构建查询条件
query = Q(applicant=user) # 自己发出的申请
query |= Q(knowledge_base_id__in=managed_kb_ids) # 有管理权限的知识库的申请
2025-03-17 16:46:37 +08:00
# 使用 select_related 优化查询,预加载关联的对象
return Permission.objects.filter(query).distinct().select_related(
2025-03-17 16:46:37 +08:00
'knowledge_base', # 预加载知识库信息
'applicant', # 预加载申请人信息
'approver' # 预加载审批人信息
)
2025-03-17 16:46:37 +08:00
def list(self, request, *args, **kwargs):
"""获取权限申请列表,包含详细信息"""
queryset = self.get_queryset()
page = self.paginate_queryset(queryset)
if page is not None:
data = []
for permission in page:
permission_data = {
'id': str(permission.id),
'knowledge_base': {
'id': str(permission.knowledge_base.id),
'name': permission.knowledge_base.name,
'type': permission.knowledge_base.type,
},
'applicant': {
'id': str(permission.applicant.id),
'username': permission.applicant.username,
'name': permission.applicant.name,
'department': permission.applicant.department,
},
'permissions': permission.permissions,
'status': permission.status,
'created_at': permission.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
}
# 添加审批人信息(如果已审批)
if permission.approver:
permission_data['approver'] = {
'id': str(permission.approver.id),
'username': permission.approver.username,
'name': permission.approver.name,
'department': permission.approver.department,
}
permission_data['response_message'] = permission.response_message
data.append(permission_data)
return Response({
'code': 200,
'message': '获取权限申请列表成功',
'data': {
'total': self.paginator.page.paginator.count,
'results': data
}
})
return Response({
'code': 200,
'message': '获取权限申请列表成功',
'data': {
'total': queryset.count(),
'results': []
}
})
def perform_create(self, serializer):
"""创建权限申请并发送通知给知识库创建者"""
# 获取知识库
knowledge_base = serializer.validated_data['knowledge_base']
# 验证权限请求
requested_permissions = serializer.validated_data.get('permissions', {})
expires_at = serializer.validated_data.get('expires_at')
if not any([requested_permissions.get('can_read'),
requested_permissions.get('can_edit'),
requested_permissions.get('can_delete')]):
raise ValidationError("至少需要申请一种权限(读/改/删)")
if not expires_at:
raise ValidationError("请指定权限到期时间")
# 检查是否已有未过期的权限申请
existing_request = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=self.request.user,
status='pending'
).first()
if existing_request:
raise ValidationError("您已有一个待处理的权限申请")
# 检查是否已有有效的权限
existing_permission = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=self.request.user,
status='approved',
expires_at__gt=timezone.now()
).first()
if existing_permission:
raise ValidationError("您已有此知识库的访问权限")
# 保存权限申请
permission = serializer.save(
applicant=self.request.user,
status='pending'
)
# 获取权限类型字符串
permission_types = []
if requested_permissions.get('can_read'):
permission_types.append('读取')
if requested_permissions.get('can_edit'):
permission_types.append('编辑')
if requested_permissions.get('can_delete'):
permission_types.append('删除')
permission_str = ''.join(permission_types)
# 发送通知给知识库创建者
owner = User.objects.get(id=knowledge_base.user_id)
self.send_notification(
user=owner,
title="新的权限申请",
content=f"用户 {self.request.user.name} 申请了知识库 '{knowledge_base.name}'{permission_str}权限",
notification_type="permission_request",
related_object_id=permission.id
)
def send_notification(self, user, title, content, notification_type, related_object_id):
"""发送通知"""
try:
notification = Notification.objects.create(
sender=self.request.user,
receiver=user,
title=title,
content=content,
type=notification_type,
related_resource=related_object_id,
)
# 通过WebSocket发送实时通知
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"notification_user_{user.id}",
{
"type": "notification",
"data": {
"id": str(notification.id),
"title": notification.title,
"content": notification.content,
"type": notification.type,
"created_at": notification.created_at.isoformat(),
"sender": {
"id": str(notification.sender.id),
"name": notification.sender.name
}
}
}
)
except Exception as e:
logger.error(f"发送通知时发生错误: {str(e)}")
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
try:
# 获取权限申请记录
permission = self.get_object()
# 只检查是否是知识库创建者
if not self.can_manage_knowledge_base(request.user, permission.knowledge_base):
logger.warning(f"用户 {request.user.username} 尝试审批知识库 {permission.knowledge_base.name} 的权限申请,但不是创建者")
return Response({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 获取审批意见
response_message = request.data.get('response_message', '')
with transaction.atomic():
# 更新权限申请状态
permission.status = 'approved'
permission.approver = request.user
permission.response_message = response_message
permission.save()
# 检查是否已存在权限记录
kb_permission = KBPermissionModel.objects.filter(
knowledge_base=permission.knowledge_base,
user=permission.applicant
).first()
if kb_permission:
# 更新现有权限
kb_permission.can_read = permission.permissions.get('can_read', False)
kb_permission.can_edit = permission.permissions.get('can_edit', False)
kb_permission.can_delete = permission.permissions.get('can_delete', False)
kb_permission.granted_by = request.user
kb_permission.status = 'active'
kb_permission.expires_at = permission.expires_at
kb_permission.save()
logger.info(f"更新知识库权限记录: {kb_permission.id}")
else:
# 创建新的权限记录
kb_permission = KBPermissionModel.objects.create(
knowledge_base=permission.knowledge_base,
user=permission.applicant,
can_read=permission.permissions.get('can_read', False),
can_edit=permission.permissions.get('can_edit', False),
can_delete=permission.permissions.get('can_delete', False),
granted_by=request.user,
status='active',
expires_at=permission.expires_at
)
logger.info(f"创建新的知识库权限记录: {kb_permission.id}")
# 发送通知给申请人
self.send_notification(
user=permission.applicant,
title="权限申请已通过",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已通过",
notification_type="permission_approved",
related_object_id=permission.id
)
return Response({
'code': 200,
'message': '权限申请已批准',
'data': None
})
except Permission.DoesNotExist:
return Response({
'code': 404,
'message': '权限申请不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"处理权限申请失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理权限申请失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
"""拒绝权限申请"""
permission = self.get_object()
# 检查是否是知识库创建者
if str(permission.knowledge_base.user_id) != str(request.user.id):
return Response({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 检查申请是否已被处理
if permission.status != 'pending':
return Response({
'code': 400,
'message': '该申请已被处理',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证拒绝原因
response_message = request.data.get('response_message')
if not response_message:
return Response({
'code': 400,
'message': '请填写拒绝原因',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 更新权限状态
permission.status = 'rejected'
permission.approver = request.user
permission.response_message = response_message
permission.save()
# 发送通知给申请人
self.send_notification(
user=permission.applicant,
title="权限申请已拒绝",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已被拒绝\n"
f"拒绝原因:{response_message}",
notification_type="permission_rejected",
related_object_id=permission.id
)
return Response({
'code': 200,
'message': '权限申请已拒绝',
'data': PermissionSerializer(permission).data
})
@action(detail=True, methods=['post'])
def extend(self, request, pk=None):
"""延长权限有效期"""
instance = self.get_object()
user = request.user
# 检查是否有权限延长
if not self.check_extend_permission(instance, user):
return Response({
"code": 403,
"message": "您没有权限延长此权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
new_expires_at = request.data.get('expires_at')
if not new_expires_at:
return Response({
"code": 400,
"message": "请设置新的过期时间",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
try:
with transaction.atomic():
# 更新权限申请表的过期时间
instance.expires_at = new_expires_at
instance.save()
# 同步更新知识库权限表的过期时间
kb_permission = KBPermissionModel.objects.get(
knowledge_base=instance.knowledge_base,
user=instance.applicant
)
kb_permission.expires_at = new_expires_at
kb_permission.save()
return Response({
"code": 200,
"message": "权限有效期延长成功",
"data": PermissionSerializer(instance).data
})
except Exception as e:
return Response({
"code": 500,
"message": f"延长权限有效期失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def check_extend_permission(self, permission, user):
"""检查是否有权限延长权限有效期"""
knowledge_base = permission.knowledge_base
# 私人知识库只有拥有者能延长
if knowledge_base.type == 'private':
return knowledge_base.owner == user
# 组长知识库只有管理员能延长
if knowledge_base.type == 'leader':
return user.role == 'admin'
# 组员知识库可以由管理员或本部门组长延长
if knowledge_base.type == 'member':
return (
user.role == 'admin' or
(user.role == 'leader' and user.department == knowledge_base.department)
)
return False
2025-03-17 16:46:37 +08:00
@action(detail=False, methods=['get'])
def user_permissions(self, request):
"""获取指定用户的所有知识库权限"""
try:
# 获取用户名参数
username = request.query_params.get('username')
if not username:
return Response({
'code': 400,
'message': '请提供用户名',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取用户
try:
target_user = User.objects.get(username=username)
except User.DoesNotExist:
return Response({
'code': 404,
'message': f'用户 {username} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 获取该用户的所有权限记录
permissions = KBPermissionModel.objects.filter(
user=target_user,
status='active'
).select_related('knowledge_base', 'granted_by')
# 构建响应数据
permissions_data = []
for perm in permissions:
perm_data = {
'id': str(perm.id),
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': perm.granted_by.name if perm.granted_by else None
},
'created_at': perm.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
permissions_data.append(perm_data)
return Response({
'code': 200,
'message': '获取用户权限成功',
'data': {
'user': {
'id': str(target_user.id),
'username': target_user.username,
'name': target_user.name,
'department': target_user.department,
'role': target_user.role
},
'permissions': permissions_data
}
})
except Exception as e:
logger.error(f"获取用户权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取用户权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def all_permissions(self, request):
"""管理员获取所有用户的知识库权限(不包括私有知识库)"""
try:
# 检查是否是管理员
if request.user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以查看所有权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 获取查询参数
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
status_filter = request.query_params.get('status') # active/expired
department = request.query_params.get('department')
kb_type = request.query_params.get('kb_type') # 知识库类型筛选
# 构建基础查询
queryset = KBPermissionModel.objects.filter(
~Q(knowledge_base__type='private') # 排除私有知识库
).select_related(
'user',
'knowledge_base',
'granted_by'
)
# 应用过滤条件
if status_filter == 'active':
queryset = queryset.filter(
Q(expires_at__gt=timezone.now()) | Q(expires_at__isnull=True),
status='active'
)
elif status_filter == 'expired':
queryset = queryset.filter(
Q(expires_at__lte=timezone.now()) | Q(status='inactive')
)
if department:
queryset = queryset.filter(user__department=department)
if kb_type:
queryset = queryset.filter(knowledge_base__type=kb_type)
# 计算总数
total = queryset.count()
# 分页
start = (page - 1) * page_size
end = start + page_size
permissions = queryset.order_by('-granted_at')[start:end]
# 获取所有相关的创建者ID
creator_ids = set(perm.knowledge_base.user_id for perm in permissions)
creators = {
str(user.id): user
for user in User.objects.filter(id__in=creator_ids)
}
# 构建响应数据
permissions_data = []
for perm in permissions:
creator = creators.get(str(perm.knowledge_base.user_id))
perm_data = {
'id': str(perm.id),
'user': {
'id': str(perm.user.id),
'username': perm.user.username,
'name': getattr(perm.user, 'name', perm.user.username),
'department': getattr(perm.user, 'department', None),
'role': getattr(perm.user, 'role', None)
},
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group,
'creator': {
'id': str(perm.knowledge_base.user_id),
'name': creator.name if creator else None,
'username': creator.username if creator else None
}
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': getattr(perm.granted_by, 'name', perm.granted_by.username) if perm.granted_by else None
},
'granted_at': perm.granted_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
permissions_data.append(perm_data)
return Response({
'code': 200,
'message': '获取权限列表成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': permissions_data
}
})
except Exception as e:
logger.error(f"获取所有权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取所有权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def update_permission(self, request):
"""管理员更新用户的知识库权限"""
try:
# 检查是否是管理员
if request.user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以直接修改权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 验证必要参数
user_id = request.data.get('user_id')
knowledge_base_id = request.data.get('knowledge_base_id')
permissions = request.data.get('permissions')
expires_at_str = request.data.get('expires_at')
if not all([user_id, knowledge_base_id, permissions]):
return Response({
'code': 400,
'message': '缺少必要参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证权限参数格式
required_permission_fields = ['can_read', 'can_edit', 'can_delete']
if not all(field in permissions for field in required_permission_fields):
return Response({
'code': 400,
'message': '权限参数格式错误,必须包含 can_read、can_edit、can_delete',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取用户和知识库
try:
user = User.objects.get(id=user_id)
knowledge_base = KnowledgeBase.objects.get(id=knowledge_base_id)
except User.DoesNotExist:
return Response({
'code': 404,
'message': f'用户ID {user_id} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except KnowledgeBase.DoesNotExist:
return Response({
'code': 404,
'message': f'知识库ID {knowledge_base_id} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 检查知识库类型和用户角色的匹配
if knowledge_base.type == 'private' and str(knowledge_base.user_id) != str(user.id):
return Response({
'code': 403,
'message': '不能修改其他用户的私有知识库权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 处理过期时间
expires_at = None
if expires_at_str:
try:
# 将字符串转换为datetime对象
expires_at = timezone.datetime.strptime(
expires_at_str,
'%Y-%m-%dT%H:%M:%SZ'
)
# 确保时区感知
expires_at = timezone.make_aware(expires_at)
# 检查是否早于当前时间
if expires_at <= timezone.now():
return Response({
'code': 400,
'message': '过期时间不能早于或等于当前时间',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except ValueError:
return Response({
'code': 400,
'message': '过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 根据用户角色限制权限
if user.role == 'member' and permissions.get('can_delete'):
return Response({
'code': 400,
'message': '普通成员不能获得删除权限',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 更新或创建权限记录
try:
with transaction.atomic():
permission, created = KBPermissionModel.objects.update_or_create(
user=user,
knowledge_base=knowledge_base,
defaults={
'can_read': permissions.get('can_read', False),
'can_edit': permissions.get('can_edit', False),
'can_delete': permissions.get('can_delete', False),
'granted_by': request.user,
'status': 'active',
'expires_at': expires_at
}
)
# 发送通知给用户
self.send_notification(
user=user,
title="知识库权限更新",
content=f"管理员已{created and '授予' or '更新'}您对知识库 '{knowledge_base.name}' 的权限",
notification_type="permission_updated",
related_object_id=permission.id
)
except IntegrityError as e:
return Response({
'code': 500,
'message': f'数据库操作失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'code': 200,
'message': f"{'创建' if created else '更新'}权限成功",
'data': {
'id': str(permission.id),
'user': {
'id': str(user.id),
'username': user.username,
'name': user.name,
'department': user.department,
'role': user.role
},
'knowledge_base': {
'id': str(knowledge_base.id),
'name': knowledge_base.name,
'type': knowledge_base.type,
'department': knowledge_base.department,
'group': knowledge_base.group
},
'permissions': {
'can_read': permission.can_read,
'can_edit': permission.can_edit,
'can_delete': permission.can_delete
},
'granted_by': {
'id': str(request.user.id),
'username': request.user.username,
'name': request.user.name
},
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
'created': created
}
})
except Exception as e:
logger.error(f"更新权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class NotificationViewSet(viewsets.ModelViewSet):
"""通知视图集"""
queryset = Notification.objects.all()
serializer_class = NotificationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""只返回用户自己的通知"""
return Notification.objects.filter(receiver=self.request.user)
@action(detail=True, methods=['post'])
def mark_as_read(self, request, pk=None):
"""标记通知为已读"""
notification = self.get_object()
notification.is_read = True
notification.save()
return Response({'status': 'marked as read'})
@action(detail=False, methods=['post'])
def mark_all_as_read(self, request):
"""标记所有通知为已读"""
self.get_queryset().update(is_read=True)
return Response({'status': 'all marked as read'})
@action(detail=False, methods=['get'])
def unread_count(self, request):
"""获取未读通知数量"""
count = self.get_queryset().filter(is_read=False).count()
return Response({'unread_count': count})
@action(detail=False, methods=['get'])
def latest(self, request):
"""获取最新通知"""
notifications = self.get_queryset().filter(
is_read=False
).order_by('-created_at')[:5]
serializer = self.get_serializer(notifications, many=True)
return Response(serializer.data)
def perform_create(self, serializer):
"""创建通知时自动设置发送者"""
serializer.save(sender=self.request.user)
@method_decorator(csrf_exempt, name='dispatch')
class LoginView(APIView):
"""用户登录视图"""
authentication_classes = [] # 清空认证类
permission_classes = [AllowAny]
def post(self, request):
try:
username = request.data.get('username')
password = request.data.get('password')
# 参数验证
if not username or not password:
return Response({
"code": 400,
"message": "请提供用户名和密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证用户
user = authenticate(request, username=username, password=password)
if user is not None:
# 获取或创建token
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "登录成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token.key
}
})
else:
return Response({
"code": 401,
"message": "用户名或密码错误",
"data": None
}, status=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
import traceback
logger.error(f"登录失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": "登录失败,请稍后重试",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class RegisterView(APIView):
"""用户注册视图"""
permission_classes = [AllowAny]
def post(self, request):
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'department', 'name']
for field in required_fields:
if not data.get(field):
return Response({
"code": 400,
"message": f"缺少必填字段: {field}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
roles_str = ', '.join(valid_roles) # 先构造角色字符串
if data['role'] not in valid_roles:
return Response({
"code": 400,
"message": f"无效的角色,必须是: {roles_str}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证部门是否存在
if data['department'] not in settings.DEPARTMENT_GROUPS:
return Response({
"code": 400,
"message": f"无效的部门,可选部门: {', '.join(settings.DEPARTMENT_GROUPS.keys())}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是组员,验证小组
if data['role'] == 'member':
if not data.get('group'):
return Response({
"code": 400,
"message": "组员必须指定所属小组",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证小组是否存在且属于指定部门
valid_groups = settings.DEPARTMENT_GROUPS.get(data['department'], [])
if data['group'] not in valid_groups:
return Response({
"code": 400,
"message": f"无效的小组,{data['department']}的可选小组: {', '.join(valid_groups)}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查用户名是否已存在
if User.objects.filter(username=data['username']).exists():
return Response({
"code": 400,
"message": "用户名已存在",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查邮箱是否已存在
if User.objects.filter(email=data['email']).exists():
return Response({
"code": 400,
"message": "邮箱已被注册",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证密码强度
if len(data['password']) < 8:
return Response({
"code": 400,
"message": "密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证邮箱格式
try:
validate_email(data['email'])
except ValidationError:
return Response({
"code": 400,
"message": "邮箱格式不正确",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = User.objects.create_user(
username=data['username'],
email=data['email'],
password=data['password'],
role=data['role'],
department=data['department'],
name=data['name'],
group=data.get('group') if data['role'] == 'member' else None,
is_staff=False,
is_superuser=False
)
# 生成认证令牌
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "注册成功",
"data": {
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token.key,
"created_at": user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
print(f"注册失败: {str(e)}")
print(f"错误类型: {type(e)}")
print(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": f"注册失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class LogoutView(APIView):
"""用户登出视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
try:
# 删除用户的token
request.user.auth_token.delete()
# 执行django的登出
logout(request)
return Response({
"code": 200,
"message": "登出成功",
"data": None
})
except Exception as e:
return Response({
"code": 500,
"message": f"登出失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET', 'PUT'])
@permission_classes([IsAuthenticated])
def user_profile(request):
"""获取或更新用户信息"""
if request.method == 'GET':
data = {
'id': request.user.id,
'username': request.user.username,
'email': request.user.email,
'role': request.user.role,
'department': request.user.department,
'phone': request.user.phone,
'date_joined': request.user.date_joined
}
return Response(data)
elif request.method == 'PUT':
user = request.user
# 只允许更新特定字段
allowed_fields = ['email', 'phone', 'department']
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
user.save()
return Response({'message': '用户信息更新成功'})
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_password(request):
"""修改密码"""
try:
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
# 验证参数
if not old_password or not new_password:
return Response({
"code": 400,
"message": "请提供旧密码和新密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证旧密码
user = request.user
if not user.check_password(old_password):
return Response({
"code": 400,
"message": "旧密码错误",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证新密码长度
if len(new_password) < 8:
return Response({
"code": 400,
"message": "新密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 修改密码
user.set_password(new_password)
user.save()
# 更新token
user.auth_token.delete()
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "密码修改成功",
"data": {
"token": token.key
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"密码修改失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([AllowAny])
def user_register(request):
"""用户注册"""
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'department', 'name']
for field in required_fields:
if not data.get(field):
return Response({
'error': f'缺少必填字段: {field}'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
if data['role'] not in valid_roles:
return Response({
'error': f'无效的角色,必须是: {", ".join(valid_roles)}'
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是组员,必须指定小组
if data['role'] == 'member' and not data.get('group'):
return Response({
'error': '组员必须指定所属小组'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查用户名是否已存在
if User.objects.filter(username=data['username']).exists():
return Response({
'error': '用户名已存在'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查邮箱是否已存在
if User.objects.filter(email=data['email']).exists():
return Response({
'error': '邮箱已被注册'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证密码强度
if len(data['password']) < 8:
return Response({
'error': '密码长度必须至少为8位'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证邮箱格式
try:
validate_email(data['email'])
except ValidationError:
return Response({
'error': '邮箱格式不正确'
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = User.objects.create_user(
username=data['username'],
email=data['email'],
password=data['password'],
role=data['role'],
department=data['department'],
name=data['name'],
group=data.get('group') if data['role'] == 'member' else None,
is_staff=False,
is_superuser=False
)
# 生成认证令牌
token, _ = Token.objects.get_or_create(user=user)
return Response({
'message': '注册成功',
'data': {
'id': user.id,
'username': user.username,
'email': user.email,
'role': user.role,
'department': user.department,
'name': user.name,
'group': user.group,
'token': token.key,
'created_at': user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
print(f"注册失败: {str(e)}")
print(f"错误类型: {type(e)}")
print(f"错误堆栈: {traceback.format_exc()}")
return Response({
'error': f'注册失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def verify_token(request):
"""验证令牌有效性"""
try:
return Response({
"code": 200,
"message": "令牌有效",
"data": {
"is_valid": True,
"user": {
"id": request.user.id,
"username": request.user.username,
"email": request.user.email,
"role": request.user.role,
"department": request.user.department,
"name": request.user.name,
"group": request.user.group
}
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"验证失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_list(request):
"""获取用户列表"""
user = request.user
if user.role == 'admin':
users = User.objects.all()
elif user.role == 'leader':
users = User.objects.filter(department=user.department)
else:
users = User.objects.filter(id=user.id)
data = [{
'id': u.id,
'username': u.username,
'email': u.email,
'role': u.role,
'department': u.department,
'is_active': u.is_active,
'date_joined': u.date_joined
} for u in users]
return Response(data)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_detail(request, pk):
"""获取用户详情"""
try:
# 尝试转换为 UUID
if not isinstance(pk, uuid.UUID):
try:
pk = uuid.UUID(pk)
except ValueError:
return Response({
"code": 400,
"message": "无效的用户ID格式",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
user = get_object_or_404(User, pk=pk)
return Response({
"code": 200,
"message": "获取用户信息成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"department": user.department,
"group": user.group
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"获取用户信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
@permission_classes([IsAdminUser])
def user_update(request, pk):
"""更新用户信息"""
try:
user = User.objects.get(pk=pk)
# 只允许更新特定字段
allowed_fields = ['email', 'role', 'department', 'is_active', 'phone']
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
user.save()
return Response({'message': '用户信息更新成功'})
except User.DoesNotExist:
return Response({'message': '用户不存在'}, status=404)
@api_view(['DELETE'])
@permission_classes([IsAdminUser])
def user_delete(request, pk):
"""删除用户"""
try:
user = User.objects.get(pk=pk)
user.delete()
return Response({'message': '用户删除成功'})
except User.DoesNotExist:
return Response({'message': '用户不存在'}, status=404)