daren/apps/brands/services/offer_status_service.py

320 lines
13 KiB
Python
Raw Normal View History

2025-05-20 12:17:45 +08:00
import requests
import logging
import json
from django.conf import settings
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
logger = logging.getLogger('brands')
class OfferStatusService:
"""提供获取达人谈判状态的服务"""
@staticmethod
def fetch_status(creator_id, product_id):
"""
获取达人对产品的谈判状态
:param creator_id: 达人ID
:param product_id: 产品ID
:return: 状态字符串
"""
try:
url = "http://81.69.223.133:58099/api/operation/negotiations/offer_status/"
2025-05-20 12:17:45 +08:00
payload = {
'creator_id': str(creator_id),
'product_id': str(product_id)
}
# 记录请求详情
logger.info(f"发送状态查询请求: URL={url}, 参数={payload}")
2025-05-20 12:17:45 +08:00
response = requests.post(url, data=payload)
# 记录响应状态码
logger.info(f"接收状态查询响应: 状态码={response.status_code}")
2025-05-20 12:17:45 +08:00
if response.status_code == 200:
data = response.json()
# 记录响应内容
logger.info(f"状态查询响应内容: {data}")
2025-05-20 12:17:45 +08:00
if data['code'] == 200:
return data['data']['status']
else:
logger.error(f"获取谈判状态失败: {data['message']}")
return None
else:
# 记录详细错误信息
error_content = None
try:
error_content = response.text[:200] # 只取前200个字符
except:
pass
logger.error(f"请求谈判状态接口失败: {response.status_code}, 响应内容: {error_content}")
2025-05-20 12:17:45 +08:00
return None
except Exception as e:
logger.error(f"获取谈判状态时发生错误: {str(e)}")
return None
@staticmethod
def update_creator_status(campaign_id, creator_id, status):
"""
更新达人的状态
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 新状态
:return: 是否更新成功
"""
try:
from apps.daren_detail.models import CreatorCampaign
# 更新数据库中的状态
creator_campaign = CreatorCampaign.objects.get(
campaign_id=campaign_id,
creator_id=creator_id
)
# 如果状态没有变化,则不进行更新
if creator_campaign.status == status:
return False
creator_campaign.status = status
creator_campaign.save()
logger.info(f"已更新数据库中的状态: 活动 {campaign_id}, 达人 {creator_id}, 状态 {status}")
return True
except CreatorCampaign.DoesNotExist:
logger.error(f"找不到关联记录: 活动 {campaign_id}, 达人 {creator_id}")
return False
except Exception as e:
logger.error(f"更新达人状态时发生错误: {str(e)}")
return False
@staticmethod
def get_campaign_creator_data(campaign_id, product_id=None):
2025-05-20 12:17:45 +08:00
"""
获取活动关联的所有达人信息
:param campaign_id: 活动ID
:param product_id: 产品ID可选
2025-05-20 12:17:45 +08:00
:return: 达人信息列表
"""
try:
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from ..models import Campaign, Product
# 查询活动信息
campaign = Campaign.objects.get(id=campaign_id)
# 获取产品信息
product_name = None
if product_id:
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_id = None
# 如果没有指定产品ID则使用活动关联的第一个产品
if not product_id and campaign.link_product.exists():
product = campaign.link_product.first()
product_id = str(product.id)
product_name = product.name
2025-05-20 12:17:45 +08:00
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign_id
).select_related('creator')
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 获取最新状态
status = None
if product_id:
status = OfferStatusService.fetch_status(creator.id, product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
2025-05-20 12:17:45 +08:00
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"creator_name": creator.name,
2025-05-20 12:17:45 +08:00
"category": creator.category,
"followers": followers_formatted,
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": avg_views_formatted,
2025-05-20 12:17:45 +08:00
"pricing": f"${creator.pricing_min}" if creator.pricing_min else "$0",
"status": status
2025-05-20 12:17:45 +08:00
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取活动达人数据时发生错误: {str(e)}")
return []
@staticmethod
def send_status_update(campaign_id, creator_id, status, product_id=None):
2025-05-20 12:17:45 +08:00
"""
通过WebSocket发送状态更新
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 状态
:param product_id: 产品ID可选
2025-05-20 12:17:45 +08:00
"""
try:
# 先更新数据库中的状态
updated = OfferStatusService.update_creator_status(campaign_id, creator_id, status)
# 如果状态没有变化,则不发送更新
if not updated:
return
channel_layer = get_channel_layer()
# 构建消息数据
from ..models import Campaign, Product
2025-05-20 12:17:45 +08:00
try:
campaign = Campaign.objects.get(id=campaign_id)
# 1. 发送到产品特定的群组
if product_id:
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, product_id)
# 查询产品名称
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_name = None
# 构建产品特定的消息
product_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'product_id': str(product_id),
'product_name': product_name,
'creators': creator_list
}
}
# 发送到产品特定的群组
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
product_group_name,
{
'type': 'send_update',
'message': json.dumps(product_message)
}
)
logger.info(f"已发送状态更新到产品特定组: {product_group_name}")
# 2. 发送到活动通用群组,包含所有产品数据
# 获取所有产品
products_data = []
# 如果活动有关联产品
if campaign.link_product.exists():
for product in campaign.link_product.all():
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, str(product.id))
# 构建产品数据
product_data = {
'product_id': str(product.id),
'product_name': product.name,
'creators': creator_list
}
products_data.append(product_data)
# 如果没有关联产品仍然需要使用活动ID作为产品ID
else:
# 使用活动ID作为产品ID
fallback_product_id = str(campaign_id)
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, fallback_product_id)
# 构建产品数据
product_data = {
'product_id': fallback_product_id,
'product_name': campaign.name,
'creators': creator_list
}
products_data.append(product_data)
# 构建活动通用消息
campaign_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'products': products_data
}
2025-05-20 12:17:45 +08:00
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(campaign_message)
}
)
logger.info(f"已发送状态更新到活动通用组: {campaign_group_name}")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {campaign_id}")
except Exception as e:
logger.error(f"构建消息数据时出错: {str(e)}")
# 如果构建消息失败,发送简单的状态更新消息
simple_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'creator_id': str(creator_id),
'status': status
}
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
# 如果有产品ID也发送到产品特定群组
if product_id:
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
product_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
2025-05-20 12:17:45 +08:00
logger.info(f"已发送状态更新: 活动 {campaign_id}, 产品 {product_id}, 达人 {creator_id}, 状态 {status}")
2025-05-20 12:17:45 +08:00
except Exception as e:
logger.error(f"发送WebSocket更新失败: {str(e)}")