import json import logging from channels.generic.websocket import WebsocketConsumer from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from .services.status_polling_service import polling_service from .models import Campaign from apps.daren_detail.models import CreatorCampaign, CreatorProfile logger = logging.getLogger('brands') class CampaignStatusConsumer(WebsocketConsumer): """处理活动状态更新的WebSocket消费者""" def connect(self): """处理WebSocket连接请求""" # 获取活动ID从URL路由参数 self.campaign_id = self.scope['url_route']['kwargs']['campaign_id'] self.group_name = f'campaign_{self.campaign_id}' # 将连接添加到频道组 async_to_sync(self.channel_layer.group_add)( self.group_name, self.channel_name ) # 接受WebSocket连接 self.accept() # 发送初始状态 self.send_initial_status() # 启动轮询 self.start_status_polling() logger.info(f"WebSocket连接已建立: {self.group_name}") def disconnect(self, close_code): """处理WebSocket断开连接""" # 将连接从频道组移除 async_to_sync(self.channel_layer.group_discard)( self.group_name, self.channel_name ) logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}") def receive(self, text_data): """处理从WebSocket客户端接收的消息""" try: # 解析接收到的JSON数据 data = json.loads(text_data) action = data.get('action') # 处理刷新请求 if action == 'refresh': self.send_initial_status() logger.debug(f"接收到WebSocket消息: {text_data}") except json.JSONDecodeError: logger.error(f"接收到无效的JSON数据: {text_data}") except Exception as e: logger.error(f"处理WebSocket消息时出错: {str(e)}") def send_update(self, event): """向WebSocket客户端发送更新消息""" # 直接转发消息 self.send(text_data=event['message']) def get_creator_data(self): """获取创作者数据列表""" try: # 查询活动信息 campaign = Campaign.objects.get(id=self.campaign_id) # 查询活动关联的达人 creator_campaigns = CreatorCampaign.objects.filter( campaign_id=campaign.id ).select_related('creator') # 构建达人列表数据 creator_list = [] for cc in creator_campaigns: creator = cc.creator # 格式化粉丝数和观看量 followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0" avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0" # 构建响应数据 creator_data = { "id": str(creator.id), "name": creator.name, "avatar": creator.avatar_url, "category": creator.category, "followers": followers_formatted, "views": avg_views_formatted, "gmv": f"${creator.gmv}k" if creator.gmv else "$0", "pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0", "status": cc.status } creator_list.append(creator_data) return creator_list except Exception as e: logger.error(f"获取创作者数据出错: {str(e)}") return [] def send_initial_status(self): """发送初始状态信息""" try: # 获取创作者数据 creator_list = self.get_creator_data() # 构建并发送标准格式消息 message = { 'code': 200, 'message': '获取成功', 'data': creator_list } self.send(text_data=json.dumps(message)) except Campaign.DoesNotExist: logger.error(f"找不到活动: {self.campaign_id}") message = { 'code': 404, 'message': '找不到活动', 'data': None } self.send(text_data=json.dumps(message)) except Exception as e: logger.error(f"发送初始状态时出错: {str(e)}") message = { 'code': 500, 'message': f'服务器错误: {str(e)}', 'data': None } self.send(text_data=json.dumps(message)) def start_status_polling(self): """启动状态轮询""" try: # 查询活动信息 campaign = Campaign.objects.get(id=self.campaign_id) # 查询活动关联的达人 creator_campaigns = CreatorCampaign.objects.filter( campaign_id=campaign.id ).select_related('creator') # 获取产品ID product_id = None if campaign.link_product.exists(): product = campaign.link_product.first() product_id = product.id # 如果没有关联产品,则使用活动ID作为产品ID if not product_id: product_id = campaign.id # 构建达人-产品对 creator_product_pairs = [] for cc in creator_campaigns: creator_id = cc.creator_id creator_product_pairs.append((creator_id, product_id)) # 启动轮询 if creator_product_pairs: polling_service.start_polling( campaign_id=self.campaign_id, creator_product_pairs=creator_product_pairs, interval=30 # 每30秒轮询一次 ) logger.info(f"已启动活动 {self.campaign_id} 的状态轮询") except Campaign.DoesNotExist: logger.error(f"找不到活动: {self.campaign_id}") except Exception as e: logger.error(f"启动状态轮询时出错: {str(e)}")