daren/apps/brands/consumers.py
2025-05-23 12:11:03 +08:00

344 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign, Product
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from .services.offer_status_service import OfferStatusService
from django.db.models import Q
from django.http.request import QueryDict
from urllib.parse import parse_qs
from apps.user.models import UserToken
from apps.user.authentication import CustomTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化属性防止disconnect时出现属性不存在的问题
self.group_name = None
self.product_group_name = None
self.campaign_id = None
self.product_id = None
def connect(self):
"""处理WebSocket连接请求"""
try:
# 获取活动ID和产品ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.product_id = self.scope['url_route']['kwargs'].get('product_id')
# 获取URL查询参数中的token
query_string = self.scope.get('query_string', b'').decode()
query_params = parse_qs(query_string)
token = query_params.get('token', [None])[0]
# 验证token
if not self.validate_token(token):
logger.error(f"WebSocket连接未授权缺少有效token: {token}")
self.close(code=4001)
return
# 定义标准组名
self.group_name = f'campaign_{self.campaign_id}'
# 如果指定了产品ID则额外加入产品特定的组
if self.product_id:
self.product_group_name = f'campaign_{self.campaign_id}_product_{self.product_id}'
else:
self.product_group_name = None
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 如果有产品特定的组,也加入
if self.product_group_name:
async_to_sync(self.channel_layer.group_add)(
self.product_group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
# 发送初始状态
self.send_initial_status()
# 启动轮询
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
except Exception as e:
logger.error(f"WebSocket连接失败: {str(e)}")
self.close(code=4000)
def validate_token(self, token):
"""验证token的有效性使用CustomTokenAuthentication"""
if not token:
logger.warning("未提供token")
return False
try:
# 创建一个模拟的请求对象包含Authorization头
class MockRequest:
def __init__(self, token):
self.META = {
'HTTP_AUTHORIZATION': f'Token {token}'
}
# 创建认证器并尝试认证
auth = CustomTokenAuthentication()
mock_request = MockRequest(token)
try:
user, _ = auth.authenticate(mock_request)
if user and user.is_active:
return True
return False
except AuthenticationFailed as e:
logger.warning(f"Token认证失败: {str(e)}")
return False
except Exception as e:
logger.error(f"验证token时出错: {str(e)}")
return False
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 安全检查,避免属性不存在的错误
if hasattr(self, 'group_name') and self.group_name:
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
# 如果有产品特定组,也移除
if hasattr(self, 'product_group_name') and self.product_group_name:
async_to_sync(self.channel_layer.group_discard)(
self.product_group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {getattr(self, 'group_name', 'unknown')}, 关闭代码: {close_code}")
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
try:
# 解析接收到的JSON数据
data = json.loads(text_data)
action = data.get('action')
# 处理刷新请求
if action == 'refresh':
self.send_initial_status()
logger.debug(f"接收到WebSocket消息: {text_data}")
except json.JSONDecodeError:
logger.error(f"接收到无效的JSON数据: {text_data}")
except Exception as e:
logger.error(f"处理WebSocket消息时出错: {str(e)}")
def send_update(self, event):
"""向WebSocket客户端发送更新消息"""
# 直接转发消息
self.send(text_data=event['message'])
def get_creator_data(self):
"""获取创作者数据列表"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 指定了产品ID时只获取该产品的数据
if self.product_id:
try:
# 获取产品信息
product = Product.objects.get(id=self.product_id)
products = [product]
except Product.DoesNotExist:
logger.error(f"找不到产品: {self.product_id}")
if campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 否则,获取所有关联产品的数据
elif campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 如果没有产品,返回空数据
if not products:
return {
"campaign_id": str(campaign.id),
"products": []
}
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 准备返回数据
products_data = []
# 遍历所有产品
for product in products:
creator_list = []
# 遍历所有达人
for cc in creator_campaigns:
creator = cc.creator
# 获取最新状态
creator_id = creator.id
product_id = str(product.id)
status = None
# 尝试从API获取状态
try:
status = OfferStatusService.fetch_status(creator_id, product_id)
except Exception as e:
logger.error(f"获取达人状态时出错: {str(e)}")
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
# 如果状态仍然为空,设置默认值"Unconnected"
if not status:
status = "Unconnected"
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"pricing": f"${creator.pricing}" if creator.pricing else "$0",
"status": status
}
creator_list.append(creator_data)
# 构建产品数据
product_data = {
"product_id": str(product.id),
"product_name": product.name,
"creators": creator_list
}
products_data.append(product_data)
# 如果只指定了一个产品,返回单产品格式
if self.product_id and len(products_data) == 1:
return {
"campaign_id": str(campaign.id),
"product_id": products_data[0]["product_id"],
"product_name": products_data[0]["product_name"],
"creators": products_data[0]["creators"]
}
# 否则返回多产品格式
return {
"campaign_id": str(campaign.id),
"products": products_data
}
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return {
"campaign_id": self.campaign_id,
"product_id": self.product_id if self.product_id else None,
"product_name": None,
"creators": []
}
def send_initial_status(self):
"""发送初始状态信息"""
try:
# 获取创作者数据
creator_list = self.get_creator_data()
# 构建并发送标准格式消息
message = {
'code': 200,
'message': '获取成功',
'data': creator_list
}
self.send(text_data=json.dumps(message))
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
message = {
'code': 404,
'message': '找不到活动',
'data': None
}
self.send(text_data=json.dumps(message))
except Exception as e:
logger.error(f"发送初始状态时出错: {str(e)}")
message = {
'code': 500,
'message': f'服务器错误: {str(e)}',
'data': None
}
self.send(text_data=json.dumps(message))
def start_status_polling(self):
"""启动状态轮询"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 获取所有产品ID列表
product_ids = []
# 如果指定了产品ID只轮询该产品
if self.product_id:
product_ids.append(self.product_id)
# 否则,获取所有关联产品
elif campaign.link_product.exists():
for product in campaign.link_product.all():
product_ids.append(str(product.id))
# 如果没有关联产品使用活动ID作为备选
if not product_ids:
product_ids.append(str(self.campaign_id))
# 构建达人-产品对
creator_product_pairs = []
for creator_campaign in creator_campaigns:
creator_id = creator_campaign.creator_id
for product_id in product_ids:
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=self.campaign_id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询,包含 {len(product_ids)} 个产品和 {len(creator_campaigns)} 个达人")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")