daren/apps/brands/consumers.py

344 lines
13 KiB
Python
Raw Permalink Normal View History

2025-05-20 12:17:45 +08:00
import json
import logging
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign, Product
2025-05-20 12:17:45 +08:00
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from .services.offer_status_service import OfferStatusService
from django.db.models import Q
from django.http.request import QueryDict
from urllib.parse import parse_qs
from apps.user.models import UserToken
from apps.user.authentication import CustomTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
2025-05-20 12:17:45 +08:00
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化属性防止disconnect时出现属性不存在的问题
self.group_name = None
self.product_group_name = None
self.campaign_id = None
self.product_id = None
2025-05-20 12:17:45 +08:00
def connect(self):
"""处理WebSocket连接请求"""
try:
# 获取活动ID和产品ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.product_id = self.scope['url_route']['kwargs'].get('product_id')
# 获取URL查询参数中的token
query_string = self.scope.get('query_string', b'').decode()
query_params = parse_qs(query_string)
token = query_params.get('token', [None])[0]
# 验证token
if not self.validate_token(token):
logger.error(f"WebSocket连接未授权缺少有效token: {token}")
self.close(code=4001)
return
# 定义标准组名
self.group_name = f'campaign_{self.campaign_id}'
# 如果指定了产品ID则额外加入产品特定的组
if self.product_id:
self.product_group_name = f'campaign_{self.campaign_id}_product_{self.product_id}'
else:
self.product_group_name = None
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 如果有产品特定的组,也加入
if self.product_group_name:
async_to_sync(self.channel_layer.group_add)(
self.product_group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
# 发送初始状态
self.send_initial_status()
# 启动轮询
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
except Exception as e:
logger.error(f"WebSocket连接失败: {str(e)}")
self.close(code=4000)
def validate_token(self, token):
"""验证token的有效性使用CustomTokenAuthentication"""
if not token:
logger.warning("未提供token")
return False
2025-05-20 12:17:45 +08:00
try:
# 创建一个模拟的请求对象包含Authorization头
class MockRequest:
def __init__(self, token):
self.META = {
'HTTP_AUTHORIZATION': f'Token {token}'
}
# 创建认证器并尝试认证
auth = CustomTokenAuthentication()
mock_request = MockRequest(token)
try:
user, _ = auth.authenticate(mock_request)
if user and user.is_active:
return True
return False
except AuthenticationFailed as e:
logger.warning(f"Token认证失败: {str(e)}")
return False
except Exception as e:
logger.error(f"验证token时出错: {str(e)}")
return False
2025-05-20 12:17:45 +08:00
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 安全检查,避免属性不存在的错误
if hasattr(self, 'group_name') and self.group_name:
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
# 如果有产品特定组,也移除
if hasattr(self, 'product_group_name') and self.product_group_name:
async_to_sync(self.channel_layer.group_discard)(
self.product_group_name,
self.channel_name
)
2025-05-20 12:17:45 +08:00
logger.info(f"WebSocket连接已断开: {getattr(self, 'group_name', 'unknown')}, 关闭代码: {close_code}")
2025-05-20 12:17:45 +08:00
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
try:
# 解析接收到的JSON数据
data = json.loads(text_data)
action = data.get('action')
# 处理刷新请求
if action == 'refresh':
self.send_initial_status()
logger.debug(f"接收到WebSocket消息: {text_data}")
except json.JSONDecodeError:
logger.error(f"接收到无效的JSON数据: {text_data}")
except Exception as e:
logger.error(f"处理WebSocket消息时出错: {str(e)}")
def send_update(self, event):
"""向WebSocket客户端发送更新消息"""
# 直接转发消息
self.send(text_data=event['message'])
def get_creator_data(self):
"""获取创作者数据列表"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 指定了产品ID时只获取该产品的数据
if self.product_id:
try:
# 获取产品信息
product = Product.objects.get(id=self.product_id)
products = [product]
except Product.DoesNotExist:
logger.error(f"找不到产品: {self.product_id}")
if campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 否则,获取所有关联产品的数据
elif campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 如果没有产品,返回空数据
if not products:
return {
"campaign_id": str(campaign.id),
"products": []
}
# 查询与活动关联的所有达人关联记录
2025-05-20 12:17:45 +08:00
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 准备返回数据
products_data = []
# 遍历所有产品
for product in products:
creator_list = []
2025-05-20 12:17:45 +08:00
# 遍历所有达人
for cc in creator_campaigns:
creator = cc.creator
# 获取最新状态
creator_id = creator.id
product_id = str(product.id)
status = None
# 尝试从API获取状态
try:
status = OfferStatusService.fetch_status(creator_id, product_id)
except Exception as e:
logger.error(f"获取达人状态时出错: {str(e)}")
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
2025-05-22 12:09:55 +08:00
# 如果状态仍然为空,设置默认值"Unconnected"
if not status:
status = "Unconnected"
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
2025-05-23 12:11:03 +08:00
"pricing": f"${creator.pricing}" if creator.pricing else "$0",
"status": status
}
creator_list.append(creator_data)
2025-05-20 12:17:45 +08:00
# 构建产品数据
product_data = {
"product_id": str(product.id),
"product_name": product.name,
"creators": creator_list
2025-05-20 12:17:45 +08:00
}
products_data.append(product_data)
# 如果只指定了一个产品,返回单产品格式
if self.product_id and len(products_data) == 1:
return {
"campaign_id": str(campaign.id),
"product_id": products_data[0]["product_id"],
"product_name": products_data[0]["product_name"],
"creators": products_data[0]["creators"]
}
# 否则返回多产品格式
return {
"campaign_id": str(campaign.id),
"products": products_data
}
2025-05-20 12:17:45 +08:00
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return {
"campaign_id": self.campaign_id,
"product_id": self.product_id if self.product_id else None,
"product_name": None,
"creators": []
}
2025-05-20 12:17:45 +08:00
def send_initial_status(self):
"""发送初始状态信息"""
try:
# 获取创作者数据
creator_list = self.get_creator_data()
# 构建并发送标准格式消息
message = {
'code': 200,
'message': '获取成功',
'data': creator_list
}
self.send(text_data=json.dumps(message))
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
message = {
'code': 404,
'message': '找不到活动',
'data': None
}
self.send(text_data=json.dumps(message))
except Exception as e:
logger.error(f"发送初始状态时出错: {str(e)}")
message = {
'code': 500,
'message': f'服务器错误: {str(e)}',
'data': None
}
self.send(text_data=json.dumps(message))
def start_status_polling(self):
"""启动状态轮询"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 获取所有产品ID列表
product_ids = []
2025-05-20 12:17:45 +08:00
# 如果指定了产品ID只轮询该产品
if self.product_id:
product_ids.append(self.product_id)
# 否则,获取所有关联产品
elif campaign.link_product.exists():
for product in campaign.link_product.all():
product_ids.append(str(product.id))
# 如果没有关联产品使用活动ID作为备选
if not product_ids:
product_ids.append(str(self.campaign_id))
2025-05-20 12:17:45 +08:00
# 构建达人-产品对
creator_product_pairs = []
for creator_campaign in creator_campaigns:
creator_id = creator_campaign.creator_id
for product_id in product_ids:
creator_product_pairs.append((creator_id, product_id))
2025-05-20 12:17:45 +08:00
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=self.campaign_id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询,包含 {len(product_ids)} 个产品和 {len(creator_campaigns)} 个达人")
2025-05-20 12:17:45 +08:00
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")