daren/apps/brands/services/offer_status_service.py
2025-05-23 12:11:03 +08:00

320 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import logging
import json
from django.conf import settings
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
logger = logging.getLogger('brands')
class OfferStatusService:
"""提供获取达人谈判状态的服务"""
@staticmethod
def fetch_status(creator_id, product_id):
"""
获取达人对产品的谈判状态
:param creator_id: 达人ID
:param product_id: 产品ID
:return: 状态字符串
"""
try:
url = "http://81.69.223.133:58099/api/operation/negotiations/offer_status/"
payload = {
'creator_id': str(creator_id),
'product_id': str(product_id)
}
# 记录请求详情
logger.info(f"发送状态查询请求: URL={url}, 参数={payload}")
response = requests.post(url, data=payload)
# 记录响应状态码
logger.info(f"接收状态查询响应: 状态码={response.status_code}")
if response.status_code == 200:
data = response.json()
# 记录响应内容
logger.info(f"状态查询响应内容: {data}")
if data['code'] == 200:
return data['data']['status']
else:
logger.error(f"获取谈判状态失败: {data['message']}")
return None
else:
# 记录详细错误信息
error_content = None
try:
error_content = response.text[:200] # 只取前200个字符
except:
pass
logger.error(f"请求谈判状态接口失败: {response.status_code}, 响应内容: {error_content}")
return None
except Exception as e:
logger.error(f"获取谈判状态时发生错误: {str(e)}")
return None
@staticmethod
def update_creator_status(campaign_id, creator_id, status):
"""
更新达人的状态
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 新状态
:return: 是否更新成功
"""
try:
from apps.daren_detail.models import CreatorCampaign
# 更新数据库中的状态
creator_campaign = CreatorCampaign.objects.get(
campaign_id=campaign_id,
creator_id=creator_id
)
# 如果状态没有变化,则不进行更新
if creator_campaign.status == status:
return False
creator_campaign.status = status
creator_campaign.save()
logger.info(f"已更新数据库中的状态: 活动 {campaign_id}, 达人 {creator_id}, 状态 {status}")
return True
except CreatorCampaign.DoesNotExist:
logger.error(f"找不到关联记录: 活动 {campaign_id}, 达人 {creator_id}")
return False
except Exception as e:
logger.error(f"更新达人状态时发生错误: {str(e)}")
return False
@staticmethod
def get_campaign_creator_data(campaign_id, product_id=None):
"""
获取活动关联的所有达人信息
:param campaign_id: 活动ID
:param product_id: 产品ID可选
:return: 达人信息列表
"""
try:
from apps.daren_detail.models import CreatorCampaign, CreatorProfile
from ..models import Campaign, Product
# 查询活动信息
campaign = Campaign.objects.get(id=campaign_id)
# 获取产品信息
product_name = None
if product_id:
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_id = None
# 如果没有指定产品ID则使用活动关联的第一个产品
if not product_id and campaign.link_product.exists():
product = campaign.link_product.first()
product_id = str(product.id)
product_name = product.name
# 查询与活动关联的所有达人关联记录
creator_campaigns = CreatorCampaign.objects.filter(
campaign_id=campaign_id
).select_related('creator')
creator_list = []
for cc in creator_campaigns:
creator = cc.creator
# 获取最新状态
status = None
if product_id:
status = OfferStatusService.fetch_status(creator.id, product_id)
# 如果无法获取状态,则使用数据库中的状态
if not status:
status = cc.status
# 格式化粉丝数和观看量
followers_formatted = f"{int(creator.followers / 1000)}k" if creator.followers else "0"
avg_views_formatted = f"{int(creator.avg_video_views / 1000)}k" if creator.avg_video_views else "0"
# 构建响应数据
creator_data = {
"creator_name": creator.name,
"category": creator.category,
"followers": followers_formatted,
"gmv_generated": f"${creator.gmv}k" if creator.gmv else "$0",
"views_generated": avg_views_formatted,
"pricing": f"${creator.pricing}" if creator.pricing else "$0",
"status": status
}
creator_list.append(creator_data)
return creator_list
except Exception as e:
logger.error(f"获取活动达人数据时发生错误: {str(e)}")
return []
@staticmethod
def send_status_update(campaign_id, creator_id, status, product_id=None):
"""
通过WebSocket发送状态更新
:param campaign_id: 活动ID
:param creator_id: 达人ID
:param status: 状态
:param product_id: 产品ID可选
"""
try:
# 先更新数据库中的状态
updated = OfferStatusService.update_creator_status(campaign_id, creator_id, status)
# 如果状态没有变化,则不发送更新
if not updated:
return
channel_layer = get_channel_layer()
# 构建消息数据
from ..models import Campaign, Product
try:
campaign = Campaign.objects.get(id=campaign_id)
# 1. 发送到产品特定的群组
if product_id:
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, product_id)
# 查询产品名称
try:
product = Product.objects.get(id=product_id)
product_name = product.name
except Product.DoesNotExist:
logger.error(f"找不到产品: {product_id}")
product_name = None
# 构建产品特定的消息
product_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'product_id': str(product_id),
'product_name': product_name,
'creators': creator_list
}
}
# 发送到产品特定的群组
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
product_group_name,
{
'type': 'send_update',
'message': json.dumps(product_message)
}
)
logger.info(f"已发送状态更新到产品特定组: {product_group_name}")
# 2. 发送到活动通用群组,包含所有产品数据
# 获取所有产品
products_data = []
# 如果活动有关联产品
if campaign.link_product.exists():
for product in campaign.link_product.all():
# 获取该产品的所有达人数据
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, str(product.id))
# 构建产品数据
product_data = {
'product_id': str(product.id),
'product_name': product.name,
'creators': creator_list
}
products_data.append(product_data)
# 如果没有关联产品仍然需要使用活动ID作为产品ID
else:
# 使用活动ID作为产品ID
fallback_product_id = str(campaign_id)
creator_list = OfferStatusService.get_campaign_creator_data(campaign_id, fallback_product_id)
# 构建产品数据
product_data = {
'product_id': fallback_product_id,
'product_name': campaign.name,
'creators': creator_list
}
products_data.append(product_data)
# 构建活动通用消息
campaign_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'products': products_data
}
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(campaign_message)
}
)
logger.info(f"已发送状态更新到活动通用组: {campaign_group_name}")
except Campaign.DoesNotExist:
logger.error(f"找不到活动: {campaign_id}")
except Exception as e:
logger.error(f"构建消息数据时出错: {str(e)}")
# 如果构建消息失败,发送简单的状态更新消息
simple_message = {
'code': 200,
'message': '状态已更新',
'data': {
'campaign_id': str(campaign_id),
'creator_id': str(creator_id),
'status': status
}
}
# 发送到活动通用群组
campaign_group_name = f'campaign_{campaign_id}'
async_to_sync(channel_layer.group_send)(
campaign_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
# 如果有产品ID也发送到产品特定群组
if product_id:
product_group_name = f'campaign_{campaign_id}_product_{product_id}'
async_to_sync(channel_layer.group_send)(
product_group_name,
{
'type': 'send_update',
'message': json.dumps(simple_message)
}
)
logger.info(f"已发送状态更新: 活动 {campaign_id}, 产品 {product_id}, 达人 {creator_id}, 状态 {status}")
except Exception as e:
logger.error(f"发送WebSocket更新失败: {str(e)}")