import json import logging from channels.generic.websocket import WebsocketConsumer from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from .services.status_polling_service import polling_service from .models import Campaign, Product from apps.daren_detail.models import CreatorCampaign, CreatorProfile from .services.offer_status_service import OfferStatusService from django.db.models import Q from django.http.request import QueryDict from urllib.parse import parse_qs from apps.user.models import UserToken from apps.user.authentication import CustomTokenAuthentication from rest_framework.exceptions import AuthenticationFailed logger = logging.getLogger('brands') class CampaignStatusConsumer(WebsocketConsumer): """处理活动状态更新的WebSocket消费者""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 初始化属性,防止disconnect时出现属性不存在的问题 self.group_name = None self.product_group_name = None self.campaign_id = None self.product_id = None def connect(self): """处理WebSocket连接请求""" try: # 获取活动ID和产品ID从URL路由参数 self.campaign_id = self.scope['url_route']['kwargs']['campaign_id'] self.product_id = self.scope['url_route']['kwargs'].get('product_id') # 获取URL查询参数中的token query_string = self.scope.get('query_string', b'').decode() query_params = parse_qs(query_string) token = query_params.get('token', [None])[0] # 验证token if not self.validate_token(token): logger.error(f"WebSocket连接未授权,缺少有效token: {token}") self.close(code=4001) return # 定义标准组名 self.group_name = f'campaign_{self.campaign_id}' # 如果指定了产品ID,则额外加入产品特定的组 if self.product_id: self.product_group_name = f'campaign_{self.campaign_id}_product_{self.product_id}' else: self.product_group_name = None # 将连接添加到频道组 async_to_sync(self.channel_layer.group_add)( self.group_name, self.channel_name ) # 如果有产品特定的组,也加入 if self.product_group_name: async_to_sync(self.channel_layer.group_add)( self.product_group_name, self.channel_name ) # 接受WebSocket连接 self.accept() # 发送初始状态 self.send_initial_status() # 启动轮询 self.start_status_polling() logger.info(f"WebSocket连接已建立: {self.group_name}") except Exception as e: logger.error(f"WebSocket连接失败: {str(e)}") self.close(code=4000) def validate_token(self, token): """验证token的有效性,使用CustomTokenAuthentication""" if not token: logger.warning("未提供token") return False try: # 创建一个模拟的请求对象,包含Authorization头 class MockRequest: def __init__(self, token): self.META = { 'HTTP_AUTHORIZATION': f'Token {token}' } # 创建认证器并尝试认证 auth = CustomTokenAuthentication() mock_request = MockRequest(token) try: user, _ = auth.authenticate(mock_request) if user and user.is_active: return True return False except AuthenticationFailed as e: logger.warning(f"Token认证失败: {str(e)}") return False except Exception as e: logger.error(f"验证token时出错: {str(e)}") return False def disconnect(self, close_code): """处理WebSocket断开连接""" # 安全检查,避免属性不存在的错误 if hasattr(self, 'group_name') and self.group_name: # 将连接从频道组移除 async_to_sync(self.channel_layer.group_discard)( self.group_name, self.channel_name ) # 如果有产品特定组,也移除 if hasattr(self, 'product_group_name') and self.product_group_name: async_to_sync(self.channel_layer.group_discard)( self.product_group_name, self.channel_name ) logger.info(f"WebSocket连接已断开: {getattr(self, 'group_name', 'unknown')}, 关闭代码: {close_code}") def receive(self, text_data): """处理从WebSocket客户端接收的消息""" try: # 解析接收到的JSON数据 data = json.loads(text_data) action = data.get('action') # 处理刷新请求 if action == 'refresh': self.send_initial_status() logger.debug(f"接收到WebSocket消息: {text_data}") except json.JSONDecodeError: logger.error(f"接收到无效的JSON数据: {text_data}") except Exception as e: logger.error(f"处理WebSocket消息时出错: {str(e)}") def send_update(self, event): """向WebSocket客户端发送更新消息""" # 直接转发消息 self.send(text_data=event['message']) def get_creator_data(self): """获取创作者数据列表""" try: # 查询活动信息 campaign = Campaign.objects.get(id=self.campaign_id) # 指定了产品ID时,只获取该产品的数据 if self.product_id: try: # 获取产品信息 product = Product.objects.get(id=self.product_id) products = [product] except Product.DoesNotExist: logger.error(f"找不到产品: {self.product_id}") if campaign.link_product.exists(): products = list(campaign.link_product.all()) else: products = [] # 否则,获取所有关联产品的数据 elif campaign.link_product.exists(): products = list(campaign.link_product.all()) else: products = [] # 如果没有产品,返回空数据 if not products: return { "campaign_id": str(campaign.id), "products": [] } # 查询与活动关联的所有达人关联记录 creator_campaigns = CreatorCampaign.objects.filter( campaign_id=campaign.id ).select_related('creator') # 准备返回数据 products_data = [] # 遍历所有产品 for product in products: creator_list = [] # 遍历所有达人 for cc in creator_campaigns: creator = cc.creator # 获取最新状态 creator_id = creator.id product_id = str(product.id) status = None # 尝试从API获取状态 try: status = OfferStatusService.fetch_status(creator_id, product_id) except Exception as e: logger.error(f"获取达人状态时出错: {str(e)}") # 如果无法获取状态,则使用数据库中的状态 if not status: status = cc.status # 如果状态仍然为空,设置默认值"Unconnected" if not status: status = "Unconnected" # 构建响应数据 creator_data = { "creator_name": creator.name, "category": creator.category, "followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0", "gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0", "views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0", "pricing": f"${creator.pricing}" if creator.pricing else "$0", "status": status } creator_list.append(creator_data) # 构建产品数据 product_data = { "product_id": str(product.id), "product_name": product.name, "creators": creator_list } products_data.append(product_data) # 如果只指定了一个产品,返回单产品格式 if self.product_id and len(products_data) == 1: return { "campaign_id": str(campaign.id), "product_id": products_data[0]["product_id"], "product_name": products_data[0]["product_name"], "creators": products_data[0]["creators"] } # 否则返回多产品格式 return { "campaign_id": str(campaign.id), "products": products_data } except Exception as e: logger.error(f"获取创作者数据出错: {str(e)}") return { "campaign_id": self.campaign_id, "product_id": self.product_id if self.product_id else None, "product_name": None, "creators": [] } def send_initial_status(self): """发送初始状态信息""" try: # 获取创作者数据 creator_list = self.get_creator_data() # 构建并发送标准格式消息 message = { 'code': 200, 'message': '获取成功', 'data': creator_list } self.send(text_data=json.dumps(message)) except Campaign.DoesNotExist: logger.error(f"找不到活动: {self.campaign_id}") message = { 'code': 404, 'message': '找不到活动', 'data': None } self.send(text_data=json.dumps(message)) except Exception as e: logger.error(f"发送初始状态时出错: {str(e)}") message = { 'code': 500, 'message': f'服务器错误: {str(e)}', 'data': None } self.send(text_data=json.dumps(message)) def start_status_polling(self): """启动状态轮询""" try: # 查询活动信息 campaign = Campaign.objects.get(id=self.campaign_id) # 查询活动关联的达人 creator_campaigns = CreatorCampaign.objects.filter( campaign_id=campaign.id ).select_related('creator') # 获取所有产品ID列表 product_ids = [] # 如果指定了产品ID,只轮询该产品 if self.product_id: product_ids.append(self.product_id) # 否则,获取所有关联产品 elif campaign.link_product.exists(): for product in campaign.link_product.all(): product_ids.append(str(product.id)) # 如果没有关联产品,使用活动ID作为备选 if not product_ids: product_ids.append(str(self.campaign_id)) # 构建达人-产品对 creator_product_pairs = [] for creator_campaign in creator_campaigns: creator_id = creator_campaign.creator_id for product_id in product_ids: creator_product_pairs.append((creator_id, product_id)) # 启动轮询 if creator_product_pairs: polling_service.start_polling( campaign_id=self.campaign_id, creator_product_pairs=creator_product_pairs, interval=30 # 每30秒轮询一次 ) logger.info(f"已启动活动 {self.campaign_id} 的状态轮询,包含 {len(product_ids)} 个产品和 {len(creator_campaigns)} 个达人") except Campaign.DoesNotExist: logger.error(f"找不到活动: {self.campaign_id}") except Exception as e: logger.error(f"启动状态轮询时出错: {str(e)}")