实现数据集相关接口

This commit is contained in:
Zixiao Wang 2025-06-09 15:40:07 +08:00
parent fe6b149f06
commit 287ed16fb1

View File

@ -3,22 +3,297 @@ import os
import shutil
import glob
import zipfile
import duckdb
import json
from django.shortcuts import render
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.files.storage import FileSystemStorage
from .models import Dataset, Project
from .models import Dataset, Project, CreatorProfile, Product, CreatorProductMatch
from .tool import check_directory_structure_detect, delete_temp_dir, get_categories
from .minio_tools import upload_to_minio, delete_from_minio, get_dataset_link
from .config import TEMP_ROOT_DIR, LOCAL_IP, BUCKET_NAME
from .config import TEMP_ROOT_DIR, LOCAL_IP, BUCKET_NAME, LOCAL_DATA_DIR
from concurrent.futures import ThreadPoolExecutor
from accounts.models import User
from django.forms import model_to_dict
from django.db.models.fields.files import ImageFieldFile
import pandas as pd
import numpy as np
from decimal import Decimal
from datetime import datetime
executor = ThreadPoolExecutor(max_workers=10)
task_type_map = {'Detect':'Detection', 'Classify':'Classification'}
# Create your views here.
@csrf_exempt
def create_dataset(request):
"""创建一个数据集的接口,包含参数校验、用户校验、名称唯一性校验,统一返回格式"""
if request.method == 'POST':
try:
data = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400)
name = data.get('name')
user_id = data.get('user_id')
task_type = data.get('task_type', '')
size = data.get('size', '')
description = data.get('description', '')
categories = data.get('categories', [])
# 参数完整性校验
if not name or not user_id:
return JsonResponse({'code': 400, 'message': '缺少必要参数: name 或 user_id', 'data': {}}, status=400)
# 用户存在性校验
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
# 名称唯一性校验(同一用户下)
if Dataset.objects.filter(name=name, user=user).exists():
return JsonResponse({'code': 400, 'message': '该用户下数据集名称已存在', 'data': {}}, status=400)
# 创建数据集
dataset = Dataset(
name=name,
user=user,
task_type=task_type,
size=size,
description=description,
categories=categories
)
dataset.save()
# 创建本地磁盘目录结构
user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id))
if not os.path.exists(user_dir):
os.makedirs(user_dir)
dataset_dir = os.path.join(user_dir, str(dataset.id))
if not os.path.exists(dataset_dir):
os.makedirs(dataset_dir)
# 检查/创建Parquet文件
parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet")
if os.path.exists(parquet_path):
os.remove(parquet_path)
# # 创建空表结构的Parquet文件可根据后续实际字段调整
# empty_df = pd.DataFrame()
# empty_df.to_parquet(parquet_path)
return JsonResponse({'code': 200, 'message': '数据集创建成功', 'data': {'dataset_id': dataset.id}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)
@csrf_exempt
def add_data_to_dataset(request):
"""根据传入的product_id和creator_id插入一条CreatorProductMatch记录统一返回格式"""
if request.method == 'POST':
try:
data = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400)
product_id = data.get('product_id')
creator_id = data.get('creator_id')
is_matched = data.get('is_matched')
dataset_id = data.get('dataset_id')
user_id = data.get('user_id')
if not product_id or not creator_id or not dataset_id or not user_id:
return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id或dataset_id或user_id参数', 'data': {}}, status=400)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400)
# 用户存在性校验
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400)
try:
dataset = Dataset.objects.get(id=dataset_id, user_id=user_id)
except Dataset.DoesNotExist:
return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400)
user_dir = os.path.join(LOCAL_DATA_DIR, str(dataset.user.id))
dataset_dir = os.path.join(user_dir, str(dataset.id))
parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet")
# 组装json数据
def clean_filefields(d):
for k, v in d.items():
if isinstance(v, ImageFieldFile):
d[k] = None
return d
creator_dict = model_to_dict(creator)
product_dict = model_to_dict(product)
creator_dict = clean_filefields(creator_dict)
product_dict = clean_filefields(product_dict)
# 字段重命名
creator_dict['creator_id'] = creator_dict.pop('id')
product_dict['product_id'] = product_dict.pop('id')
row = {**creator_dict, **product_dict, 'is_matched': is_matched, 'create_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
if not os.path.exists(parquet_path):
# 文件不存在,直接创建新文件并写入当前数据
df = pd.DataFrame([row])
df.to_parquet(parquet_path)
return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}})
else:
# 文件存在,检查是否已包含该数据
con = duckdb.connect()
query = f"SELECT COUNT(*) FROM read_parquet('{parquet_path}') WHERE product_id={product_id} AND creator_id={creator_id}"
result = con.execute(query).fetchone()
exists = result and result[0] > 0
con.close()
if exists:
return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}})
# 合并数据
df = pd.read_parquet(parquet_path)
df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
df.to_parquet(parquet_path)
return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)
@csrf_exempt
def get_dataset_list(request):
"""根据传入的用户id返回该用户所有数据集信息统一返回格式"""
if request.method == 'GET':
user_id = request.GET.get('user_id')
if not user_id:
return JsonResponse({'code': 400, 'message': '缺少user_id参数', 'data': {}}, status=400)
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
datasets = Dataset.objects.filter(user=user)
dataset_list = list(datasets.values())
return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'datasets': dataset_list}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)
@csrf_exempt
def get_dataset_detail(request):
"""根据传入的dataset_id和user_id返回该数据集的信息统一返回格式"""
if request.method == 'GET':
dataset_id = request.GET.get('dataset_id')
user_id = request.GET.get('user_id')
if not dataset_id or not user_id:
return JsonResponse({'code': 400, 'message': '缺少dataset_id或user_id参数', 'data': {}}, status=400)
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
try:
dataset = Dataset.objects.get(id=dataset_id, user=user)
except Dataset.DoesNotExist:
return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400)
return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'dataset': model_to_dict(dataset)}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)
@csrf_exempt
def delete_dataset_api(request):
"""根据user_id和dataset_id删除数据集统一返回格式"""
if request.method == 'POST':
try:
data = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400)
user_id = data.get('user_id')
dataset_id = data.get('dataset_id')
if not user_id or not dataset_id:
return JsonResponse({'code': 400, 'message': '缺少user_id或dataset_id参数', 'data': {}}, status=400)
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
try:
dataset = Dataset.objects.get(id=dataset_id, user=user)
except Dataset.DoesNotExist:
return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400)
# 删除本地磁盘数据集文件夹
user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id))
dataset_dir = os.path.join(user_dir, str(dataset.id))
if os.path.exists(dataset_dir):
shutil.rmtree(dataset_dir)
# 删除数据库中的数据集
dataset.delete()
return JsonResponse({'code': 200, 'message': '数据集删除成功', 'data': {}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)
@csrf_exempt
def get_match_list(request):
"""获取CreatorProductMatch数据列表支持分页和筛选从Parquet文件读取"""
if request.method == 'GET':
page = request.GET.get('page')
page_size = request.GET.get('page_size')
user_id = request.GET.get('user_id')
dataset_id = request.GET.get('dataset_id')
# 参数校验
if not page or not page_size or not user_id or not dataset_id:
return JsonResponse({'code': 400, 'message': '缺少参数: page, page_size, user_id, dataset_id', 'data': {}}, status=400)
try:
page = int(page)
page_size = int(page_size)
if page < 1 or page_size < 1:
raise ValueError
except ValueError:
return JsonResponse({'code': 400, 'message': 'page和page_size必须为正整数', 'data': {}}, status=400)
# 校验用户和数据集
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'code': 400, 'message': '用户不存在', 'data': {}}, status=400)
try:
dataset = Dataset.objects.get(id=dataset_id, user=user)
except Dataset.DoesNotExist:
return JsonResponse({'code': 400, 'message': '数据集不存在', 'data': {}}, status=400)
# 拼接Parquet文件路径
user_dir = os.path.join(LOCAL_DATA_DIR, str(user.id))
dataset_dir = os.path.join(user_dir, str(dataset.id))
parquet_path = os.path.join(dataset_dir, f"{dataset.id}.parquet")
if not os.path.exists(parquet_path):
return JsonResponse({'code': 200, 'message': '暂无数据', 'data': {'total': 0, 'matches': []}})
try:
df = pd.read_parquet(parquet_path)
except Exception as e:
return JsonResponse({'code': 500, 'message': f'Parquet文件读取失败: {str(e)}', 'data': {}})
total = len(df)
start = (page - 1) * page_size
end = start + page_size
# 分页
page_df = df.iloc[start:end]
# 转为dict
matches = page_df.fillna('').to_dict(orient='records')
def convert_obj(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, dict):
return {k: convert_obj(v) for k, v in obj.items()}
if isinstance(obj, list):
return [convert_obj(i) for i in obj]
return obj
matches = [convert_obj(item) for item in matches]
print(matches)
return JsonResponse({'code': 200, 'message': '查询成功', 'data': {'total': total, 'matches': matches}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}})
####### 老接口
@csrf_exempt
def upload(request):
if request.method == 'POST':
# 检查文件是否存在
@ -224,3 +499,32 @@ def get_dataset_is_upload(request):
return JsonResponse({'success': False, 'message': '请求方法错误'})
"""根据传入的product_id和creator_id插入一条CreatorProductMatch记录统一返回格式"""
if request.method == 'POST':
try:
data = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({'code': 400, 'message': '请求体不是有效的JSON', 'data': {}}, status=400)
product_id = data.get('product_id')
creator_id = data.get('creator_id')
if not product_id or not creator_id:
return JsonResponse({'code': 400, 'message': '缺少product_id或creator_id参数', 'data': {}}, status=400)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return JsonResponse({'code': 400, 'message': '产品不存在', 'data': {}}, status=400)
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({'code': 400, 'message': '达人不存在', 'data': {}}, status=400)
# 检查是否已存在
if CreatorProductMatch.objects.filter(product=product, creator=creator).exists():
return JsonResponse({'code': 400, 'message': '该达人和产品的匹配已存在', 'data': {}}, status=400)
match = CreatorProductMatch.objects.create(product=product, creator=creator)
return JsonResponse({'code': 200, 'message': '匹配关系创建成功', 'data': {'match_id': match.id}})
return JsonResponse({'code': 400, 'message': '请求方法错误', 'data': {}}, status=400)