import requests import os import shutil import glob import zipfile import duckdb import json from django.shortcuts import render from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.core.files.storage import FileSystemStorage from .models import Dataset, Project, CreatorProfile, Product, CreatorProductMatch from .tool import check_directory_structure_detect, delete_temp_dir, get_categories from .minio_tools import upload_to_minio, delete_from_minio, get_dataset_link from .config import TEMP_ROOT_DIR, LOCAL_IP, BUCKET_NAME, LOCAL_DATA_DIR from concurrent.futures import ThreadPoolExecutor from accounts.models import User from django.forms import model_to_dict from django.db.models.fields.files import ImageFieldFile import pandas as pd import numpy as np from decimal import Decimal from datetime import datetime executor = ThreadPoolExecutor(max_workers=10) task_type_map = {'Detect':'Detection', 'Classify':'Classification'} # Create your views here. @csrf_exempt def create_dataset(request): """创建一个数据集的接口,包含参数校验、用户校验、名称唯一性校验,统一返回格式""" if request.method == 'POST': try: data = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) name = data.get('name') user_id = data.get('user_id') task_type = data.get('task_type', '') size = data.get('size', '') description = data.get('description', '') categories = data.get('categories', []) # 参数完整性校验 if not name or not user_id: return JsonResponse({'code': 400, 'message': '缺少必要参数: name 或 user_id', 'data': {}}, status=400) # 用户存在性校验 try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) # 名称唯一性校验(同一用户下) if Dataset.objects.filter(name=name, user=user).exists(): return JsonResponse({'code': 400, 'message': '该用户下数据集名称已存在', 'data': {}}, status=400) # 创建数据集 dataset = Dataset( name=name, user=user, task_type=task_type, size=size, description=description, categories=categories ) dataset.save() # 创建本地磁盘目录结构 user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) if not os.path.exists(user_dir): os.makedirs(user_dir) dataset_dir = os.path.join(user_dir, str(dataset.id)) if not os.path.exists(dataset_dir): os.makedirs(dataset_dir) # 检查/创建Parquet文件 parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") if os.path.exists(parquet_path): os.remove(parquet_path) # # 创建空表结构的Parquet文件(可根据后续实际字段调整) # empty_df = pd.DataFrame() # empty_df.to_parquet(parquet_path) return JsonResponse({'code': 200, 'message': '数据集创建成功', 'data': {'dataset_id': dataset.id}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) @csrf_exempt def add_data_to_dataset(request): """根据传入的product_id和creator_id插入一条CreatorProductMatch记录,统一返回格式""" if request.method == 'POST': try: data = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) product_id = data.get('product_id') creator_id = data.get('creator_id') is_matched = data.get('is_matched') dataset_id = data.get('dataset_id') user_id = data.get('user_id') if not product_id or not creator_id or not dataset_id or not user_id: return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id或dataset_id或user_id参数', 'data': {}}, status=400) try: product = Product.objects.get(id=product_id) except Product.DoesNotExist: return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400) # 用户存在性校验 try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) try: creator = CreatorProfile.objects.get(id=creator_id) except CreatorProfile.DoesNotExist: return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400) try: dataset = Dataset.objects.get(id=dataset_id, user_id=user_id) except Dataset.DoesNotExist: return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) user_dir = os.path.join(LOCAL_DATA_DIR, str(dataset.user.id)) dataset_dir = os.path.join(user_dir, str(dataset.id)) parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") # 组装json数据 def clean_filefields(d): for k, v in d.items(): if isinstance(v, ImageFieldFile): d[k] = None return d creator_dict = model_to_dict(creator) product_dict = model_to_dict(product) creator_dict = clean_filefields(creator_dict) product_dict = clean_filefields(product_dict) # 字段重命名 creator_dict['creator_id'] = creator_dict.pop('id') product_dict['product_id'] = product_dict.pop('id') row = {**creator_dict, **product_dict, 'is_matched': is_matched, 'create_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")} if not os.path.exists(parquet_path): # 文件不存在,直接创建新文件并写入当前数据 df = pd.DataFrame([row]) df.to_parquet(parquet_path) return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}}) else: # 文件存在,检查是否已包含该数据 con = duckdb.connect() query = f"SELECT COUNT(*) FROM read_parquet('{parquet_path}') WHERE product_id={product_id} AND creator_id={creator_id}" result = con.execute(query).fetchone() exists = result and result[0] > 0 con.close() if exists: return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}}) # 合并数据 df = pd.read_parquet(parquet_path) df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) df.to_parquet(parquet_path) return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) @csrf_exempt def get_dataset_list(request): """根据传入的用户id返回该用户所有数据集信息,统一返回格式""" if request.method == 'GET': user_id = request.GET.get('user_id') if not user_id: return JsonResponse({'code': 400, 'message': '缺少user_id参数', 'data': {}}, status=400) try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) datasets = Dataset.objects.filter(user=user) dataset_list = list(datasets.values()) return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'datasets': dataset_list}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) @csrf_exempt def get_dataset_detail(request): """根据传入的dataset_id和user_id返回该数据集的信息,统一返回格式""" if request.method == 'GET': dataset_id = request.GET.get('dataset_id') user_id = request.GET.get('user_id') if not dataset_id or not user_id: return JsonResponse({'code': 400, 'message': '缺少dataset_id或user_id参数', 'data': {}}, status=400) try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) try: dataset = Dataset.objects.get(id=dataset_id, user=user) except Dataset.DoesNotExist: return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'dataset': model_to_dict(dataset)}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) @csrf_exempt def delete_dataset_api(request): """根据user_id和dataset_id删除数据集,统一返回格式""" if request.method == 'POST': try: data = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) user_id = data.get('user_id') dataset_id = data.get('dataset_id') if not user_id or not dataset_id: return JsonResponse({'code': 400, 'message': '缺少user_id或dataset_id参数', 'data': {}}, status=400) try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) try: dataset = Dataset.objects.get(id=dataset_id, user=user) except Dataset.DoesNotExist: return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) # 删除本地磁盘数据集文件夹 user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) dataset_dir = os.path.join(user_dir, str(dataset.id)) if os.path.exists(dataset_dir): shutil.rmtree(dataset_dir) # 删除数据库中的数据集 dataset.delete() return JsonResponse({'code': 200, 'message': '数据集删除成功', 'data': {}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) @csrf_exempt def get_match_list(request): """获取CreatorProductMatch数据列表,支持分页和筛选(从Parquet文件读取)""" if request.method == 'GET': page = request.GET.get('page') page_size = request.GET.get('page_size') user_id = request.GET.get('user_id') dataset_id = request.GET.get('dataset_id') # 参数校验 if not page or not page_size or not user_id or not dataset_id: return JsonResponse({'code': 400, 'message': '缺少参数: page, page_size, user_id, dataset_id', 'data': {}}, status=400) try: page = int(page) page_size = int(page_size) if page < 1 or page_size < 1: raise ValueError except ValueError: return JsonResponse({'code': 400, 'message': 'page和page_size必须为正整数', 'data': {}}, status=400) # 校验用户和数据集 try: user = User.objects.get(id=user_id) except User.DoesNotExist: return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) try: dataset = Dataset.objects.get(id=dataset_id, user=user) except Dataset.DoesNotExist: return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) # 拼接Parquet文件路径 user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) dataset_dir = os.path.join(user_dir, str(dataset.id)) parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") if not os.path.exists(parquet_path): return JsonResponse({'code': 200, 'message': '暂无数据', 'data': {'total': 0, 'matches': []}}) try: df = pd.read_parquet(parquet_path) except Exception as e: return JsonResponse({'code': 500, 'message': f'Parquet文件读取失败: {str(e)}', 'data': {}}) total = len(df) start = (page - 1) * page_size end = start + page_size # 分页 page_df = df.iloc[start:end] # 转为dict matches = page_df.fillna('').to_dict(orient='records') def convert_obj(obj): if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, Decimal): return float(obj) if isinstance(obj, dict): return {k: convert_obj(v) for k, v in obj.items()} if isinstance(obj, list): return [convert_obj(i) for i in obj] return obj matches = [convert_obj(item) for item in matches] print(matches) return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'total': total, 'matches': matches}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}) ####### 老接口 @csrf_exempt def upload(request): if request.method == 'POST': # 检查文件是否存在 if 'file' in request.FILES: dataset_name = request.POST.get('name') user = request.POST.get('user') task_type = request.POST.get('taskType') size = request.POST.get('size') description = request.POST.get('description') uploaded_file = request.FILES.get('file') ## 检查数据库判断该数据集名称是否重复 if Dataset.objects.filter(name=dataset_name, user=user).exists(): return JsonResponse({'success': False, 'message': f'数据集 {dataset_name} 名称已存在'}) ## 创建解压数据集的临时目录 temp_dir = TEMP_ROOT_DIR + '/' + user + '/' + dataset_name # 如果目录已存在且不为空,则删除该目录 if os.path.exists(temp_dir) and os.listdir(temp_dir): shutil.rmtree(temp_dir) os.makedirs(temp_dir, exist_ok=True) file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, 'wb') as f: for chunk in uploaded_file.chunks(): f.write(chunk) print(f"数据集 {dataset_name} 保存到临时目录成功") ## 开始解压数据集 try: with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) print(f"数据集 {dataset_name} 解压到临时目录成功") # 重命名解压后的文件夹 extracted_folder_name = os.path.splitext(uploaded_file.name)[0] extracted_folder_path = os.path.join(temp_dir, extracted_folder_name) new_folder_path = os.path.join(temp_dir, 'Detection') os.rename(extracted_folder_path, new_folder_path) except Exception as e: print(f"数据集{dataset_name}解压失败", e) return JsonResponse({'success': False, 'message': f'数据集 {dataset_name} 解压失败'}) # 根据 task_type 设置 data_path if task_type == "Detect": data_path = os.path.join(temp_dir, 'Detection') elif task_type == "Classify": data_path = os.path.join(temp_dir, 'Classification') else: data_path = os.path.join(temp_dir, 'Detection') # 默认值 # 根据 task_type 检查相应数据集的文件结构是否正确 flag = False if task_type == "Detect": flag, message = check_directory_structure_detect(data_path) elif task_type == "Classify": flag, message = check_directory_structure_detect(data_path) else: flag, message = check_directory_structure_detect(data_path) if flag: yaml_files = glob.glob(os.path.join(data_path, "*.yaml")) yaml_path = yaml_files[0] categories = get_categories(yaml_path) print(f"数据集 {dataset_name} 结构检查成功") else: print(f"数据集 {dataset_name} 结构检查失败") ## 删除临时文件夹的所有数据 curr_temp_dir = os.path.join(os.path.join(TEMP_ROOT_DIR, user), dataset_name) delete_temp_dir(curr_temp_dir) return JsonResponse({'success': False, 'message': f'数据集 {dataset_name} 结构不符合要求: {message}'}) # 创建 Dataset 对象 dataset = Dataset( name=dataset_name, user=user, # 这里需要替换为实际的用户信息 task_type=task_type, size=size, number=0, description=description, categories = categories ) future = executor.submit(upload_to_minio, data_path, dataset) dataset.save() if uploaded_file: return JsonResponse({'success': True, 'message': '文件已成功上传'}) else: return JsonResponse({'success': False, 'message': '没有文件上传'}) return JsonResponse({'success': False, 'message': '请求错误'}) def get_user_datasets(request): user = request.GET.get('user') datasets = Dataset.objects.filter(user=user) return JsonResponse({'datasets': list(datasets.values())}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def create_project(request): project_name = request.POST.get('name') user = request.POST.get('user') task_type = request.POST.get('taskType') description = request.POST.get('description') project = Project(name=project_name, user=user, task_type=task_type, description=description) project.save() return JsonResponse({'success': True, 'message': '项目已创建'}) def get_user_projects(request): user = request.GET.get('user') projects = Project.objects.filter(user=user) return JsonResponse({'projects': list(projects.values())}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def delete_project(request): user = request.GET.get('user') project_name = request.GET.get('projectName') project_deleted, _ = Project.objects.filter(user=user, name=project_name).delete() if project_deleted > 0: return JsonResponse({'success': True}) else: return JsonResponse({'success': False}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def delete_dataset(request): user = request.GET.get('user') dataset_name = request.GET.get('datasetName') project_deleted, _ = Dataset.objects.filter(user=user, name=dataset_name).delete() delete_object_name = user + '/' + dataset_name ## 删除minio中的所有数据 future1 = executor.submit(delete_from_minio, delete_object_name) ## 删除临时文件夹的所有数据 curr_temp_dir = os.path.join(os.path.join(TEMP_ROOT_DIR, user), dataset_name) future2 = executor.submit(delete_temp_dir, curr_temp_dir) if project_deleted > 0: return JsonResponse({'success': True}) else: return JsonResponse({'success': False}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def get_project(request): user = request.GET.get('user') project_name = request.GET.get('projectName') project = Project.objects.filter(user=user, name=project_name) if project is not None: return JsonResponse({'success': True, 'project': list(project.values())[0]}) else: return JsonResponse({'success': False, 'project': None}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def get_dataset(request): user = request.GET.get('user') dataset_name = request.GET.get('datasetName') dataset = Dataset.objects.filter(user=user, name=dataset_name) if dataset is not None: return JsonResponse({'success': True, 'dataset': list(dataset.values())[0]}) else: return JsonResponse({'success': False, 'dataset': None}) @csrf_exempt # 仅在开发时使用,生产环境中请使用更安全的方式 def get_minio_links(request): user = request.GET.get('user') dataset_name = request.GET.get('datasetName') next_image = request.GET.get('nextImage') page_size = request.GET.get('pageSize') if next_image != '': next_image = os.path.relpath(next_image, f"http://{LOCAL_IP}:9000/{BUCKET_NAME}") task_type = request.GET.get('taskType') ## 生成object_name路径,用于访问minio object_name = user + '/' + dataset_name + '/' + task_type + '/images' http_links = get_dataset_link(object_name, next_image, page_size) return JsonResponse({'success': True, 'links': http_links}) @csrf_exempt def get_dataset_is_upload(request): if request.method == 'GET': user = request.GET.get('user') dataset_name = request.GET.get('datasetName') # 检查用户和数据集名称是否提供 if not user or not dataset_name: return JsonResponse({'success': False, 'message': '用户或数据集名称未提供'}) # 查询数据库获取数据集 dataset = Dataset.objects.filter(user=user, name=dataset_name).first() if dataset is not None: return JsonResponse({'success': True, 'is_upload': dataset.is_upload}) else: return JsonResponse({'success': False, 'message': '数据集未找到'}) return JsonResponse({'success': False, 'message': '请求方法错误'}) """根据传入的product_id和creator_id插入一条CreatorProductMatch记录,统一返回格式""" if request.method == 'POST': try: data = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) product_id = data.get('product_id') creator_id = data.get('creator_id') if not product_id or not creator_id: return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id参数', 'data': {}}, status=400) try: product = Product.objects.get(id=product_id) except Product.DoesNotExist: return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400) try: creator = CreatorProfile.objects.get(id=creator_id) except CreatorProfile.DoesNotExist: return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400) # 检查是否已存在 if CreatorProductMatch.objects.filter(product=product, creator=creator).exists(): return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}}, status=400) match = CreatorProductMatch.objects.create(product=product, creator=creator) return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {'match_id': match.id}}) return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)