daren/apps/brands/consumers.py

182 lines
6.5 KiB
Python
Raw Normal View History

2025-05-20 12:17:45 +08:00
import json
import logging
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def connect(self):
"""处理WebSocket连接请求"""
# 获取活动ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.group_name = f'campaign_{self.campaign_id}'
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
# 发送初始状态
self.send_initial_status()
# 启动轮询
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}")
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
try:
# 解析接收到的JSON数据
data = json.loads(text_data)
action = data.get('action')
# 处理刷新请求
if action == 'refresh':
self.send_initial_status()
logger.debug(f"接收到WebSocket消息: {text_data}")
except json.JSONDecodeError:
logger.error(f"接收到无效的JSON数据: {text_data}")
except Exception as e:
logger.error(f"处理WebSocket消息时出错: {str(e)}")
def send_update(self, event):
"""向WebSocket客户端发送更新消息"""
# 直接转发消息
self.send(text_data=event['message'])
def get_creator_data(self):
"""获取创作者数据列表"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 构建达人列表数据
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return []
def send_initial_status(self):
"""发送初始状态信息"""
try:
# 获取创作者数据
creator_list = self.get_creator_data()
# 构建并发送标准格式消息
message = {
'code': 200,
'message': '获取成功',
'data': creator_list
}
self.send(text_data=json.dumps(message))
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
message = {
'code': 404,
'message': '找不到活动',
'data': None
}
self.send(text_data=json.dumps(message))
except Exception as e:
logger.error(f"发送初始状态时出错: {str(e)}")
message = {
'code': 500,
'message': f'服务器错误: {str(e)}',
'data': None
}
self.send(text_data=json.dumps(message))
def start_status_polling(self):
"""启动状态轮询"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 构建达人-产品对
creator_product_pairs = []
for cc in creator_campaigns:
creator_id = cc.creator_id
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=self.campaign_id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")