diff --git a/datasets/views.py b/datasets/views.py index d5ffee5..90c5ec3 100644 --- a/datasets/views.py +++ b/datasets/views.py @@ -3,22 +3,297 @@ import os import shutil import glob import zipfile +import duckdb +import json from django.shortcuts import render from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.core.files.storage import FileSystemStorage -from .models import Dataset, Project +from .models import Dataset, Project, CreatorProfile, Product, CreatorProductMatch from .tool import check_directory_structure_detect, delete_temp_dir, get_categories from .minio_tools import upload_to_minio, delete_from_minio, get_dataset_link -from .config import TEMP_ROOT_DIR, LOCAL_IP, BUCKET_NAME +from .config import TEMP_ROOT_DIR, LOCAL_IP, BUCKET_NAME, LOCAL_DATA_DIR from concurrent.futures import ThreadPoolExecutor - +from accounts.models import User +from django.forms import model_to_dict +from django.db.models.fields.files import ImageFieldFile +import pandas as pd +import numpy as np +from decimal import Decimal +from datetime import datetime executor = ThreadPoolExecutor(max_workers=10) task_type_map = {'Detect':'Detection', 'Classify':'Classification'} # Create your views here. @csrf_exempt +def create_dataset(request): + """创建一个数据集的接口,包含参数校验、用户校验、名称唯一性校验,统一返回格式""" + if request.method == 'POST': + try: + data = json.loads(request.body.decode('utf-8')) + except Exception: + return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) + name = data.get('name') + user_id = data.get('user_id') + task_type = data.get('task_type', '') + size = data.get('size', '') + description = data.get('description', '') + categories = data.get('categories', []) + # 参数完整性校验 + if not name or not user_id: + return JsonResponse({'code': 400, 'message': '缺少必要参数: name 或 user_id', 'data': {}}, status=400) + # 用户存在性校验 + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + # 名称唯一性校验(同一用户下) + if Dataset.objects.filter(name=name, user=user).exists(): + return JsonResponse({'code': 400, 'message': '该用户下数据集名称已存在', 'data': {}}, status=400) + # 创建数据集 + dataset = Dataset( + name=name, + user=user, + task_type=task_type, + size=size, + description=description, + categories=categories + ) + dataset.save() + + # 创建本地磁盘目录结构 + user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) + if not os.path.exists(user_dir): + os.makedirs(user_dir) + dataset_dir = os.path.join(user_dir, str(dataset.id)) + if not os.path.exists(dataset_dir): + os.makedirs(dataset_dir) + + # 检查/创建Parquet文件 + parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") + if os.path.exists(parquet_path): + os.remove(parquet_path) + + # # 创建空表结构的Parquet文件(可根据后续实际字段调整) + # empty_df = pd.DataFrame() + # empty_df.to_parquet(parquet_path) + + return JsonResponse({'code': 200, 'message': '数据集创建成功', 'data': {'dataset_id': dataset.id}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + +@csrf_exempt +def add_data_to_dataset(request): + """根据传入的product_id和creator_id插入一条CreatorProductMatch记录,统一返回格式""" + if request.method == 'POST': + try: + data = json.loads(request.body.decode('utf-8')) + except Exception: + return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) + product_id = data.get('product_id') + creator_id = data.get('creator_id') + is_matched = data.get('is_matched') + dataset_id = data.get('dataset_id') + user_id = data.get('user_id') + if not product_id or not creator_id or not dataset_id or not user_id: + return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id或dataset_id或user_id参数', 'data': {}}, status=400) + try: + product = Product.objects.get(id=product_id) + except Product.DoesNotExist: + return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400) + + # 用户存在性校验 + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + + try: + creator = CreatorProfile.objects.get(id=creator_id) + except CreatorProfile.DoesNotExist: + return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400) + + try: + dataset = Dataset.objects.get(id=dataset_id, user_id=user_id) + except Dataset.DoesNotExist: + return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) + + user_dir = os.path.join(LOCAL_DATA_DIR, str(dataset.user.id)) + dataset_dir = os.path.join(user_dir, str(dataset.id)) + parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") + + # 组装json数据 + def clean_filefields(d): + for k, v in d.items(): + if isinstance(v, ImageFieldFile): + d[k] = None + return d + + creator_dict = model_to_dict(creator) + product_dict = model_to_dict(product) + creator_dict = clean_filefields(creator_dict) + product_dict = clean_filefields(product_dict) + # 字段重命名 + creator_dict['creator_id'] = creator_dict.pop('id') + product_dict['product_id'] = product_dict.pop('id') + row = {**creator_dict, **product_dict, 'is_matched': is_matched, 'create_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")} + + if not os.path.exists(parquet_path): + # 文件不存在,直接创建新文件并写入当前数据 + df = pd.DataFrame([row]) + df.to_parquet(parquet_path) + return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}}) + else: + # 文件存在,检查是否已包含该数据 + con = duckdb.connect() + query = f"SELECT COUNT(*) FROM read_parquet('{parquet_path}') WHERE product_id={product_id} AND creator_id={creator_id}" + result = con.execute(query).fetchone() + exists = result and result[0] > 0 + con.close() + if exists: + return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}}) + # 合并数据 + df = pd.read_parquet(parquet_path) + df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) + df.to_parquet(parquet_path) + return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + +@csrf_exempt +def get_dataset_list(request): + """根据传入的用户id返回该用户所有数据集信息,统一返回格式""" + if request.method == 'GET': + user_id = request.GET.get('user_id') + if not user_id: + return JsonResponse({'code': 400, 'message': '缺少user_id参数', 'data': {}}, status=400) + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + datasets = Dataset.objects.filter(user=user) + dataset_list = list(datasets.values()) + return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'datasets': dataset_list}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + +@csrf_exempt +def get_dataset_detail(request): + """根据传入的dataset_id和user_id返回该数据集的信息,统一返回格式""" + if request.method == 'GET': + dataset_id = request.GET.get('dataset_id') + user_id = request.GET.get('user_id') + if not dataset_id or not user_id: + return JsonResponse({'code': 400, 'message': '缺少dataset_id或user_id参数', 'data': {}}, status=400) + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + try: + dataset = Dataset.objects.get(id=dataset_id, user=user) + except Dataset.DoesNotExist: + return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) + return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'dataset': model_to_dict(dataset)}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + +@csrf_exempt +def delete_dataset_api(request): + """根据user_id和dataset_id删除数据集,统一返回格式""" + if request.method == 'POST': + try: + data = json.loads(request.body.decode('utf-8')) + except Exception: + return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) + user_id = data.get('user_id') + dataset_id = data.get('dataset_id') + if not user_id or not dataset_id: + return JsonResponse({'code': 400, 'message': '缺少user_id或dataset_id参数', 'data': {}}, status=400) + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + try: + dataset = Dataset.objects.get(id=dataset_id, user=user) + except Dataset.DoesNotExist: + return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) + + # 删除本地磁盘数据集文件夹 + user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) + dataset_dir = os.path.join(user_dir, str(dataset.id)) + if os.path.exists(dataset_dir): + shutil.rmtree(dataset_dir) + + # 删除数据库中的数据集 + dataset.delete() + return JsonResponse({'code': 200, 'message': '数据集删除成功', 'data': {}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + +@csrf_exempt +def get_match_list(request): + """获取CreatorProductMatch数据列表,支持分页和筛选(从Parquet文件读取)""" + if request.method == 'GET': + page = request.GET.get('page') + page_size = request.GET.get('page_size') + user_id = request.GET.get('user_id') + dataset_id = request.GET.get('dataset_id') + # 参数校验 + if not page or not page_size or not user_id or not dataset_id: + return JsonResponse({'code': 400, 'message': '缺少参数: page, page_size, user_id, dataset_id', 'data': {}}, status=400) + try: + page = int(page) + page_size = int(page_size) + if page < 1 or page_size < 1: + raise ValueError + except ValueError: + return JsonResponse({'code': 400, 'message': 'page和page_size必须为正整数', 'data': {}}, status=400) + # 校验用户和数据集 + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist: + return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400) + try: + dataset = Dataset.objects.get(id=dataset_id, user=user) + except Dataset.DoesNotExist: + return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400) + # 拼接Parquet文件路径 + user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id)) + dataset_dir = os.path.join(user_dir, str(dataset.id)) + parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet") + if not os.path.exists(parquet_path): + return JsonResponse({'code': 200, 'message': '暂无数据', 'data': {'total': 0, 'matches': []}}) + try: + df = pd.read_parquet(parquet_path) + except Exception as e: + return JsonResponse({'code': 500, 'message': f'Parquet文件读取失败: {str(e)}', 'data': {}}) + total = len(df) + start = (page - 1) * page_size + end = start + page_size + # 分页 + page_df = df.iloc[start:end] + # 转为dict + matches = page_df.fillna('').to_dict(orient='records') + + def convert_obj(obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + if isinstance(obj, Decimal): + return float(obj) + if isinstance(obj, dict): + return {k: convert_obj(v) for k, v in obj.items()} + if isinstance(obj, list): + return [convert_obj(i) for i in obj] + return obj + matches = [convert_obj(item) for item in matches] + print(matches) + return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'total': total, 'matches': matches}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}) + + + + + + + +####### 老接口 +@csrf_exempt def upload(request): if request.method == 'POST': # 检查文件是否存在 @@ -224,3 +499,32 @@ def get_dataset_is_upload(request): return JsonResponse({'success': False, 'message': '请求方法错误'}) + + + """根据传入的product_id和creator_id插入一条CreatorProductMatch记录,统一返回格式""" + if request.method == 'POST': + try: + data = json.loads(request.body.decode('utf-8')) + except Exception: + return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400) + product_id = data.get('product_id') + creator_id = data.get('creator_id') + if not product_id or not creator_id: + return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id参数', 'data': {}}, status=400) + try: + product = Product.objects.get(id=product_id) + except Product.DoesNotExist: + return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400) + try: + creator = CreatorProfile.objects.get(id=creator_id) + except CreatorProfile.DoesNotExist: + return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400) + # 检查是否已存在 + if CreatorProductMatch.objects.filter(product=product, creator=creator).exists(): + return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}}, status=400) + match = CreatorProductMatch.objects.create(product=product, creator=creator) + return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {'match_id': match.id}}) + return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400) + + +