活动id和商品id获取状态定时轮询更新加token和停止轮询

This commit is contained in:
wanjia 2025-05-21 17:18:59 +08:00
parent f882c25093
commit 687f61fae8
6 changed files with 999 additions and 132 deletions

View File

@ -4,26 +4,69 @@ from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign
from .models import Campaign, Product
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from .services.offer_status_service import OfferStatusService
from django.db.models import Q
from django.http.request import QueryDict
from urllib.parse import parse_qs
from apps.user.models import UserToken
from apps.user.authentication import CustomTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化属性防止disconnect时出现属性不存在的问题
self.group_name = None
self.product_group_name = None
self.campaign_id = None
self.product_id = None
def connect(self):
"""处理WebSocket连接请求"""
# 获取活动ID从URL路由参数
try:
# 获取活动ID和产品ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.product_id = self.scope['url_route']['kwargs'].get('product_id')
# 获取URL查询参数中的token
query_string = self.scope.get('query_string', b'').decode()
query_params = parse_qs(query_string)
token = query_params.get('token', [None])[0]
# 验证token
if not self.validate_token(token):
logger.error(f"WebSocket连接未授权缺少有效token: {token}")
self.close(code=4001)
return
# 定义标准组名
self.group_name = f'campaign_{self.campaign_id}'
# 如果指定了产品ID则额外加入产品特定的组
if self.product_id:
self.product_group_name = f'campaign_{self.campaign_id}_product_{self.product_id}'
else:
self.product_group_name = None
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 如果有产品特定的组,也加入
if self.product_group_name:
async_to_sync(self.channel_layer.group_add)(
self.product_group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
@ -34,16 +77,59 @@ class CampaignStatusConsumer(WebsocketConsumer):
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
except Exception as e:
logger.error(f"WebSocket连接失败: {str(e)}")
self.close(code=4000)
def validate_token(self, token):
"""验证token的有效性使用CustomTokenAuthentication"""
if not token:
logger.warning("未提供token")
return False
try:
# 创建一个模拟的请求对象包含Authorization头
class MockRequest:
def __init__(self, token):
self.META = {
'HTTP_AUTHORIZATION': f'Token {token}'
}
# 创建认证器并尝试认证
auth = CustomTokenAuthentication()
mock_request = MockRequest(token)
try:
user, _ = auth.authenticate(mock_request)
if user and user.is_active:
return True
return False
except AuthenticationFailed as e:
logger.warning(f"Token认证失败: {str(e)}")
return False
except Exception as e:
logger.error(f"验证token时出错: {str(e)}")
return False
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 安全检查,避免属性不存在的错误
if hasattr(self, 'group_name') and self.group_name:
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}")
# 如果有产品特定组,也移除
if hasattr(self, 'product_group_name') and self.product_group_name:
async_to_sync(self.channel_layer.group_discard)(
self.product_group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {getattr(self, 'group_name', 'unknown')}, 关闭代码: {close_code}")
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
@ -74,38 +160,105 @@ class CampaignStatusConsumer(WebsocketConsumer):
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
# 指定了产品ID时只获取该产品的数据
if self.product_id:
try:
# 获取产品信息
product = Product.objects.get(id=self.product_id)
products = [product]
except Product.DoesNotExist:
logger.error(f"找不到产品: {self.product_id}")
if campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 否则,获取所有关联产品的数据
elif campaign.link_product.exists():
products = list(campaign.link_product.all())
else:
products = []
# 如果没有产品,返回空数据
if not products:
return {
"campaign_id": str(campaign.id),
"products": []
}
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 构建达人列表数据
# 准备返回数据
products_data = []
# 遍历所有产品
for product in products:
creator_list = []
# 遍历所有达人
for cc in creator_campaigns:
creator = cc.creator
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 获取最新状态
creator_id = creator.id
product_id = str(product.id)
status = None
# 尝试从API获取状态
try:
status = OfferStatusService.fetch_status(creator_id, product_id)
except Exception as e:
logger.error(f"获取达人状态时出错: {str(e)}")
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"creator_name": creator.name,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
"status": status
}
creator_list.append(creator_data)
return creator_list
# 构建产品数据
product_data = {
"product_id": str(product.id),
"product_name": product.name,
"creators": creator_list
}
products_data.append(product_data)
# 如果只指定了一个产品,返回单产品格式
if self.product_id and len(products_data) == 1:
return {
"campaign_id": str(campaign.id),
"product_id": products_data[0]["product_id"],
"product_name": products_data[0]["product_name"],
"creators": products_data[0]["creators"]
}
# 否则返回多产品格式
return {
"campaign_id": str(campaign.id),
"products": products_data
}
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return []
return {
"campaign_id": self.campaign_id,
"product_id": self.product_id if self.product_id else None,
"product_name": None,
"creators": []
}
def send_initial_status(self):
"""发送初始状态信息"""
@ -151,20 +304,25 @@ class CampaignStatusConsumer(WebsocketConsumer):
campaign_id=campaign.id
).select_related('creator')
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 获取所有产品ID列表
product_ids = []
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 如果指定了产品ID只轮询该产品
if self.product_id:
product_ids.append(self.product_id)
# 否则,获取所有关联产品
elif campaign.link_product.exists():
for product in campaign.link_product.all():
product_ids.append(str(product.id))
# 如果没有关联产品使用活动ID作为备选
if not product_ids:
product_ids.append(str(self.campaign_id))
# 构建达人-产品对
creator_product_pairs = []
for cc in creator_campaigns:
creator_id = cc.creator_id
for creator_campaign in creator_campaigns:
creator_id = creator_campaign.creator_id
for product_id in product_ids:
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
@ -174,7 +332,7 @@ class CampaignStatusConsumer(WebsocketConsumer):
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询")
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询,包含 {len(product_ids)} 个产品和 {len(creator_campaigns)} 个达人")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")

View File

@ -3,5 +3,7 @@ from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
# 支持带有查询参数的WebSocket路由
re_path(r'ws/campaigns/(?P<campaign_id>\w+)/status/$', consumers.CampaignStatusConsumer.as_asgi()),
re_path(r'ws/campaigns/(?P<campaign_id>\w+)/products/(?P<product_id>[\w-]+)/status/$', consumers.CampaignStatusConsumer.as_asgi()),
]

View File

@ -19,24 +19,38 @@ class OfferStatusService:
:return: 状态字符串
"""
try:
url = "http://127.0.0.1:8000/api/operation/negotiations/offer_status/"
url = "http://81.69.223.133:58099/api/operation/negotiations/offer_status/"
payload = {
'creator_id': str(creator_id),
'product_id': str(product_id)
}
# 记录请求详情
logger.info(f"发送状态查询请求: URL={url}, 参数={payload}")
response = requests.post(url, data=payload)
# 记录响应状态码
logger.info(f"接收状态查询响应: 状态码={response.status_code}")
if response.status_code == 200:
data = response.json()
# 记录响应内容
logger.info(f"状态查询响应内容: {data}")
if data['code'] == 200:
return data['data']['status']
else:
logger.error(f"获取谈判状态失败: {data['message']}")
return None
else:
logger.error(f"请求谈判状态接口失败: {response.status_code}")
# 记录详细错误信息
error_content = None
try:
error_content = response.text[:200] # 只取前200个字符
except:
pass
logger.error(f"请求谈判状态接口失败: {response.status_code}, 响应内容: {error_content}")
return None
except Exception as e:
@ -78,14 +92,35 @@ class OfferStatusService:
return False
@staticmethod
def get_campaign_creator_data(campaign_id):
def get_campaign_creator_data(campaign_id, product_id=None):
"""
获取活动关联的所有达人信息
:param campaign_id: 活动ID
:param product_id: 产品ID可选
:return: 达人信息列表
"""
try:
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from ..models import Campaign, Product
# 查询活动信息
campaign = Campaign.objects.get(id=campaign_id)
# 获取产品信息
product_name = None
if product_id:
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_id = None
# 如果没有指定产品ID则使用活动关联的第一个产品
if not product_id and campaign.link_product.exists():
product = campaign.link_product.first()
product_id = str(product.id)
product_name = product.name
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
@ -96,21 +131,28 @@ class OfferStatusService:
for cc in creator_campaigns:
creator = cc.creator
# 获取最新状态
status = None
if product_id:
status = OfferStatusService.fetch_status(creator.id, product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"creator_name": creator.name,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": avg_views_formatted,
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
"status": status
}
creator_list.append(creator_data)
@ -121,12 +163,13 @@ class OfferStatusService:
return []
@staticmethod
def send_status_update(campaign_id, creator_id, status):
def send_status_update(campaign_id, creator_id, status, product_id=None):
"""
通过WebSocket发送状态更新
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 状态
:param product_id: 产品ID可选
"""
try:
# 先更新数据库中的状态
@ -136,28 +179,142 @@ class OfferStatusService:
if not updated:
return
# 获取最新的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id)
channel_layer = get_channel_layer()
# 构建消息数据 - 使用标准的API响应格式
message = {
# 构建消息数据
from ..models import Campaign, Product
try:
campaign = Campaign.objects.get(id=campaign_id)
# 1. 发送到产品特定的群组
if product_id:
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, product_id)
# 查询产品名称
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_name = None
# 构建产品特定的消息
product_message = {
'code': 200,
'message': '状态已更新',
'data': creator_list
'data': {
'campaign_id': str(campaign_id),
'product_id': str(product_id),
'product_name': product_name,
'creators': creator_list
}
}
# 发送到活动特定的群组
# 发送到产品特定的群组
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
f'campaign_{campaign_id}',
product_group_name,
{
'type': 'send_update',
'message': json.dumps(message)
'message': json.dumps(product_message)
}
)
logger.info(f"已发送状态更新: 活动 {campaign_id}, 达人 {creator_id}, 状态 {status}")
logger.info(f"已发送状态更新到产品特定组: {product_group_name}")
# 2. 发送到活动通用群组,包含所有产品数据
# 获取所有产品
products_data = []
# 如果活动有关联产品
if campaign.link_product.exists():
for product in campaign.link_product.all():
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, str(product.id))
# 构建产品数据
product_data = {
'product_id': str(product.id),
'product_name': product.name,
'creators': creator_list
}
products_data.append(product_data)
# 如果没有关联产品仍然需要使用活动ID作为产品ID
else:
# 使用活动ID作为产品ID
fallback_product_id = str(campaign_id)
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, fallback_product_id)
# 构建产品数据
product_data = {
'product_id': fallback_product_id,
'product_name': campaign.name,
'creators': creator_list
}
products_data.append(product_data)
# 构建活动通用消息
campaign_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'products': products_data
}
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(campaign_message)
}
)
logger.info(f"已发送状态更新到活动通用组: {campaign_group_name}")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {campaign_id}")
except Exception as e:
logger.error(f"构建消息数据时出错: {str(e)}")
# 如果构建消息失败,发送简单的状态更新消息
simple_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'creator_id': str(creator_id),
'status': status
}
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
# 如果有产品ID也发送到产品特定群组
if product_id:
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
product_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
logger.info(f"已发送状态更新: 活动 {campaign_id}, 产品 {product_id}, 达人 {creator_id}, 状态 {status}")
except Exception as e:
logger.error(f"发送WebSocket更新失败: {str(e)}")

View File

@ -50,19 +50,47 @@ class StatusPollingService:
# 等待线程结束
if campaign_id in self._polling_threads:
self._polling_threads[campaign_id].join(timeout=5)
thread = self._polling_threads[campaign_id]
logger.info(f"正在停止活动 {campaign_id} 的状态轮询...")
# 设置超时,避免无限等待
thread.join(timeout=5)
if thread.is_alive():
logger.warning(f"活动 {campaign_id} 的轮询线程在5秒内未能停止")
else:
logger.info(f"活动 {campaign_id} 的轮询线程已成功停止")
# 清理资源
del self._polling_threads[campaign_id]
del self._stop_events[campaign_id]
logger.info(f"已停止活动 {campaign_id} 的状态轮询")
return True
else:
logger.warning(f"未找到活动 {campaign_id} 的轮询任务")
return False
def stop_all(self):
"""停止所有轮询"""
campaign_ids = list(self._polling_threads.keys())
logger.info(f"正在停止所有轮询任务,活动数量: {len(campaign_ids)}")
success_count = 0
for campaign_id in campaign_ids:
self.stop_polling(campaign_id)
if self.stop_polling(campaign_id):
success_count += 1
logger.info(f"已停止 {success_count}/{len(campaign_ids)} 个轮询任务")
return success_count
def get_active_pollings(self):
"""获取当前正在运行的所有轮询任务信息"""
polling_info = []
for campaign_id, thread in self._polling_threads.items():
polling_info.append({
'campaign_id': campaign_id,
'thread_name': thread.name,
'is_alive': thread.is_alive(),
'daemon': thread.daemon
})
return polling_info
def _polling_worker(self, campaign_id, creator_product_pairs, interval, stop_event):
"""
@ -84,8 +112,8 @@ class StatusPollingService:
status = OfferStatusService.fetch_status(creator_id, product_id)
if status:
# 发送状态更新
OfferStatusService.send_status_update(campaign_id, creator_id, status)
# 发送状态更新传递产品ID
OfferStatusService.send_status_update(campaign_id, creator_id, status, product_id)
except Exception as e:
logger.error(f"处理达人 {creator_id} 状态时出错: {str(e)}")

View File

@ -164,6 +164,52 @@ class CampaignViewSet(viewsets.ModelViewSet):
authentication_classes = [CustomTokenAuthentication]
permission_classes = [IsAuthenticated]
def get_permissions(self):
"""根据不同的操作设置不同的权限"""
if self.action in ['stop_polling', 'active_pollings', 'token_info']:
# 这些操作不需要身份验证
return []
return super().get_permissions()
@action(detail=False, methods=['get'], url_path='token-info')
def token_info(self, request):
"""获取当前用户的token信息和WebSocket URL示例"""
# 检查用户是否已认证
if not request.user.is_authenticated:
return api_response(code=401, message="未授权,请先登录", data=None)
# 获取当前用户的token
from apps.user.models import UserToken
token = None
user_token = UserToken.objects.filter(user=request.user).first()
if user_token:
token = user_token.token
# 如果没有token返回错误
if not token:
return api_response(code=404, message="未找到有效的token请重新登录", data=None)
# 构建示例WebSocket URL
base_url = request.get_host()
ws_protocol = 'wss' if request.is_secure() else 'ws'
# 构建示例URL
ws_examples = {
"活动状态WebSocket": f"{ws_protocol}://{base_url}/ws/campaigns/1/status/?token={token}",
"活动产品状态WebSocket": f"{ws_protocol}://{base_url}/ws/campaigns/1/products/123/status/?token={token}",
}
# 构建响应
data = {
"user_id": request.user.id,
"email": request.user.email,
"token": token,
"token_expired_at": user_token.expired_at.strftime('%Y-%m-%d %H:%M:%S') if hasattr(user_token, 'expired_at') else None,
"websocket_examples": ws_examples
}
return api_response(data=data)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer(queryset, many=True)
@ -268,25 +314,46 @@ class CampaignViewSet(viewsets.ModelViewSet):
campaign = self.get_object()
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
# 获取所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign.id)
# 启动状态轮询(当有用户请求此接口时)
try:
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 获取活动关联的所有产品
products = campaign.link_product.all()
# 如果没有关联产品,则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 如果没有关联产品,使用活动本身作为产品
if not products.exists():
products = [campaign]
all_creator_list = []
# 遍历每个产品,获取相关达人
for product in products:
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
for cc in creator_campaigns:
creator = cc.creator
# 构建响应数据
creator_data = {
"name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"GMV Generated": f"${creator.gmv}k" if creator.gmv else "$0",
"Views Generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"Pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"Status": cc.status
}
all_creator_list.append(creator_data)
# 启动状态轮询
try:
# 构建达人-产品对
creator_product_pairs = []
for creator_data in creator_list:
creator_id = creator_data['id']
for product in products:
product_id = product.id
for cc in creator_campaigns:
creator_id = cc.creator_id
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
@ -299,7 +366,33 @@ class CampaignViewSet(viewsets.ModelViewSet):
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")
return api_response(data=creator_list)
# 构建活动基本信息
campaign_info = {
"name": campaign.name,
"description": campaign.description,
"image_url": campaign.image_url,
"service": campaign.service,
"creator_type": campaign.creator_type,
"creator_level": campaign.creator_level,
"creator_category": campaign.creator_category,
"creators_count": len(all_creator_list),
"gmv": campaign.gmv,
"followers": campaign.followers,
"views": campaign.views,
"budget": campaign.budget,
"start_date": campaign.start_date.strftime('%Y-%m-%d') if campaign.start_date else None,
"end_date": campaign.end_date.strftime('%Y-%m-%d') if campaign.end_date else None,
"status": campaign.status
}
return api_response(data={
"campaign": campaign_info,
"creators": all_creator_list
})
except Exception as e:
logger.error(f"获取活动达人列表失败: {str(e)}")
return api_response(code=500, message=f"获取活动达人列表失败: {str(e)}", data=None)
@action(detail=True, methods=['post'])
def update_creator_status(self, request, pk=None):
@ -308,8 +401,9 @@ class CampaignViewSet(viewsets.ModelViewSet):
from apps.daren_detail.models import CreatorCampaign
from .services.offer_status_service import OfferStatusService
# 获取传入的达人ID
# 获取传入的达人ID和产品ID
creator_id = request.data.get('creator_id')
product_id = request.data.get('product_id')
if not creator_id:
return api_response(code=400, message="缺少必要参数: creator_id", data=None)
@ -321,15 +415,13 @@ class CampaignViewSet(viewsets.ModelViewSet):
creator_id=creator_id
)
# 获取产品ID
product_id = None
# 如果没有提供产品ID获取活动的第一个关联产品或使用活动ID
if not product_id:
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
product_id = str(product.id)
else:
product_id = str(campaign.id)
# 获取最新状态
status = OfferStatusService.fetch_status(creator_id, product_id)
@ -342,8 +434,8 @@ class CampaignViewSet(viewsets.ModelViewSet):
# 获取所有达人的最新数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign.id)
# 发送WebSocket更新
OfferStatusService.send_status_update(campaign.id, creator_id, status)
# 发送WebSocket更新传递产品ID
OfferStatusService.send_status_update(campaign.id, creator_id, status, product_id)
return api_response(message="状态已更新", data=creator_list)
else:
@ -355,6 +447,272 @@ class CampaignViewSet(viewsets.ModelViewSet):
logger.error(f"更新达人状态时出错: {str(e)}")
return api_response(code=500, message=f"更新状态失败: {str(e)}", data=None)
@action(detail=True, methods=['get'])
def product_creators(self, request, pk=None):
"""根据活动ID和产品ID获取达人列表"""
campaign = self.get_object()
product_id = request.query_params.get('product_id')
try:
# 获取与活动关联的所有达人
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 如果指定了产品ID返回单产品的达人数据
if product_id:
# 获取产品信息
product = get_object_or_404(Product, id=product_id)
creator_list = []
# 构建达人-产品对,用于获取状态
creator_product_pairs = []
for cc in creator_campaigns:
creator = cc.creator
creator_id = creator.id
# 添加到达人-产品对列表
creator_product_pairs.append((creator_id, product_id))
# 获取状态
status = OfferStatusService.fetch_status(creator_id, product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
else:
# 更新数据库中的状态
cc.status = status
cc.save(update_fields=['status', 'update_time'])
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": status
}
creator_list.append(creator_data)
# 启动轮询服务
if creator_product_pairs:
try:
polling_service.start_polling(
campaign_id=campaign.id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")
# 构建单产品响应
response_data = {
"campaign_id": str(campaign.id),
"product_id": str(product.id),
"product_name": product.name,
"creators": creator_list
}
return api_response(data=response_data)
# 如果没有指定产品ID返回所有产品的达人数据
else:
# 获取活动关联的所有产品
products = campaign.link_product.all()
# 如果没有关联产品,使用活动本身作为产品
if not products.exists():
products = []
# 构建达人-产品对使用活动ID作为产品ID
creator_product_pairs = []
fallback_product_id = str(campaign.id)
for cc in creator_campaigns:
creator_id = cc.creator_id
creator_product_pairs.append((creator_id, fallback_product_id))
# 获取所有达人数据
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
creator_id = creator.id
# 获取状态
status = OfferStatusService.fetch_status(creator_id, fallback_product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
else:
# 更新数据库中的状态
cc.status = status
cc.save(update_fields=['status', 'update_time'])
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": status
}
creator_list.append(creator_data)
# 启动轮询服务
if creator_product_pairs:
try:
polling_service.start_polling(
campaign_id=campaign.id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")
# 构建单产品响应(使用活动作为产品)
response_data = {
"campaign_id": str(campaign.id),
"product_id": fallback_product_id,
"product_name": campaign.name,
"creators": creator_list
}
return api_response(data=response_data)
# 如果有关联产品,返回所有产品的达人数据
products_data = []
all_creator_product_pairs = []
for product in products:
product_id = str(product.id)
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
creator_id = creator.id
# 添加到达人-产品对列表
all_creator_product_pairs.append((creator_id, product_id))
# 获取状态
status = OfferStatusService.fetch_status(creator_id, product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
else:
# 更新数据库中的状态
cc.status = status
cc.save(update_fields=['status', 'update_time'])
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": f"{int(creator.followers / 1000)}k" if creator.followers else "0",
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": status
}
creator_list.append(creator_data)
# 构建产品数据
product_data = {
"product_id": product_id,
"product_name": product.name,
"creators": creator_list
}
products_data.append(product_data)
# 启动轮询服务
if all_creator_product_pairs:
try:
polling_service.start_polling(
campaign_id=campaign.id,
creator_product_pairs=all_creator_product_pairs,
interval=30 # 每30秒轮询一次
)
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")
# 构建多产品响应
response_data = {
"campaign_id": str(campaign.id),
"products": products_data
}
return api_response(data=response_data)
except Exception as e:
logger.error(f"获取活动产品达人列表时出错: {str(e)}")
return api_response(code=500, message=f"获取活动产品达人列表失败: {str(e)}", data=None)
@action(detail=False, methods=['post'], url_path='stop-polling')
def stop_polling(self, request):
"""停止指定活动或所有活动的状态轮询"""
campaign_id = request.data.get('campaign_id')
if campaign_id:
# 停止指定活动的轮询
result = polling_service.stop_polling(campaign_id)
if result:
return api_response(message=f"已停止活动 {campaign_id} 的状态轮询")
else:
return api_response(code=404, message=f"未找到活动 {campaign_id} 的轮询任务")
else:
# 停止所有轮询
count = polling_service.stop_all()
return api_response(message=f"已停止 {count} 个活动的状态轮询")
@action(detail=False, methods=['get'], url_path='active-pollings')
def active_pollings(self, request):
"""获取当前正在运行的所有轮询任务信息"""
active_pollings = polling_service.get_active_pollings()
return api_response(data=active_pollings)
@action(detail=True, methods=['get'], url_path='websocket-url')
def get_websocket_url(self, request, pk=None):
"""获取带认证的WebSocket连接URL"""
campaign = self.get_object()
product_id = request.query_params.get('product_id')
# 获取当前用户的token
from apps.user.models import UserToken
token = None
if request.user.is_authenticated:
user_token = UserToken.objects.filter(user=request.user).first()
if user_token:
token = user_token.token
# 如果没有token返回错误
if not token:
return api_response(code=401, message="未授权,请先登录", data=None)
# 构建基础URL
base_url = request.get_host()
ws_protocol = 'wss' if request.is_secure() else 'ws'
# 根据是否有产品ID构建不同的WebSocket URL
if product_id:
ws_url = f"{ws_protocol}://{base_url}/ws/campaigns/{campaign.id}/products/{product_id}/status/?token={token}"
else:
ws_url = f"{ws_protocol}://{base_url}/ws/campaigns/{campaign.id}/status/?token={token}"
return api_response(data={"websocket_url": ws_url})
class BrandChatSessionViewSet(viewsets.ModelViewSet):
"""品牌聊天会话API视图集"""

View File

@ -29,3 +29,167 @@ ERROR 2025-05-20 17:40:33,375 offer_status_service
ERROR 2025-05-20 17:41:03,425 offer_status_service 获取谈判状态失败: 不存在与用户id为14和商品id为241a67e0-1c99-44de-a5dd-40622ffa23b6的谈判
ERROR 2025-05-20 17:41:33,477 offer_status_service 获取谈判状态失败: 不存在与用户id为14和商品id为241a67e0-1c99-44de-a5dd-40622ffa23b6的谈判
ERROR 2025-05-20 17:42:03,529 offer_status_service 获取谈判状态失败: 不存在与用户id为14和商品id为241a67e0-1c99-44de-a5dd-40622ffa23b6的谈判
INFO 2025-05-21 15:45:25,705 consumers WebSocket连接已建立: campaign_1
INFO 2025-05-21 16:21:02,647 consumers WebSocket连接已断开: campaign_1, 关闭代码: None
ERROR 2025-05-21 16:21:26,879 consumers 获取创作者数据出错: ['“2” is not a valid UUID.']
INFO 2025-05-21 16:21:26,888 consumers WebSocket连接已建立: campaign_1_product_2
INFO 2025-05-21 16:21:43,287 consumers WebSocket连接已断开: campaign_1_product_2, 关闭代码: None
INFO 2025-05-21 16:23:05,257 consumers WebSocket连接已建立: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
INFO 2025-05-21 16:26:53,083 consumers WebSocket连接已建立: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
INFO 2025-05-21 16:27:02,643 consumers WebSocket连接已断开: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7, 关闭代码: None
INFO 2025-05-21 16:27:23,523 consumers WebSocket连接已建立: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
INFO 2025-05-21 16:30:34,682 consumers WebSocket连接已建立: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
INFO 2025-05-21 16:32:45,178 consumers WebSocket连接已建立: campaign_1_product_5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
INFO 2025-05-21 16:34:58,033 consumers WebSocket连接已建立: campaign_1
ERROR 2025-05-21 16:36:53,382 consumers 找不到产品: 5b3c3ee6-ba51-48ad-97ae-0f15a50ae5b7
ERROR 2025-05-21 16:36:56,662 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:36:56,673 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 16:36:56,676 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 16:36:56,676 consumers WebSocket连接已建立: campaign_1
ERROR 2025-05-21 16:36:59,897 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:37:33,115 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:38:03,664 consumers WebSocket连接已断开: campaign_1, 关闭代码: None
ERROR 2025-05-21 16:38:06,337 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:38:09,534 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:38:09,560 status_polling_service 已停止活动 1 的状态轮询
INFO 2025-05-21 16:38:09,560 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 16:38:09,561 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 16:38:09,561 consumers WebSocket连接已建立: campaign_1
ERROR 2025-05-21 16:38:12,838 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:38:46,381 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:38:49,978 consumers WebSocket连接已断开: campaign_1, 关闭代码: None
ERROR 2025-05-21 16:38:57,358 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:38:57,386 status_polling_service 已停止活动 1 的状态轮询
INFO 2025-05-21 16:38:57,387 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 16:38:57,388 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 16:38:57,389 consumers WebSocket连接已建立: campaign_1
ERROR 2025-05-21 16:39:00,777 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:39:06,175 consumers WebSocket连接已断开: campaign_1, 关闭代码: None
ERROR 2025-05-21 16:39:35,384 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:40:08,976 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:40:42,262 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:41:16,044 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:41:49,269 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:42:23,263 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:42:57,384 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:43:31,316 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:44:04,535 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:44:37,749 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:45:11,002 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:45:44,220 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:46:17,688 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:46:51,162 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:47:34,922 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:48:08,121 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:48:39,328 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:49:13,738 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:49:47,160 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:50:20,406 offer_status_service 请求谈判状态接口失败: 502
ERROR 2025-05-21 16:50:55,630 offer_status_service 请求谈判状态接口失败: 502
INFO 2025-05-21 16:51:23,845 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:51:27,038 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:51:27,038 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:51:27,069 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:51:27,069 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 16:51:27,070 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 16:51:27,071 consumers WebSocket连接已建立: campaign_1
INFO 2025-05-21 16:51:30,275 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:51:30,275 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:52:00,282 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:52:03,480 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:52:03,480 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:52:33,493 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:52:36,702 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:52:36,702 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:53:06,707 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:53:09,918 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:53:09,918 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:53:39,927 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:53:43,328 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:53:43,328 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:54:13,339 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:54:16,588 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:54:16,588 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:54:46,595 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:54:49,853 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:54:49,853 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:55:19,857 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:55:24,235 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:55:24,235 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:55:54,248 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:55:57,459 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:55:57,459 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:56:00,237 consumers WebSocket连接已断开: campaign_1, 关闭代码: None
INFO 2025-05-21 16:56:27,475 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:56:30,674 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:56:30,675 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:57:00,687 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 16:57:03,910 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 16:57:03,910 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 16:57:33,923 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:03:54,985 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:03:58,230 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:03:58,230 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:03:58,343 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:03:58,343 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 17:03:58,343 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 17:03:58,345 consumers WebSocket连接已建立: campaign_1
INFO 2025-05-21 17:04:01,648 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:04:01,648 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:04:31,655 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:04:35,089 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:04:35,089 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:05:05,101 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:05:08,340 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:05:08,340 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:05:53,832 status_polling_service 正在停止所有轮询任务,活动数量: 0
INFO 2025-05-21 17:05:53,833 status_polling_service 已停止 0/0 个轮询任务
WARNING 2025-05-21 17:05:57,260 status_polling_service 未找到活动 1 的轮询任务
INFO 2025-05-21 17:06:03,394 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:06:06,619 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:06:06,619 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:06:06,649 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:06:06,649 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 17:06:06,650 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 17:06:06,651 consumers WebSocket连接已建立: campaign_1
INFO 2025-05-21 17:06:09,886 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:06:09,886 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:06:35,565 status_polling_service 正在停止活动 1 的状态轮询...
INFO 2025-05-21 17:06:35,567 status_polling_service 活动 1 的轮询线程已成功停止
INFO 2025-05-21 17:06:35,567 status_polling_service 已停止活动 1 的状态轮询
WARNING 2025-05-21 17:10:49,067 consumers 未提供token
ERROR 2025-05-21 17:10:49,067 consumers WebSocket连接未授权缺少有效token: None
WARNING 2025-05-21 17:10:59,589 consumers 未提供token
ERROR 2025-05-21 17:10:59,590 consumers WebSocket连接未授权缺少有效token: None
WARNING 2025-05-21 17:11:21,414 consumers 未提供token
ERROR 2025-05-21 17:11:21,414 consumers WebSocket连接未授权缺少有效token: None
WARNING 2025-05-21 17:13:59,426 consumers 未提供token
ERROR 2025-05-21 17:13:59,426 consumers WebSocket连接未授权缺少有效token: None
INFO 2025-05-21 17:13:59,427 consumers WebSocket连接已断开: None, 关闭代码: 1006
WARNING 2025-05-21 17:14:01,039 consumers 未提供token
ERROR 2025-05-21 17:14:01,039 consumers WebSocket连接未授权缺少有效token: None
INFO 2025-05-21 17:14:01,040 consumers WebSocket连接已断开: None, 关闭代码: 1006
WARNING 2025-05-21 17:14:34,895 consumers 未提供token
ERROR 2025-05-21 17:14:34,896 consumers WebSocket连接未授权缺少有效token: None
INFO 2025-05-21 17:14:34,897 consumers WebSocket连接已断开: None, 关闭代码: 1006
WARNING 2025-05-21 17:15:06,268 consumers Token认证失败: 无效或过期的token
ERROR 2025-05-21 17:15:06,268 consumers WebSocket连接未授权缺少有效token: c859e71135f1031a8cf007681c22734f2b443a80
INFO 2025-05-21 17:15:06,271 consumers WebSocket连接已断开: None, 关闭代码: 1006
INFO 2025-05-21 17:15:19,040 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:15:22,320 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:15:22,320 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:15:22,353 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:15:22,353 status_polling_service 已启动活动 1 的状态轮询,间隔 30 秒
INFO 2025-05-21 17:15:22,355 consumers 已启动活动 1 的状态轮询,包含 1 个产品和 1 个达人
INFO 2025-05-21 17:15:22,356 consumers WebSocket连接已建立: campaign_1
INFO 2025-05-21 17:15:25,811 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:15:25,811 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:15:55,825 offer_status_service 发送状态查询请求: URL=http://81.69.223.133:58099/api/operation/negotiations/offer_status/, 参数={'creator_id': '14', 'product_id': '241a67e0-1c99-44de-a5dd-40622ffa23b6'}
INFO 2025-05-21 17:15:59,101 offer_status_service 接收状态查询响应: 状态码=502
ERROR 2025-05-21 17:15:59,101 offer_status_service 请求谈判状态接口失败: 502, 响应内容:
INFO 2025-05-21 17:16:15,343 status_polling_service 正在停止所有轮询任务,活动数量: 1
INFO 2025-05-21 17:16:15,343 status_polling_service 正在停止活动 1 的状态轮询...
INFO 2025-05-21 17:16:15,343 status_polling_service 活动 1 的轮询线程已成功停止
INFO 2025-05-21 17:16:15,343 status_polling_service 已停止活动 1 的状态轮询
INFO 2025-05-21 17:16:15,345 status_polling_service 已停止 1/1 个轮询任务
INFO 2025-05-21 17:16:36,532 consumers WebSocket连接已断开: campaign_1, 关闭代码: None