daren/apps/expertproducts/views.py

803 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

## 根据商品信息生成视频文案
from django.shortcuts import render
import json
import uuid
import logging
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.conf import settings
from django.utils import timezone
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.db.models import Q
import os
import subprocess
import re
import requests
from django.db import connection
from django.core.mail import EmailMessage
from ollama import Client
from rest_framework.views import APIView
from rest_framework.parsers import MultiPartParser, FormParser
from .models import Product, Negotiation, Message
from .serializers import NegotiationSerializer
from apps.daren_detail.models import CreatorProfile
from apps.brands.models import Product
client = Client(host="http://localhost:11434")
class ContentAnalysisAPI(APIView):
parser_classes = (MultiPartParser, FormParser)
def post(self, request):
# 1. 接收所有字段
data = request.data
files = request.FILES.getlist('files') # 获取上传的文件列表
# 2. 验证必填字段
required_fields = [
"product_info.name", # 商品信息(字典)
"product_info.price",
"product_info.stock",
"posting_suggestion", # 视频要求(字典)
"acceptance_standard",
"selling_points" # 商品卖点(列表)
]
for field in required_fields:
if field not in data:
return Response(
{"error": f"Field '{field}' is required."},
status=status.HTTP_400_BAD_REQUEST
)
# 3. 解析 JSON 字段
try:
product_info = {
"name": data.get('product_info.name'),
"price": data.get('product_info.price'),
"stock": data.get('product_info.stock')
}
video_requirements = {
"posting_suggestion": data.get('posting_suggestion'),
"acceptance_standard": data.get('acceptance_standard')
}
selling_points = data.get('selling_points')
except json.JSONDecodeError as e:
return Response(
{"error": f"Invalid JSON format: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST
)
# 4. 处理文件
file_contents = []
for file in files:
file_contents.append(file.read().decode('utf-8')) # 假设文件是文本文件
# 5. 用 messages 形式调用大模型
try:
system_prompt = self._build_video_prompt(product_info, video_requirements, selling_points)
messages = [{'role': 'system', 'content': system_prompt}]
###### 调用steven的api
url = "http://localhost:8002/api/text"
data = {
"question": "{}".format(system_prompt)
}
response = requests.post(url, data=data)
response_json = response.json() # 解析为字典
full_response = response_json.get('response')
#### 调用本地deepseek
# response = client.chat(
# model="deepseek-r1:70b",
# messages=messages,
# )
# full_response = response['message']['content']
version1, version2 = self.filter_ai_response(full_response)
# print("full_respons:", full_response)
# 6. 返回分析结果
response_data = {
"code": 200,
"message": "成功生成视频文案",
"data": {
"version1": version1,
"version2": version2
}
}
return Response(response_data)
except Exception as e:
response_data = {
"code": 500,
"message": f"AI分析失败, error:{e}",
"data": {
"version1": "",
"version2": ""
}
}
return Response(response_data)
def _build_video_prompt(self, product_info, video_requirements, selling_points):
"""Build the prompt for the price negotiation phase"""
content = f"""
你是一名专业的新媒体短视频文案策划师。请根据以下商品信息、视频要求和商品卖点以及上传文件的内容,生成一段适合达人在短视频中介绍产品的中文口播文案,要求内容流畅、自然、有吸引力,能激发观众的兴趣和购买欲望。文案要突出商品的核心卖点,结合视频要求,适当加入情感化和场景化描述,结尾可以有引导关注或购买的号召。
【商品信息】
{product_info}
【视频要求】
{video_requirements}
【商品卖点】
{selling_points}
请直接输出完整的中文视频口播文案,不要加任何解释说明。
请生成两个版本, 分别是Version1和Version2。
"""
return content
def filter_ai_response(self, ai_response):
"""过滤掉 <think>...</think> 及其内容, 只保留模型回复内容, 并解析出 Version1 和 Version2"""
import re
text = re.sub(r"<think>.*?</think>\s*", "", ai_response, flags=re.DOTALL)
# 更宽松地匹配 Version 1 和 Version 2
match = re.search(
r'(?:\*\*)?Version\s*1:?[\s\*:]*([\s\S]*?)(?:\*\*)?Version\s*2:?([\s\S]*)',
text, re.IGNORECASE
)
if match:
version1 = match.group(1).strip().strip('"')
version2 = match.group(2).strip().strip('"')
return version1, version2
else:
# 兜底:如果没匹配到,返回原文
return text, ""
## 轮训问答
class NegotiationViewSet(viewsets.ModelViewSet):
queryset = Negotiation.objects.all()
serializer_class = NegotiationSerializer
def create(self, request, *args, **kwargs):
"""创建谈判并返回包含初始消息的响应"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Extract creator and product from validated data
creator = serializer.validated_data['creator']
product = serializer.validated_data['product']
# 检查该用户是否存在
if not CreatorProfile.objects.filter(id=creator.id).exists():
return Response({
"code": 404,
"message": "未找到指定的达人",
"data": None
})
# Check if the product exists
if not Product.objects.filter(id=product.id).exists():
return Response({
"code": 404,
"message": "未找到指定的商品",
"data": None
})
# Check if a negotiation already exists for the same creator and product
existing_negotiation = Negotiation.objects.filter(creator=creator, product=product).first()
if existing_negotiation:
return Response({
"code": 400,
"message": "谈判已存在",
"data": {
"negotiation_id": existing_negotiation.id
}
})
# 1. 创建谈判记录
negotiation = serializer.save()
# 2. 生成并保存初始消息
initial_message = self._generate_welcome_message(negotiation)
message = Message.objects.create(
negotiation=negotiation,
role='assistant',
content=initial_message,
stage=negotiation.status
)
# 4. 构建响应数据
response_data = {
"code": 200,
"message": "谈判已创建",
"data": {
"id": message.id,
**serializer.data,
"content": initial_message,
"created_at": message.created_at
}
}
headers = self.get_success_headers(serializer.data)
return Response(response_data, status=status.HTTP_201_CREATED, headers=headers)
@action(detail=True, methods=['post'])
def chat(self, request, pk=None):
"""统一对话接口"""
negotiation = self.get_object()
print("negotiation: ", negotiation.id)
user_message = request.data.get("message", "")
# 1. 保存用户消息
Message.objects.create(
negotiation=negotiation,
role='user',
content=user_message,
stage=negotiation.status
)
# 2. 获取本阶段历史消息
history = self._get_stage_messages(negotiation)
# 如果没有历史(首次对话),手动加上本次用户输入
if not history:
history = [{'role': 'user', 'content': user_message}]
# 4. 构建大模型messages
if negotiation.status == 'brand_review':
system_prompt = self._build_brand_review_prompt(negotiation)
elif negotiation.status == 'price_negotiation':
system_prompt = self._build_price_prompt(negotiation)
elif negotiation.status == 'contract_review':
system_prompt = self._build_contract_review_prompt(negotiation)
elif negotiation.status == 'draft_ready':
system_prompt = self._build_draft_ready_prompt(negotiation)
else:
return Response({"error": "谈判已结束"}, status=400)
# 5. 组装最终messages
messages = [{'role': 'system', 'content': system_prompt}] + history
# print("final_messages: ", messages)
# 6. 调用大模型
try:
response = client.chat(
model="deepseek-r1:70b",
messages=messages,
)
ai_response, current_status_json = self.filter_ai_response(response['message']['content'])
if negotiation.status == 'draft_ready':
if current_status_json['is_resend_email']:
self._send_contract_to_email(negotiation)
# print("response: ", response['message']['content'])
except Exception as e:
return Response({"error": f"AI服务错误: {str(e)}"}, status=500)
# 7. 自动状态转换
old_status = negotiation.status # 记录更新前的状态
self._update_negotiation_status(negotiation, current_status_json['status'])
if old_status != 'draft_ready' and negotiation.status == 'draft_ready':
self._send_contract_to_email(negotiation)
# 8. 保存AI回复
Message.objects.create(
negotiation=negotiation,
role='assistant',
content=ai_response,
stage=negotiation.status
)
return Response({
"response": ai_response,
"status": negotiation.status,
})
@action(detail=True, methods=['post'])
def submit(self, request, pk=None):
"""接收一个文件并返回文件名,后续处理由你来实现"""
negotiation = self.get_object()
upload_file = request.FILES.get('file')
if not upload_file:
return Response({
'code': 400,
'message': '未收到文件',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
####### 这里实现对合同文件处理的后续业务 ########
####### 这里实现对合同文件处理的后续业务 ########
# 处理完成后给会话更新状态
self._update_negotiation_status(negotiation, "draft_approved")
return Response({
'code': 200,
'message': '文件上传成功',
'data': {
'filename': upload_file.name,
'size': upload_file.size
}
})
@action(detail=False, methods=['post'], url_path='search_creator/status')
def search_creator_by_status(self, request):
"""根据状态搜索达人"""
status = request.data.get('status')
if not status:
return Response({
'code': 400,
'message': '未收到状态',
'data': None
})
# 查找符合状态的谈判
negotiations = Negotiation.objects.filter(status=status)
if not negotiations.exists():
return Response({
'code': 404,
'message': '未找到符合条件的谈判',
'data': None
})
# 获取所有相关的达人
creators = CreatorProfile.objects.filter(negotiation__in=negotiations).distinct()
if creators.exists():
# 序列化达人数据
creator_data = [{'name': creator.name, 'category': creator.category, 'followers': creator.followers} for
creator in creators]
else:
creator_data = []
return Response({
'code': 200,
'message': '成功找到符合条件的达人',
'data': creator_data
})
@action(detail=False, methods=['post'])
def offer_status(self, request):
"""获取谈判状态"""
# 获取请求参数
creator_id = request.data.get('creator_id')
product_id = request.data.get('product_id')
if not creator_id or not product_id:
return Response({
'code': 400,
'message': '未收到达人名称或产品ID',
'data': None
})
# 查找符合条件的谈判
try:
creator = CreatorProfile.objects.get(id=creator_id)
negotiation = Negotiation.objects.get(creator=creator, product_id=product_id)
return Response({
'code': 200,
'message': '成功获取谈判状态',
'data': {
'status': negotiation.status
}
})
except CreatorProfile.DoesNotExist:
return Response({
'code': 404,
'message': f'找不到ID为{creator_id}的达人',
'data': None
})
except Negotiation.DoesNotExist:
return Response({
'code': 404,
'message': f'不存在与用户id为{creator_id}和商品id为{product_id}的谈判',
'data': None
})
def filter_ai_response(self, ai_response):
"""过滤掉 <think>...</think> 及其内容, 只保留模型回复内容, 并解析末尾的json状态"""
import re
import json
# 1. 过滤 <think>...</think>
text = re.sub(r"<think>.*?</think>\s*", "", ai_response, flags=re.DOTALL)
# 2. 匹配最后一个 {...},允许后面有空白
matches = list(re.finditer(r'\{[\s\S]*\}', text))
status_json = None
if matches:
match = matches[-1]
try:
status_json = json.loads(match.group())
# 去掉json部分
text = text[:match.start()].rstrip()
except Exception:
status_json = None
return text, status_json
def _generate_welcome_message(self, negotiation):
"""生成欢迎消息"""
product = negotiation.product
creator = negotiation.creator
return f"""
尊敬的{creator.name},感谢您对{product.name}的关注!
🔍 商品详情:
- 名称:{product.name}
- 类目:{product.category}
- 核心卖点:{product.description}
🤝 合作优势:
- 我们的产品在市场上具有很高的竞争力和良好的口碑。
- 我们提供灵活的合作方案和丰厚的回报。
- 您的粉丝群体与我们的目标市场高度契合,能够有效提升产品的曝光率和销售量。
🎯 契合度:
- 您在{creator.category}领域的影响力和专业性与我们的产品完美契合。
- 您的创意和内容风格能够为我们的产品增添独特的价值。
我们诚挚地希望能与您合作, 这边想问一下您的报价是多少?
""".strip()
def _build_brand_review_prompt(self, negotiation):
"""Build the prompt for the brand review phase"""
history = self._get_stage_messages(negotiation)
content = f"""
你正在与达人 {negotiation.creator.name} 沟通商品合作。
已发送的商品信息:
{self._get_last_assistant_message(negotiation)}
达人回复:
{history[-1]['content']}
请根据以下要求回应:
1. 回答达人关于商品的问题
2. 自然引导进入价格谈判阶段
3. 如果达人表现出兴趣,直接询问"您希望以什么价格合作?"
"""
return content
def _build_price_prompt(self, negotiation):
"""Build the prompt for the price negotiation phase"""
history = self._get_stage_messages(negotiation)
content = f"""
你是作为一个专业的商务谈判专家, 现在正在与达人 {negotiation.creator.name} 进行价格谈判。
当前谈判轮次:{negotiation.current_round}/4
商品参考价格:{negotiation.product.max_price}
商品最低价格:{negotiation.product.min_price}
达人最新回复:
{history[-1]['content']}
如果说用户的报价合理: 就推进到合同审查阶段。
如果说用户的报价不合理: 请根据我们的最低价格和最高价格的区间来跟用户去讲价, 总之就是尽量压低价格, 但是不要压的太低, 不要超过最高价格。如果用户同意我们的价格, 就推进到合同审查阶段。如果用户不同意我们的价格, 就继续谈判, 但是不要超过4轮谈判。
在回复消息的最后返回一个json格式status, 根据用户回复的情况来选择返回的状态, 示例如下:
{{
"status": "contract_review" # 如果用户同意我们的价格, 就推进到合同审查阶段。
"status": "price_negotiation" # 如果用户不同意我们的价格, 就继续谈判, 但是不要超过4轮谈判。
}}
如果用户同意我们的价格的话或者用户的报价在最低价格和最高价格之间, 我们就顺带将初步合同模版发给用户, 合同模版如下:
{self._generate_contract_template(negotiation)}
"""
return content
def _build_contract_review_prompt(self, negotiation):
"""Build the prompt for the contract review phase"""
history = self._get_stage_messages(negotiation)
content = f"""
当前谈判已进入初步合同审查阶段。
商品名称:{negotiation.product.name}
达人:{negotiation.creator.name}
达人最新回复:
{history[-1]['content']}
请根据以下要求回应:
1. 回答达人关于合同条款的问题
2. 确保合同条款清晰明了
3. 如果达人同意合同条款,确认并推进到签署阶段
4. 如果有异议,记录并准备进一步协商
如果用户对合同条款有异议, 就继续回答相关问题, 但是不要超过4轮问询。
如果用户觉得合同内容没有问题, 我们会将正式合同发送到用户的邮箱地址, 并推进到签署阶段。
如果用户想再次要一下合同模版, 我们就将合同模版发给用户, 合同模版如下:
{self._generate_contract_template(negotiation)}
在回复消息的最后返回一个json格式status, 根据用户回复的情况来选择返回的状态, 示例如下:
{{
"status": "contract_review" # 如果用户对合同仍有疑问或者异议
"status": "draft_ready" # 如果用户觉得合同内容没有问题
}}
"""
return content
def _build_draft_ready_prompt(self, negotiation):
"""Build the prompt for the draft ready phase"""
# history = self._get_stage_messages(negotiation)
content = f"""
当前谈判已进入正式合同的准备阶段。
如果用户表示没有收到合同邮件的话,我们就再次向用户发送合同邮件。
如果用户表示接受的话,我们引导用户去查看邮件并签署合同。
如果用户表示拒绝的话,我们引导用户去查看合同邮件然后在邮件中拒绝签署此合同。
如果用户还有其他疑问的话,我们不做回答,引导用户去看合同邮件。
在回复消息的最后返回一个json格式的status, 内容如下:
{{
"status": "draft_ready",
"is_resend_email": true/false # 默认为false。如果用户表示没有收到合同邮件的话就为true, 否则为false
}}
"""
return content
def _update_negotiation_status(self, negotiation, current_status):
"""状态自动转换"""
# ai_response_lower = ai_response.lower()
if current_status == 'price_negotiation':
negotiation.status = 'price_negotiation'
elif current_status == 'contract_review':
negotiation.status = 'contract_review'
elif current_status == 'draft_ready':
negotiation.status = 'draft_ready'
elif current_status == 'draft_approved':
negotiation.status = 'draft_approved'
elif current_status == 'published':
negotiation.status = 'published'
elif current_status == 'abandoned':
negotiation.status = 'abandoned'
negotiation.save()
def _get_last_assistant_message(self, negotiation):
"""获取上一条系统消息"""
last_msg = negotiation.message_set.filter(role='assistant').last()
return last_msg.content if last_msg else ""
def _get_stage_messages(self, negotiation):
"""获取当前阶段的所有历史消息,按时间排序"""
# 只查当前阶段的消息
messages = negotiation.message_set.filter(stage=negotiation.status).order_by('created_at')
return [{'role': m.role, 'content': m.content} for m in messages]
def _generate_contract_template(self, negotiation):
"""生成合同模版"""
return f"""
以下为本次合作的合同模板内容,请展示给用户:
================ 合同模板 ================
合同编号HT-{negotiation.id:06d}
甲方品牌方XXX公司
乙方(达人):{negotiation.creator.name}
商品名称:{negotiation.product.name}
合作价格:请以最终谈判价格为准
合作内容:乙方需在其社交平台发布与商品相关的推广内容,具体要求以双方沟通为准。
付款方式甲方在乙方完成推广后7个工作日内支付合作费用。
合同生效日期:以双方签署日期为准
其他条款:如有未尽事宜,双方友好协商解决。
========================================
"""
def _send_contract_to_email(self, negotiation,
contract_path='/Users/liuzizhen/OOIN/daren_project/operation/商品合同.docx'):
"""发送合同到用户邮箱,支持附件"""
contract_content = self._generate_contract_template(negotiation)
subject = f"合同文件 - {negotiation.product.name}"
recipient = '3299361176@qq.com'
if not recipient:
logger.error(f"未找到达人 {negotiation.creator.name} 的邮箱,无法发送合同。")
return False
try:
email = EmailMessage(
subject=subject,
body=contract_content,
from_email=getattr(settings, 'DEFAULT_FROM_EMAIL', None),
to=[recipient],
)
# 如果有附件路径,添加附件
if contract_path:
with open(contract_path, 'rb') as f:
email.attach('商品合同.docx', f.read(),
'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
email.send(fail_silently=False)
logger.info(f"合同已发送到 {recipient}")
return True
except Exception as e:
logger.error(f"发送合同邮件失败: {e}")
return False
## 根据接收到的达人列表来查找达人(已弃用)
class TopCreatorsAPI(APIView):
def post(self, request):
# Extract filtering criteria and creator list from the request
criteria = request.data.get('criteria', "")
creators = request.data.get('creators', [])
top_n = request.data.get('top_n', 5) # Default to top 5 if not specified
if not creators:
return Response({"error": "没有合适的达人"}, status=status.HTTP_400_BAD_REQUEST)
# Prepare the message for the Ollama model
messages = self._build_messages(criteria, creators)
try:
# Call the Ollama model
response = client.chat(
model="deepseek-r1:70b",
messages=messages,
)
ranked_creators = self._parse_response(response['message']['content'], top_n)
return Response({
"top_creators": ranked_creators
}, status=status.HTTP_200_OK)
except Exception as e:
return Response({"error": f"AI service error: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _build_messages(self, criteria, creators):
"""Build the messages for the Ollama model"""
creators_str = "\n".join([
f"name: {creator['name']}, sex: {creator['sex']}, age: {creator['age']}, category: {creator['category']}, followers: {creator['followers']}"
for creator in creators])
content = f"""
以下是给定的筛选标准, 请按照此标准来筛选达人:
{criteria}
评估以下所有达人,从中筛选出符合条件的达人:
{creators_str}
如果没有找到合适的达人, 就按照当前达人最匹配的达人返回。
记住, 我只要json结果, 不要任何别的回答包括你的think部分!!!
请按照以下格式返回结果:
[{{
"name": "达人名称",
"sex": "达人性别",
"age": "达人年龄",
"category": "达人分类",
"followers": "达人粉丝数"
}},
{{
"name": "达人名称",
"sex": "达人性别",
"age": "达人年龄",
"category": "达人分类",
"followers": "达人粉丝数"
}},
...
]]
"""
return [{'role': 'user', 'content': content}]
def _parse_response(self, response, top_n):
"""Parse the response from the Ollama model to extract the top N creators"""
# Use a regular expression to find the JSON block
json_match = re.search(r'```json\n(.*?)\n```', response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
try:
# Parse the JSON string
creators = json.loads(json_str)
# Return the top N creators
return creators[:top_n]
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return []
else:
print("No JSON found in response")
return []
## 生成sql直接查询达人库
class CreatorSQLSearchAPI(APIView):
def post(self, request):
criteria = request.data.get('criteria', '')
top_n = int(request.data.get('top_n', 10)) # 默认为10
if not criteria:
return Response({"error": "缺少筛选条件"}, status=400)
table_schema = '''
creator_profiles(
id bigint 主键,
name varchar(255) 达人名称,
avatar_url text 头像URL,
email varchar(255) 电子邮箱,
instagram varchar(255) Instagram账号,
tiktok_link varchar(255) TikTok链接,
location varchar(100) 位置,
live_schedule varchar(255) 直播时间表,
category varchar(100) 类别,
e_commerce_level int 电商能力等级,
exposure_level varchar(10) 曝光等级,
followers int 粉丝数,
gmv decimal(12,2) GMV(千美元),
items_sold decimal(12,2) 售出商品数量,
avg_video_views int 平均视频浏览量,
pricing_min decimal(10,2) 最低个人定价,
pricing_max decimal(10,2) 最高个人定价,
pricing_package varchar(100) 套餐定价,
collab_count int 合作次数,
latest_collab varchar(100) 最新合作,
e_commerce_platforms json 电商平台,
gmv_by_channel json GMV按渠道分布,
gmv_by_category json GMV按类别分布,
mcn varchar(255) MCN机构,
create_time datetime 创建时间,
update_time datetime 更新时间
)
'''
prompt = f"""
你是一个SQL专家。下面是MySQL表creator_profiles的结构:
{table_schema}
以下是我对表中每个字段的解释: 方便你写出正确的sql查询语句
请根据以下自然语言筛选条件, 生成一条MySQL的SELECT语句, 查询daren_detail.creator_profiles表(注意一定要写数据库名称.表名称), 返回所有字段。不要加任何解释说明, 只输出SQL语句本身。
筛选条件:{criteria}
"""
# 2. 让大模型生成SQL
response = client.chat(
model="qwen2.5:32b",
messages=[{'role': 'user', 'content': prompt}],
)
sql = self._extract_sql(response['message']['content'])
# 3. 校验SQL只允许select防止注入
if not sql.strip().lower().startswith('select'):
response_data = {
"code": 500,
"message": "生成的SQL不合法",
"data": {
"results": []
}
}
return Response(response_data)
# 4. 执行SQL
with connection.cursor() as cursor:
try:
cursor.execute(sql)
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
response_data = {
"code": 200,
"message": "成功生成SQL",
"data": {
"results": results[:top_n]
}
}
except Exception as e:
response_data = {
"code": 500,
"message": "SQL执行异常",
"data": {
"results": []
}
}
raise
return Response(response_data)
def _extract_sql(self, text):
# 先去除 <think>...</think> 内容
import re
text = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
# 再提取SQL语句
match = re.search(r"select[\s\S]+?;", text, re.IGNORECASE)
if match:
return match.group()
return text.strip()