daren/apps/daren_detail/views.py
2025-05-19 18:23:59 +08:00

3433 lines
129 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.http import JsonResponse
# from .models import TiktokUserVideos
import logging
import os
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import json
import requests
import concurrent.futures
import shutil
import dotenv
import random
import uuid
dotenv.load_dotenv()
# 获取应用专属的logger
logger = logging.getLogger('daren_detail')
directory_monitoring = {}
# 全局变量来控制检测线程
monitor_thread = None
is_monitoring = False
# # 在文件开头定义日志目录
# BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor')
# 创建保存视频的基本路径
# TIKTOK_VIDEOS_PATH = os.path.join(BASE_DIR, 'media', 'tiktok_videos')
# 确保基本目录存在
# os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True)
# 确保基础目录结构存在,添加 'all' 目录
# for resource_type in ['cpu', 'memory', 'gpu', 'all']:
# os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True)
# 全局变量来跟踪监控的目录
# monitored_directories = set()
# 在文件顶部添加 API 基础 URL
# API_BASE_URL = os.getenv("API_BASE_URL")
#
# @csrf_exempt
# @require_http_methods(["POST"])
# def fetch_tiktok_videos(request):
# """获取TikTok视频"""
# try:
# # 添加全局变量引用
# global all_downloaded_videos
#
# # 如果变量未初始化,则初始化为空列表
# if 'all_downloaded_videos' not in globals():
# all_downloaded_videos = []
#
# data = json.loads(request.body)
# unique_id = data.get('unique_id')
#
# if not unique_id:
# return JsonResponse({
# 'status': 'error',
# 'message': '请提供TikTok用户ID(unique_id)'
# }, json_dumps_params={'ensure_ascii': False})
#
# # 检查数据库中是否已存在该unique_id
# if TiktokUserVideos.objects.filter(unique_id=unique_id).exists():
# logger.info(f"用户 {unique_id} 已存在于数据库中,跳过处理")
# return JsonResponse({
# 'status': 'success',
# 'message': f'用户 {unique_id} 已存在,跳过处理'
# }, json_dumps_params={'ensure_ascii': False})
#
# # 获取creator_oec_id
# creator_oec_id = data.get('creator_oec_id')
# logger.info(f"从请求数据中获取到creator_oec_id: {creator_oec_id}")
#
# # 调用API获取用户资料和secUid
# logger.info(f"正在获取用户 {unique_id} 的资料...")
# # user_profile = fetch_user_profile(unique_id)
# user_profile = None
# if not user_profile or 'data' not in user_profile:
# # 尝试使用备用方法获取用户资料
# logger.info(f"尝试使用备用方法获取用户 {unique_id} 的资料...")
#
# # 从data中获取creator_oec_id
# try:
# # 从响应中提取creator_oec_id
# creator_oec_id = data.get('creator_oec_id')
# logger.info(f"从请求数据中获取到creator_oec_id: {creator_oec_id}")
#
# # 如果有creator_oec_id则使用备用方法获取用户资料
# if creator_oec_id:
# user_profile = fetch_user_profile_1(creator_oec_id)
#
# if not user_profile or 'data' not in user_profile:
# return JsonResponse({
# 'status': 'error',
# 'message': f'无法获取用户 {unique_id} 的资料(两种方法均失败)'
# }, json_dumps_params={'ensure_ascii': False})
# else:
# return JsonResponse({
# 'status': 'error',
# 'message': f'无法获取用户 {unique_id} 的资料'
# }, json_dumps_params={'ensure_ascii': False})
# except Exception as e:
# logger.error(f"备用方法获取用户资料失败: {e}")
# return JsonResponse({
# 'status': 'error',
# 'message': f'无法获取用户 {unique_id} 的资料: {str(e)}'
# }, json_dumps_params={'ensure_ascii': False})
#
# # 从API响应中提取secUid和其他用户信息
# try:
# user_info = user_profile['data']['userInfo']['user']
# sec_uid = user_info['secUid']
#
# # 提取其他用户信息
# nickname = user_info.get('nickname', f'用户_{unique_id}')
# signature = user_info.get('signature', '')
# avatar_url = user_info.get('avatarLarger', '')
# user_stats = user_profile['data']['userInfo']['stats']
# follower_count = user_stats.get('followerCount', 0)
# heart_count = user_stats.get('heartCount', 0)
#
# logger.info(f"成功获取用户secUid: {sec_uid}")
# except (KeyError, TypeError) as e:
# logger.error(f"解析用户资料出错: {e}")
# return JsonResponse({
# 'status': 'error',
# 'message': f'解析用户资料出错: {str(e)}'
# }, json_dumps_params={'ensure_ascii': False})
#
# # 确保用户目录存在
# user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
# os.makedirs(user_dir, exist_ok=True)
#
# # 获取用户视频
# downloaded_videos = []
# videos_data = fetch_user_videos(sec_uid)
# if videos_data and 'data' in videos_data and 'itemList' in videos_data['data']:
# videos = videos_data['data']['itemList']
# user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
# os.makedirs(user_dir, exist_ok=True)
#
# # 获取前10个热门视频
# top_videos = sorted(videos, key=lambda x: x.get('stats', {}).get('playCount', 0), reverse=True)[:10]
#
# # 定义下载任务函数
# def download_task(video):
# try:
# video_id = video.get('id', '')
# if not video_id:
# return None
#
# save_path = os.path.join(user_dir, f"{video_id}.mp4")
#
# stats = video.get('stats', {})
# play_count = int(stats.get('playCount', 0))
#
# # 检查是否已下载过该视频
# is_downloaded = os.path.exists(save_path) and os.path.getsize(save_path) > 0
#
# if is_downloaded and is_valid_video_file(save_path):
# logger.info(f"视频已存在且有效,跳过下载: {video_id}")
# # 即使跳过下载,也添加到已下载列表中
# return {
# 'id': video_id,
# 'desc': video.get('desc', ''),
# 'play_count': play_count,
# 'user_unique_id': unique_id,
# 'skipped': True
# }
# elif download_video(video_id, unique_id, save_path):
# # download_video函数内部已经检查了文件有效性
# return {
# 'id': video_id,
# 'desc': video.get('desc', ''),
# 'play_count': play_count,
# 'user_unique_id': unique_id,
# 'skipped': False
# }
# return None
# except Exception as e:
# logger.error(f"下载视频时出错: {e}")
# return None
#
# # 使用线程池并发下载
# with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# # 提交所有下载任务
# future_to_video = {executor.submit(download_task, video): video for video in top_videos}
#
# # 获取结果
# for future in concurrent.futures.as_completed(future_to_video):
# result = future.result()
# if result:
# downloaded_videos.append(result)
#
# all_downloaded_videos.extend(downloaded_videos)
#
# # 视频下载完成后先调用API分析视频然后再保存到数据库
# if downloaded_videos: # 只有当有视频下载成功时才处理
# video_info_json = json.dumps([{
# 'id': v['id'],
# 'desc': v['desc'],
# 'play_count': v['play_count']
# } for v in downloaded_videos], ensure_ascii=False)
#
# # 先调用视频分析和文档创建API
# document_id = analyze_videos_and_create_document(user_dir, nickname, unique_id, sec_uid)
#
# # 只有在API调用成功后才写入数据库
# if document_id:
# user_record = TiktokUserVideos.objects.update_or_create(
# sec_user_id=sec_uid,
# defaults={
# 'unique_id': unique_id, # 添加unique_id字段
# 'nickname': nickname,
# 'signature': signature,
# 'follower_count': follower_count,
# 'total_favorited': heart_count,
# 'avatar_url': avatar_url,
# 'videos_folder': user_dir,
# 'video_paths': video_info_json,
# 'creator_oec_id': creator_oec_id,
# 'document_id': document_id
# }
# )
# logger.info(
# f"API调用成功用户 {nickname} 的数据已写入数据库,下载视频数: {len(downloaded_videos)}document_id: {document_id}")
# else:
# logger.warning(f"API调用失败跳过用户 {nickname} 的数据库写入")
# else:
# logger.warning(f"未获取到用户 {unique_id} 的视频数据")
# document_id = None
#
# return JsonResponse({
# 'status': 'success',
# 'message': '处理完成'
# }, json_dumps_params={'ensure_ascii': False})
#
# except Exception as e:
# logger.error(f"处理TikTok视频失败: {e}")
# import traceback
# logger.error(f"详细错误: {traceback.format_exc()}")
# return JsonResponse({
# 'status': 'error',
# 'message': f'处理TikTok视频失败: {str(e)}'
# }, json_dumps_params={'ensure_ascii': False})
#
#
# @require_http_methods(["GET"])
# def get_tiktok_user_videos(request):
# """获取已下载的TikTok用户视频列表"""
# try:
# sec_user_id = request.GET.get('sec_user_id')
#
# if not sec_user_id:
# # 如果没有指定用户ID返回所有用户列表
# users = TiktokUserVideos.objects.all().values('sec_user_id', 'unique_id', 'nickname', 'follower_count',
# 'videos_folder', 'create_time', 'avatar_url', 'signature',
# 'creator_oec_id')
# return JsonResponse({
# 'status': 'success',
# 'users': list(users)
# }, json_dumps_params={'ensure_ascii': False})
#
# # 查询指定用户信息
# try:
# user = TiktokUserVideos.objects.get(sec_user_id=sec_user_id)
# # 解析视频信息JSON
# video_info = json.loads(user.video_paths) if user.video_paths else []
#
# # 获取文件夹中的文件列表
# videos_folder = user.videos_folder
# video_files = []
# if os.path.exists(videos_folder):
# video_files = [f for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]
#
# except TiktokUserVideos.DoesNotExist:
# return JsonResponse({
# 'status': 'error',
# 'message': f'用户 {sec_user_id} 不存在'
# }, json_dumps_params={'ensure_ascii': False})
#
# return JsonResponse({
# 'status': 'success',
# 'user_info': {
# 'sec_user_id': user.sec_user_id,
# 'unique_id': user.unique_id,
# 'nickname': user.nickname,
# 'signature': user.signature,
# 'follower_count': user.follower_count,
# 'total_favorited': user.total_favorited,
# 'avatar_url': user.avatar_url,
# 'creator_oec_id': user.creator_oec_id, # 添加creator_oec_id字段
# 'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'),
# 'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S')
# },
# 'videos_folder': videos_folder,
# 'video_files': video_files,
# 'video_info': video_info
# }, json_dumps_params={'ensure_ascii': False})
#
# except Exception as e:
# logger.error(f"获取TikTok视频列表失败: {str(e)}")
# return JsonResponse({
# 'status': 'error',
# 'message': f'获取TikTok视频列表失败: {str(e)}'
# }, json_dumps_params={'ensure_ascii': False})
#
#
# # 辅助函数
#
# def fetch_user_videos(sec_uid, cursor=0, count=30):
# """获取用户视频列表"""
# url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_post?secUid={sec_uid}&cursor={cursor}&count={count}"
#
# try:
# response = requests.get(url, timeout=30)
#
# if response.status_code == 200:
# data = response.json()
# logger.info(f"成功获取用户视频,共 {len(data['data'].get('itemList', []))} 个视频")
# return data
# else:
# logger.error(f"获取用户视频失败: {response.status_code}")
# return None
# except Exception as e:
# logger.error(f"获取用户视频异常: {e}")
# return None
#
#
# def fetch_user_profile(unique_id):
# """获取用户基本信息"""
# url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_profile?uniqueId={unique_id}"
#
# try:
# logger.info(f"正在请求用户资料: {url}")
# response = requests.get(url, timeout=30)
#
# if response.status_code == 200:
# data = response.json()
# logger.info(f"成功获取用户资料: {unique_id}")
#
# # 打印完整响应以便调试
# logger.info(f"API原始响应: {data}")
#
# # 验证数据完整性
# if 'data' not in data or not data['data']:
# logger.error(f"API响应缺少data字段: {data}")
# return None
#
# if 'userInfo' not in data['data'] or not data['data']['userInfo']:
# logger.error(f"API响应缺少userInfo字段: {data['data']}")
# return None
#
# if 'user' not in data['data']['userInfo'] or not data['data']['userInfo']['user']:
# logger.error(f"API响应缺少user字段: {data['data']['userInfo']}")
# return None
#
# # 打印用户信息
# logger.info(f"用户信息: {data['data']['userInfo']['user']}")
#
# return data
# else:
# logger.error(f"获取用户信息失败: HTTP {response.status_code}, 响应: {response.text[:500]}")
# return None
# except Exception as e:
# logger.error(f"获取用户信息异常: {e}")
# return None
#
#
# def get_sec_user_id(unique_id):
# """
# 通过TikTok用户名获取secUid
#
# Args:
# unique_id: TikTok用户名
#
# Returns:
# secUid字符串如果获取失败则返回None
# """
# url = f"{API_BASE_URL}/api/tiktok/web/get_sec_user_id?url=https://www.tiktok.com/@{unique_id}"
#
# try:
# logger.info(f"正在请求secUid: {url}")
# response = requests.get(url, timeout=30)
#
# if response.status_code == 200:
# data = response.json()
# logger.info(f"成功获取secUid: {data}")
#
# if 'data' in data and data['data']:
# sec_uid = data['data']
# logger.info(f"获取到secUid: {sec_uid}")
# return sec_uid
# else:
# logger.error(f"API响应缺少data字段或data为空: {data}")
# return None
# else:
# logger.error(f"获取secUid失败: HTTP {response.status_code}, 响应: {response.text[:500]}")
# return None
# except Exception as e:
# logger.error(f"获取secUid异常: {e}")
# return None
#
#
# def fetch_user_profile_1(creator_oec_id):
# """
# 通过creator_oec_id获取用户基本信息
#
# 当fetch_user_profile无法获取用户信息时使用此备用方法
# 通过POST请求获取用户详细资料
#
# Args:
# creator_oec_id: 创作者OEC ID
#
# Returns:
# 与fetch_user_profile相同格式的用户信息
# """
# url = "http://100.65.206.105:8899/profile"
#
# try:
# logger.info(f"正在通过creator_oec_id请求用户资料: {creator_oec_id}")
#
# # 构建请求体
# payload = {
# "creator_oec_id": creator_oec_id,
# "profile_types": [1, 6]
# }
#
# # 发送POST请求
# response = requests.post(url, json=payload, timeout=30)
#
# if response.status_code == 200:
# data = response.json()
# logger.info(f"成功获取用户资料(方法2): {creator_oec_id}")
#
# # 验证数据完整性
# if 'creator_profile' not in data:
# logger.error(f"API响应缺少creator_profile字段: {data}")
# return None
#
# # 转换为与fetch_user_profile相同的格式
# creator_profile = data.get('creator_profile', {})
#
# # 获取uniqueId
# unique_id = creator_profile.get('handle', {}).get('value', '')
#
# # 通过uniqueId获取secUid
# sec_uid = None
# if unique_id:
# sec_uid = get_sec_user_id(unique_id)
#
# # 如果无法获取secUid使用一个占位符
# if not sec_uid:
# logger.warning(f"无法获取用户 {unique_id} 的secUid使用占位符")
# sec_uid = f"placeholder_secuid_{creator_oec_id}"
#
# # 构建与原API相同格式的响应
# formatted_data = {
# 'data': {
# 'userInfo': {
# 'user': {
# 'secUid': sec_uid,
# 'uniqueId': unique_id,
# 'nickname': creator_profile.get('nickname', {}).get('value', ''),
# 'signature': creator_profile.get('bio', {}).get('value', ''),
# 'avatarLarger': creator_profile.get('avatar', {}).get('value', {}).get('url_list', [''])[0]
# },
# 'stats': {
# 'followerCount': int(creator_profile.get('follower_cnt', {}).get('value', '0')),
# 'heartCount': 0, # 没有对应字段设为0
# 'followingCount': 0, # 没有对应字段设为0
# 'videoCount': 0 # 没有对应字段设为0
# }
# }
# }
# }
#
# return formatted_data
# else:
# logger.error(f"获取用户信息失败(方法2): HTTP {response.status_code}, 响应: {response.text[:500]}")
# return None
# except Exception as e:
# logger.error(f"获取用户信息异常(方法2): {e}")
# import traceback
# logger.error(f"详细错误: {traceback.format_exc()}")
# return None
#
#
# def is_valid_video_file(file_path):
# """
# 检查视频文件是否有效
#
# Args:
# file_path: 视频文件路径
#
# Returns:
# bool: 如果文件有效返回True否则返回False
# """
# try:
# # 检查文件是否存在
# if not os.path.exists(file_path):
# logger.error(f"视频文件不存在: {file_path}")
# return False
#
# # 检查文件大小是否合理 (大于10KB)
# file_size = os.path.getsize(file_path)
# if file_size < 10 * 1024: # 小于10KB的文件可能是无效的
# logger.warning(f"视频文件过小,可能无效: {file_path},大小: {file_size}字节")
# return False
#
# # 检查文件是否可读
# with open(file_path, 'rb') as f:
# # 尝试读取前100字节
# data = f.read(100)
# if not data:
# logger.warning(f"视频文件内容为空: {file_path}")
# return False
#
# return True
#
# except Exception as e:
# logger.error(f"验证视频文件时出错: {e}")
# return False
#
#
# def download_video(video_id, unique_id, save_path):
# """使用API的直接下载接口下载TikTok视频"""
# # 确保视频ID是纯数字
# if not str(video_id).isdigit():
# logger.error(f"无效的视频ID: {video_id},必须是纯数字")
# return False
#
# # 检查文件是否已存在
# if os.path.exists(save_path):
# file_size = os.path.getsize(save_path)
# if file_size > 0: # 确保文件不是空文件
# # 进一步验证文件有效性
# if is_valid_video_file(save_path):
# logger.info(f"视频文件已存在且有效,跳过下载: {save_path},文件大小: {file_size}字节")
# return True
# else:
# logger.warning(f"发现无效视频文件,将重新下载: {save_path}")
# # 删除无效文件
# os.remove(save_path)
# else:
# logger.warning(f"发现空视频文件,将重新下载: {save_path}")
# # 删除空文件
# os.remove(save_path)
#
# # 构建标准TikTok视频URL
# tiktok_url = f"https://www.tiktok.com/@{unique_id}/video/{video_id}"
# logger.info(f"构建的TikTok URL: {tiktok_url}")
#
# # 构建完整的API请求URL
# api_url = f"{API_BASE_URL}/api/download"
# full_url = f"{api_url}?url={tiktok_url}&prefix=true&with_watermark=false"
# logger.info(f"完整的API请求URL: {full_url}")
#
# try:
# # 直接使用完整URL发送请求
# response = requests.get(full_url, stream=True, timeout=60)
#
# # 检查响应状态
# if response.status_code != 200:
# logger.error(
# f"下载视频失败: {response.status_code} - {response.text[:200] if response.text else '无响应内容'}")
# return False
#
# # 获取内容类型
# content_type = response.headers.get('Content-Type', '')
# logger.info(f"响应内容类型: {content_type}")
#
# # 保存文件
# with open(save_path, 'wb') as f:
# for chunk in response.iter_content(chunk_size=8192):
# if chunk:
# f.write(chunk)
#
# file_size = os.path.getsize(save_path)
# logger.info(f"视频已下载到: {save_path},文件大小: {file_size}字节")
#
# # 验证下载的文件是否有效
# if is_valid_video_file(save_path):
# logger.info(f"视频文件验证通过: {save_path}")
# return True
# else:
# logger.error(f"下载的视频文件无效: {save_path}")
# # 删除无效文件
# if os.path.exists(save_path):
# os.remove(save_path)
# return False
#
# except Exception as e:
# logger.error(f"下载视频异常: {e}")
# import traceback
# logger.error(f"详细错误: {traceback.format_exc()}")
# return False
#
#
# def analyze_videos_and_create_document(videos_folder, nickname, unique_id, sec_uid):
# """
# 分析视频目录并创建Yanxi文档
#
# Args:
# videos_folder: 视频所在目录路径
# nickname: 用户昵称
# unique_id: TikTok用户ID
# sec_uid: 用户的安全ID
#
# Returns:
# str: 如果API调用成功返回document_id否则返回None
# """
# try:
# # 直接调用analyze_directory函数进行视频分析
# from app.api.summary.summary import analyze_directory
#
# logger.info(f"开始分析视频目录: {videos_folder}")
# analyze_result = analyze_directory(videos_folder)
#
# if analyze_result:
# logger.info(f"分析成功")
#
# # 提取分析结果中的用户画像和聚合摘要
# user_persona = analyze_result.get('user_persona', '')
# content_summary = analyze_result.get('content_summary', '')
#
# content = f"{user_persona}\n\n{content_summary}"
#
# # 调用yanxi API创建文档
# try:
# yanxi_url = "http://localhost:8899/yanxi/api/dataset/231aaa42-fc97-11ef-a8ef-0242ac120006/document"
# yanxi_data = {
# "name": f"{nickname}@{unique_id}",
# "paragraphs": [
# {
# "content": content,
# "title": sec_uid,
# "is_active": True
# }
# ]
# }
#
# logger.info(f"调用Yanxi API: {yanxi_url}")
# yanxi_response = requests.post(
# yanxi_url,
# json=yanxi_data,
# headers={"Content-Type": "application/json"},
# timeout=60
# )
#
# if yanxi_response.status_code in [200, 201]:
# response_data = yanxi_response.json()
# document_id = response_data.get('data', {}).get('id')
# if document_id:
# logger.info(f"文档创建成功document_id: {document_id}")
# return document_id
# else:
# logger.error("文档创建成功但未获取到document_id")
# return None
# else:
# logger.error(f"文档创建失败: {yanxi_response.status_code}, {yanxi_response.text}")
# return None
# except Exception as e:
# logger.error(f"调用Yanxi API出错: {e}")
# return None
# else:
# logger.error(f"分析失败")
# return None
# except Exception as e:
# logger.error(f"视频分析出错: {e}")
# return None
#
#
# @csrf_exempt
# @require_http_methods(["DELETE"])
# def delete_tiktok_user(request):
# """删除TikTok用户及其相关视频和文档"""
# try:
# data = json.loads(request.body)
# sec_uid = data.get('sec_uid')
#
# if not sec_uid:
# return JsonResponse({
# 'status': 'error',
# 'message': 'invalid params'
# }, json_dumps_params={'ensure_ascii': False})
#
# try:
# user = TiktokUserVideos.objects.get(sec_user_id=sec_uid)
# except TiktokUserVideos.DoesNotExist:
# return JsonResponse({
# 'status': 'error',
# 'message': f'用户 {sec_uid} 不存在'
# }, json_dumps_params={'ensure_ascii': False})
#
# # 获取必要信息
# videos_folder = user.videos_folder
# document_id = user.document_id
#
# # 1. 删除视频文件夹
# if videos_folder and os.path.exists(videos_folder):
# try:
# shutil.rmtree(videos_folder)
# logger.info(f"成功删除视频文件夹: {videos_folder}")
# except Exception as e:
# logger.error(f"删除视频文件夹失败: {e}")
#
# # 2. 删除文档
# if document_id:
# try:
# delete_url = f"http://localhost:8899/yanxi/api/dataset/231aaa42-fc97-11ef-a8ef-0242ac120006/document/{document_id}"
# delete_response = requests.delete(
# delete_url,
# headers={"Content-Type": "application/json"},
# timeout=30
# )
#
# if delete_response.status_code in [200, 204]:
# logger.info(f"成功删除文档: {document_id}")
# else:
# logger.error(f"删除文档失败: {delete_response.status_code}, {delete_response.text}")
# except Exception as e:
# logger.error(f"调用删除文档API出错: {e}")
#
# # 3. 删除数据库记录
# user.delete()
# logger.info(f"成功删除用户数据: {sec_uid}")
#
# return JsonResponse({
# 'status': 'success',
# 'message': f'成功删除用户 {sec_uid} 及其相关数据'
# }, json_dumps_params={'ensure_ascii': False})
#
# except Exception as e:
# logger.error(f"删除用户异常: {e}")
# import traceback
# logger.error(f"详细错误: {traceback.format_exc()}")
# return JsonResponse({
# 'status': 'error',
# 'message': f'删除用户时发生错误: {str(e)}'
# }, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def filter_creators(request):
"""根据过滤条件筛选达人信息POST版"""
try:
from .models import CreatorProfile
import json
# 解析POST请求体
data = json.loads(request.body)
filter_data = data.get('filter', {})
page = int(data.get('page', 1))
page_size = int(data.get('page_size', 10))
# 基础查询
query = CreatorProfile.objects.all()
# Category 单选过滤
category = filter_data.get('category')
if category and len(category) > 0:
query = query.filter(category=category[0])
# 电商能力等级过滤 (L1-L7),单选
e_commerce_level = filter_data.get('e_commerce_level')
if e_commerce_level and len(e_commerce_level) > 0:
level_str = e_commerce_level[0]
if level_str.startswith('L'):
level_num = int(level_str[1:])
query = query.filter(e_commerce_level=level_num)
# 曝光等级过滤 (KOL-1, KOL-2, KOC-1等),单选
exposure_level = filter_data.get('exposure_level')
if exposure_level and len(exposure_level) > 0:
query = query.filter(exposure_level=exposure_level[0])
# GMV范围过滤 ($0-$5k, $5k-$25k, $25k-$50k等),单选
gmv_range = filter_data.get('gmv_range')
if gmv_range and len(gmv_range) > 0:
gmv_min, gmv_max = 0, float('inf')
gmv_val = gmv_range[0]
if gmv_val == "$0-$5k":
gmv_min, gmv_max = 0, 5
elif gmv_val == "$5k-$25k":
gmv_min, gmv_max = 5, 25
elif gmv_val == "$25k-$50k":
gmv_min, gmv_max = 25, 50
elif gmv_val == "$50k-$150k":
gmv_min, gmv_max = 50, 150
elif gmv_val == "$150k-$400k":
gmv_min, gmv_max = 150, 400
elif gmv_val == "$400k-$1500k":
gmv_min, gmv_max = 400, 1500
elif gmv_val == "$1500k+":
gmv_min = 1500
if gmv_min > 0:
query = query.filter(gmv__gte=gmv_min)
if gmv_max < float('inf'):
query = query.filter(gmv__lte=gmv_max)
# 观看量范围过滤,单选
views_range = filter_data.get('views_range')
if views_range and len(views_range) > 0:
views_min, views_max = 0, float('inf')
views_val = views_range[0]
if views_val == "0-100":
views_min, views_max = 0, 100
elif views_val == "1k-10k":
views_min, views_max = 1000, 10000
elif views_val == "10k-100k":
views_min, views_max = 10000, 100000
elif views_val == "100k-250k":
views_min, views_max = 100000, 250000
elif views_val == "250k-500k":
views_min, views_max = 250000, 500000
elif views_val == "500k+":
views_min = 500000
if views_min > 0:
query = query.filter(avg_video_views__gte=views_min)
if views_max < float('inf'):
query = query.filter(avg_video_views__lte=views_max)
# 价格区间过滤逻辑,单选
pricing = filter_data.get('pricing')
if pricing and len(pricing) > 0:
pricing_val = pricing[0]
if '-' in pricing_val:
min_price, max_price = pricing_val.split('-')
min_price = float(min_price)
max_price = float(max_price)
query = query.filter(pricing_min__gte=min_price, pricing_max__lte=max_price)
# 获取总数据量
total_count = query.count()
# 计算分页
start = (page - 1) * page_size
end = start + page_size
# 执行查询并分页
creators = query[start:end]
creator_list = []
for creator in creators:
# 格式化电商等级
e_commerce_level_formatted = f"L{creator.e_commerce_level}" if creator.e_commerce_level else None
# 格式化GMV
gmv_formatted = f"${creator.gmv}k" if creator.gmv else "$0"
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 格式化价格区间
if creator.pricing_min is not None and creator.pricing_max is not None:
pricing_range = f"${creator.pricing_min}-{creator.pricing_max}"
else:
pricing_range = None
# 格式化结果
formatted_creator = {
"Creator": {
"name": creator.name,
"avatar": creator.avatar_url
},
"Category": creator.category,
"E-commerce Level": e_commerce_level_formatted,
"Exposure Level": creator.exposure_level,
"Followers": followers_formatted,
"GMV": gmv_formatted,
"Items Sold": f"{creator.items_sold}k" if creator.items_sold else None,
"Avg. Video Views": avg_views_formatted,
"Pricing": {
"Range": pricing_range,
"Pack. P": creator.pricing_package
},
"# Collab": creator.collab_count,
"Latest Collab.": creator.latest_collab,
"E-commerce": creator.e_commerce_platforms,
}
creator_list.append(formatted_creator)
# 计算总页数
total_pages = (total_count + page_size - 1) // page_size
# 修改响应格式为code、message和data
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': {
'total_count': total_count,
'total_pages': total_pages,
'current_page': page,
'page_size': page_size,
'count': len(creator_list),
'creators': creator_list
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"筛选达人信息失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'筛选达人信息失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def add_creator(request):
"""添加或更新达人信息"""
try:
from .models import CreatorProfile
import json
data = json.loads(request.body)
# 必需的基本信息
name = data.get('name')
if not name:
return JsonResponse({
'code': 400,
'message': '名称是必填项',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询是否已存在该creator
creator, created = CreatorProfile.objects.get_or_create(
name=name,
defaults={
'avatar_url': data.get('avatar_url'),
}
)
# 更新其他字段
creator.category = data.get('category', creator.category)
# 处理电商等级
e_commerce_level = data.get('e_commerce_level')
if e_commerce_level:
if isinstance(e_commerce_level, str) and e_commerce_level.startswith('L'):
creator.e_commerce_level = int(e_commerce_level[1:])
elif isinstance(e_commerce_level, int):
creator.e_commerce_level = e_commerce_level
creator.exposure_level = data.get('exposure_level', creator.exposure_level)
creator.followers = data.get('followers', creator.followers)
# 处理GMV
gmv = data.get('gmv')
if gmv:
if isinstance(gmv, str) and gmv.startswith('$') and gmv.endswith('k'):
# 将"$534.1k"转换为534.1
creator.gmv = float(gmv.strip('$k'))
elif isinstance(gmv, (int, float)):
creator.gmv = gmv
# 处理items_sold
items_sold = data.get('items_sold')
if items_sold:
if isinstance(items_sold, str) and items_sold.endswith('k'):
creator.items_sold = float(items_sold.strip('k'))
elif isinstance(items_sold, (int, float)):
creator.items_sold = items_sold
# 处理avg_video_views
avg_video_views = data.get('avg_video_views')
if avg_video_views:
if isinstance(avg_video_views, str) and avg_video_views.endswith('k'):
creator.avg_video_views = float(avg_video_views.strip('k')) * 1000
elif isinstance(avg_video_views, (int, float)):
creator.avg_video_views = avg_video_views
# 更新价格信息
pricing = data.get('pricing', {})
if pricing:
creator.pricing_individual = pricing.get('individual', creator.pricing_individual)
creator.pricing_package = pricing.get('package', creator.pricing_package)
creator.collab_count = data.get('collab_count', creator.collab_count)
creator.latest_collab = data.get('latest_collab', creator.latest_collab)
creator.e_commerce_platforms = data.get('e_commerce_platforms', creator.e_commerce_platforms)
creator.is_active = data.get('is_active', creator.is_active)
creator.mcn = data.get('mcn', creator.mcn)
# 保存更新
creator.save()
return JsonResponse({
'code': 200,
'message': '达人信息已添加/更新',
'data': {
'created': created,
'creator_id': creator.id
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"添加/更新达人信息失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'添加/更新达人信息失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_campaigns(request):
"""获取所有营销活动列表"""
try:
from .models import Campaign
# 查询所有活跃的活动
campaigns = Campaign.objects.filter(is_active=True)
campaign_list = []
for campaign in campaigns:
display_name = campaign.name
if campaign.product:
display_name = f"{campaign.brand} {campaign.product}"
else:
display_name = campaign.brand
campaign_list.append({
"id": campaign.id,
"name": display_name,
"brand": campaign.brand,
"product": campaign.product
})
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': campaign_list
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取营销活动失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取营销活动失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def add_to_campaign(request):
"""添加达人到营销活动(如有原达人则先删除后添加)"""
try:
from .models import CreatorProfile, CreatorCampaign
from apps.brands.models import Campaign
import json
data = json.loads(request.body)
# 获取必要参数
campaign_id = data.get('campaign_id')
creator_ids = data.get('creator_ids', [])
if not campaign_id or not creator_ids:
return JsonResponse({
'code': 400,
'message': '缺少必要参数campaign_id 或 creator_ids',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 检查活动是否存在
try:
# 直接使用原始ID字符串查询
logger.info(f"尝试查找活动ID: {campaign_id}")
campaign = Campaign.objects.raw('SELECT * FROM campaigns WHERE id = %s AND is_active = 1', [campaign_id])[0]
logger.info(f"找到活动: {campaign.name}, Active: {campaign.is_active}")
except IndexError:
logger.warning(f"找不到ID为 {campaign_id} 的活动")
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {campaign_id} 的活跃营销活动',
'data': None
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"查询活动时发生错误: {str(e)}")
return JsonResponse({
'code': 500,
'message': f'查询活动时发生错误: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 先删除该活动下之前绑定的所有达人
deleted_count = CreatorCampaign.objects.filter(campaign=campaign).delete()[0]
logger.info(f"已删除活动 {campaign_id} 下的 {deleted_count} 个原有达人关联")
# 添加新达人到活动
added_count = 0
skipped_count = 0
added_creators = []
for creator_id in creator_ids:
try:
# 移除is_active检查
creator = CreatorProfile.objects.get(id=creator_id)
# 创建新的关联
creator_campaign = CreatorCampaign.objects.create(
creator=creator,
campaign=campaign,
status='pending'
)
added_count += 1
added_creators.append({
'id': creator.id,
'name': creator.name
})
except CreatorProfile.DoesNotExist:
skipped_count += 1
logger.warning(f"找不到ID为 {creator_id} 的达人")
return JsonResponse({
'code': 200,
'message': '成功添加达人到活动',
'data': {
'campaign': {
'id': str(campaign.id),
'name': campaign.name
},
'added_creators': added_creators
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新活动达人失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新活动达人失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_creator_detail(request, creator_id):
"""获取达人详情信息"""
try:
from .models import CreatorProfile
import json
# 查询指定ID的达人信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': '未找到该达人信息',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 格式化电商等级
e_commerce_level_formatted = f"L{creator.e_commerce_level}" if creator.e_commerce_level else None
# 格式化GMV
gmv_formatted = f"${creator.gmv}k" if creator.gmv else "$0"
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 计算相关指标
gpm = 0
gmv_per_customer = 0
if creator.avg_video_views and creator.avg_video_views > 0:
if creator.pricing_min:
try:
price = float(creator.pricing_min)
gpm = (price * 1000) / creator.avg_video_views
gpm = round(gpm, 2)
except (ValueError, AttributeError):
pass
# 计算每位客户GMV
if creator.gmv and creator.items_sold and creator.items_sold > 0:
gmv_per_customer = float(creator.gmv) / float(creator.items_sold)
gmv_per_customer = round(gmv_per_customer, 2)
# 构造详细响应数据
creator_detail = {
"creator": {
"id": creator.id,
"name": creator.name,
"avatar": creator.avatar_url,
"email": creator.email,
"social_accounts": {
"instagram": creator.instagram, # 示例数据,实际应从数据库获取
"tiktok": creator.tiktok_link # 示例链接
},
"location": creator.location,
"live_schedule": creator.live_schedule
},
"metrics": {
"e_commerce_level": e_commerce_level_formatted,
"exposure_level": creator.exposure_level,
"followers": followers_formatted,
"actual_followers": creator.followers,
"gmv": gmv_formatted,
"actual_gmv": creator.gmv,
"items_sold": f"{creator.items_sold}",
"avg_video_views": avg_views_formatted,
"actual_views": creator.avg_video_views,
"gpm": f"${gpm}",
"gmv_per_customer": f"${gmv_per_customer}"
},
"business": {
"category": creator.category,
"categories": creator.category.split('|') if creator.category and '|' in creator.category else [
creator.category] if creator.category else [],
"mcn": creator.mcn or "",
"pricing": {
"range": f"${creator.pricing_min}-{creator.pricing_max}" if creator.pricing_min is not None and creator.pricing_max is not None else None,
"package": creator.pricing_package
},
"collab_count": creator.collab_count,
"latest_collab": creator.latest_collab,
"e_commerce_platforms": creator.e_commerce_platforms or []
},
"analytics": {
"gmv_by_channel": creator.gmv_by_channel,
"gmv_by_category": creator.gmv_by_category
}
}
# 返回响应
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': creator_detail
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取达人详情失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取达人详情失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def update_creator_detail(request):
"""更新达人详细信息"""
try:
from .models import CreatorProfile
import json
data = json.loads(request.body)
# 必需的基本信息
creator_id = data.get('creator_id')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询是否已存在该creator
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {creator_id} 的达人信息',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 更新基础信息
creator.email = data.get('email', creator.email)
creator.instagram = data.get('instagram', creator.instagram)
creator.tiktok_link = data.get('tiktok_link', creator.tiktok_link)
creator.location = data.get('location', creator.location)
creator.live_schedule = data.get('live_schedule', creator.live_schedule)
creator.mcn = data.get('mcn', creator.mcn)
# 更新分析数据
if 'gmv_by_channel' in data:
creator.gmv_by_channel = data.get('gmv_by_channel')
if 'gmv_by_category' in data:
creator.gmv_by_category = data.get('gmv_by_category')
# 保存更新
creator.save()
return JsonResponse({
'code': 200,
'message': '达人详细信息已更新',
'data': {
'creator_id': creator.id,
'name': creator.name
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新达人详细信息失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新达人详细信息失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取特定达人与各品牌的合作详情
@csrf_exempt
@require_http_methods(["GET"])
def get_creator_brand_campaigns(request, creator_id=None):
"""获取特定达人与各品牌的合作详情"""
try:
from .models import CreatorProfile, BrandCampaign, CreatorCampaign, Campaign
# 检查creator_id是否提供
if not creator_id:
creator_id = request.GET.get('creator_id')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询达人信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的达人',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
# 查询该达人参与的所有活动
creator_campaigns = CreatorCampaign.objects.filter(
creator_id=creator_id
).select_related('campaign')
# 获取所有相关活动的ID
campaign_ids = [cc.campaign_id for cc in creator_campaigns]
# 查询所有相关的BrandCampaign记录
# 优先查找已关联Campaign的品牌活动记录
brand_campaigns_with_campaign = BrandCampaign.objects.filter(
campaign__id__in=campaign_ids
).select_related('campaign')
# 如果找不到直接关联的记录尝试使用品牌ID匹配
brands_needed = []
if not brand_campaigns_with_campaign.exists():
# 获取所有相关Campaign的品牌首字母
campaigns = Campaign.objects.filter(id__in=campaign_ids)
brands_needed = [campaign.brand[:1].upper() for campaign in campaigns if campaign.brand]
# 合并品牌活动数据
campaign_list = []
# 处理有Campaign关联的BrandCampaign记录
for brand_campaign in brand_campaigns_with_campaign:
# 查找对应的CreatorCampaign记录获取状态
creator_campaign = next(
(cc for cc in creator_campaigns if cc.campaign_id == brand_campaign.campaign_id),
None
)
if creator_campaign:
campaign_data = {
"brand": {
"id": brand_campaign.brand_id,
"name": brand_campaign.brand_name,
"color": brand_campaign.brand_color
},
"pricing_detail": brand_campaign.pricing_detail,
"start_date": brand_campaign.start_date.strftime('%m/%d/%Y'),
"end_date": brand_campaign.end_date.strftime('%m/%d/%Y'),
"status": creator_campaign.status, # 使用CreatorCampaign中的状态
"gmv_achieved": brand_campaign.gmv_achieved,
"views_achieved": brand_campaign.views_achieved,
"video_link": brand_campaign.video_link,
"campaign_id": brand_campaign.campaign.id if brand_campaign.campaign else None,
"campaign_name": brand_campaign.campaign.name if brand_campaign.campaign else None
}
campaign_list.append(campaign_data)
# 如果通过Campaign关联找不到数据尝试使用品牌ID匹配
if not campaign_list and brands_needed:
brand_campaigns_by_id = BrandCampaign.objects.filter(brand_id__in=brands_needed)
for brand_campaign in brand_campaigns_by_id:
# 查找相关的Campaign和CreatorCampaign
matching_campaign = next(
(c for c in campaigns if c.brand and c.brand[:1].upper() == brand_campaign.brand_id),
None
)
if matching_campaign:
creator_campaign = next(
(cc for cc in creator_campaigns if cc.campaign_id == matching_campaign.id),
None
)
if creator_campaign:
campaign_data = {
"brand": {
"id": brand_campaign.brand_id,
"name": brand_campaign.brand_name,
"color": brand_campaign.brand_color
},
"pricing_detail": brand_campaign.pricing_detail,
"start_date": brand_campaign.start_date.strftime('%m/%d/%Y'),
"end_date": brand_campaign.end_date.strftime('%m/%d/%Y'),
"status": creator_campaign.status,
"gmv_achieved": brand_campaign.gmv_achieved,
"views_achieved": brand_campaign.views_achieved,
"video_link": brand_campaign.video_link,
"campaign_id": matching_campaign.id,
"campaign_name": matching_campaign.name
}
campaign_list.append(campaign_data)
# 如果仍找不到数据则直接为每个CreatorCampaign创建一个默认的活动数据
if not campaign_list:
# 为每个CreatorCampaign创建一个默认的活动数据
for cc in creator_campaigns:
campaign = cc.campaign
brand_id = campaign.brand[:1].upper() if campaign.brand else 'U'
# 为不同品牌设置不同颜色
colors = {
'U': '#E74694', # 粉色
'R': '#17CDCB', # 青色
'X': '#6E41BF', # 紫色
'Q': '#F9975D', # 橙色
'A': '#B4B4B4', # 灰色
'M': '#333333', # 黑色
}
campaign_data = {
"brand": {
"id": brand_id,
"name": campaign.brand or "brand",
"color": colors.get(brand_id, '#000000')
},
"pricing_detail": "$80", # 默认价格
"start_date": "05/31/2024", # 默认日期
"end_date": "05/29/2024", # 默认日期
"status": cc.status,
"gmv_achieved": "$120", # 默认GMV
"views_achieved": "650", # 默认观看量
"video_link": f"https://example.com/video/{campaign.id}",
"campaign_id": campaign.id,
"campaign_name": campaign.name
}
campaign_list.append(campaign_data)
# 计算总数据量
total_count = len(campaign_list)
# 计算分页
start = (page - 1) * page_size
end = min(start + page_size, total_count)
# 切片数据
paged_campaigns = campaign_list[start:end]
# 计算总页数
total_pages = (total_count + page_size - 1) // page_size
# 构造分页信息
pagination = {
"current_page": page,
"total_pages": total_pages,
"total_count": total_count,
"has_next": page < total_pages,
"has_prev": page > 1
}
# 构造创作者基本信息
creator_info = {
"id": creator.id,
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"exposure_level": creator.exposure_level,
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': paged_campaigns,
'pagination': pagination,
'creator': creator_info
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取达人品牌合作详情失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取达人品牌合作详情失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取创作者的协作指标、视频和直播指标数据
@csrf_exempt
@require_http_methods(["GET"])
def get_creator_metrics(request, creator_id):
"""获取创作者的协作指标、视频和直播指标数据"""
try:
from .models import CreatorProfile, CollaborationMetrics, VideoMetrics, LiveMetrics
import json
from datetime import datetime
# 检查creator_id是否提供
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取时间范围参数格式为yyyy-mm-dd
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
try:
# 查询创作者信息
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 构建查询条件字典
filter_kwargs = {'creator': creator}
# 如果提供了时间范围,添加到过滤条件
if start_date_str and end_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
filter_kwargs.update({
'start_date__lte': end_date, # 开始日期早于或等于请求的结束日期
'end_date__gte': start_date # 结束日期晚于或等于请求的开始日期
})
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询协作指标
try:
collab_metrics = CollaborationMetrics.objects.filter(**filter_kwargs).order_by('-end_date').first()
except Exception as e:
logger.error(f"查询协作指标出错: {e}")
collab_metrics = None
# 查询视频指标 - 普通视频
try:
video_metrics = VideoMetrics.objects.filter(video_type='regular', **filter_kwargs).order_by(
'-end_date').first()
except Exception as e:
logger.error(f"查询普通视频指标出错: {e}")
video_metrics = None
# 查询视频指标 - 可购物视频
try:
shoppable_video_metrics = VideoMetrics.objects.filter(video_type='shoppable', **filter_kwargs).order_by(
'-end_date').first()
except Exception as e:
logger.error(f"查询可购物视频指标出错: {e}")
shoppable_video_metrics = None
# 查询直播指标 - 普通直播
try:
live_metrics = LiveMetrics.objects.filter(live_type='regular', **filter_kwargs).order_by(
'-end_date').first()
except Exception as e:
logger.error(f"查询普通直播指标出错: {e}")
live_metrics = None
# 查询直播指标 - 可购物直播
try:
shoppable_live_metrics = LiveMetrics.objects.filter(live_type='shoppable', **filter_kwargs).order_by(
'-end_date').first()
except Exception as e:
logger.error(f"查询可购物直播指标出错: {e}")
shoppable_live_metrics = None
# 构建响应数据
metrics_data = {
'collaboration_metrics': None,
'video': None,
'shoppable_video': None,
'live': None,
'shoppable_live': None
}
# 填充协作指标数据
if collab_metrics:
metrics_data['collaboration_metrics'] = {
'avg_commission_rate': f"{collab_metrics.avg_commission_rate}%",
'products_count': collab_metrics.products_count,
'brand_collaborations': collab_metrics.brand_collaborations,
'product_price': f"${collab_metrics.min_product_price} - ${collab_metrics.max_product_price}",
'date_range': f"{collab_metrics.start_date.strftime('%b %d, %Y')} - {collab_metrics.end_date.strftime('%b %d, %Y')}"
}
# 填充普通视频指标数据
if video_metrics:
metrics_data['video'] = {
'gpm': f"${video_metrics.gpm}",
'videos_count': video_metrics.videos_count,
'avg_views': f"{float(video_metrics.avg_views) / 1000}k" if video_metrics.avg_views >= 1000 else f"{float(video_metrics.avg_views)}",
'avg_engagement': f"{video_metrics.avg_engagement}%",
'avg_likes': video_metrics.avg_likes,
'date_range': f"{video_metrics.start_date.strftime('%b %d, %Y')} - {video_metrics.end_date.strftime('%b %d, %Y')}"
}
# 填充可购物视频指标数据
if shoppable_video_metrics:
metrics_data['shoppable_video'] = {
'gpm': f"${shoppable_video_metrics.gpm}",
'videos_count': shoppable_video_metrics.videos_count,
'avg_views': f"{float(shoppable_video_metrics.avg_views) / 1000}k" if shoppable_video_metrics.avg_views >= 1000 else f"{float(shoppable_video_metrics.avg_views)}",
'avg_engagement': f"{shoppable_video_metrics.avg_engagement}%",
'avg_likes': shoppable_video_metrics.avg_likes,
'date_range': f"{shoppable_video_metrics.start_date.strftime('%b %d, %Y')} - {shoppable_video_metrics.end_date.strftime('%b %d, %Y')}"
}
# 填充普通直播指标数据
if live_metrics:
metrics_data['live'] = {
'gpm': f"${live_metrics.gpm}",
'lives_count': live_metrics.lives_count,
'avg_views': f"{float(live_metrics.avg_views) / 1000}k" if live_metrics.avg_views >= 1000 else f"{float(live_metrics.avg_views)}",
'avg_engagement': f"{live_metrics.avg_engagement}%",
'avg_likes': live_metrics.avg_likes,
'date_range': f"{live_metrics.start_date.strftime('%b %d, %Y')} - {live_metrics.end_date.strftime('%b %d, %Y')}"
}
# 填充可购物直播指标数据
if shoppable_live_metrics:
metrics_data['shoppable_live'] = {
'gpm': f"${shoppable_live_metrics.gpm}",
'lives_count': shoppable_live_metrics.lives_count,
'avg_views': f"{float(shoppable_live_metrics.avg_views) / 1000}k" if shoppable_live_metrics.avg_views >= 1000 else f"{float(shoppable_live_metrics.avg_views)}",
'avg_engagement': f"{shoppable_live_metrics.avg_engagement}%",
'avg_likes': shoppable_live_metrics.avg_likes,
'date_range': f"{shoppable_live_metrics.start_date.strftime('%b %d, %Y')} - {shoppable_live_metrics.end_date.strftime('%b %d, %Y')}"
}
# 返回响应
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': metrics_data
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取创作者指标数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取创作者指标数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 更新创作者的指标数据
@csrf_exempt
@require_http_methods(["POST"])
def update_creator_metrics(request):
"""更新创作者的指标数据"""
try:
from .models import CreatorProfile, CollaborationMetrics, VideoMetrics, LiveMetrics
import json
from datetime import datetime
data = json.loads(request.body)
# 获取必要参数
creator_id = data.get('creator_id')
metrics_type = data.get('metrics_type') # collaboration, video, shoppable_video, live, shoppable_live
metrics_data = data.get('metrics_data', {})
if not creator_id or not metrics_type or not metrics_data:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id, metrics_type 或 metrics_data',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取时间范围
start_date_str = metrics_data.get('start_date')
end_date_str = metrics_data.get('end_date')
if not start_date_str or not end_date_str:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: start_date 或 end_date',
'data': None
}, json_dumps_params={'ensure_ascii': False})
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 根据metrics_type处理不同类型的指标数据
if metrics_type == 'collaboration':
# 处理协作指标数据
# 从metrics_data中提取所需字段
avg_commission_rate = metrics_data.get('avg_commission_rate')
if isinstance(avg_commission_rate, str) and '%' in avg_commission_rate:
avg_commission_rate = float(avg_commission_rate.replace('%', ''))
products_count = int(metrics_data.get('products_count', 0))
brand_collaborations = int(metrics_data.get('brand_collaborations', 0))
# 处理价格范围
product_price = metrics_data.get('product_price', '$0 - $0')
if isinstance(product_price, str) and ' - ' in product_price:
price_parts = product_price.split(' - ')
min_price = float(price_parts[0].replace('$', ''))
max_price = float(price_parts[1].replace('$', ''))
else:
min_price = 0
max_price = 0
# 更新或创建协作指标记录
collab_metrics, created = CollaborationMetrics.objects.update_or_create(
creator=creator,
start_date=start_date,
end_date=end_date,
defaults={
'avg_commission_rate': avg_commission_rate,
'products_count': products_count,
'brand_collaborations': brand_collaborations,
'min_product_price': min_price,
'max_product_price': max_price
}
)
return JsonResponse({
'code': 200,
'message': '协作指标数据已更新',
'data': {
'created': created,
'metrics_id': collab_metrics.id
}
}, json_dumps_params={'ensure_ascii': False})
elif metrics_type in ['video', 'shoppable_video']:
# 处理视频指标数据
video_type = 'regular' if metrics_type == 'video' else 'shoppable'
# 从metrics_data中提取所需字段
gpm = metrics_data.get('gpm', '$0')
if isinstance(gpm, str) and '$' in gpm:
gpm = float(gpm.replace('$', ''))
videos_count = int(metrics_data.get('videos_count', 0))
avg_views = metrics_data.get('avg_views', '0')
if isinstance(avg_views, str) and 'k' in avg_views:
avg_views = float(avg_views.replace('k', '')) * 1000
else:
avg_views = float(avg_views)
avg_engagement = metrics_data.get('avg_engagement', '0%')
if isinstance(avg_engagement, str) and '%' in avg_engagement:
avg_engagement = float(avg_engagement.replace('%', ''))
avg_likes = int(metrics_data.get('avg_likes', 0))
# 更新或创建视频指标记录
video_metrics, created = VideoMetrics.objects.update_or_create(
creator=creator,
video_type=video_type,
start_date=start_date,
end_date=end_date,
defaults={
'gpm': gpm,
'videos_count': videos_count,
'avg_views': avg_views,
'avg_engagement': avg_engagement,
'avg_likes': avg_likes
}
)
return JsonResponse({
'code': 200,
'message': f'{metrics_type}指标数据已更新',
'data': {
'created': created,
'metrics_id': video_metrics.id
}
}, json_dumps_params={'ensure_ascii': False})
elif metrics_type in ['live', 'shoppable_live']:
# 处理直播指标数据
live_type = 'regular' if metrics_type == 'live' else 'shoppable'
# 从metrics_data中提取所需字段
gpm = metrics_data.get('gpm', '$0')
if isinstance(gpm, str) and '$' in gpm:
gpm = float(gpm.replace('$', ''))
lives_count = int(metrics_data.get('lives_count', 0))
avg_views = metrics_data.get('avg_views', '0')
if isinstance(avg_views, str) and 'k' in avg_views:
avg_views = float(avg_views.replace('k', '')) * 1000
else:
avg_views = float(avg_views)
avg_engagement = metrics_data.get('avg_engagement', '0%')
if isinstance(avg_engagement, str) and '%' in avg_engagement:
avg_engagement = float(avg_engagement.replace('%', ''))
avg_likes = int(metrics_data.get('avg_likes', 0))
# 更新或创建直播指标记录
live_metrics, created = LiveMetrics.objects.update_or_create(
creator=creator,
live_type=live_type,
start_date=start_date,
end_date=end_date,
defaults={
'gpm': gpm,
'lives_count': lives_count,
'avg_views': avg_views,
'avg_engagement': avg_engagement,
'avg_likes': avg_likes
}
)
return JsonResponse({
'code': 200,
'message': f'{metrics_type}指标数据已更新',
'data': {
'created': created,
'metrics_id': live_metrics.id
}
}, json_dumps_params={'ensure_ascii': False})
else:
return JsonResponse({
'code': 400,
'message': f'不支持的指标类型: {metrics_type}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新创作者指标数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新创作者指标数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_creator_followers_metrics(request, creator_id=None):
"""获取创作者的粉丝统计指标数据"""
try:
from .models import CreatorProfile, FollowerMetrics
import json
from datetime import datetime
# 检查creator_id是否提供
if not creator_id:
creator_id = request.GET.get('creator_id')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取最新的粉丝统计数据
follower_metrics = FollowerMetrics.objects.filter(
creator=creator
).order_by('-end_date').first()
if not follower_metrics:
# 如果没有数据,返回示例数据
follower_data = {
'gender': {
'female': 0,
'male': 0
},
'age': {
'18-24': 0,
'25-34': 0,
'35-44': 0,
'45-54': 0,
'55+': 0
},
'locations': {
'TX': 0,
'FL': 0,
'NY': 0,
'GE': 0,
'CA': 0
},
'date_range': {
'start_date': '2025-03-29',
'end_date': '2025-04-28'
}
}
else:
# 构建粉丝统计数据
follower_data = {
'gender': {
'female': follower_metrics.female_percentage,
'male': follower_metrics.male_percentage
},
'age': {
'18-24': follower_metrics.age_18_24_percentage,
'25-34': follower_metrics.age_25_34_percentage,
'35-44': follower_metrics.age_35_44_percentage,
'45-54': follower_metrics.age_45_54_percentage,
'55+': follower_metrics.age_55_plus_percentage
},
'locations': follower_metrics.location_data,
'date_range': {
'start_date': follower_metrics.start_date.strftime('%Y-%m-%d'),
'end_date': follower_metrics.end_date.strftime('%Y-%m-%d')
}
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': follower_data
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取创作者粉丝指标数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取创作者粉丝指标数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_creator_trends(request, creator_id=None):
"""获取创作者的趋势指标数据"""
try:
from .models import CreatorProfile, TrendMetrics
import json
from datetime import datetime, timedelta
# 检查creator_id是否提供
if not creator_id:
creator_id = request.GET.get('creator_id')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取时间范围参数格式为yyyy-mm-dd
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 如果提供了时间范围,进行过滤
if start_date_str and end_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
trends = TrendMetrics.objects.filter(
creator=creator,
date__gte=start_date,
date__lte=end_date
).order_by('date')
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
else:
# 默认获取最近30天的数据
end_date = datetime.now().date()
start_date = end_date - timedelta(days=30)
trends = TrendMetrics.objects.filter(
creator=creator,
date__gte=start_date,
date__lte=end_date
).order_by('date')
# 如果没有数据,返回示例数据
if not trends.exists():
# 生成示例数据 - 模拟30天的趋势
example_date_range = [
(datetime.now().date() - timedelta(days=30 - i)).strftime('%Y-%m-%d')
for i in range(31)
]
# 设定随机的起始值和波动
import random
base_gmv = random.uniform(500, 3000)
base_items_sold = random.randint(500, 2000)
base_followers = random.randint(1000, 3000)
base_views = random.randint(1000, 3000)
base_engagement = random.uniform(1.0, 3.0)
# 生成模拟数据
trend_data = {
'gmv': [],
'items_sold': [],
'followers': [],
'video_views': [],
'engagement_rate': [],
'dates': example_date_range
}
for i in range(31):
# 添加一些随机波动
gmv_value = base_gmv + random.uniform(-base_gmv * 0.2, base_gmv * 0.3)
items_value = int(base_items_sold + random.uniform(-base_items_sold * 0.15, base_items_sold * 0.25))
followers_value = int(base_followers + random.uniform(-base_followers * 0.05, base_followers * 0.1))
views_value = int(base_views + random.uniform(-base_views * 0.2, base_views * 0.4))
engagement_value = base_engagement + random.uniform(-base_engagement * 0.1, base_engagement * 0.15)
# 确保值不小于0
trend_data['gmv'].append(max(0, gmv_value))
trend_data['items_sold'].append(max(0, items_value))
trend_data['followers'].append(max(0, followers_value))
trend_data['video_views'].append(max(0, views_value))
trend_data['engagement_rate'].append(max(0, engagement_value))
# 更新基准值,使数据有一定的连续性
base_gmv = gmv_value
base_items_sold = items_value
base_followers = followers_value
base_views = views_value
base_engagement = engagement_value
trend_data['date_range'] = {
'start_date': example_date_range[0],
'end_date': example_date_range[-1]
}
else:
# 从数据库构建趋势数据
trend_data = {
'gmv': [],
'items_sold': [],
'followers': [],
'video_views': [],
'engagement_rate': [],
'dates': []
}
for trend in trends:
trend_data['gmv'].append(trend.gmv)
trend_data['items_sold'].append(trend.items_sold)
trend_data['followers'].append(trend.followers_count)
trend_data['video_views'].append(trend.video_views)
trend_data['engagement_rate'].append(trend.engagement_rate)
trend_data['dates'].append(trend.date.strftime('%Y-%m-%d'))
trend_data['date_range'] = {
'start_date': trend_data['dates'][0] if trend_data['dates'] else start_date.strftime('%Y-%m-%d'),
'end_date': trend_data['dates'][-1] if trend_data['dates'] else end_date.strftime('%Y-%m-%d')
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': trend_data
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取创作者趋势指标数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取创作者趋势指标数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def update_creator_followers(request):
"""更新创作者的粉丝统计数据"""
try:
from .models import CreatorProfile, FollowerMetrics
import json
from datetime import datetime
data = json.loads(request.body)
# 获取必要参数
creator_id = data.get('creator_id')
follower_data = data.get('follower_data', {})
if not creator_id or not follower_data:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id 或 follower_data',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 从请求中获取日期范围
date_range = follower_data.get('date_range', {})
start_date_str = date_range.get('start_date')
end_date_str = date_range.get('end_date')
if not start_date_str or not end_date_str:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: date_range.start_date 或 date_range.end_date',
'data': None
}, json_dumps_params={'ensure_ascii': False})
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 提取粉丝性别数据
gender_data = follower_data.get('gender', {})
female_percentage = gender_data.get('female', 0)
male_percentage = gender_data.get('male', 0)
# 提取粉丝年龄数据
age_data = follower_data.get('age', {})
age_18_24 = age_data.get('18-24', 0)
age_25_34 = age_data.get('25-34', 0)
age_35_44 = age_data.get('35-44', 0)
age_45_54 = age_data.get('45-54', 0)
age_55_plus = age_data.get('55+', 0)
# 提取粉丝地域数据
location_data = follower_data.get('locations', {})
# 更新或创建粉丝统计记录
follower_metrics, created = FollowerMetrics.objects.update_or_create(
creator=creator,
start_date=start_date,
end_date=end_date,
defaults={
'female_percentage': female_percentage,
'male_percentage': male_percentage,
'age_18_24_percentage': age_18_24,
'age_25_34_percentage': age_25_34,
'age_35_44_percentage': age_35_44,
'age_45_54_percentage': age_45_54,
'age_55_plus_percentage': age_55_plus,
'location_data': location_data
}
)
return JsonResponse({
'code': 200,
'message': '粉丝统计数据已更新',
'data': {
'created': created,
'metrics_id': follower_metrics.id
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新创作者粉丝统计数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新创作者粉丝统计数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def update_creator_trend(request):
"""更新创作者的趋势数据"""
try:
from .models import CreatorProfile, TrendMetrics
import json
from datetime import datetime
data = json.loads(request.body)
# 获取必要参数
creator_id = data.get('creator_id')
trend_date = data.get('date')
metrics = data.get('metrics', {})
if not creator_id or not trend_date or not metrics:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id, date 或 metrics',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 解析日期
try:
date_obj = datetime.strptime(trend_date, '%Y-%m-%d').date()
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 提取指标数据
gmv = metrics.get('gmv', 0)
items_sold = metrics.get('items_sold', 0)
followers_count = metrics.get('followers', 0)
video_views = metrics.get('video_views', 0)
engagement_rate = metrics.get('engagement_rate', 0)
# 更新或创建趋势记录
trend_metrics, created = TrendMetrics.objects.update_or_create(
creator=creator,
date=date_obj,
defaults={
'gmv': gmv,
'items_sold': items_sold,
'followers_count': followers_count,
'video_views': video_views,
'engagement_rate': engagement_rate
}
)
return JsonResponse({
'code': 200,
'message': '趋势数据已更新',
'data': {
'created': created,
'metrics_id': trend_metrics.id,
'date': trend_date
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新创作者趋势数据失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新创作者趋势数据失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_creator_videos(request, creator_id=None):
"""获取创作者的视频列表,分为普通视频和带产品视频"""
try:
from .models import CreatorProfile, CreatorVideo
import json
from datetime import datetime
# 检查creator_id是否提供
if not creator_id:
creator_id = request.GET.get('creator_id')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 6)) # 默认每页6个视频
# 计算偏移量
offset = (page - 1) * page_size
# 查询普通视频
regular_videos = CreatorVideo.objects.filter(
creator=creator,
video_type='regular',
has_product=False
).order_by('-release_date')[offset:offset + page_size]
# 查询带产品视频
product_videos = CreatorVideo.objects.filter(
creator=creator,
video_type='product',
has_product=True
).order_by('-release_date')[offset:offset + page_size]
# 如果没有找到视频,创建一些示例数据
if not regular_videos.exists() and not product_videos.exists():
# 返回示例视频数据
regular_videos_data = [
{
"id": 1,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "red",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31"
},
{
"id": 2,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "red",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31"
},
{
"id": 3,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "gold",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31"
}
]
product_videos_data = [
{
"id": 4,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "red",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31",
"has_product": True,
"product_name": "Collagen + Biotin",
"product_url": "#"
},
{
"id": 5,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "red",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31",
"has_product": True,
"product_name": "Collagen + Biotin",
"product_url": "#"
},
{
"id": 6,
"title": "Collagen + Biotin = your beauty routine's new besties. For hair, skin, nails, and join...",
"thumbnail_url": "#",
"badge": "gold",
"view_count": 2130,
"like_count": 20,
"release_date": "2025-03-31",
"has_product": True,
"product_name": "Collagen + Biotin",
"product_url": "#"
}
]
else:
# 格式化查询结果
regular_videos_data = []
for video in regular_videos:
regular_videos_data.append({
"id": video.id,
"title": video.title,
"description": video.description,
"thumbnail_url": video.thumbnail_url,
"video_url": video.video_url,
"badge": video.badge,
"view_count": video.view_count,
"like_count": video.like_count,
"comment_count": video.comment_count,
"release_date": video.release_date.strftime("%d.%m.%Y")
})
product_videos_data = []
for video in product_videos:
product_videos_data.append({
"id": video.id,
"title": video.title,
"description": video.description,
"thumbnail_url": video.thumbnail_url,
"video_url": video.video_url,
"badge": video.badge,
"view_count": video.view_count,
"like_count": video.like_count,
"comment_count": video.comment_count,
"release_date": video.release_date.strftime("%d.%m.%Y"),
"has_product": video.has_product,
"product_name": video.product_name,
"product_url": video.product_url
})
# 查询视频总数,用于分页
regular_videos_count = CreatorVideo.objects.filter(creator=creator, video_type='regular',
has_product=False).count()
product_videos_count = CreatorVideo.objects.filter(creator=creator, video_type='product',
has_product=True).count()
# 计算总页数
regular_total_pages = (regular_videos_count + page_size - 1) // page_size
product_total_pages = (product_videos_count + page_size - 1) // page_size
# 构造返回数据
response_data = {
"regular_videos": {
"videos": regular_videos_data,
"total": regular_videos_count,
"page": page,
"page_size": page_size,
"total_pages": regular_total_pages
},
"product_videos": {
"videos": product_videos_data,
"total": product_videos_count,
"page": page,
"page_size": page_size,
"total_pages": product_total_pages
}
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': response_data
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取创作者视频列表失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取创作者视频列表失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def add_creator_video(request):
"""添加创作者视频"""
try:
from .models import CreatorProfile, CreatorVideo
import json
from datetime import datetime
data = json.loads(request.body)
# 获取必要参数
creator_id = data.get('creator_id')
title = data.get('title')
video_type = data.get('video_type', 'regular') # 默认为普通视频
release_date_str = data.get('release_date')
if not creator_id or not title or not release_date_str:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id, title 或 release_date',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询创作者信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'未找到ID为 {creator_id} 的创作者',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 解析日期
try:
release_date = datetime.strptime(release_date_str, '%Y-%m-%d').date()
except ValueError:
return JsonResponse({
'code': 400,
'message': '日期格式不正确请使用YYYY-MM-DD格式',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取其他参数
description = data.get('description', '')
thumbnail_url = data.get('thumbnail_url', '')
video_url = data.get('video_url', '')
video_id = data.get('video_id', f"vid_{int(datetime.now().timestamp())}")
badge = data.get('badge', 'red')
view_count = int(data.get('view_count', 0))
like_count = int(data.get('like_count', 0))
comment_count = int(data.get('comment_count', 0))
# 产品信息
has_product = data.get('has_product', False)
product_name = data.get('product_name', '')
product_url = data.get('product_url', '')
# 如果视频类型是product确保has_product为True
if video_type == 'product':
has_product = True
# 创建或更新视频
video, created = CreatorVideo.objects.update_or_create(
creator=creator,
video_id=video_id,
defaults={
'title': title,
'description': description,
'thumbnail_url': thumbnail_url,
'video_url': video_url,
'video_type': video_type,
'badge': badge,
'view_count': view_count,
'like_count': like_count,
'comment_count': comment_count,
'has_product': has_product,
'product_name': product_name,
'product_url': product_url,
'release_date': release_date
}
)
return JsonResponse({
'code': 200,
'message': '视频添加成功',
'data': {
'video_id': video.id,
'created': created
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"添加创作者视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'添加创作者视频失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
######################################## 公有达人和私有达人 ########################################
@csrf_exempt
@require_http_methods(["GET"])
def get_public_creators(request):
"""获取公有达人库列表"""
try:
from .models import PublicCreatorPool, CreatorProfile
import json
# 获取分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
# 获取过滤参数
category = request.GET.get('category')
keyword = request.GET.get('keyword')
# 基础查询
public_creators = PublicCreatorPool.objects.all()
# 应用过滤条件
if category:
public_creators = public_creators.filter(category=category)
if keyword:
public_creators = public_creators.filter(
creator__name__icontains=keyword
) | public_creators.filter(
creator__category__icontains=keyword
)
# 获取总数据量
total_count = public_creators.count()
# 计算分页
start = (page - 1) * page_size
end = start + page_size
# 执行查询并分页
creators = public_creators[start:end]
creator_list = []
for public_creator in creators:
creator = public_creator.creator
# 格式化电商等级
e_commerce_level_formatted = f"L{creator.e_commerce_level}" if creator.e_commerce_level else None
# 格式化GMV
gmv_formatted = f"${creator.gmv}k" if creator.gmv else "$0"
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 格式化价格区间
if creator.pricing_min is not None and creator.pricing_max is not None:
pricing_range = f"${creator.pricing_min}-{creator.pricing_max}"
else:
pricing_range = None
# 格式化结果
formatted_creator = {
"public_id": public_creator.id,
"creator_id": creator.id,
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"e_commerce_level": e_commerce_level_formatted,
"exposure_level": creator.exposure_level,
"followers": followers_formatted,
"gmv": gmv_formatted,
"avg_video_views": avg_views_formatted,
"pricing": pricing_range,
"collab_count": creator.collab_count,
"remark": public_creator.remark,
"category_public": public_creator.category
}
creator_list.append(formatted_creator)
# 计算总页数
total_pages = (total_count + page_size - 1) // page_size
# 构造分页信息
pagination = {
"current_page": page,
"total_pages": total_pages,
"total_count": total_count,
"has_next": page < total_pages,
"has_prev": page > 1
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': creator_list,
'pagination': pagination
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取公有达人库列表失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取公有达人库列表失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def add_to_public_pool(request):
"""将达人添加到公有达人库"""
try:
from .models import PublicCreatorPool, CreatorProfile
import json
data = json.loads(request.body)
# 获取必要参数
creator_id = data.get('creator_id')
category = data.get('category')
remark = data.get('remark')
if not creator_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: creator_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询达人信息
try:
creator = CreatorProfile.objects.get(id=creator_id)
except CreatorProfile.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {creator_id} 的达人',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 检查是否已存在于公有库
exists = PublicCreatorPool.objects.filter(creator=creator).exists()
if exists:
# 如果已存在,则更新信息
public_creator = PublicCreatorPool.objects.get(creator=creator)
public_creator.category = category if category else public_creator.category
public_creator.remark = remark if remark else public_creator.remark
public_creator.save()
action = "更新"
else:
# 创建新的公有库达人
public_creator = PublicCreatorPool.objects.create(
creator=creator,
category=category,
remark=remark
)
action = "添加"
return JsonResponse({
'code': 200,
'message': f'成功{action}达人到公有库',
'data': {
'creator': {
'id': creator.id,
'name': creator.name
},
'public_pool': {
'id': public_creator.id,
'category': public_creator.category,
'remark': public_creator.remark
}
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"添加达人到公有库失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'添加达人到公有库失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_private_pools(request):
"""获取用户的私有达人库列表"""
try:
from .models import PrivateCreatorPool
import json
# 获取用户ID
user_id = request.GET.get('user_id')
if not user_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: user_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询用户的所有私有库
pools = PrivateCreatorPool.objects.filter(user=user_id)
pool_list = []
for pool in pools:
# 获取私有库中的达人数量
creator_count = pool.creator_relations.filter(status="active").count()
pool_data = {
"id": pool.id,
"name": pool.name,
"description": pool.description,
"is_default": pool.is_default,
"creator_count": creator_count,
"created_at": pool.created_at.strftime('%Y-%m-%d')
}
pool_list.append(pool_data)
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': pool_list
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取私有达人库列表失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取私有达人库列表失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def create_private_pool(request):
"""创建私有达人库"""
try:
from .models import PrivateCreatorPool
from apps.user.models import User
import json
data = json.loads(request.body)
logger.info(f"创建私有达人库请求数据: {data}")
# 获取必要参数
user_id = data.get('user_id')
name = data.get('name')
description = data.get('description')
is_default = data.get('is_default', False)
logger.info(f"解析后的参数: user_id={user_id}, name={name}, description={description}, is_default={is_default}")
if not user_id or not name:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: user_id 或 name',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询用户信息
try:
logger.info(f"尝试查询用户ID: {user_id}")
# 确保user_id是整数
user_id = int(user_id)
user = User.objects.get(id=user_id)
logger.info(f"查询到的用户信息: {user}")
# 验证user是否是User实例
if not isinstance(user, User):
raise ValueError(f"查询结果不是User实例: {type(user)}")
except ValueError as e:
logger.error(f"用户ID格式错误: {str(e)}")
return JsonResponse({
'code': 400,
'message': f'用户ID格式错误: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
except User.DoesNotExist:
logger.error(f"用户不存在: {user_id}")
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {user_id} 的用户',
'data': None
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"查询用户时发生错误: {str(e)}")
raise
# 检查是否已存在同名私有库
try:
existing_pool = PrivateCreatorPool.objects.filter(user_id=user.id, name=name).first()
if existing_pool:
return JsonResponse({
'code': 409,
'message': f'已存在同名私有库: {name}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"检查私有库是否存在时发生错误: {str(e)}")
raise
# 如果设置为默认库,则将其他库设为非默认
if is_default:
PrivateCreatorPool.objects.filter(user_id=user.id, is_default=True).update(is_default=False)
# 创建私有库
try:
private_pool = PrivateCreatorPool.objects.create(
user_id=user.id, # 使用user.id而不是user实例
name=name,
description=description,
is_default=is_default
)
except Exception as e:
logger.error(f"创建私有库时发生错误: {str(e)}")
raise
return JsonResponse({
'code': 200,
'message': '私有库创建成功',
'data': {
'id': private_pool.id,
'name': private_pool.name,
'is_default': private_pool.is_default,
'creator_count': 0,
'created_at': private_pool.created_at.strftime('%Y-%m-%d')
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"创建私有达人库失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'创建私有达人库失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_private_pool_creators(request, pool_id=None):
"""获取私有库中的达人列表"""
try:
from .models import PrivateCreatorPool, PrivateCreatorRelation
import json
# 检查pool_id是否提供
if not pool_id:
pool_id = request.GET.get('pool_id')
if not pool_id:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: pool_id',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 获取分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
# 获取过滤参数
status = request.GET.get('status', 'active') # 默认只获取活跃状态的达人
keyword = request.GET.get('keyword')
# 查询私有库信息
try:
private_pool = PrivateCreatorPool.objects.get(id=pool_id)
except PrivateCreatorPool.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {pool_id} 的私有库',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询私有库中的达人关联
creator_relations = PrivateCreatorRelation.objects.filter(
private_pool=private_pool
).select_related('creator')
# 应用过滤条件
if status:
creator_relations = creator_relations.filter(status=status)
if keyword:
creator_relations = creator_relations.filter(
creator__name__icontains=keyword
) | creator_relations.filter(
creator__category__icontains=keyword
) | creator_relations.filter(
notes__icontains=keyword
)
# 获取总数据量
total_count = creator_relations.count()
# 计算分页
start = (page - 1) * page_size
end = start + page_size
# 执行查询并分页
paged_relations = creator_relations[start:end]
creator_list = []
for relation in paged_relations:
creator = relation.creator
# 格式化电商等级
e_commerce_level_formatted = f"L{creator.e_commerce_level}" if creator.e_commerce_level else None
# 格式化GMV
gmv_formatted = f"${creator.gmv}k" if creator.gmv else "$0"
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 格式化价格区间
if creator.pricing_min is not None and creator.pricing_max is not None:
pricing_range = f"${creator.pricing_min}-{creator.pricing_max}"
else:
pricing_range = None
# 格式化结果
formatted_creator = {
"relation_id": relation.id,
"creator_id": creator.id,
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"e_commerce_level": e_commerce_level_formatted,
"exposure_level": creator.exposure_level,
"followers": followers_formatted,
"gmv": gmv_formatted,
"avg_video_views": avg_views_formatted,
"pricing": pricing_range, # 使用格式化后的价格区间
"collab_count": creator.collab_count,
"notes": relation.notes,
"status": relation.status,
"added_from_public": relation.added_from_public,
"added_at": relation.created_at.strftime('%Y-%m-%d')
}
creator_list.append(formatted_creator)
# 计算总页数
total_pages = (total_count + page_size - 1) // page_size
# 构造分页信息
pagination = {
"current_page": page,
"total_pages": total_pages,
"total_count": total_count,
"has_next": page < total_pages,
"has_prev": page > 1
}
# 构造私有库信息
pool_info = {
"id": private_pool.id,
"name": private_pool.name,
"description": private_pool.description,
"is_default": private_pool.is_default,
"user_id": private_pool.user_id,
"created_at": private_pool.created_at.strftime('%Y-%m-%d')
}
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': creator_list,
'pagination': pagination,
'pool_info': pool_info
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取私有库达人列表失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'获取私有库达人列表失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def add_creator_to_private_pool(request):
"""将达人添加到私有达人库"""
try:
from .models import PrivateCreatorPool, PrivateCreatorRelation, CreatorProfile, PublicCreatorPool
import json
data = json.loads(request.body)
# 获取必要参数
pool_id = data.get('pool_id')
creator_id = data.get('creator_id')
notes = data.get('notes')
# 也接受批量添加
creator_ids = data.get('creator_ids', [])
if creator_id and not creator_ids:
creator_ids = [creator_id]
if not pool_id or not creator_ids:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: pool_id 或 creator_id/creator_ids',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询私有库信息
try:
private_pool = PrivateCreatorPool.objects.get(id=pool_id)
except PrivateCreatorPool.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {pool_id} 的私有库',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 添加达人到私有库
added_creators = []
skipped_creators = []
for cid in creator_ids:
try:
# 查询达人信息
creator = CreatorProfile.objects.get(id=cid)
# 检查是否从公有库添加
from_public = PublicCreatorPool.objects.filter(creator=creator).exists()
# 检查是否已存在于该私有库
exists = PrivateCreatorRelation.objects.filter(
private_pool=private_pool,
creator=creator
).exists()
if exists:
# 如果已存在,则更新信息
relation = PrivateCreatorRelation.objects.get(
private_pool=private_pool,
creator=creator
)
# 如果状态为archived则重新激活
if relation.status == 'archived':
relation.status = 'active'
if notes:
relation.notes = notes
relation.save()
added_creators.append({
'id': creator.id,
'name': creator.name,
'action': '更新'
})
else:
# 创建新的关联
relation = PrivateCreatorRelation.objects.create(
private_pool=private_pool,
creator=creator,
notes=notes,
added_from_public=from_public,
status='active'
)
added_creators.append({
'id': creator.id,
'name': creator.name,
'action': '添加'
})
except CreatorProfile.DoesNotExist:
skipped_creators.append({
'id': cid,
'reason': '达人不存在或已失效'
})
return JsonResponse({
'code': 200,
'message': '操作成功',
'data': {
'added': added_creators,
'skipped': skipped_creators,
'pool': {
'id': private_pool.id,
'name': private_pool.name
}
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"添加达人到私有库失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'添加达人到私有库失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def update_creator_in_private_pool(request):
"""更新私有库中达人的状态或笔记"""
try:
from .models import PrivateCreatorRelation
import json
data = json.loads(request.body)
# 获取必要参数
relation_id = data.get('relation_id')
status = data.get('status')
notes = data.get('notes')
if not relation_id or (not status and notes is None):
return JsonResponse({
'code': 400,
'message': '缺少必要参数: relation_id 或 status/notes',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 查询关联信息
try:
relation = PrivateCreatorRelation.objects.get(id=relation_id)
except PrivateCreatorRelation.DoesNotExist:
return JsonResponse({
'code': 404,
'message': f'找不到ID为 {relation_id} 的关联记录',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 更新信息
if status:
relation.status = status
if notes is not None: # 允许清空笔记
relation.notes = notes
relation.save()
return JsonResponse({
'code': 200,
'message': '更新成功',
'data': {
'relation_id': relation.id,
'creator_id': relation.creator_id,
'pool_id': relation.private_pool_id,
'status': relation.status,
'notes': relation.notes
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"更新私有库达人失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'更新私有库达人失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def remove_creator_from_private_pool(request):
"""从私有库中移除达人"""
try:
from .models import PrivateCreatorRelation
import json
data = json.loads(request.body)
# 方式1通过关联ID删除
relation_id = data.get('relation_id')
relation_ids = data.get('relation_ids', [])
if relation_id and not relation_ids:
relation_ids = [relation_id]
# 方式2通过私有库ID和达人ID删除
pool_id = data.get('pool_id')
creator_id = data.get('creator_id')
creator_ids = data.get('creator_ids', [])
if creator_id and not creator_ids:
creator_ids = [creator_id]
# 检查参数有效性
if relation_ids:
# 通过关联ID删除
query = PrivateCreatorRelation.objects.filter(id__in=relation_ids)
result_type = 'relation_ids'
result_value = relation_ids
elif pool_id and creator_ids:
# 通过私有库ID和达人ID删除
query = PrivateCreatorRelation.objects.filter(
private_pool_id=pool_id,
creator_id__in=creator_ids
)
result_type = 'creator_ids'
result_value = creator_ids
else:
return JsonResponse({
'code': 400,
'message': '缺少必要参数: 需要提供 relation_id/relation_ids 或同时提供 pool_id 和 creator_id/creator_ids',
'data': None
}, json_dumps_params={'ensure_ascii': False})
# 执行删除操作
deleted_count = query.delete()[0]
return JsonResponse({
'code': 200,
'message': '移除成功',
'data': {
'deleted_count': deleted_count,
result_type: result_value,
'pool_id': pool_id if pool_id else None
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"从私有库移除达人失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': f'从私有库移除达人失败: {str(e)}',
'data': None
}, json_dumps_params={'ensure_ascii': False})