344 lines
13 KiB
Python
344 lines
13 KiB
Python
import json
|
||
import logging
|
||
from channels.generic.websocket import WebsocketConsumer
|
||
from channels.layers import get_channel_layer
|
||
from asgiref.sync import async_to_sync
|
||
from .services.status_polling_service import polling_service
|
||
from .models import Campaign, Product
|
||
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
|
||
from .services.offer_status_service import OfferStatusService
|
||
from django.db.models import Q
|
||
from django.http.request import QueryDict
|
||
from urllib.parse import parse_qs
|
||
from apps.user.models import UserToken
|
||
from apps.user.authentication import CustomTokenAuthentication
|
||
from rest_framework.exceptions import AuthenticationFailed
|
||
|
||
logger = logging.getLogger('brands')
|
||
|
||
class CampaignStatusConsumer(WebsocketConsumer):
|
||
"""处理活动状态更新的WebSocket消费者"""
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
# 初始化属性,防止disconnect时出现属性不存在的问题
|
||
self.group_name = None
|
||
self.product_group_name = None
|
||
self.campaign_id = None
|
||
self.product_id = None
|
||
|
||
def connect(self):
|
||
"""处理WebSocket连接请求"""
|
||
try:
|
||
# 获取活动ID和产品ID从URL路由参数
|
||
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
|
||
self.product_id = self.scope['url_route']['kwargs'].get('product_id')
|
||
|
||
# 获取URL查询参数中的token
|
||
query_string = self.scope.get('query_string', b'').decode()
|
||
query_params = parse_qs(query_string)
|
||
token = query_params.get('token', [None])[0]
|
||
|
||
# 验证token
|
||
if not self.validate_token(token):
|
||
logger.error(f"WebSocket连接未授权,缺少有效token: {token}")
|
||
self.close(code=4001)
|
||
return
|
||
|
||
# 定义标准组名
|
||
self.group_name = f'campaign_{self.campaign_id}'
|
||
|
||
# 如果指定了产品ID,则额外加入产品特定的组
|
||
if self.product_id:
|
||
self.product_group_name = f'campaign_{self.campaign_id}_product_{self.product_id}'
|
||
else:
|
||
self.product_group_name = None
|
||
|
||
# 将连接添加到频道组
|
||
async_to_sync(self.channel_layer.group_add)(
|
||
self.group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
# 如果有产品特定的组,也加入
|
||
if self.product_group_name:
|
||
async_to_sync(self.channel_layer.group_add)(
|
||
self.product_group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
# 接受WebSocket连接
|
||
self.accept()
|
||
|
||
# 发送初始状态
|
||
self.send_initial_status()
|
||
|
||
# 启动轮询
|
||
self.start_status_polling()
|
||
|
||
logger.info(f"WebSocket连接已建立: {self.group_name}")
|
||
except Exception as e:
|
||
logger.error(f"WebSocket连接失败: {str(e)}")
|
||
self.close(code=4000)
|
||
|
||
def validate_token(self, token):
|
||
"""验证token的有效性,使用CustomTokenAuthentication"""
|
||
if not token:
|
||
logger.warning("未提供token")
|
||
return False
|
||
|
||
try:
|
||
# 创建一个模拟的请求对象,包含Authorization头
|
||
class MockRequest:
|
||
def __init__(self, token):
|
||
self.META = {
|
||
'HTTP_AUTHORIZATION': f'Token {token}'
|
||
}
|
||
|
||
# 创建认证器并尝试认证
|
||
auth = CustomTokenAuthentication()
|
||
mock_request = MockRequest(token)
|
||
|
||
try:
|
||
user, _ = auth.authenticate(mock_request)
|
||
if user and user.is_active:
|
||
return True
|
||
return False
|
||
except AuthenticationFailed as e:
|
||
logger.warning(f"Token认证失败: {str(e)}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"验证token时出错: {str(e)}")
|
||
return False
|
||
|
||
def disconnect(self, close_code):
|
||
"""处理WebSocket断开连接"""
|
||
# 安全检查,避免属性不存在的错误
|
||
if hasattr(self, 'group_name') and self.group_name:
|
||
# 将连接从频道组移除
|
||
async_to_sync(self.channel_layer.group_discard)(
|
||
self.group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
# 如果有产品特定组,也移除
|
||
if hasattr(self, 'product_group_name') and self.product_group_name:
|
||
async_to_sync(self.channel_layer.group_discard)(
|
||
self.product_group_name,
|
||
self.channel_name
|
||
)
|
||
|
||
logger.info(f"WebSocket连接已断开: {getattr(self, 'group_name', 'unknown')}, 关闭代码: {close_code}")
|
||
|
||
def receive(self, text_data):
|
||
"""处理从WebSocket客户端接收的消息"""
|
||
try:
|
||
# 解析接收到的JSON数据
|
||
data = json.loads(text_data)
|
||
action = data.get('action')
|
||
|
||
# 处理刷新请求
|
||
if action == 'refresh':
|
||
self.send_initial_status()
|
||
|
||
logger.debug(f"接收到WebSocket消息: {text_data}")
|
||
|
||
except json.JSONDecodeError:
|
||
logger.error(f"接收到无效的JSON数据: {text_data}")
|
||
except Exception as e:
|
||
logger.error(f"处理WebSocket消息时出错: {str(e)}")
|
||
|
||
def send_update(self, event):
|
||
"""向WebSocket客户端发送更新消息"""
|
||
# 直接转发消息
|
||
self.send(text_data=event['message'])
|
||
|
||
def get_creator_data(self):
|
||
"""获取创作者数据列表"""
|
||
try:
|
||
# 查询活动信息
|
||
campaign = Campaign.objects.get(id=self.campaign_id)
|
||
|
||
# 指定了产品ID时,只获取该产品的数据
|
||
if self.product_id:
|
||
try:
|
||
# 获取产品信息
|
||
product = Product.objects.get(id=self.product_id)
|
||
products = [product]
|
||
except Product.DoesNotExist:
|
||
logger.error(f"找不到产品: {self.product_id}")
|
||
if campaign.link_product.exists():
|
||
products = list(campaign.link_product.all())
|
||
else:
|
||
products = []
|
||
# 否则,获取所有关联产品的数据
|
||
elif campaign.link_product.exists():
|
||
products = list(campaign.link_product.all())
|
||
else:
|
||
products = []
|
||
|
||
# 如果没有产品,返回空数据
|
||
if not products:
|
||
return {
|
||
"campaign_id": str(campaign.id),
|
||
"products": []
|
||
}
|
||
|
||
# 查询与活动关联的所有达人关联记录
|
||
creator_campaigns = CreatorCampaign.objects.filter(
|
||
campaign_id=campaign.id
|
||
).select_related('creator')
|
||
|
||
# 准备返回数据
|
||
products_data = []
|
||
|
||
# 遍历所有产品
|
||
for product in products:
|
||
creator_list = []
|
||
|
||
# 遍历所有达人
|
||
for cc in creator_campaigns:
|
||
creator = cc.creator
|
||
|
||
# 获取最新状态
|
||
creator_id = creator.id
|
||
product_id = str(product.id)
|
||
status = None
|
||
|
||
# 尝试从API获取状态
|
||
try:
|
||
status = OfferStatusService.fetch_status(creator_id, product_id)
|
||
except Exception as e:
|
||
logger.error(f"获取达人状态时出错: {str(e)}")
|
||
|
||
# 如果无法获取状态,则使用数据库中的状态
|
||
if not status:
|
||
status = cc.status
|
||
|
||
# 如果状态仍然为空,设置默认值"Unconnected"
|
||
if not status:
|
||
status = "Unconnected"
|
||
|
||
# 构建响应数据
|
||
creator_data = {
|
||
"creator_name": creator.name,
|
||
"category": creator.category,
|
||
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
|
||
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
|
||
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
|
||
"pricing": f"${creator.pricing}" if creator.pricing else "$0",
|
||
"status": status
|
||
}
|
||
creator_list.append(creator_data)
|
||
|
||
# 构建产品数据
|
||
product_data = {
|
||
"product_id": str(product.id),
|
||
"product_name": product.name,
|
||
"creators": creator_list
|
||
}
|
||
products_data.append(product_data)
|
||
|
||
# 如果只指定了一个产品,返回单产品格式
|
||
if self.product_id and len(products_data) == 1:
|
||
return {
|
||
"campaign_id": str(campaign.id),
|
||
"product_id": products_data[0]["product_id"],
|
||
"product_name": products_data[0]["product_name"],
|
||
"creators": products_data[0]["creators"]
|
||
}
|
||
|
||
# 否则返回多产品格式
|
||
return {
|
||
"campaign_id": str(campaign.id),
|
||
"products": products_data
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取创作者数据出错: {str(e)}")
|
||
return {
|
||
"campaign_id": self.campaign_id,
|
||
"product_id": self.product_id if self.product_id else None,
|
||
"product_name": None,
|
||
"creators": []
|
||
}
|
||
|
||
def send_initial_status(self):
|
||
"""发送初始状态信息"""
|
||
try:
|
||
# 获取创作者数据
|
||
creator_list = self.get_creator_data()
|
||
|
||
# 构建并发送标准格式消息
|
||
message = {
|
||
'code': 200,
|
||
'message': '获取成功',
|
||
'data': creator_list
|
||
}
|
||
|
||
self.send(text_data=json.dumps(message))
|
||
|
||
except Campaign.DoesNotExist:
|
||
logger.error(f"找不到活动: {self.campaign_id}")
|
||
message = {
|
||
'code': 404,
|
||
'message': '找不到活动',
|
||
'data': None
|
||
}
|
||
self.send(text_data=json.dumps(message))
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送初始状态时出错: {str(e)}")
|
||
message = {
|
||
'code': 500,
|
||
'message': f'服务器错误: {str(e)}',
|
||
'data': None
|
||
}
|
||
self.send(text_data=json.dumps(message))
|
||
|
||
def start_status_polling(self):
|
||
"""启动状态轮询"""
|
||
try:
|
||
# 查询活动信息
|
||
campaign = Campaign.objects.get(id=self.campaign_id)
|
||
|
||
# 查询活动关联的达人
|
||
creator_campaigns = CreatorCampaign.objects.filter(
|
||
campaign_id=campaign.id
|
||
).select_related('creator')
|
||
|
||
# 获取所有产品ID列表
|
||
product_ids = []
|
||
|
||
# 如果指定了产品ID,只轮询该产品
|
||
if self.product_id:
|
||
product_ids.append(self.product_id)
|
||
# 否则,获取所有关联产品
|
||
elif campaign.link_product.exists():
|
||
for product in campaign.link_product.all():
|
||
product_ids.append(str(product.id))
|
||
# 如果没有关联产品,使用活动ID作为备选
|
||
if not product_ids:
|
||
product_ids.append(str(self.campaign_id))
|
||
|
||
# 构建达人-产品对
|
||
creator_product_pairs = []
|
||
for creator_campaign in creator_campaigns:
|
||
creator_id = creator_campaign.creator_id
|
||
for product_id in product_ids:
|
||
creator_product_pairs.append((creator_id, product_id))
|
||
|
||
# 启动轮询
|
||
if creator_product_pairs:
|
||
polling_service.start_polling(
|
||
campaign_id=self.campaign_id,
|
||
creator_product_pairs=creator_product_pairs,
|
||
interval=30 # 每30秒轮询一次
|
||
)
|
||
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询,包含 {len(product_ids)} 个产品和 {len(creator_campaigns)} 个达人")
|
||
|
||
except Campaign.DoesNotExist:
|
||
logger.error(f"找不到活动: {self.campaign_id}")
|
||
except Exception as e:
|
||
logger.error(f"启动状态轮询时出错: {str(e)}") |