ws更新达人与活动的状态

This commit is contained in:
wanjia 2025-05-20 12:17:45 +08:00
parent efafe4c452
commit 4f102eb26b
9 changed files with 593 additions and 10 deletions

182
apps/brands/consumers.py Normal file
View File

@ -0,0 +1,182 @@
import json
import logging
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .services.status_polling_service import polling_service
from .models import Campaign
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
logger = logging.getLogger('brands')
class CampaignStatusConsumer(WebsocketConsumer):
"""处理活动状态更新的WebSocket消费者"""
def connect(self):
"""处理WebSocket连接请求"""
# 获取活动ID从URL路由参数
self.campaign_id = self.scope['url_route']['kwargs']['campaign_id']
self.group_name = f'campaign_{self.campaign_id}'
# 将连接添加到频道组
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
# 接受WebSocket连接
self.accept()
# 发送初始状态
self.send_initial_status()
# 启动轮询
self.start_status_polling()
logger.info(f"WebSocket连接已建立: {self.group_name}")
def disconnect(self, close_code):
"""处理WebSocket断开连接"""
# 将连接从频道组移除
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
logger.info(f"WebSocket连接已断开: {self.group_name}, 关闭代码: {close_code}")
def receive(self, text_data):
"""处理从WebSocket客户端接收的消息"""
try:
# 解析接收到的JSON数据
data = json.loads(text_data)
action = data.get('action')
# 处理刷新请求
if action == 'refresh':
self.send_initial_status()
logger.debug(f"接收到WebSocket消息: {text_data}")
except json.JSONDecodeError:
logger.error(f"接收到无效的JSON数据: {text_data}")
except Exception as e:
logger.error(f"处理WebSocket消息时出错: {str(e)}")
def send_update(self, event):
"""向WebSocket客户端发送更新消息"""
# 直接转发消息
self.send(text_data=event['message'])
def get_creator_data(self):
"""获取创作者数据列表"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 构建达人列表数据
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取创作者数据出错: {str(e)}")
return []
def send_initial_status(self):
"""发送初始状态信息"""
try:
# 获取创作者数据
creator_list = self.get_creator_data()
# 构建并发送标准格式消息
message = {
'code': 200,
'message': '获取成功',
'data': creator_list
}
self.send(text_data=json.dumps(message))
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
message = {
'code': 404,
'message': '找不到活动',
'data': None
}
self.send(text_data=json.dumps(message))
except Exception as e:
logger.error(f"发送初始状态时出错: {str(e)}")
message = {
'code': 500,
'message': f'服务器错误: {str(e)}',
'data': None
}
self.send(text_data=json.dumps(message))
def start_status_polling(self):
"""启动状态轮询"""
try:
# 查询活动信息
campaign = Campaign.objects.get(id=self.campaign_id)
# 查询活动关联的达人
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign.id
).select_related('creator')
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 构建达人-产品对
creator_product_pairs = []
for cc in creator_campaigns:
creator_id = cc.creator_id
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=self.campaign_id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
logger.info(f"已启动活动 {self.campaign_id} 的状态轮询")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {self.campaign_id}")
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")

7
apps/brands/routing.py Normal file
View File

@ -0,0 +1,7 @@
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/campaigns/(?P<campaign_id>\w+)/status/$', consumers.CampaignStatusConsumer.as_asgi()),
]

View File

@ -1 +1 @@
# 服务模块初始化

View File

@ -0,0 +1,163 @@
import requests
import logging
import json
from django.conf import settings
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
logger = logging.getLogger('brands')
class OfferStatusService:
"""提供获取达人谈判状态的服务"""
@staticmethod
def fetch_status(creator_id, product_id):
"""
获取达人对产品的谈判状态
:param creator_id: 达人ID
:param product_id: 产品ID
:return: 状态字符串
"""
try:
url = "http://127.0.0.1:8000/api/operation/negotiations/offer_status/"
payload = {
'creator_id': str(creator_id),
'product_id': str(product_id)
}
response = requests.post(url, data=payload)
if response.status_code == 200:
data = response.json()
if data['code'] == 200:
return data['data']['status']
else:
logger.error(f"获取谈判状态失败: {data['message']}")
return None
else:
logger.error(f"请求谈判状态接口失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取谈判状态时发生错误: {str(e)}")
return None
@staticmethod
def update_creator_status(campaign_id, creator_id, status):
"""
更新达人的状态
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 新状态
:return: 是否更新成功
"""
try:
from apps.daren_detail.models import CreatorCampaign
# 更新数据库中的状态
creator_campaign = CreatorCampaign.objects.get(
campaign_id=campaign_id,
creator_id=creator_id
)
# 如果状态没有变化,则不进行更新
if creator_campaign.status == status:
return False
creator_campaign.status = status
creator_campaign.save()
logger.info(f"已更新数据库中的状态: 活动 {campaign_id}, 达人 {creator_id}, 状态 {status}")
return True
except CreatorCampaign.DoesNotExist:
logger.error(f"找不到关联记录: 活动 {campaign_id}, 达人 {creator_id}")
return False
except Exception as e:
logger.error(f"更新达人状态时发生错误: {str(e)}")
return False
@staticmethod
def get_campaign_creator_data(campaign_id):
"""
获取活动关联的所有达人信息
:param campaign_id: 活动ID
:return: 达人信息列表
"""
try:
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign_id
).select_related('creator')
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"id": str(creator.id),
"name": creator.name,
"avatar": creator.avatar_url,
"category": creator.category,
"followers": followers_formatted,
"views": avg_views_formatted,
"gmv": f"${creator.gmv}k" if creator.gmv else "$0",
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": cc.status
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取活动达人数据时发生错误: {str(e)}")
return []
@staticmethod
def send_status_update(campaign_id, creator_id, status):
"""
通过WebSocket发送状态更新
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 状态
"""
try:
# 先更新数据库中的状态
updated = OfferStatusService.update_creator_status(campaign_id, creator_id, status)
# 如果状态没有变化,则不发送更新
if not updated:
return
# 获取最新的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id)
channel_layer = get_channel_layer()
# 构建消息数据 - 使用标准的API响应格式
message = {
'code': 200,
'message': '状态已更新',
'data': creator_list
}
# 发送到活动特定的群组
async_to_sync(channel_layer.group_send)(
f'campaign_{campaign_id}',
{
'type': 'send_update',
'message': json.dumps(message)
}
)
logger.info(f"已发送状态更新: 活动 {campaign_id}, 达人 {creator_id}, 状态 {status}")
except Exception as e:
logger.error(f"发送WebSocket更新失败: {str(e)}")

View File

@ -0,0 +1,101 @@
import threading
import time
import logging
from django.db import close_old_connections
from .offer_status_service import OfferStatusService
logger = logging.getLogger('brands')
class StatusPollingService:
"""提供定时轮询服务,定期获取并更新达人状态"""
def __init__(self):
self._polling_threads = {} # 保存活动ID到线程的映射
self._stop_events = {} # 保存活动ID到停止事件的映射
def start_polling(self, campaign_id, creator_product_pairs, interval=30):
"""
开始轮询指定活动的达人状态
:param campaign_id: 活动ID
:param creator_product_pairs: 达人ID和产品ID的对应关系列表格式为 [(creator_id, product_id), ...]
:param interval: 轮询间隔时间
"""
# 如果该活动已有轮询线程,先停止它
if campaign_id in self._polling_threads:
self.stop_polling(campaign_id)
# 创建停止事件
stop_event = threading.Event()
self._stop_events[campaign_id] = stop_event
# 创建并启动轮询线程
thread = threading.Thread(
target=self._polling_worker,
args=(campaign_id, creator_product_pairs, interval, stop_event),
daemon=True
)
self._polling_threads[campaign_id] = thread
thread.start()
logger.info(f"已启动活动 {campaign_id} 的状态轮询,间隔 {interval}")
def stop_polling(self, campaign_id):
"""
停止指定活动的轮询
:param campaign_id: 活动ID
"""
if campaign_id in self._stop_events:
# 设置停止事件
self._stop_events[campaign_id].set()
# 等待线程结束
if campaign_id in self._polling_threads:
self._polling_threads[campaign_id].join(timeout=5)
# 清理资源
del self._polling_threads[campaign_id]
del self._stop_events[campaign_id]
logger.info(f"已停止活动 {campaign_id} 的状态轮询")
def stop_all(self):
"""停止所有轮询"""
campaign_ids = list(self._polling_threads.keys())
for campaign_id in campaign_ids:
self.stop_polling(campaign_id)
def _polling_worker(self, campaign_id, creator_product_pairs, interval, stop_event):
"""
轮询工作线程
:param campaign_id: 活动ID
:param creator_product_pairs: 达人ID和产品ID的对应关系列表
:param interval: 轮询间隔
:param stop_event: 停止事件
"""
while not stop_event.is_set():
try:
# 关闭旧的数据库连接
close_old_connections()
# 遍历每个达人-产品对,获取并发送状态更新
for creator_id, product_id in creator_product_pairs:
try:
# 获取状态
status = OfferStatusService.fetch_status(creator_id, product_id)
if status:
# 发送状态更新
OfferStatusService.send_status_update(campaign_id, creator_id, status)
except Exception as e:
logger.error(f"处理达人 {creator_id} 状态时出错: {str(e)}")
# 等待指定时间间隔
stop_event.wait(interval)
except Exception as e:
logger.error(f"轮询线程发生错误: {str(e)}")
# 短暂休眠后继续
time.sleep(5)
# 创建单例实例
polling_service = StatusPollingService()

View File

@ -2,6 +2,7 @@ from django.shortcuts import render, get_object_or_404
from rest_framework import viewsets, status from rest_framework import viewsets, status
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.response import Response from rest_framework.response import Response
import logging
from .models import Brand, Product, Campaign, BrandChatSession from .models import Brand, Product, Campaign, BrandChatSession
from .serializers import ( from .serializers import (
@ -11,6 +12,10 @@ from .serializers import (
BrandChatSessionSerializer, BrandChatSessionSerializer,
BrandDetailSerializer BrandDetailSerializer
) )
from .services.status_polling_service import polling_service
from .services.offer_status_service import OfferStatusService
logger = logging.getLogger(__name__)
def api_response(code=200, message="成功", data=None): def api_response(code=200, message="成功", data=None):
"""统一API响应格式""" """统一API响应格式"""
@ -269,6 +274,99 @@ class CampaignViewSet(viewsets.ModelViewSet):
except Exception as e: except Exception as e:
return api_response(code=500, message=f"移除产品失败: {str(e)}", data=None) return api_response(code=500, message=f"移除产品失败: {str(e)}", data=None)
@action(detail=True, methods=['get'])
def creator_list(self, request, pk=None):
"""获取活动关联的达人列表"""
campaign = self.get_object()
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
# 获取所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign.id)
# 启动状态轮询(当有用户请求此接口时)
try:
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 构建达人-产品对
creator_product_pairs = []
for creator_data in creator_list:
creator_id = creator_data['id']
creator_product_pairs.append((creator_id, product_id))
# 启动轮询
if creator_product_pairs:
polling_service.start_polling(
campaign_id=campaign.id,
creator_product_pairs=creator_product_pairs,
interval=30 # 每30秒轮询一次
)
except Exception as e:
logger.error(f"启动状态轮询时出错: {str(e)}")
return api_response(data=creator_list)
@action(detail=True, methods=['post'])
def update_creator_status(self, request, pk=None):
"""手动更新达人状态"""
campaign = self.get_object()
from apps.daren_detail.models import CreatorCampaign
from .services.offer_status_service import OfferStatusService
# 获取传入的达人ID
creator_id = request.data.get('creator_id')
if not creator_id:
return api_response(code=400, message="缺少必要参数: creator_id", data=None)
try:
# 查询达人与活动的关联
creator_campaign = CreatorCampaign.objects.get(
campaign_id=campaign.id,
creator_id=creator_id
)
# 获取产品ID
product_id = None
if campaign.link_product.exists():
product = campaign.link_product.first()
product_id = product.id
# 如果没有关联产品则使用活动ID作为产品ID
if not product_id:
product_id = campaign.id
# 获取最新状态
status = OfferStatusService.fetch_status(creator_id, product_id)
if status:
# 更新状态
creator_campaign.status = status
creator_campaign.save()
# 获取所有达人的最新数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign.id)
# 发送WebSocket更新
OfferStatusService.send_status_update(campaign.id, creator_id, status)
return api_response(message="状态已更新", data=creator_list)
else:
return api_response(code=500, message="获取状态失败", data=None)
except CreatorCampaign.DoesNotExist:
return api_response(code=404, message="找不到达人与活动的关联", data=None)
except Exception as e:
logger.error(f"更新达人状态时出错: {str(e)}")
return api_response(code=500, message=f"更新状态失败: {str(e)}", data=None)
class BrandChatSessionViewSet(viewsets.ModelViewSet): class BrandChatSessionViewSet(viewsets.ModelViewSet):
"""品牌聊天会话API视图集""" """品牌聊天会话API视图集"""

View File

@ -1,16 +1,26 @@
""" """
ASGI config for daren project. ASGI config for daren project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.1/howto/deployment/asgi/
""" """
import os import os
import django
# 设置Django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'daren.settings')
django.setup() # 添加这一行
from django.core.asgi import get_asgi_application from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'daren.settings') # 确保在django.setup()之后再导入
import apps.brands.routing
application = get_asgi_application() application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(
apps.brands.routing.websocket_urlpatterns
)
),
})

View File

@ -15,6 +15,8 @@ import os
# Build paths inside the project like this: BASE_DIR / 'subdir'. # Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent BASE_DIR = Path(__file__).resolve().parent.parent
import pymysql
pymysql.install_as_MySQLdb()
# Quick-start development settings - unsuitable for production # Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/ # See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/
@ -38,6 +40,8 @@ INSTALLED_APPS = [
'django.contrib.messages', 'django.contrib.messages',
'django_filters', 'django_filters',
'django.contrib.staticfiles', 'django.contrib.staticfiles',
'channels',
'rest_framework',
'apps.user.apps.UserConfig', 'apps.user.apps.UserConfig',
"apps.expertproducts.apps.ExpertproductsConfig", "apps.expertproducts.apps.ExpertproductsConfig",
"apps.daren_detail.apps.DarenDetailConfig", "apps.daren_detail.apps.DarenDetailConfig",
@ -77,6 +81,14 @@ TEMPLATES = [
] ]
WSGI_APPLICATION = 'daren.wsgi.application' WSGI_APPLICATION = 'daren.wsgi.application'
ASGI_APPLICATION = 'daren.asgi.application'
# WebSocket配置
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels.layers.InMemoryChannelLayer',
},
}
# Database # Database
@ -93,9 +105,9 @@ DATABASES = {
'OPTIONS': { 'OPTIONS': {
'charset': 'utf8mb4', 'charset': 'utf8mb4',
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'", 'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
'connect_timeout': 60, # 连接超时时间 'connect_timeout': 60,
}, },
'CONN_MAX_AGE': 0, # 强制Django在每次请求后关闭连接 'CONN_MAX_AGE': 0,
} }
} }
@ -172,5 +184,15 @@ LOGGING = {
'level': 'INFO', 'level': 'INFO',
'propagate': True, 'propagate': True,
}, },
'daren_detail': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': True,
},
'brands': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': True,
},
}, },
} }

Binary file not shown.