daren/apps/discovery/views.py
2025-05-23 13:49:13 +08:00

260 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from django.db.models import Q
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from django.utils import timezone
from rest_framework.permissions import IsAuthenticated
from apps.user.authentication import CustomTokenAuthentication
from .models import SearchSession, Creator
from .serializers import (
SearchSessionSerializer,
SearchSessionDetailSerializer,
CreatorSerializer,
CreatorDetailSerializer
)
from .pagination import StandardResultsSetPagination
class ApiResponse:
"""API统一响应格式"""
@staticmethod
def success(data=None, message="操作成功"):
return Response({
"code": 200,
"message": message,
"data": data
})
@staticmethod
def error(message="操作失败", code=400, data=None):
return Response({
"code": code,
"message": message,
"data": data
}, status=status.HTTP_200_OK) # 始终返回200状态码错误信息在内容中提供
class SearchSessionViewSet(viewsets.ModelViewSet):
"""搜索会话视图集"""
queryset = SearchSession.objects.all()
serializer_class = SearchSessionSerializer
authentication_classes = [CustomTokenAuthentication]
permission_classes = [IsAuthenticated]
pagination_class = StandardResultsSetPagination
def get_serializer_class(self):
if self.action == 'retrieve':
return SearchSessionDetailSerializer
return SearchSessionSerializer
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return ApiResponse.success(serializer.data, "获取搜索会话列表成功")
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return ApiResponse.success(serializer.data, "获取搜索会话详情成功")
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return ApiResponse.success(serializer.data, "创建搜索会话成功")
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
return ApiResponse.success(serializer.data, "更新搜索会话成功")
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return ApiResponse.success(None, "删除搜索会话成功")
@action(detail=True, methods=['get'])
def results(self, request, pk=None):
"""获取指定会话的搜索结果"""
session = self.get_object()
creators = session.creators.all()
page = self.paginate_queryset(creators)
if page is not None:
serializer = CreatorSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = CreatorSerializer(creators, many=True)
return ApiResponse.success(serializer.data, "获取会话创作者列表成功")
@action(detail=False, methods=['get'])
def summary(self, request):
"""获取所有会话的摘要数据,用于展示在表格中"""
queryset = self.filter_queryset(self.get_queryset())
# 格式化响应数据
result = []
for session in queryset:
result.append({
'id': session.id,
'session_number': session.session_number,
'date': session.date_created,
'creator_count': session.creator_count,
'shoppable_creators': session.shoppable_creators,
'avg_followers': session.avg_followers,
'avg_gmv': session.avg_gmv,
'avg_video_views': session.avg_video_views
})
return ApiResponse.success(result, "获取会话摘要列表成功")
class CreatorDiscoveryViewSet(viewsets.ReadOnlyModelViewSet):
"""创作者发现视图集"""
queryset = Creator.objects.all()
serializer_class = CreatorSerializer
authentication_classes = [CustomTokenAuthentication]
permission_classes = [IsAuthenticated]
pagination_class = StandardResultsSetPagination
def get_serializer_class(self):
if self.action == 'retrieve':
return CreatorDetailSerializer
return CreatorSerializer
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return ApiResponse.success(serializer.data, "获取创作者列表成功")
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return ApiResponse.success(serializer.data, "获取创作者详情成功")
@action(detail=False, methods=['post'])
def search(self, request):
"""搜索创作者"""
query = request.data.get('query', '')
category = request.data.get('category', None)
ecommerce_level = request.data.get('ecommerce_level', None)
exposure_level = request.data.get('exposure_level', None)
try:
# 导入CreatorProfile模型
from apps.daren_detail.models import CreatorProfile
import logging
logger = logging.getLogger(__name__)
# 获取或创建当天的session
today = timezone.now().date()
today_session = SearchSession.objects.filter(date_created=today).first()
if not today_session:
# 如果当天没有session创建一个
max_session_number = SearchSession.objects.all().order_by('-session_number').first()
session_number = 1
if max_session_number:
session_number = max_session_number.session_number + 1
today_session = SearchSession.objects.create(
session_number=session_number,
creator_count=0,
shoppable_creators=0,
avg_followers=0,
avg_gmv=0,
avg_video_views=0,
date_created=today
)
# 构建查询条件
query_filters = {}
if query:
query_filters['name__icontains'] = query
if category:
query_filters['category'] = category
if ecommerce_level:
if isinstance(ecommerce_level, str) and ecommerce_level.startswith('L') and len(ecommerce_level) > 1:
try:
ecommerce_level_num = int(ecommerce_level[1:])
query_filters['e_commerce_level'] = ecommerce_level_num
except ValueError:
pass
else:
query_filters['e_commerce_level'] = ecommerce_level
if exposure_level:
query_filters['exposure_level'] = exposure_level
# 查询CreatorProfile表
creator_profiles = CreatorProfile.objects.filter(**query_filters)
# 对每个找到的CreatorProfile创建Creator记录
for profile in creator_profiles:
# 检查是否已经在当前session中存在相同creator
existing_creator = Creator.objects.filter(
session=today_session,
name=profile.name
).first()
if not existing_creator:
# 确定电商等级字符串
ecommerce_level_str = "New tag"
if profile.e_commerce_level is not None:
ecommerce_level_str = f"L{profile.e_commerce_level}"
# 创建Creator记录
Creator.objects.create(
session=today_session,
name=profile.name,
avatar=profile.avatar_url if profile.avatar_url else None,
category=profile.category if profile.category else "Other",
ecommerce_level=ecommerce_level_str,
exposure_level=profile.exposure_level if profile.exposure_level else "New tag",
followers=float(profile.followers) if profile.followers else 0,
gmv=float(profile.gmv) if profile.gmv else 0,
items_sold=float(profile.items_sold) if profile.items_sold else 0,
avg_video_views=float(profile.avg_video_views) if profile.avg_video_views else 0,
has_ecommerce=profile.e_commerce_level is not None,
tiktok_url=profile.tiktok_link if profile.tiktok_link else None
)
# 更新session统计信息
all_creators = today_session.creators.all()
total_creators = all_creators.count()
if total_creators > 0:
# 计算平均值
total_followers = sum(creator.followers for creator in all_creators)
total_gmv = sum(creator.gmv for creator in all_creators)
total_video_views = sum(creator.avg_video_views for creator in all_creators)
today_session.creator_count = total_creators
today_session.shoppable_creators = all_creators.filter(has_ecommerce=True).count()
today_session.avg_followers = round(total_followers / total_creators, 1)
today_session.avg_gmv = round(total_gmv / total_creators, 1)
today_session.avg_video_views = round(total_video_views / total_creators, 1)
today_session.save()
# 返回session详情
serializer = SearchSessionDetailSerializer(today_session)
return ApiResponse.success(serializer.data, "搜索创作者成功")
except Exception as e:
import traceback
logger.error(f"搜索创作者时发生错误: {str(e)}")
logger.error(traceback.format_exc())
return ApiResponse.error(f"搜索创作者失败: {str(e)}")