182 lines
6.5 KiB
Python
182 lines
6.5 KiB
Python
![]() |
import json
|
|||
|
import logging
|
|||
|
from channels.generic.websocket import WebsocketConsumer
|
|||
|
from channels.layers import get_channel_layer
|
|||
|
from asgiref.sync import async_to_sync
|
|||
|
from .services.status_polling_service import polling_service
|
|||
|
from .models import Campaign
|
|||
|
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
|
|||
|
|
|||
|
logger = logging.getLogger('brands')
|
|||
|
|
|||
|
class CampaignStatusConsumer(WebsocketConsumer):
|
|||
|
"""处理活动状态更新的WebSocket消费者"""
|
|||
|
|
|||
|
def connect(self):
|
|||
|
"""处理WebSocket连接请求"""
|
|||
|
# 获取活动ID从URL路由参数
|
|||
|
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
|
|||
|
self.group_name = f'campaign_{self.campaign_id}'
|
|||
|
|
|||
|
# 将连接添加到频道组
|
|||
|
async_to_sync(self.channel_layer.group_add)(
|
|||
|
self.group_name,
|
|||
|
self.channel_name
|
|||
|
)
|
|||
|
|
|||
|
# 接受WebSocket连接
|
|||
|
self.accept()
|
|||
|
|
|||
|
# 发送初始状态
|
|||
|
self.send_initial_status()
|
|||
|
|
|||
|
# 启动轮询
|
|||
|
self.start_status_polling()
|
|||
|
|
|||
|
logger.info(f"WebSocket连接已建立: {self.group_name}")
|
|||
|
|
|||
|
def disconnect(self, close_code):
|
|||
|
"""处理WebSocket断开连接"""
|
|||
|
# 将连接从频道组移除
|
|||
|
async_to_sync(self.channel_layer.group_discard)(
|
|||
|
self.group_name,
|
|||
|
self.channel_name
|
|||
|
)
|
|||
|
|
|||
|
logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}")
|
|||
|
|
|||
|
def receive(self, text_data):
|
|||
|
"""处理从WebSocket客户端接收的消息"""
|
|||
|
try:
|
|||
|
# 解析接收到的JSON数据
|
|||
|
data = json.loads(text_data)
|
|||
|
action = data.get('action')
|
|||
|
|
|||
|
# 处理刷新请求
|
|||
|
if action == 'refresh':
|
|||
|
self.send_initial_status()
|
|||
|
|
|||
|
logger.debug(f"接收到WebSocket消息: {text_data}")
|
|||
|
|
|||
|
except json.JSONDecodeError:
|
|||
|
logger.error(f"接收到无效的JSON数据: {text_data}")
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"处理WebSocket消息时出错: {str(e)}")
|
|||
|
|
|||
|
def send_update(self, event):
|
|||
|
"""向WebSocket客户端发送更新消息"""
|
|||
|
# 直接转发消息
|
|||
|
self.send(text_data=event['message'])
|
|||
|
|
|||
|
def get_creator_data(self):
|
|||
|
"""获取创作者数据列表"""
|
|||
|
try:
|
|||
|
# 查询活动信息
|
|||
|
campaign = Campaign.objects.get(id=self.campaign_id)
|
|||
|
|
|||
|
# 查询活动关联的达人
|
|||
|
creator_campaigns = CreatorCampaign.objects.filter(
|
|||
|
campaign_id=campaign.id
|
|||
|
).select_related('creator')
|
|||
|
|
|||
|
# 构建达人列表数据
|
|||
|
creator_list = []
|
|||
|
for cc in creator_campaigns:
|
|||
|
creator = cc.creator
|
|||
|
|
|||
|
# 格式化粉丝数和观看量
|
|||
|
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
|
|||
|
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
|
|||
|
|
|||
|
# 构建响应数据
|
|||
|
creator_data = {
|
|||
|
"id": str(creator.id),
|
|||
|
"name": creator.name,
|
|||
|
"avatar": creator.avatar_url,
|
|||
|
"category": creator.category,
|
|||
|
"followers": followers_formatted,
|
|||
|
"views": avg_views_formatted,
|
|||
|
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
|
|||
|
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
|
|||
|
"status": cc.status
|
|||
|
}
|
|||
|
creator_list.append(creator_data)
|
|||
|
|
|||
|
return creator_list
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"获取创作者数据出错: {str(e)}")
|
|||
|
return []
|
|||
|
|
|||
|
def send_initial_status(self):
|
|||
|
"""发送初始状态信息"""
|
|||
|
try:
|
|||
|
# 获取创作者数据
|
|||
|
creator_list = self.get_creator_data()
|
|||
|
|
|||
|
# 构建并发送标准格式消息
|
|||
|
message = {
|
|||
|
'code': 200,
|
|||
|
'message': '获取成功',
|
|||
|
'data': creator_list
|
|||
|
}
|
|||
|
|
|||
|
self.send(text_data=json.dumps(message))
|
|||
|
|
|||
|
except Campaign.DoesNotExist:
|
|||
|
logger.error(f"找不到活动: {self.campaign_id}")
|
|||
|
message = {
|
|||
|
'code': 404,
|
|||
|
'message': '找不到活动',
|
|||
|
'data': None
|
|||
|
}
|
|||
|
self.send(text_data=json.dumps(message))
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"发送初始状态时出错: {str(e)}")
|
|||
|
message = {
|
|||
|
'code': 500,
|
|||
|
'message': f'服务器错误: {str(e)}',
|
|||
|
'data': None
|
|||
|
}
|
|||
|
self.send(text_data=json.dumps(message))
|
|||
|
|
|||
|
def start_status_polling(self):
|
|||
|
"""启动状态轮询"""
|
|||
|
try:
|
|||
|
# 查询活动信息
|
|||
|
campaign = Campaign.objects.get(id=self.campaign_id)
|
|||
|
|
|||
|
# 查询活动关联的达人
|
|||
|
creator_campaigns = CreatorCampaign.objects.filter(
|
|||
|
campaign_id=campaign.id
|
|||
|
).select_related('creator')
|
|||
|
|
|||
|
# 获取产品ID
|
|||
|
product_id = None
|
|||
|
if campaign.link_product.exists():
|
|||
|
product = campaign.link_product.first()
|
|||
|
product_id = product.id
|
|||
|
|
|||
|
# 如果没有关联产品,则使用活动ID作为产品ID
|
|||
|
if not product_id:
|
|||
|
product_id = campaign.id
|
|||
|
|
|||
|
# 构建达人-产品对
|
|||
|
creator_product_pairs = []
|
|||
|
for cc in creator_campaigns:
|
|||
|
creator_id = cc.creator_id
|
|||
|
creator_product_pairs.append((creator_id, product_id))
|
|||
|
|
|||
|
# 启动轮询
|
|||
|
if creator_product_pairs:
|
|||
|
polling_service.start_polling(
|
|||
|
campaign_id=self.campaign_id,
|
|||
|
creator_product_pairs=creator_product_pairs,
|
|||
|
interval=30 # 每30秒轮询一次
|
|||
|
)
|
|||
|
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询")
|
|||
|
|
|||
|
except Campaign.DoesNotExist:
|
|||
|
logger.error(f"找不到活动: {self.campaign_id}")
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"启动状态轮询时出错: {str(e)}")
|