621 lines
23 KiB
Python
621 lines
23 KiB
Python
from itertools import count
|
||
from django.contrib.auth.models import AbstractUser
|
||
from django.db import models
|
||
from django.utils import timezone
|
||
from django.core.exceptions import ValidationError
|
||
import uuid
|
||
import logging
|
||
from django.contrib.auth import get_user_model
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class User(AbstractUser):
|
||
"""自定义用户模型"""
|
||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
|
||
ROLE_CHOICES = (
|
||
('admin', '管理员'),
|
||
('leader', '组长'),
|
||
('member', '组员'),
|
||
)
|
||
|
||
name = models.CharField(max_length=150, verbose_name='真实姓名', default='未设置')
|
||
role = models.CharField(max_length=20, choices=ROLE_CHOICES, verbose_name='角色', default='member')
|
||
department = models.CharField(max_length=100, verbose_name='部门', default='未分配')
|
||
group = models.CharField(max_length=100, null=True, blank=True, verbose_name='小组')
|
||
|
||
class Meta:
|
||
db_table = 'users'
|
||
verbose_name = '用户'
|
||
verbose_name_plural = '用户'
|
||
|
||
def __str__(self):
|
||
return f"{self.username}({self.name})"
|
||
|
||
def can_manage_department(self):
|
||
"""检查是否可以管理部门"""
|
||
return self.role in ['admin', 'leader']
|
||
|
||
def can_manage_knowledge_base(self, knowledge_base):
|
||
"""检查是否可以管理知识库"""
|
||
if self.role == 'admin':
|
||
return knowledge_base.type != 'private' # 管理员不能管理私人知识库
|
||
if self.role == 'leader' and self.department == knowledge_base.department:
|
||
return knowledge_base.type == 'member' # 组长只能管理本部门的成员知识库
|
||
return knowledge_base.user_id == str(self.id) # 用户可以管理自己创建的知识库
|
||
|
||
def has_access_permission(self, knowledge_base):
|
||
"""检查用户是否有权限访问知识库"""
|
||
|
||
# 1. 如果是私人知识库
|
||
if knowledge_base.type == 'private':
|
||
# 创建者直接允许
|
||
if str(knowledge_base.user_id) == str(self.id):
|
||
return True
|
||
# 其他人需要申请权限
|
||
return Permission.objects.filter(
|
||
resource_type='knowledge',
|
||
resource_id=str(knowledge_base.id),
|
||
applicant=self,
|
||
status='approved',
|
||
expires_at__gt=timezone.now()
|
||
).exists()
|
||
|
||
# 2. 如果是管理级知识库
|
||
if knowledge_base.type == 'admin':
|
||
# 管理员直接允许
|
||
if self.role == 'admin':
|
||
return True
|
||
# 其他人需要申请权限
|
||
return Permission.objects.filter(
|
||
resource_type='knowledge',
|
||
resource_id=str(knowledge_base.id),
|
||
applicant=self,
|
||
status='approved'
|
||
).exists()
|
||
|
||
# 3. 如果是部门级知识库
|
||
if knowledge_base.type == 'leader':
|
||
# 同部门的管理员和组长可以访问
|
||
if self.department == knowledge_base.department:
|
||
return self.role in ['admin', 'leader']
|
||
return False
|
||
|
||
# 4. 如果是成员级知识库
|
||
if knowledge_base.type == 'member':
|
||
# 同部门的所有人可以访问
|
||
return self.department == knowledge_base.department
|
||
|
||
class Data(models.Model):
|
||
"""统一的数据模型"""
|
||
DATA_TYPES = (
|
||
('admin', '管理员数据'),
|
||
('leader', '组长数据'),
|
||
('member', '组员数据'),
|
||
)
|
||
|
||
# 基本信息
|
||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
name = models.CharField(max_length=100, help_text="数据名称")
|
||
desc = models.TextField(blank=True, help_text="数据描述")
|
||
type = models.CharField(max_length=10, choices=DATA_TYPES)
|
||
meta = models.JSONField(default=dict, blank=True, help_text="元数据")
|
||
|
||
# 关联信息
|
||
user_id = models.UUIDField(help_text="创建者ID")
|
||
department = models.CharField(max_length=50, blank=True)
|
||
|
||
# 统计信息
|
||
char_length = models.IntegerField(null=True, blank=True, help_text="字符长度")
|
||
document_count = models.IntegerField(null=True, blank=True, help_text="文档数量")
|
||
application_mapping_count = models.IntegerField(default=0, help_text="应用映射数量")
|
||
|
||
# 时间信息
|
||
create_time = models.DateTimeField(auto_now_add=True)
|
||
update_time = models.DateTimeField(auto_now=True)
|
||
|
||
class Meta:
|
||
db_table = 'user_data'
|
||
indexes = [
|
||
models.Index(fields=['type', 'user_id']),
|
||
models.Index(fields=['create_time']),
|
||
]
|
||
|
||
def __str__(self):
|
||
return f"{self.name} ({self.get_type_display()})"
|
||
|
||
def publish(self):
|
||
"""发布数据"""
|
||
if self.status == 'draft':
|
||
self.status = 'published'
|
||
self.published_at = timezone.now()
|
||
self.save()
|
||
logger.info(f"Data {self.id} published by {self.owner.username}")
|
||
|
||
def archive(self):
|
||
"""归档数据"""
|
||
if self.status == 'published':
|
||
self.status = 'archived'
|
||
self.archived_at = timezone.now()
|
||
self.save()
|
||
logger.info(f"Data {self.id} archived by {self.owner.username}")
|
||
|
||
class Notification(models.Model):
|
||
"""通知模型"""
|
||
NOTIFICATION_TYPES = (
|
||
('permission_request', '权限申请'),
|
||
('permission_approved', '权限批准'),
|
||
('permission_rejected', '权限拒绝'),
|
||
('permission_expired', '权限过期'),
|
||
('system_notice', '系统通知'),
|
||
)
|
||
|
||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
type = models.CharField(max_length=20, choices=NOTIFICATION_TYPES)
|
||
title = models.CharField(max_length=100)
|
||
content = models.TextField()
|
||
sender = models.ForeignKey('User', on_delete=models.CASCADE, related_name='sent_notifications')
|
||
receiver = models.ForeignKey('User', on_delete=models.CASCADE, related_name='received_notifications')
|
||
is_read = models.BooleanField(default=False)
|
||
related_resource = models.CharField(max_length=100, blank=True) # 相关资源ID
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
|
||
class Meta:
|
||
ordering = ['-created_at']
|
||
indexes = [
|
||
models.Index(fields=['receiver', '-created_at']),
|
||
models.Index(fields=['type', 'is_read']),
|
||
]
|
||
|
||
def __str__(self):
|
||
return f"{self.get_type_display()} - {self.title}"
|
||
|
||
class Permission(models.Model):
|
||
"""权限申请模型"""
|
||
STATUS_CHOICES = (
|
||
('pending', '待审批'),
|
||
('approved', '已批准'),
|
||
('rejected', '已拒绝'),
|
||
)
|
||
|
||
knowledge_base = models.ForeignKey(
|
||
'KnowledgeBase',
|
||
on_delete=models.CASCADE,
|
||
related_name='permissions',
|
||
verbose_name='知识库'
|
||
)
|
||
applicant = models.ForeignKey(
|
||
'User',
|
||
on_delete=models.CASCADE,
|
||
related_name='permission_applications',
|
||
verbose_name='申请人'
|
||
)
|
||
approver = models.ForeignKey(
|
||
'User',
|
||
on_delete=models.SET_NULL,
|
||
null=True,
|
||
blank=True,
|
||
related_name='permission_approvals',
|
||
verbose_name='审批人'
|
||
)
|
||
|
||
permissions = models.JSONField(default=dict, verbose_name='权限配置') # {"can_read": true, "can_edit": false, "can_delete": false}
|
||
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending', verbose_name='状态')
|
||
reason = models.TextField(verbose_name='申请原因')
|
||
response_message = models.TextField(null=True, blank=True, verbose_name='审批意见')
|
||
expires_at = models.DateTimeField(null=True, blank=True, verbose_name='过期时间') # 修改为可空
|
||
|
||
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
|
||
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
|
||
|
||
class Meta:
|
||
verbose_name = '权限申请'
|
||
verbose_name_plural = '权限申请'
|
||
ordering = ['-created_at']
|
||
|
||
def __str__(self):
|
||
return f"{self.applicant} 申请 {self.knowledge_base} 的权限"
|
||
|
||
def clean(self):
|
||
"""验证权限申请的合法性"""
|
||
try:
|
||
# 检查是否是自己的知识库
|
||
if str(self.knowledge_base.user_id) == str(self.applicant.id):
|
||
raise ValidationError('不能申请访问自己的知识库')
|
||
except KnowledgeBase.DoesNotExist:
|
||
raise ValidationError('知识库不存在')
|
||
|
||
def save(self, *args, **kwargs):
|
||
self.clean()
|
||
super().save(*args, **kwargs)
|
||
|
||
def approve(self, approver, response_message=''):
|
||
"""批准权限申请"""
|
||
if self.status == 'pending':
|
||
self.status = 'approved'
|
||
self.approver = approver
|
||
self.response_message = response_message
|
||
self.save()
|
||
|
||
def reject(self, approver, response_message=''):
|
||
"""拒绝权限申请"""
|
||
if self.status == 'pending':
|
||
self.status = 'rejected'
|
||
self.approver = approver
|
||
self.response_message = response_message
|
||
self.save()
|
||
|
||
def check_expiration(self):
|
||
"""检查权限是否过期"""
|
||
if self.status == 'approved' and self.expires_at and self.expires_at < timezone.now():
|
||
self.status = 'expired'
|
||
self.save()
|
||
logger.info(f"Permission {self.id} expired")
|
||
|
||
def send_notification(self, notification_type, title, content):
|
||
"""发送通知"""
|
||
Notification.objects.create(
|
||
type=notification_type,
|
||
title=title,
|
||
content=content,
|
||
sender=self.applicant,
|
||
receiver=self.approver if notification_type == 'permission_request' else self.applicant,
|
||
related_resource=str(self.id)
|
||
)
|
||
|
||
def notify_approver(self):
|
||
"""通知审批人"""
|
||
self.send_notification(
|
||
'permission_request',
|
||
f'新的权限申请 - {self.get_resource_type_display()}',
|
||
f'用户 {self.applicant.username} 申请访问 {self.get_resource_type_display()} 的权限'
|
||
)
|
||
|
||
def notify_applicant(self, status):
|
||
"""通知申请人审批结果"""
|
||
notification_type = 'permission_approved' if status == 'approved' else 'permission_rejected'
|
||
title = f'权限申请{self.get_status_display()} - {self.get_resource_type_display()}'
|
||
content = f'您申请的 {self.get_resource_type_display()} 权限已{self.get_status_display()}'
|
||
self.send_notification(notification_type, title, content)
|
||
|
||
class ChatHistory(models.Model):
|
||
"""聊天历史记录"""
|
||
ROLE_CHOICES = [
|
||
('user', '用户'),
|
||
('assistant', 'AI助手'),
|
||
('system', '系统')
|
||
]
|
||
|
||
user = models.ForeignKey(User, on_delete=models.CASCADE)
|
||
# 保留与主知识库的关联
|
||
knowledge_base = models.ForeignKey('KnowledgeBase', on_delete=models.CASCADE)
|
||
# 用于标识知识库组合的对话
|
||
conversation_id = models.CharField(max_length=100, db_index=True)
|
||
parent_id = models.CharField(max_length=100, null=True, blank=True)
|
||
role = models.CharField(max_length=20, choices=ROLE_CHOICES)
|
||
content = models.TextField()
|
||
tokens = models.IntegerField(default=0, help_text="消息token数")
|
||
# 扩展metadata字段,用于存储知识库组合信息
|
||
metadata = models.JSONField(default=dict, blank=True, help_text="""
|
||
{
|
||
'model_id': 'xxx',
|
||
'dataset_id_list': ['id1', 'id2', ...],
|
||
'dataset_external_id_list': ['ext1', 'ext2', ...],
|
||
'primary_knowledge_base': 'id1'
|
||
}
|
||
""")
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
is_deleted = models.BooleanField(default=False)
|
||
|
||
class Meta:
|
||
db_table = 'chat_history'
|
||
ordering = ['created_at']
|
||
indexes = [
|
||
models.Index(fields=['conversation_id', 'created_at']),
|
||
models.Index(fields=['user', 'created_at']),
|
||
# 添加新的索引以支持知识库组合查询
|
||
models.Index(fields=['conversation_id', 'is_deleted']),
|
||
]
|
||
|
||
def __str__(self):
|
||
return f"{self.user.username} - {self.knowledge_base.name} - {self.created_at}"
|
||
|
||
@classmethod
|
||
def get_conversation(cls, conversation_id):
|
||
"""获取完整对话历史"""
|
||
return cls.objects.filter(
|
||
conversation_id=conversation_id,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
@classmethod
|
||
def get_conversations_by_knowledge_bases(cls, dataset_ids, user):
|
||
"""根据知识库组合获取对话历史"""
|
||
# 对知识库ID列表排序以确保一致性
|
||
sorted_kb_ids = sorted(dataset_ids)
|
||
conversation_id = str(uuid.uuid5(
|
||
uuid.NAMESPACE_DNS,
|
||
'-'.join(sorted_kb_ids)
|
||
))
|
||
|
||
return cls.objects.filter(
|
||
conversation_id=conversation_id,
|
||
user=user,
|
||
is_deleted=False
|
||
).order_by('created_at')
|
||
|
||
@classmethod
|
||
def get_knowledge_base_combinations(cls, user):
|
||
"""获取用户的所有知识库组合"""
|
||
return cls.objects.filter(
|
||
user=user,
|
||
is_deleted=False
|
||
).values('conversation_id').annotate(
|
||
last_message=max('created_at'),
|
||
message_count=count('id')
|
||
).values(
|
||
'conversation_id',
|
||
'last_message',
|
||
'message_count',
|
||
'metadata'
|
||
).order_by('-last_message')
|
||
|
||
def get_knowledge_bases(self):
|
||
"""获取此消息关联的所有知识库"""
|
||
if self.metadata and 'dataset_id_list' in self.metadata:
|
||
return KnowledgeBase.objects.filter(
|
||
id__in=self.metadata['dataset_id_list']
|
||
)
|
||
return KnowledgeBase.objects.filter(id=self.knowledge_base.id)
|
||
|
||
def soft_delete(self):
|
||
"""软删除消息"""
|
||
self.is_deleted = True
|
||
self.save()
|
||
|
||
def to_dict(self):
|
||
"""转换为字典格式"""
|
||
return {
|
||
'id': str(self.id),
|
||
'conversation_id': self.conversation_id,
|
||
'role': self.role,
|
||
'content': self.content,
|
||
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'metadata': self.metadata,
|
||
'knowledge_bases': [
|
||
{
|
||
'id': str(kb.id),
|
||
'name': kb.name,
|
||
'type': kb.type
|
||
} for kb in self.get_knowledge_bases()
|
||
]
|
||
}
|
||
|
||
class UserProfile(models.Model):
|
||
"""用户档案模型"""
|
||
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
|
||
department = models.CharField(max_length=100, blank=True, help_text="部门")
|
||
group = models.CharField(max_length=100, blank=True, help_text="小组")
|
||
|
||
class Meta:
|
||
db_table = 'user_profiles'
|
||
|
||
def __str__(self):
|
||
return f"{self.user.username}'s profile"
|
||
|
||
class KnowledgeBasePermission(models.Model):
|
||
"""知识库权限模型 - 实现知识库和用户的多对多关系"""
|
||
STATUS_CHOICES = (
|
||
('active', '生效中'),
|
||
('expired', '已过期'),
|
||
('revoked', '已撤销'),
|
||
)
|
||
|
||
knowledge_base = models.ForeignKey(
|
||
'KnowledgeBase',
|
||
on_delete=models.CASCADE,
|
||
related_name='user_permissions',
|
||
verbose_name='知识库'
|
||
)
|
||
user = models.ForeignKey(
|
||
'User',
|
||
on_delete=models.CASCADE,
|
||
related_name='knowledge_base_permissions',
|
||
verbose_name='用户'
|
||
)
|
||
|
||
# 基础权限
|
||
can_read = models.BooleanField(default=False, verbose_name='查看权限')
|
||
can_edit = models.BooleanField(default=False, verbose_name='修改权限')
|
||
can_delete = models.BooleanField(default=False, verbose_name='删除权限')
|
||
|
||
# 权限状态
|
||
status = models.CharField(
|
||
max_length=10,
|
||
choices=STATUS_CHOICES,
|
||
default='active',
|
||
verbose_name='状态'
|
||
)
|
||
|
||
# 授权信息
|
||
granted_by = models.ForeignKey(
|
||
'User',
|
||
on_delete=models.SET_NULL,
|
||
null=True,
|
||
related_name='granted_permissions',
|
||
verbose_name='授权人'
|
||
)
|
||
granted_at = models.DateTimeField(auto_now_add=True, verbose_name='授权时间')
|
||
expires_at = models.DateTimeField(null=True, blank=True, verbose_name='过期时间')
|
||
|
||
class Meta:
|
||
app_label = 'user_management'
|
||
db_table = 'knowledge_base_permissions'
|
||
unique_together = ['knowledge_base', 'user']
|
||
indexes = [
|
||
models.Index(fields=['knowledge_base', 'user', 'status']),
|
||
]
|
||
verbose_name = '知识库权限'
|
||
verbose_name_plural = '知识库权限'
|
||
|
||
def __str__(self):
|
||
return f"{self.user.username} - {self.knowledge_base.name}"
|
||
|
||
def is_valid(self):
|
||
"""检查权限是否有效"""
|
||
if self.status != 'active':
|
||
return False
|
||
if self.expires_at and self.expires_at < timezone.now():
|
||
self.status = 'expired'
|
||
self.save()
|
||
return False
|
||
return True
|
||
|
||
class KnowledgeBase(models.Model):
|
||
"""知识库模型"""
|
||
KNOWLEDGE_BASE_TYPES = [
|
||
('admin', '管理级知识库'),
|
||
('leader', '部门级知识库'),
|
||
('member', '成员级知识库'),
|
||
('private', '私有知识库'),
|
||
('secret', '公司级别私密知识库'),
|
||
]
|
||
|
||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||
user_id = models.UUIDField(verbose_name='创建者ID')
|
||
name = models.CharField(max_length=100, unique=True, verbose_name='知识库名称')
|
||
desc = models.TextField(verbose_name='知识库描述', null=True, blank=True)
|
||
type = models.CharField(
|
||
max_length=20,
|
||
choices=KNOWLEDGE_BASE_TYPES,
|
||
default='private',
|
||
verbose_name='知识库类型'
|
||
)
|
||
department = models.CharField(max_length=50, null=True, blank=True)
|
||
group = models.CharField(max_length=50, null=True, blank=True)
|
||
documents = models.JSONField(default=list)
|
||
char_length = models.IntegerField(default=0)
|
||
document_count = models.IntegerField(default=0)
|
||
external_id = models.UUIDField(null=True, blank=True)
|
||
create_time = models.DateTimeField(auto_now_add=True)
|
||
update_time = models.DateTimeField(auto_now=True)
|
||
|
||
def is_owner(self, user):
|
||
"""检查用户是否是所有者(通过权限表检查)"""
|
||
return str(user.id) == str(self.user_id) or KnowledgeBasePermission.objects.filter(
|
||
knowledge_base=self,
|
||
user=user,
|
||
can_read=True,
|
||
can_edit=True,
|
||
can_delete=True,
|
||
status='active'
|
||
).exists()
|
||
|
||
def get_owners(self):
|
||
"""获取所有所有者(包括创建者和具有完整权限的用户)"""
|
||
from .models import User
|
||
# 获取创建者
|
||
owners = [self.user_id]
|
||
# 获取具有完整权限的用户
|
||
permission_owners = KnowledgeBasePermission.objects.filter(
|
||
knowledge_base=self,
|
||
can_read=True,
|
||
can_edit=True,
|
||
can_delete=True,
|
||
status='active'
|
||
).values_list('user_id', flat=True)
|
||
owners.extend(permission_owners)
|
||
return User.objects.filter(id__in=set(owners))
|
||
|
||
def calculate_stats(self):
|
||
"""计算文档统计信息"""
|
||
total_chars = 0
|
||
doc_count = 0
|
||
|
||
if self.documents:
|
||
doc_count = len(self.documents)
|
||
for doc in self.documents:
|
||
if 'paragraphs' in doc:
|
||
for para in doc['paragraphs']:
|
||
if 'content' in para:
|
||
total_chars += len(para['content'])
|
||
if 'title' in para:
|
||
total_chars += len(para['title'])
|
||
|
||
return doc_count, total_chars
|
||
|
||
def save(self, *args, **kwargs):
|
||
"""重写保存方法"""
|
||
# 只在创建时计算统计信息
|
||
if not self.pk: # 如果是新实例
|
||
doc_count, char_count = self.calculate_stats()
|
||
self.document_count = doc_count
|
||
self.char_length = char_count
|
||
|
||
# 直接调用Model的save方法
|
||
models.Model.save(self, *args, **kwargs)
|
||
|
||
@classmethod
|
||
def update_external_id(cls, instance_id, external_id):
|
||
"""更新外部ID的静态方法"""
|
||
cls.objects.filter(id=instance_id).update(external_id=external_id)
|
||
|
||
class Meta:
|
||
db_table = 'knowledge_bases'
|
||
indexes = [
|
||
models.Index(fields=['type']),
|
||
models.Index(fields=['department']),
|
||
models.Index(fields=['group'])
|
||
]
|
||
|
||
def __str__(self):
|
||
return f"{self.name} ({self.get_type_display()})"
|
||
|
||
def clean(self):
|
||
"""验证知识库类型与创建者权限是否匹配"""
|
||
try:
|
||
user = User.objects.get(id=self.user_id)
|
||
if user.role == 'member' and self.type != 'private':
|
||
raise ValidationError('组员只能创建私人知识库')
|
||
if user.role == 'leader' and self.type not in ['member', 'private']:
|
||
raise ValidationError('组长只能创建成员级或私人知识库')
|
||
except User.DoesNotExist:
|
||
raise ValidationError('创建者不存在')
|
||
|
||
def to_response_dict(self):
|
||
"""转换为API响应格式"""
|
||
return {
|
||
"create_time": self.create_time.isoformat(),
|
||
"update_time": self.update_time.isoformat(),
|
||
"id": str(self.id),
|
||
"name": self.name,
|
||
"desc": self.desc,
|
||
"type": self.type,
|
||
"meta": self.meta,
|
||
"user_id": str(self.user_id),
|
||
"embedding_mode_id": str(self.embedding_mode_id),
|
||
"char_length": self.char_length,
|
||
"application_mapping_count": self.application_mapping_count,
|
||
"document_count": self.document_count
|
||
}
|
||
|
||
def to_external_format(self):
|
||
"""转换为外部接口格式"""
|
||
return {
|
||
"name": self.name,
|
||
"desc": self.desc,
|
||
"documents": self.documents
|
||
}
|
||
|
||
@classmethod
|
||
def from_external_format(cls, data, user_id, embedding_mode_id):
|
||
"""从外部接口格式创建实例"""
|
||
return cls(
|
||
name=data["name"],
|
||
desc=data["desc"],
|
||
documents=data["documents"],
|
||
user_id=user_id,
|
||
embedding_mode_id=embedding_mode_id,
|
||
document_count=len(data["documents"]) if data.get("documents") else 0
|
||
)
|