daren_project/feishu/feishu_ai_chat.py

1189 lines
43 KiB
Python
Raw Normal View History

2025-04-17 16:14:00 +08:00
import os
import sys
import json
import logging
import traceback
import requests
from datetime import datetime
import django
from django.db import transaction
from django.contrib.auth import get_user_model
import time
# 设置Django环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings')
django.setup()
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
from user_management.models import (
FeishuCreator, KnowledgeBase, UserGoal,
GmailTalentMapping, ChatHistory, ConversationSummary
)
from django.conf import settings
from user_management.gmail_integration import GmailIntegration
from feishu.feishu import sync_to_knowledge_base
logger = logging.getLogger(__name__)
def get_tenant_access_token(app_id, app_secret):
"""
获取飞书应用的tenant_access_token
"""
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {
"Content-Type": "application/json"
}
data = {
"app_id": app_id,
"app_secret": app_secret
}
try:
response = requests.post(url, json=data, headers=headers)
response_json = response.json()
if response.status_code == 200 and response_json.get("code") == 0:
return {
"status": "success",
"access_token": response_json.get("tenant_access_token"),
"expire": response_json.get("expire")
}
else:
logging.error(f"获取tenant_access_token失败: {response_json}")
return {
"status": "error",
"message": response_json.get("msg", "未知错误")
}
except Exception as e:
logging.error(f"获取tenant_access_token异常: {str(e)}")
return {
"status": "error",
"message": str(e)
}
def fetch_table_records(app_token, table_id, view_id, access_token=None, app_id=None, app_secret=None):
"""
从飞书多维表格获取记录
参数:
app_token: 应用ID
table_id: 表格ID
view_id: 视图ID
access_token: 访问令牌(可选如果提供则直接使用)
app_id: 应用ID(可选用于自动获取token)
app_secret: 应用密钥(可选用于自动获取token)
"""
# 如果没有提供access_token但提供了app_id和app_secret则自动获取
if not access_token and app_id and app_secret:
token_result = get_tenant_access_token(app_id, app_secret)
if token_result["status"] == "success":
access_token = token_result["access_token"]
else:
logging.error(f"无法获取访问令牌: {token_result['message']}")
return []
total_records = []
page_token = None
page_size = 20
try:
# 初始化客户端
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
logger.info(f"开始从飞书表格获取数据: app_token={app_token}, table_id={table_id}, view_id={view_id}")
# 尝试两种方法获取记录
# 方法1: 使用list接口
list_success = False
try:
while True:
try:
# 构造请求对象
builder = ListAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size) \
.view_id(view_id)
# 如果有page_token添加到请求中
if page_token:
builder = builder.page_token(page_token)
# 构建完整请求
request = builder.build()
logger.debug(f"发送list请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(access_token).build()
response = client.bitable.v1.app_table_record.list(request, option)
if not response.success():
logger.error(f"list请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
logger.info("没有更多记录")
break
total_records.extend(current_records)
list_success = True
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
has_more = response_data["data"].get("has_more", False)
total = response_data["data"].get("total", 0)
logger.info(f"list方法获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not has_more:
logger.info("list方法已获取所有数据")
break
except Exception as e:
logger.error(f"list方法获取记录时出错: {str(e)}")
logger.error(traceback.format_exc())
list_success = False
break
except Exception as e:
logger.error(f"list方法整体出错: {str(e)}")
list_success = False
# 如果list方法失败尝试search方法
if not list_success or not total_records:
logger.info("尝试使用search方法获取数据...")
page_token = None
total_records = []
while True:
try:
# 构造search请求对象
request = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
# 如果有page_token添加到请求中
if page_token:
request = request.page_token(page_token)
# 添加请求体 - 可以根据需要添加过滤条件
request = request.request_body(
SearchAppTableRecordRequestBody.builder().build()
).build()
logger.debug(f"发送search请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
logger.error(f"search请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
logger.info("search方法没有更多记录")
break
total_records.extend(current_records)
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
has_more = response_data["data"].get("has_more", False)
total = response_data["data"].get("total", 0)
logger.info(f"search方法获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not has_more:
logger.info("search方法已获取所有数据")
break
except Exception as e:
logger.error(f"search方法获取记录时出错: {str(e)}")
logger.error(traceback.format_exc())
break
logger.info(f"最终获取到 {len(total_records)} 条记录")
return total_records
except Exception as e:
logger.error(f"获取飞书表格记录时出错: {str(e)}")
logger.error(traceback.format_exc())
return []
def extract_field_value(field_value):
"""
提取字段值
参数:
field_value: 飞书返回的字段值
返回:
任意类型: 提取后的值
"""
if isinstance(field_value, list):
if field_value and isinstance(field_value[0], dict):
return field_value[0].get('text', '')
elif isinstance(field_value, dict):
if 'text' in field_value:
return field_value['text']
elif 'link' in field_value:
return field_value['link']
elif 'link_record_ids' in field_value:
return ''
return field_value
def find_duplicate_email_creators(records):
"""
查找记录中重复邮箱的创作者
参数:
records: 飞书记录列表
返回:
dict: 以邮箱为键记录列表为值的字典
"""
email_map = {}
for record in records:
fields = record.fields
email = extract_field_value(fields.get('邮箱', ''))
if email:
if email not in email_map:
email_map[email] = []
email_map[email].append(record)
# 过滤出现次数>1的邮箱
duplicate_emails = {email: records for email, records in email_map.items() if len(records) > 1}
logger.info(f"发现 {len(duplicate_emails)} 个重复邮箱")
return duplicate_emails
def create_or_update_knowledge_base(email, user=None):
"""
为创作者创建或更新知识库
参数:
email: 创作者邮箱
user: 用户对象默认为None将选择一个组长
返回:
tuple: (KnowledgeBase对象, 是否新创建)
"""
# 优先使用给定的用户,否则获取一个组长用户
if not user:
User = get_user_model()
user = User.objects.filter(role='leader').first()
if not user:
logger.error("未找到组长用户,无法创建知识库")
return None, False
# 首先查找创作者
creator = FeishuCreator.objects.filter(email=email).first()
if not creator:
logger.error(f"找不到邮箱为 {email} 的创作者")
return None, False
# 使用sync_to_knowledge_base函数创建知识库
kb, created = sync_to_knowledge_base(creator_id=creator.id)
if kb:
logger.info(f"邮箱 {email} 的知识库{'已创建' if created else '已存在'}: {kb.id}")
else:
logger.error(f"为邮箱 {email} 创建知识库失败")
return kb, created
def set_user_goal(user, email, goal_content):
"""
设置用户总目标
参数:
user: 用户对象
email: 创作者邮箱
goal_content: 目标内容
返回:
dict: 包含操作结果和目标信息
"""
try:
# 创建Gmail集成实例
gmail_integration = GmailIntegration(user)
# 设置总目标
result = gmail_integration.manage_user_goal(goal_content)
if result['status'] == 'success':
logger.info(f"为用户 {user.username} 设置总目标成功: {result['action']}")
return result
else:
logger.error(f"为用户 {user.username} 设置总目标失败: {result.get('message', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"设置用户总目标时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
def create_chat_with_ai(user, talent_email, goal_content):
"""
创建与AI的聊天会话
参数:
user: 用户对象
talent_email: 达人邮箱
goal_content: 目标内容
返回:
dict: 操作结果
"""
try:
# 1. 获取或创建知识库
kb, kb_created = create_or_update_knowledge_base(talent_email, user)
if not kb:
return {
'status': 'error',
'message': f"为邮箱 {talent_email} 创建知识库失败"
}
# 2. 设置用户总目标
goal_result = set_user_goal(user, talent_email, goal_content)
if goal_result['status'] != 'success':
return goal_result
# 3. 检查是否已有对应的Gmail映射关系
mapping = GmailTalentMapping.objects.filter(
user=user,
talent_email=talent_email,
is_active=True
).first()
if not mapping:
# 创建映射关系
mapping = GmailTalentMapping.objects.create(
user=user,
talent_email=talent_email,
knowledge_base=kb,
conversation_id=f"feishu_ai_{kb.id}",
is_active=True
)
logger.info(f"创建新的Gmail映射: {talent_email} -> {kb.id}")
elif mapping.knowledge_base_id != kb.id:
# 更新现有映射关系
mapping.knowledge_base = kb
mapping.save()
logger.info(f"更新Gmail映射: {talent_email} -> {kb.id}")
return {
'status': 'success',
'action': 'create',
'knowledge_base': {
'id': str(kb.id),
'name': kb.name,
'created': kb_created
},
'goal': goal_result.get('goal'),
'mapping': {
'id': str(mapping.id),
'conversation_id': mapping.conversation_id
}
}
except Exception as e:
logger.error(f"创建AI聊天时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
def process_duplicate_emails(duplicate_emails, goal_content=None):
"""
处理重复邮箱记录
参数:
duplicate_emails: 重复邮箱记录字典
goal_content: 目标内容模板可包含{email}{handle}占位符
返回:
dict: 处理结果统计
"""
if not goal_content:
goal_content = "与达人{handle}建立联系并了解其账号情况,处理合作需求,最终目标是达成合作并签约。"
User = get_user_model()
leader = User.objects.filter(role='leader').first()
if not leader:
logger.error("未找到组长用户,无法处理重复邮箱")
return {
'status': 'error',
'message': "未找到组长用户"
}
results = {
'total': len(duplicate_emails),
'success': 0,
'failure': 0,
'details': []
}
for email, records in duplicate_emails.items():
try:
# 获取一个Handle作为示例
handle = extract_field_value(records[0].fields.get('Handle', email.split('@')[0]))
# 格式化目标内容
formatted_goal = goal_content.format(email=email, handle=handle)
# 创建AI聊天
result = create_chat_with_ai(leader, email, formatted_goal)
if result['status'] == 'success':
results['success'] += 1
logger.info(f"成功为邮箱 {email} 创建AI聊天")
else:
results['failure'] += 1
logger.error(f"为邮箱 {email} 创建AI聊天失败: {result.get('message', 'Unknown error')}")
results['details'].append({
'email': email,
'handle': handle,
'status': result['status'],
'message': result.get('message', '')
})
except Exception as e:
results['failure'] += 1
logger.error(f"处理邮箱 {email} 时出错: {str(e)}")
logger.error(traceback.format_exc())
results['details'].append({
'email': email,
'status': 'error',
'message': str(e)
})
return results
def generate_ai_response(conversation_history, user_goal):
"""
调用DeepSeek API生成AI响应
参数:
conversation_history: 对话历史
user_goal: 用户总目标
返回:
str: AI响应内容
"""
try:
# 使用有效的API密钥
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
# 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取
# 从Django设置中获取密钥
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
api_key = settings.DEEPSEEK_API_KEY
url = "https://api.siliconflow.cn/v1/chat/completions"
# 系统消息指定AI助手的角色和总目标
system_message = {
"role": "system",
"content": f"""你是一位专业的电商客服和达人助手。你的任务是与达人进行对话,帮助实现以下总目标:
{user_goal}
你应该主动推进对话引导达人朝着目标方向发展每次回复应该简洁明了专业且有帮助
如果你认为总目标已经达成请在回复的最后一行添加标记: [目标已达成]"""
}
messages = [system_message]
# 添加对话历史但限制消息数量避免超出token限制
# 如果对话历史太长,可能需要进一步处理或分割
if len(conversation_history) > 20:
# 选取关键消息:第一条、最后几条以及中间的一些重要消息
selected_messages = (
conversation_history[:2] + # 前两条
conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条
conversation_history[-12:] # 最后12条
)
messages.extend(selected_messages)
else:
messages.extend(conversation_history)
# 构建API请求
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"stream": False,
"max_tokens": 1500, # 增加token上限以容纳完整回复
"temperature": 0.3, # 降低随机性,使回复更加确定性
"top_p": 0.9,
"top_k": 50,
"frequency_penalty": 0.5,
"presence_penalty": 0.2,
"n": 1,
"stop": [],
"response_format": {
"type": "text"
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
logger.info("开始调用DeepSeek API生成对话回复")
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
return None
result = response.json()
logger.debug(f"DeepSeek API返回: {result}")
# 提取回复内容
if 'choices' in result and len(result['choices']) > 0:
reply = result['choices'][0]['message']['content']
# 如果返回的内容为空直接返回None
if not reply or reply.strip() == '':
logger.warning("DeepSeek API返回的回复内容为空")
return None
return reply
logger.warning(f"DeepSeek API返回格式异常: {result}")
return None
except Exception as e:
logger.error(f"调用DeepSeek API生成回复失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def check_goal_achieved(response):
"""
检查目标是否已达成
参数:
response: AI的回复内容
返回:
bool: 是否达成目标
"""
if not response:
return False
# 检查回复中是否包含目标达成标记
goal_markers = [
"[目标已达成]",
"【目标已达成】",
"目标已达成",
"已达成目标"
]
for marker in goal_markers:
if marker in response:
logger.info(f"检测到目标达成标记: {marker}")
return True
return False
def auto_chat_session(user, talent_email, max_turns=10):
"""
执行自动聊天会话
参数:
user: 用户对象
talent_email: 达人邮箱
max_turns: 最大对话轮次默认10轮
返回:
dict: 会话结果
"""
try:
# 1. 获取用户目标
gmail_integration = GmailIntegration(user)
goal_result = gmail_integration.manage_user_goal()
if goal_result['status'] != 'success' or not goal_result.get('goal'):
logger.error(f"获取用户目标失败: {goal_result.get('message', 'No goal found')}")
return {
'status': 'error',
'message': "无法获取用户总目标"
}
user_goal = goal_result['goal']['content']
# 2. 获取Gmail映射关系
mapping = GmailTalentMapping.objects.filter(
user=user,
talent_email=talent_email,
is_active=True
).first()
if not mapping:
logger.error(f"找不到与邮箱 {talent_email} 的映射关系")
return {
'status': 'error',
'message': f"找不到与邮箱 {talent_email} 的映射关系"
}
kb = mapping.knowledge_base
conversation_id = mapping.conversation_id
# 3. 获取现有对话历史
chat_messages = ChatHistory.objects.filter(
user=user,
knowledge_base=kb,
conversation_id=conversation_id,
is_deleted=False
).order_by('created_at')
conversation_history = []
for msg in chat_messages:
conversation_history.append({
"role": msg.role,
"content": msg.content
})
# 如果没有对话历史,添加一条系统消息作为开始
if not conversation_history:
# 创建一条初始的系统消息
system_msg = ChatHistory.objects.create(
user=user,
knowledge_base=kb,
conversation_id=conversation_id,
role='system',
content=f"与达人 {talent_email} 的对话开始。总目标: {user_goal}"
)
conversation_history.append({
"role": "system",
"content": system_msg.content
})
# 发送第一封邮件来开始对话
first_subject = "关于合作的洽谈"
first_content = f"您好,\n\n我是{user.username},我们正在寻找合适的达人合作伙伴,注意到您的账号非常适合我们的产品。\n\n请问您有兴趣了解更多关于我们合作的细节吗?\n\n期待您的回复。\n\n祝好,\n{user.username}"
# 记录首次发送消息到对话历史
initial_msg = ChatHistory.objects.create(
user=user,
knowledge_base=kb,
conversation_id=conversation_id,
role='assistant',
content=first_content
)
conversation_history.append({
"role": "assistant",
"content": initial_msg.content
})
# 实际发送邮件
email_result = gmail_integration.send_email(
to_email=talent_email,
subject=first_subject,
body=first_content,
conversation_id=conversation_id
)
if email_result['status'] != 'success':
logger.error(f"发送首次邮件失败: {email_result.get('message', 'Unknown error')}")
return {
'status': 'error',
'message': f"发送首次邮件失败: {email_result.get('message', 'Unknown error')}"
}
logger.info(f"已发送首次邮件到 {talent_email}")
# 首次邮件发送后,直接返回不进行后续对话,等待达人回复
return {
'status': 'success',
'message': '已发送首次邮件,等待达人回复',
'turns_completed': 1,
'goal_achieved': False,
'email_sent': True,
'conversation_id': conversation_id
}
# 4. 检查最新消息是否来自达人(user),如果是,才进行回复
last_message = chat_messages.last()
if last_message and last_message.role == 'user':
# 有达人回复生成AI回复并发送邮件
# 生成AI回复
ai_response = generate_ai_response(conversation_history, user_goal)
if not ai_response:
logger.error("生成AI回复失败")
return {
'status': 'error',
'message': '生成AI回复失败'
}
# 检查目标是否已达成
goal_achieved = check_goal_achieved(ai_response)
# 保存AI回复到对话历史
ai_msg = ChatHistory.objects.create(
user=user,
knowledge_base=kb,
conversation_id=conversation_id,
role='assistant',
content=ai_response
)
# 使用最近的主题作为回复主题
subject = "回复: " + (last_message.subject if hasattr(last_message, 'subject') and last_message.subject else "关于合作的洽谈")
# 实际发送邮件
email_result = gmail_integration.send_email(
to_email=talent_email,
subject=subject,
body=ai_response,
conversation_id=conversation_id
)
if email_result['status'] != 'success':
logger.error(f"发送邮件回复失败: {email_result.get('message', 'Unknown error')}")
return {
'status': 'error',
'message': f"发送邮件回复失败: {email_result.get('message', 'Unknown error')}"
}
# 如果目标已达成,发送通知
if goal_achieved:
send_goal_achieved_notification(user, talent_email, conversation_id)
# 生成对话总结
summary_result = gmail_integration.generate_conversation_summary(talent_email)
summary_status = "success" if summary_result.get('status') == 'success' else "failed"
else:
summary_status = "not_needed"
return {
'status': 'success',
'turns_completed': 1,
'goal_achieved': goal_achieved,
'summary_status': summary_status,
'email_sent': True,
'conversation_id': conversation_id
}
else:
# 没有新的达人回复,不需要回复
return {
'status': 'success',
'message': '没有新的达人回复,不需要回复',
'turns_completed': 0,
'goal_achieved': False,
'email_sent': False,
'conversation_id': conversation_id
}
except Exception as e:
logger.error(f"自动聊天会话出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
def simulate_user_reply(ai_response, turn_count):
"""
模拟用户回复
参数:
ai_response: AI的回复内容
turn_count: 当前对话轮次
返回:
str: 模拟的用户回复
"""
# 根据对话轮次和AI回复生成不同的回复模板
if turn_count == 0:
return "你好,很高兴认识你。我对你提到的合作很感兴趣,能详细说说你们公司的情况吗?"
if "价格" in ai_response or "报价" in ai_response:
return "价格是我比较关注的,你们能提供什么样的价格方案?我希望能有一个灵活的合作模式。"
if "合作模式" in ai_response or "合作方式" in ai_response:
return "这种合作模式听起来不错。我目前的粉丝群体主要是25-35岁的女性她们对美妆和生活方式类产品比较感兴趣。你觉得这和你们的产品匹配吗"
if "产品" in ai_response:
return "产品听起来很不错。我想了解一下发货和售后是如何处理的?这对我的粉丝体验很重要。"
if "合同" in ai_response or "协议" in ai_response:
return "我对合同条款没有太大问题,但希望能保持一定的创作自由度。什么时候可以开始合作?"
# 默认回复
return "这些信息很有帮助,谢谢。我需要再考虑一下,有什么其他你想告诉我的吗?"
def send_goal_achieved_notification(user, talent_email, conversation_id):
"""
发送目标达成通知
参数:
user: 用户对象
talent_email: 达人邮箱
conversation_id: 对话ID
"""
try:
from user_management.models import Notification
# 创建通知
notification = Notification.objects.create(
type='system_notice',
title=f"目标已达成 - {talent_email}",
content=f"与达人 {talent_email} 的自动对话已达成总目标。请查看对话详情并进行后续处理。",
sender=user, # 这里发送者设为当前用户
receiver=user, # 通知发给当前用户
related_resource=conversation_id
)
logger.info(f"已创建目标达成通知: {notification.id}")
except Exception as e:
logger.error(f"发送目标达成通知失败: {str(e)}")
logger.error(traceback.format_exc())
def print_help():
"""打印帮助信息"""
print("飞书多维表格自动AI对话工具")
print("=======================")
print("\n可用命令:")
print(" 1. 从飞书读取表格并处理重复邮箱:")
print(" python feishu_ai_chat.py process_table --app_token 应用TOKEN --table_id 表格ID --view_id 视图ID --access_token 访问令牌")
print()
print(" 2. 为指定邮箱执行自动对话:")
print(" python feishu_ai_chat.py auto_chat --email 达人邮箱 [--turns 对话轮数]")
print()
print(" 3. 设置用户总目标:")
print(" python feishu_ai_chat.py set_goal --email 达人邮箱 --goal \"总目标内容\"")
print()
print(" 4. 检查目标完成状态:")
print(" python feishu_ai_chat.py check_goal --email 达人邮箱")
print()
print(" 5. 帮助信息:")
print(" python feishu_ai_chat.py help")
print()
print("示例:")
print(" python feishu_ai_chat.py process_table --table_id tblcck2za8GZBliz --view_id vewSOIsmxc")
print(" python feishu_ai_chat.py auto_chat --email example@gmail.com --turns 5")
print()
def handle_process_table(args):
"""处理飞书表格命令"""
import argparse
parser = argparse.ArgumentParser(description='处理飞书表格')
parser.add_argument('--app_token', default="XYE6bMQUOaZ5y5svj4vcWohGnmg", help='飞书应用TOKEN')
parser.add_argument('--table_id', required=True, help='表格ID')
parser.add_argument('--view_id', required=True, help='视图ID')
parser.add_argument('--access_token', default=None, help='用户访问令牌')
parser.add_argument('--app_id', default="cli_a5c97daacb9e500d", help='应用ID')
parser.add_argument('--app_secret', default="fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y", help='应用密钥')
parser.add_argument('--goal_template', default="与达人{handle}(邮箱:{email})建立联系并了解其账号情况,评估合作潜力,处理合作需求,最终目标是达成合作并签约。", help='目标内容模板')
parser.add_argument('--auto_chat', action='store_true', help='是否自动执行AI对话')
parser.add_argument('--turns', type=int, default=5, help='自动对话轮次')
params = parser.parse_args(args)
# 从飞书表格获取记录
records = fetch_table_records(
params.app_token,
params.table_id,
params.view_id,
params.access_token,
params.app_id,
params.app_secret
)
if not records:
logger.error("未获取到任何记录")
return
# 查找重复邮箱的创作者
duplicate_emails = find_duplicate_email_creators(records)
if not duplicate_emails:
logger.info("未发现重复邮箱")
return
logger.info(f"发现 {len(duplicate_emails)} 个重复邮箱,开始处理...")
# 处理重复邮箱记录
results = process_duplicate_emails(duplicate_emails, params.goal_template)
# 打印处理结果
logger.info(f"处理完成: 总计 {results['total']} 个邮箱,成功 {results['success']} 个,失败 {results['failure']}")
# 如果需要自动对话
if params.auto_chat:
# 获取组长用户
User = get_user_model()
leader = User.objects.filter(role='leader').first()
if not leader:
logger.error("未找到组长用户,无法进行自动对话")
return
# 为每个成功创建的记录执行自动对话
chat_results = []
for detail in results['details']:
if detail['status'] == 'success':
email = detail['email']
logger.info(f"开始为邮箱 {email} 执行自动对话...")
chat_result = auto_chat_session(leader, email, max_turns=params.turns)
chat_results.append({
'email': email,
'result': chat_result
})
logger.info(f"邮箱 {email} 自动对话完成: {chat_result['status']}")
# 打印对话结果
logger.info(f"自动对话完成: 总计 {len(chat_results)} 个对话")
for chat in chat_results:
result = chat['result']
if result['status'] == 'success':
logger.info(f"邮箱 {chat['email']} 对话成功,轮次: {result['turns_completed']},目标达成: {result['goal_achieved']}")
else:
logger.info(f"邮箱 {chat['email']} 对话失败: {result.get('message', 'Unknown error')}")
def handle_auto_chat(args):
"""处理自动对话命令"""
import argparse
parser = argparse.ArgumentParser(description='执行自动对话')
parser.add_argument('--email', required=True, help='达人邮箱')
parser.add_argument('--force_send', action='store_true', help='是否强制发送新邮件')
parser.add_argument('--subject', help='邮件主题(仅当force_send=true时使用)')
parser.add_argument('--content', help='邮件内容(仅当force_send=true时使用)')
params = parser.parse_args(args)
# 获取组长用户
User = get_user_model()
leader = User.objects.filter(role='leader').first()
if not leader:
logger.error("未找到组长用户,无法进行自动对话")
return
# 如果是强制发送模式
if params.force_send:
if not params.content:
logger.error("当force_send=true时必须提供content参数")
return
try:
# 获取知识库映射
mapping = GmailTalentMapping.objects.filter(
user=leader,
talent_email=params.email,
is_active=True
).first()
if not mapping:
logger.error(f"找不到与邮箱 {params.email} 的映射关系")
return
# 创建Gmail集成实例
gmail_integration = GmailIntegration(leader)
# 直接发送邮件
mail_subject = params.subject if params.subject else "关于合作的洽谈"
mail_result = gmail_integration.send_email(
to_email=params.email,
subject=mail_subject,
body=params.content,
conversation_id=mapping.conversation_id
)
if mail_result['status'] != 'success':
logger.error(f"邮件发送失败: {mail_result.get('message', 'Unknown error')}")
return
# 保存发送的内容到对话历史
ChatHistory.objects.create(
user=leader,
knowledge_base=mapping.knowledge_base,
conversation_id=mapping.conversation_id,
role='assistant',
content=params.content
)
logger.info(f"已强制发送邮件到 {params.email}")
return {
'status': 'success',
'message': f"已强制发送邮件到 {params.email}",
'email_sent': True,
'conversation_id': mapping.conversation_id
}
except Exception as e:
logger.error(f"强制发送邮件时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': str(e)
}
# 执行自动对话
result = auto_chat_session(leader, params.email)
# 打印结果
if result['status'] == 'success':
if result.get('email_sent'):
logger.info(f"自动对话成功,已发送邮件到 {params.email}")
else:
logger.info(f"没有新的达人回复,不需要发送邮件")
else:
logger.error(f"自动对话失败: {result.get('message', 'Unknown error')}")
return result
def handle_set_goal(args):
"""处理设置目标命令"""
import argparse
parser = argparse.ArgumentParser(description='设置用户总目标')
parser.add_argument('--email', required=True, help='达人邮箱')
parser.add_argument('--goal', required=True, help='目标内容')
params = parser.parse_args(args)
# 获取组长用户
User = get_user_model()
leader = User.objects.filter(role='leader').first()
if not leader:
logger.error("未找到组长用户,无法设置目标")
return
# 设置总目标
gmail_integration = GmailIntegration(leader)
result = gmail_integration.manage_user_goal(params.goal)
# 打印结果
if result['status'] == 'success':
logger.info(f"设置总目标成功: {result['action']}")
else:
logger.error(f"设置总目标失败: {result.get('message', 'Unknown error')}")
return result
def handle_check_goal(args):
"""处理检查目标完成状态命令"""
import argparse
parser = argparse.ArgumentParser(description='检查目标完成状态')
parser.add_argument('--email', required=True, help='达人邮箱')
params = parser.parse_args(args)
# 获取组长用户
User = get_user_model()
leader = User.objects.filter(role='leader').first()
if not leader:
logger.error("未找到组长用户,无法检查目标")
return
# 查找Gmail映射关系
mapping = GmailTalentMapping.objects.filter(
user=leader,
talent_email=params.email,
is_active=True
).first()
if not mapping:
logger.error(f"找不到与邮箱 {params.email} 的映射关系")
return {
'status': 'error',
'message': f"找不到与邮箱 {params.email} 的映射关系"
}
# 获取对话历史中最后的AI回复
last_ai_message = ChatHistory.objects.filter(
user=leader,
knowledge_base=mapping.knowledge_base,
conversation_id=mapping.conversation_id,
role='assistant',
is_deleted=False
).order_by('-created_at').first()
if not last_ai_message:
logger.error(f"找不到与邮箱 {params.email} 的对话历史")
return {
'status': 'error',
'message': f"找不到与邮箱 {params.email} 的对话历史"
}
# 检查目标是否已达成
goal_achieved = check_goal_achieved(last_ai_message.content)
# 获取对话总结
summary = ConversationSummary.objects.filter(
user=leader,
talent_email=params.email,
is_active=True
).order_by('-updated_at').first()
result = {
'status': 'success',
'email': params.email,
'goal_achieved': goal_achieved,
'last_message_time': last_ai_message.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'summary': summary.summary if summary else None
}
# 打印结果
logger.info(f"目标状态检查结果:")
logger.info(f"邮箱: {params.email}")
logger.info(f"目标达成: {goal_achieved}")
logger.info(f"最后消息时间: {result['last_message_time']}")
return result
def main():
"""命令行入口函数"""
import sys
if len(sys.argv) < 2 or sys.argv[1] == 'help':
print_help()
return
command = sys.argv[1]
args = sys.argv[2:]
if command == 'process_table':
handle_process_table(args)
elif command == 'auto_chat':
handle_auto_chat(args)
elif command == 'set_goal':
handle_set_goal(args)
elif command == 'check_goal':
handle_check_goal(args)
else:
print(f"未知命令: {command}")
print_help()
if __name__ == "__main__":
main()