daren_project/user_management/feishu_chat_views.py
2025-04-29 10:22:57 +08:00

712 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import traceback
import os
import pandas as pd
from django.http import FileResponse, HttpResponse
from django.conf import settings
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from user_management.models import (
GmailTalentMapping, ChatHistory, ConversationSummary
)
from user_management.gmail_integration import GmailIntegration
from feishu.feishu_ai_chat import (
fetch_table_records, find_duplicate_email_creators,
process_duplicate_emails, auto_chat_session,
check_goal_achieved, export_feishu_creators_to_excel,
export_matching_emails_to_excel
)
logger = logging.getLogger(__name__)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def process_feishu_table(request):
"""
从飞书多维表格读取数据,处理重复邮箱
请求参数:
table_id: 表格ID
view_id: 视图ID
app_token: 飞书应用TOKEN (可选)
access_token: 用户访问令牌 (可选)
app_id: 应用ID (可选)
app_secret: 应用密钥 (可选)
goal_template: 目标内容模板 (可选)
auto_chat: 是否自动执行AI对话 (可选)
turns: 自动对话轮次 (可选)
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
# 获取参数
table_id = request.data.get("table_id", "tbl3oikG3F8YYtVA") # 默认表格ID
view_id = request.data.get("view_id", "vewSOIsmxc") # 默认视图ID
app_token = request.data.get("app_token", "XYE6bMQUOaZ5y5svj4vcWohGnmg")
access_token = request.data.get("access_token", "u-fK0HvbXVte.G2xzYs5oxV6k1nHu1glvFgG00l0Ma24VD")
app_id = request.data.get("app_id", "cli_a5c97daacb9e500d")
app_secret = request.data.get("app_secret", "fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y")
goal_template = request.data.get(
"goal_template",
"与达人{handle}(邮箱:{email})建立联系并了解其账号情况,评估合作潜力,处理合作需求,最终目标是达成合作并签约。"
)
auto_chat = request.data.get("auto_chat", False)
turns = request.data.get("turns", 5)
logger.info(f"处理飞书表格数据: table_id={table_id}, view_id={view_id}, app_id={app_id}")
# 从飞书表格获取记录
records = fetch_table_records(
app_token,
table_id,
view_id,
access_token,
app_id,
app_secret
)
if not records:
logger.warning("未获取到任何记录可能是表格ID或视图ID不正确或无权限访问")
# 尝试使用SDK中的search方法直接获取
try:
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import (
SearchAppTableRecordRequest,
SearchAppTableRecordRequestBody
)
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求对象
request = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(20) \
.request_body(SearchAppTableRecordRequestBody.builder().build()) \
.build()
# 发起请求
option = lark.RequestOption.builder().user_access_token(access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
logger.error(f"直接搜索请求失败: {response.code}, {response.msg}")
return Response(
{"message": f"未获取到任何记录,错误: {response.msg}"},
status=status.HTTP_404_NOT_FOUND
)
# 获取记录
records = response.data.items
if not records:
return Response(
{"message": "未获取到任何记录"},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
logger.error(f"尝试直接搜索时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"message": f"未获取到任何记录,错误: {str(e)}"},
status=status.HTTP_404_NOT_FOUND
)
# 查找重复邮箱的创作者
duplicate_emails = find_duplicate_email_creators(records)
if not duplicate_emails:
return Response(
{"message": "未找到与系统中已有creator匹配的邮箱"},
status=status.HTTP_200_OK
)
# 处理匹配的邮箱记录
results = process_duplicate_emails(duplicate_emails, goal_template)
# 如果需要自动对话
chat_results = []
if auto_chat and results['success'] > 0:
# 为每个成功创建的记录执行自动对话
for detail in results['details']:
if detail['status'] == 'success':
email = detail['email']
chat_result = auto_chat_session(request.user, email, max_turns=turns)
chat_results.append({
'email': email,
'result': chat_result
})
# 返回处理结果
return Response({
'status': 'success',
'records_count': len(records),
'duplicate_emails_count': len(duplicate_emails),
'processing_results': results,
'chat_results': chat_results
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"处理飞书表格时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def run_auto_chat(request):
"""
为指定邮箱执行自动对话
请求参数:
email: 达人邮箱
force_send: 是否强制发送新邮件(即使没有新回复)(可选)
subject: 邮件主题(可选仅当force_send=true时使用)
content: 邮件内容(可选仅当force_send=true时使用)
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
# 获取参数
email = request.data.get("email")
force_send = request.data.get("force_send", False)
subject = request.data.get("subject")
content = request.data.get("content")
# 验证必要参数
if not email:
return Response(
{"error": "缺少参数email"},
status=status.HTTP_400_BAD_REQUEST
)
# 如果强制发送且没有提供内容
if force_send and not content:
return Response(
{"error": "当force_send=true时必须提供content参数"},
status=status.HTTP_400_BAD_REQUEST
)
# 首先尝试同步最新邮件
try:
# 创建Gmail集成实例
gmail_integration = GmailIntegration(request.user)
# 同步最新邮件
logger.info(f"正在同步与 {email} 的最新邮件...")
sync_result = gmail_integration.sync_talent_emails(email)
if sync_result.get('status') == 'success':
logger.info(f"成功同步邮件: {sync_result.get('message', 'No message')}")
else:
logger.warning(f"同步邮件警告: {sync_result.get('message', 'Unknown warning')}")
except Exception as e:
logger.error(f"同步邮件出错: {str(e)}")
logger.error(traceback.format_exc())
# 仅记录错误,不中断流程
# 如果是强制发送模式
if force_send:
try:
# 创建Gmail集成实例
gmail_integration = GmailIntegration(request.user)
# 检查Gmail服务是否已正确初始化
if not hasattr(gmail_integration, 'gmail_service') or gmail_integration.gmail_service is None:
return Response(
{"error": "Gmail服务未正确初始化请检查Gmail API配置"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# 获取知识库映射
mapping = GmailTalentMapping.objects.filter(
user=request.user,
talent_email=email,
is_active=True
).first()
if not mapping:
return Response(
{"error": f"找不到与邮箱 {email} 的映射关系"},
status=status.HTTP_404_NOT_FOUND
)
# 直接发送邮件
mail_subject = subject if subject else "关于合作的洽谈"
mail_result = gmail_integration.send_email(
to_email=email,
subject=mail_subject,
body=content,
conversation_id=mapping.conversation_id
)
if mail_result['status'] != 'success':
return Response(
{"error": f"邮件发送失败: {mail_result.get('message', 'Unknown error')}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# 保存发送的内容到对话历史
ChatHistory.objects.create(
user=request.user,
knowledge_base=mapping.knowledge_base,
conversation_id=mapping.conversation_id,
role='assistant',
content=content
)
return Response({
'status': 'success',
'message': f"已强制发送邮件到 {email}",
'email_sent': True,
'conversation_id': mapping.conversation_id
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"强制发送邮件时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# 执行自动对话
result = auto_chat_session(request.user, email)
# 返回结果
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"执行自动对话时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['GET', 'POST'])
@permission_classes([IsAuthenticated])
def feishu_user_goal(request):
"""
设置或获取用户总目标
GET 请求:
获取当前用户总目标
POST 请求参数:
email: 达人邮箱
goal: 目标内容
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
if request.method == 'GET':
# 创建Gmail集成实例
gmail_integration = GmailIntegration(request.user)
# 获取总目标
result = gmail_integration.manage_user_goal()
return Response(result, status=status.HTTP_200_OK)
elif request.method == 'POST':
# 获取参数
email = request.data.get("email")
goal = request.data.get("goal")
# 验证必要参数
if not email:
return Response(
{"error": "缺少参数email"},
status=status.HTTP_400_BAD_REQUEST
)
if not goal:
return Response(
{"error": "缺少参数goal"},
status=status.HTTP_400_BAD_REQUEST
)
# 设置用户总目标
gmail_integration = GmailIntegration(request.user)
result = gmail_integration.manage_user_goal(goal)
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"管理用户总目标时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def check_goal_status(request):
"""
检查目标完成状态
请求参数:
email: 达人邮箱
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
# 获取参数
email = request.query_params.get("email")
# 验证必要参数
if not email:
return Response(
{"error": "缺少参数email"},
status=status.HTTP_400_BAD_REQUEST
)
# 查找Gmail映射关系
mapping = GmailTalentMapping.objects.filter(
user=request.user,
talent_email=email,
is_active=True
).first()
if not mapping:
return Response(
{"error": f"找不到与邮箱 {email} 的映射关系"},
status=status.HTTP_404_NOT_FOUND
)
# 获取对话历史中最后的AI回复
last_ai_message = ChatHistory.objects.filter(
user=request.user,
knowledge_base=mapping.knowledge_base,
conversation_id=mapping.conversation_id,
role='assistant',
is_deleted=False
).order_by('-created_at').first()
if not last_ai_message:
return Response(
{"error": f"找不到与邮箱 {email} 的对话历史"},
status=status.HTTP_404_NOT_FOUND
)
# 检查目标是否已达成
goal_achieved = check_goal_achieved(last_ai_message.content)
# 获取对话总结
summary = ConversationSummary.objects.filter(
user=request.user,
talent_email=email,
is_active=True
).order_by('-updated_at').first()
result = {
'status': 'success',
'email': email,
'goal_achieved': goal_achieved,
'last_message_time': last_ai_message.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'last_message': last_ai_message.content,
'summary': summary.summary if summary else None
}
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"检查目标状态时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def export_creators_data(request):
"""
导出匹配的FeishuCreator数据到Excel文件
请求参数:
table_id: 表格ID
view_id: 视图ID
app_token: 飞书应用TOKEN (可选)
access_token: 用户访问令牌 (可选)
app_id: 应用ID (可选)
app_secret: 应用密钥 (可选)
export_type: 导出类型,'creators''feishu',默认为'creators'
format: 格式,支持'excel''csv',默认为'excel'
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
# 获取参数
table_id = request.data.get("table_id", "tbl3oikG3F8YYtVA") # 默认表格ID
view_id = request.data.get("view_id", "vewSOIsmxc") # 默认视图ID
app_token = request.data.get("app_token", "XYE6bMQUOaZ5y5svj4vcWohGnmg")
access_token = request.data.get("access_token", "u-fK0HvbXVte.G2xzYs5oxV6k1nHu1glvFgG00l0Ma24VD")
app_id = request.data.get("app_id", "cli_a5c97daacb9e500d")
app_secret = request.data.get("app_secret", "fdVeOCLXmuIHZVmSV0VbJh9wd0Kq1o5y")
export_type = request.data.get("export_type", "creators") # 导出类型creators或feishu
export_format = request.data.get("format", "excel") # 导出格式excel或csv
if export_format not in ["excel", "csv"]:
return Response(
{"error": "当前支持的格式有: excel, csv"},
status=status.HTTP_400_BAD_REQUEST
)
logger.info(f"导出飞书数据: table_id={table_id}, view_id={view_id}, type={export_type}, format={export_format}")
# 从飞书表格获取记录
records = fetch_table_records(
app_token,
table_id,
view_id,
access_token,
app_id,
app_secret
)
if not records:
logger.warning("未获取到任何记录可能是表格ID或视图ID不正确或无权限访问")
return Response(
{"message": "未获取到任何记录"},
status=status.HTTP_404_NOT_FOUND
)
# 查找重复邮箱的创作者
duplicate_emails = find_duplicate_email_creators(records)
if not duplicate_emails:
return Response(
{"message": "未找到与系统中已有creator匹配的邮箱"},
status=status.HTTP_404_NOT_FOUND
)
# 创建存储导出文件的目录
export_dir = os.path.join(settings.MEDIA_ROOT, 'exports')
os.makedirs(export_dir, exist_ok=True)
# 根据导出类型和格式选择输出文件名
if export_type == "creators":
file_prefix = "feishu_creators"
else:
file_prefix = "feishu_data"
if export_format == "excel":
file_ext = ".xlsx"
else:
file_ext = ".csv"
output_filename = f"{file_prefix}_{request.user.id}{file_ext}"
output_path = os.path.join(export_dir, output_filename)
# 根据导出类型选择导出函数
if export_type == "creators":
# 导出FeishuCreator数据
if export_format == "excel":
excel_path = export_feishu_creators_to_excel(duplicate_emails, output_path)
if not excel_path:
return Response(
{"error": "导出FeishuCreator数据失败"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
else:
# CSV格式导出直接处理
try:
# 获取所有匹配的邮箱
emails = list(duplicate_emails.keys())
# 从数据库获取FeishuCreator记录
from user_management.models import FeishuCreator
creators = FeishuCreator.objects.filter(email__in=emails)
if not creators.exists():
return Response(
{"error": "没有找到匹配的FeishuCreator记录"},
status=status.HTTP_404_NOT_FOUND
)
# 创建数据列表
data = []
for creator in creators:
# 处理datetime字段移除时区信息
created_at = creator.created_at
if hasattr(created_at, 'tzinfo') and created_at.tzinfo is not None:
created_at = created_at.replace(tzinfo=None)
updated_at = creator.updated_at
if hasattr(updated_at, 'tzinfo') and updated_at.tzinfo is not None:
updated_at = updated_at.replace(tzinfo=None)
row = {
'id': str(creator.id),
'handle': creator.handle,
'email': creator.email,
'phone': creator.phone,
'created_at': created_at,
'updated_at': updated_at,
# 其他需要的字段...
}
data.append(row)
# 创建DataFrame并导出到CSV
df = pd.DataFrame(data)
df.to_csv(output_path, index=False, encoding='utf-8-sig') # 使用BOM标记以支持中文
excel_path = output_path
except Exception as e:
logger.error(f"导出CSV时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": f"导出CSV失败: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
elif export_type == "feishu":
# 导出飞书原始数据
if export_format == "excel":
excel_path = export_matching_emails_to_excel(duplicate_emails, records, output_path)
if not excel_path:
return Response(
{"error": "导出飞书数据失败"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
else:
# CSV格式导出
try:
# 创建数据列表
data = []
for email, email_records in duplicate_emails.items():
for record in email_records:
fields = record.fields
row = {
'Email': email,
'RecordID': record.record_id
}
# 提取所有字段
for field_name, field_value in fields.items():
row[field_name] = extract_field_value(field_value)
data.append(row)
# 创建DataFrame并导出到CSV
df = pd.DataFrame(data)
df.to_csv(output_path, index=False, encoding='utf-8-sig')
excel_path = output_path
except Exception as e:
logger.error(f"导出CSV时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": f"导出CSV失败: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
else:
return Response(
{"error": f"不支持的导出类型: {export_type},可选值为'creators''feishu'"},
status=status.HTTP_400_BAD_REQUEST
)
# 获取服务器域名,考虑各种情况
domain = request.build_absolute_uri('/').rstrip('/')
# 如果是本地开发环境使用127.0.0.1:8000
if 'localhost' in domain or '127.0.0.1' in domain:
# 从请求头获取Host
host = request.META.get('HTTP_HOST', '127.0.0.1:8000')
if ':' not in host:
host = f"{host}:8000" # 添加默认端口
domain = f"http://{host}"
# 构建下载URL
file_url = f"{domain}/api/feishu/download/{output_filename}"
return Response({
"status": "success",
"message": f"成功导出{export_type}数据,格式为{export_format}",
"matched_emails": len(duplicate_emails),
"file_url": file_url,
"file_name": output_filename
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"导出数据时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def download_exported_file(request, filename):
"""
下载已导出的Excel文件
URL参数:
filename: 文件名
"""
try:
# 检查用户权限 - 只允许组长使用
if request.user.role != 'leader':
return Response(
{"error": "只有组长角色的用户可以使用此功能"},
status=status.HTTP_403_FORBIDDEN
)
# 构建文件路径
file_path = os.path.join(settings.MEDIA_ROOT, 'exports', filename)
# 检查文件是否存在
if not os.path.exists(file_path):
return Response(
{"error": f"文件不存在: {filename}"},
status=status.HTTP_404_NOT_FOUND
)
# 返回文件
response = FileResponse(open(file_path, 'rb'))
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
except Exception as e:
logger.error(f"下载文件时出错: {str(e)}")
logger.error(traceback.format_exc())
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)