daren/apps/brands/consumers.py
2025-05-20 12:17:45 +08:00

182 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def connect(self):
"""处理WebSocket连接请求"""
# 获取活动ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.group_name = f'campaign_{self.campaign_id}'
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
# 发送初始状态
self.send_initial_status()
# 启动轮询
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}")
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
try:
# 解析接收到的JSON数据
data = json.loads(text_data)
action = data.get('action')
# 处理刷新请求
if action == 'refresh':
self.send_initial_status()
logger.debug(f"接收到WebSocket消息: {text_data}")
except json.JSONDecodeError:
logger.error(f"接收到无效的JSON数据: {text_data}")
except Exception as e:
logger.error(f"处理WebSocket消息时出错: {str(e)}")
def send_update(self, event):
"""向WebSocket客户端发送更新消息"""
# 直接转发消息
self.send(text_data=event['message'])
def get_creator_data(self):
"""获取创作者数据列表"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 构建达人列表数据
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return []
def send_initial_status(self):
"""发送初始状态信息"""
try:
# 获取创作者数据
creator_list = self.get_creator_data()
# 构建并发送标准格式消息
message = {
'code': 200,
'message': '获取成功',
'data': creator_list
}
self.send(text_data=json.dumps(message))
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
message = {
'code': 404,
'message': '找不到活动',
'data': None
}
self.send(text_data=json.dumps(message))
except Exception as e:
logger.error(f"发送初始状态时出错: {str(e)}")
message = {
'code': 500,
'message': f'服务器错误: {str(e)}',
'data': None
}
self.send(text_data=json.dumps(message))
def start_status_polling(self):
"""启动状态轮询"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 构建达人-产品对
creator_product_pairs = []
for cc in creator_campaigns:
creator_id = cc.creator_id
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=self.campaign_id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")