通过飞书同步数据库创建达人知识库

This commit is contained in:
wanjia 2025-04-15 16:48:19 +08:00
parent 7ffb0de74c
commit 90656ab36a
11 changed files with 2626 additions and 252 deletions

178
feishu/README.md Normal file
View File

@ -0,0 +1,178 @@
# 飞书数据同步工具
这个工具提供了与飞书多维表格进行数据同步的功能,支持双向同步:从飞书导入数据到数据库,以及从数据库导出数据到飞书。同时支持将达人数据同步到外部知识库。
## 功能特点
- 从飞书多维表格导入数据到数据库
- 从Excel文件更新数据库记录
- 从数据库导出数据到飞书多维表格
- 从特定数据库模型导入数据到飞书
- 从自定义SQL查询导入数据到飞书
- 将达人信息同步到外部知识库
- 支持Gmail和飞书达人信息的统一管理
- 支持自定义字段映射
- 支持记录过滤和数量限制
## 使用方法
### 显示帮助信息
```bash
python feishu.py help
```
### 从飞书导入数据到数据库
```bash
python feishu.py sync_from_feishu
```
### 从Excel文件更新数据库记录
```bash
python feishu.py update_from_excel <excel文件路径>
```
### 从数据库导出数据到飞书多维表格
```bash
python feishu.py export_to_feishu [--handle 关键字] [--limit 数量]
```
例如导出Handle包含"tiktok"的前10条记录
```bash
python feishu.py export_to_feishu --handle tiktok --limit 10
```
### 从特定数据库模型导入数据到飞书
```bash
python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]
```
支持的模型:
- Data数据模型
- KnowledgeBase知识库模型
- FeishuCreator飞书创作者模型
例如,导入技术部的管理员数据:
```bash
python feishu.py import_from_db Data --filters type=admin,department=技术部 --limit 50
```
### 从自定义SQL查询导入数据到飞书
```bash
python feishu.py custom_import <sql_file>
```
SQL文件格式示例
```sql
-- 查询语句
SELECT id, name, type FROM your_table WHERE condition;
-- FIELD_MAPPING:
{
"id": "飞书ID字段",
"name": "飞书名称字段",
"type": "飞书类型字段"
}
```
### 将达人信息同步到知识库
```bash
python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>
```
例如,同步特定达人信息到知识库:
```bash
python feishu.py sync_to_kb --handle tiktok_user123
```
## 示例
在`examples`目录中提供了几个示例文件:
- `custom_query.sql`自定义SQL查询示例包含字段映射
- `creator_kb_query.sql`查询达人信息并检查知识库映射的SQL示例
- `sample_data_template.txt`Excel导入模板说明
## REST API接口
除了命令行工具外还提供了以下REST API接口
### 1. 飞书数据同步API
```
POST /api/feishu/sync
```
参数:
- `sync_type` (可选):同步类型,默认为"all"
- `handle` (可选)指定特定的达人Handle
- `create_kb` (可选)是否创建知识库默认为false
### 2. 达人信息同步到知识库API
```
POST /api/feishu/to_kb
```
参数:
- `creator_id`、`handle`或`email`:至少提供一个参数用于识别达人
### 3. 检查达人知识库API
```
POST /api/feishu/check_kb
```
参数:
- `creator_id`、`handle`或`email`:至少提供一个参数用于识别达人
## 与Gmail集成
本工具支持与Gmail系统集成实现达人信息的统一管理
1. 当通过飞书同步达人信息到知识库时会检查该达人是否已有Gmail映射
2. 如果该达人邮箱已有Gmail映射则使用现有知识库
3. 如果没有映射,则创建新的知识库并建立映射关系
4. 知识库文档采用追加模式,保留历史记录
## 配置
飞书API配置位于代码中包括
- APP_TOKEN飞书应用的AppToken
- TABLE_ID飞书多维表格的TableID
- USER_ACCESS_TOKEN用户访问令牌
如需修改这些配置,请编辑相应的函数中的变量。
## 注意事项
1. 确保已安装所需的依赖:`lark_oapi`, `pandas`, `django`
2. 在执行数据库操作前,确保数据库连接已正确配置
3. 自定义SQL查询应避免执行修改数据的操作(INSERT, UPDATE, DELETE等)
4. 导入大量数据时,可能需要分批处理,使用`--limit`参数控制数量
5. 创建知识库需要管理员权限,确保有可用的管理员账号
## 常见问题
### Q: 导入数据时出现字段不匹配错误?
A: 确保字段名称与飞书多维表格中的字段名称完全匹配,或使用字段映射功能。
### Q: 如何添加新的数据模型支持?
A: 在`import_from_database_to_feishu`函数中添加新的模型映射,并按照现有模式实现字段映射逻辑。
### Q: 如何处理Gmail和飞书同一个达人的数据合并
A: 系统会自动检查达人邮箱如果在Gmail中已有映射会使用同一个知识库避免重复创建。

View File

@ -0,0 +1,36 @@
-- 查询所有有邮箱的达人,检查是否已有知识库映射
SELECT
fc.id as creator_id,
fc.handle as handle,
fc.email as email,
fc.contact_person as contact_person,
fc.fans_count as fans_count,
fc.tiktok_url as tiktok_url,
gtm.knowledge_base_id as kb_id,
kb.name as kb_name,
kb.document_count as doc_count
FROM
feishu_creators fc
LEFT JOIN
gmail_talent_mappings gtm ON fc.email = gtm.talent_email AND gtm.is_active = 1
LEFT JOIN
knowledge_bases kb ON gtm.knowledge_base_id = kb.id
WHERE
fc.email IS NOT NULL AND fc.email != ''
ORDER BY
fc.updated_at DESC
LIMIT
50;
-- FIELD_MAPPING:
{
"creator_id": "达人ID",
"handle": "达人Handle",
"email": "达人邮箱",
"contact_person": "对接人",
"fans_count": "粉丝数",
"tiktok_url": "链接",
"kb_id": "知识库ID",
"kb_name": "知识库名称",
"doc_count": "文档数量"
}

View File

@ -0,0 +1,42 @@
# Excel文件模板说明
您可以使用Excel文件来批量更新数据库中的记录。Excel文件应包含以下列
## 必需字段
- `Handle`:用于匹配数据库中的记录,此字段必填
## 可选字段(仅更新不为空的字段)
- `contact_person`:对接人
- `tiktok_url`:链接
- `fans_count`:粉丝数
- `gmv`GMV
- `email`:邮箱
- `phone`:手机号|WhatsApp
- `account_type`:账号属性
- `price_quote`:报价
- `response_speed`:回复速度
- `cooperation_intention`:合作意向
- `payment_method`:支付方式
- `payment_account`:收款账号
- `address`:收件地址
- `has_ooin`签约OOIN?
- `source`:渠道来源
- `contact_status`:建联进度
- `system_categories`:系统展示的带货品类
- `actual_categories`:实际高播放量带货品类
- `human_categories`:达人标想要货品类
## 示例
| Handle | contact_person | fans_count | email | phone |
|------------|----------------|------------|---------------------|------------|
| user1 | 张三 | 10000 | user1@example.com | 13800000001|
| user2 | 李四 | 20000 | user2@example.com | 13800000002|
## 注意事项
1. 导入时会根据Handle匹配记录如果找不到匹配的记录则会跳过
2. 只有非空字段会被更新,空字段会保留原值
3. 确保Excel文件保存为.xlsx格式

View File

@ -3,7 +3,14 @@ import django
import os import os
import sys import sys
import pandas as pd import pandas as pd
import requests
from django.db import transaction from django.db import transaction
from django.db.models import Q
from django.db import connection
from datetime import datetime
import traceback
import tempfile
import uuid
# 设置 Django 环境 # 设置 Django 环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -12,8 +19,12 @@ django.setup()
import lark_oapi as lark import lark_oapi as lark
from lark_oapi.api.bitable.v1 import * from lark_oapi.api.bitable.v1 import *
from user_management.models import FeishuCreator from user_management.models import FeishuCreator, Data, KnowledgeBase, KnowledgeBaseDocument
from django.conf import settings
import logging
from django.contrib.auth import get_user_model
logger = logging.getLogger(__name__)
# SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development # SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development
# 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用 # 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用
@ -217,8 +228,128 @@ def update_from_excel(excel_file_path):
except Exception as e: except Exception as e:
print(f"处理Excel文件时出错: {str(e)}") print(f"处理Excel文件时出错: {str(e)}")
def sync_from_feishu(): def sync_from_feishu(app_token=None, table_id=None, user_access_token=None, filters=None):
"""从飞书多维表格同步数据""" """从飞书同步数据到系统
参数:
app_token: 飞书应用的APP_TOKEN如不提供则使用默认值
table_id: 飞书多维表格的TABLE_ID如不提供则使用默认值
user_access_token: 用户访问令牌如不提供则使用默认值
filters: 筛选条件字典格式
返回:
dict: 包含同步统计信息的字典
"""
try:
# 使用传入的参数或默认值
app_token = app_token or settings.FEISHU_APP_TOKEN
table_id = table_id or settings.FEISHU_TABLE_ID
user_access_token = user_access_token or settings.FEISHU_USER_ACCESS_TOKEN
if not all([app_token, table_id, user_access_token]):
logger.error("飞书API凭据不完整")
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': '飞书API凭据不完整'
}
# 初始化统计信息
created_count = 0
updated_count = 0
error_count = 0
created_creators = []
# 获取飞书数据
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
print("开始从飞书同步数据...")
all_records = fetch_all_records(client, app_token, table_id, user_access_token)
if not all_records:
print("未获取到任何记录")
return {
'created': 0,
'updated': 0,
'errors': 0,
'total': 0
}
print("\n开始更新数据库...")
for record in all_records:
creator, created = save_to_database(record)
if creator:
if created:
created_count += 1
if created_count % 10 == 0:
print(f"已创建 {created_count} 条记录...")
created_creators.append(creator)
else:
updated_count += 1
if updated_count % 10 == 0:
print(f"已更新 {updated_count} 条记录...")
else:
error_count += 1
print(f"处理记录失败")
print("\n飞书同步完成!统计信息:")
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}")
# 返回统计信息增加了created_creators列表
return {
'created': created_count,
'updated': updated_count,
'errors': error_count,
'total': created_count + updated_count,
'created_creators': created_creators # 新增:返回新创建的记录对象列表
}
except Exception as e:
logger.error(f"从飞书同步数据时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': str(e)
}
def format_field_value(value, field_type='text'):
"""格式化字段值以符合飞书多维表格API要求"""
if value is None or value == '':
return None
if field_type == 'text':
return {"text": str(value)}
elif field_type == 'number':
try:
return float(value)
except (ValueError, TypeError):
return None
elif field_type == 'checkbox':
return value.lower() in ('true', 'yes', '', '1')
elif field_type == 'multi_select':
# 多选项需要返回一个选项ID列表
if isinstance(value, str):
items = [item.strip() for item in value.split(',') if item.strip()]
return [{"text": item} for item in items]
elif isinstance(value, list):
return [{"text": str(item)} for item in value]
return []
return {"text": str(value)}
def export_to_feishu(query_set=None):
"""从数据库导出记录到飞书多维表格"""
# 创建client # 创建client
client = lark.Client.builder() \ client = lark.Client.builder() \
.enable_set_token(True) \ .enable_set_token(True) \
@ -230,61 +361,740 @@ def sync_from_feishu():
TABLE_ID = "tbl3oikG3F8YYtVA" TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy" USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从飞书同步数据...") print("开始从数据库导出数据到飞书...")
all_records = fetch_all_records(client, APP_TOKEN, TABLE_ID, USER_ACCESS_TOKEN)
if not all_records: # 如果没有提供查询集则使用所有没有record_id的记录
print("未获取到任何记录") if not query_set:
query_set = FeishuCreator.objects.filter(record_id='')
if not query_set.exists():
print("没有需要导出的记录")
return return
print("\n开始更新数据库...")
created_count = 0 created_count = 0
updated_count = 0 updated_count = 0
error_count = 0 error_count = 0
for record in all_records: for creator in query_set:
creator, created = save_to_database(record) try:
if creator: # 准备字段数据
if created: fields = {
created_count += 1 "对接人": format_field_value(creator.contact_person),
if created_count % 10 == 0: "Handle": format_field_value(creator.handle),
print(f"已创建 {created_count} 条记录...") "链接": format_field_value(creator.tiktok_url),
else: "粉丝数": format_field_value(creator.fans_count),
updated_count += 1 "GMV": format_field_value(creator.gmv),
if updated_count % 10 == 0: "邮箱": format_field_value(creator.email),
print(f"已更新 {updated_count} 条记录...") "手机号|WhatsApp": format_field_value(creator.phone),
else: "账号属性": format_field_value(creator.account_type, 'multi_select'),
error_count += 1 "报价": format_field_value(creator.price_quote),
print(f"处理记录失败") "回复速度": format_field_value(creator.response_speed),
"合作意向": format_field_value(creator.cooperation_intention),
"支付方式": format_field_value(creator.payment_method),
"收款账号": format_field_value(creator.payment_account),
"收件地址": format_field_value(creator.address),
"签约OOIN?": format_field_value(creator.has_ooin),
"渠道来源": format_field_value(creator.source),
"建联进度": format_field_value(creator.contact_status),
"合作品牌": creator.cooperation_brands,
"系统展示的带货品类": format_field_value(creator.system_categories, 'multi_select'),
"实际高播放量带货品类": format_field_value(creator.actual_categories, 'multi_select'),
"达人标想要货品类": format_field_value(creator.human_categories)
}
print("\n飞书同步完成!统计信息:") # 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
if creator.record_id:
# 更新现有记录
request = UpdateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.record_id(creator.record_id) \
.request_body(UpdateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.update(request, option)
if response.success():
updated_count += 1
print(f"已更新记录: {creator.handle}")
else:
error_count += 1
print(f"更新失败: {creator.handle}, 错误: {response.msg}")
else:
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
# 获取新记录的ID并保存
record_id = response.data.record_id
creator.record_id = record_id
creator.save()
created_count += 1
print(f"已创建记录: {creator.handle}, ID: {record_id}")
else:
error_count += 1
print(f"创建失败: {creator.handle}, 错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理记录 {creator.handle} 时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n飞书导出完成!统计信息:")
print(f"新建记录:{created_count}") print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}") print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}") print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}") print(f"总处理记录数:{created_count + updated_count + error_count}")
def import_from_database_to_feishu(model_name, filters=None, limit=None):
"""从指定数据库模型导入数据到飞书多维表格
参数:
model_name (str): 模型名称 'Data', 'KnowledgeBase'
filters (dict): 过滤条件 {'department': '技术部'}
limit (int): 限制导入的记录数量
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print(f"开始从{model_name}模型导入数据到飞书...")
# 选择模型
model_mapping = {
'Data': Data,
'KnowledgeBase': KnowledgeBase,
'FeishuCreator': FeishuCreator,
}
model_class = model_mapping.get(model_name)
if not model_class:
print(f"错误: 不支持的模型名称 {model_name}")
return
# 构建查询
query = model_class.objects.all()
# 应用过滤条件
if filters:
q_objects = Q()
for field, value in filters.items():
if isinstance(value, list):
# 对列表值使用OR查询
q_filter = Q()
for v in value:
q_filter |= Q(**{field: v})
q_objects &= q_filter
else:
q_objects &= Q(**{field: value})
query = query.filter(q_objects)
# 限制记录数量
if limit and isinstance(limit, int) and limit > 0:
query = query[:limit]
# 检查查询结果
total_count = query.count()
if total_count == 0:
print("没有符合条件的记录")
return
print(f"找到 {total_count} 条记录,准备导入到飞书...")
# 记录统计
created_count = 0
error_count = 0
# 处理不同模型的字段映射
for index, record in enumerate(query):
try:
if model_name == 'Data':
# Data模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.get_type_display()),
"部门": format_field_value(record.department),
"字符长度": format_field_value(record.char_length, 'number'),
"文档数量": format_field_value(record.document_count, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"更新时间": format_field_value(record.update_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
elif model_name == 'KnowledgeBase':
# KnowledgeBase模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.type),
"部门": format_field_value(record.department),
"小组": format_field_value(record.group),
"文档数量": format_field_value(record.document_count, 'number'),
"字符长度": format_field_value(record.char_length, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
else:
# 对于FeishuCreator模型使用export_to_feishu函数的逻辑
continue
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == total_count:
print(f"已导入 {created_count}/{total_count} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
print("\n飞书导入完成!统计信息:")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{total_count}")
def customize_import_to_feishu(sql_query, field_mapping=None):
"""从自定义SQL查询结果导入数据到飞书多维表格
参数:
sql_query (str): SQL查询语句
field_mapping (dict): 字段映射 {'db_field': 'feishu_field'}
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从自定义SQL查询导入数据到飞书...")
try:
# 执行SQL查询
with connection.cursor() as cursor:
cursor.execute(sql_query)
columns = [col[0] for col in cursor.description]
result_set = cursor.fetchall()
# 检查查询结果
if not result_set:
print("SQL查询没有返回任何结果")
return
print(f"查询返回 {len(result_set)} 条记录,准备导入到飞书...")
# 默认字段映射:数据库字段名 -> 飞书字段名
if not field_mapping:
field_mapping = {col: col for col in columns}
# 记录统计
created_count = 0
error_count = 0
# 处理每条记录
for index, row in enumerate(result_set):
try:
# 创建记录字典
record_dict = dict(zip(columns, row))
# 将数据映射到飞书字段
fields = {}
for db_field, feishu_field in field_mapping.items():
if db_field in record_dict:
value = record_dict[db_field]
# 处理不同类型的值
if isinstance(value, (int, float)):
fields[feishu_field] = format_field_value(value, 'number')
else:
fields[feishu_field] = format_field_value(value)
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == len(result_set):
print(f"已导入 {created_count}/{len(result_set)} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n自定义SQL导入完成统计信息")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{len(result_set)}")
except Exception as e:
print(f"执行SQL查询时出错: {str(e)}")
import traceback
print(traceback.format_exc())
def print_help():
"""打印帮助信息"""
print("飞书数据同步工具")
print("================")
print("\n可用命令:")
print(" 1. 从飞书导入数据到数据库:")
print(" python feishu.py sync_from_feishu")
print()
print(" 2. 从Excel文件更新数据库记录:")
print(" python feishu.py update_from_excel <excel文件路径>")
print()
print(" 3. 从数据库导出数据到飞书多维表格:")
print(" python feishu.py export_to_feishu [--handle 关键字] [--limit 数量]")
print()
print(" 4. 从特定数据库模型导入数据到飞书:")
print(" python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print(" 支持的模型: Data, KnowledgeBase, FeishuCreator")
print(" 示例: python feishu.py import_from_db Data --filters type=admin,department=技术部 --limit 50")
print()
print(" 5. 从自定义SQL查询导入数据到飞书:")
print(" python feishu.py custom_import <sql_file>")
print(" SQL文件中应包含SQL查询语句及可选的字段映射JSON")
print()
print(" 6. 将达人信息同步到知识库:")
print(" python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
print(" 示例: python feishu.py sync_to_kb --handle tiktok_user123")
print()
print(" 7. 显示帮助信息:")
print(" python feishu.py help")
print()
def sync_to_knowledge_base(creator_id=None, handle=None, email=None):
"""将达人信息同步到外部知识库
参数:
creator_id: 达人ID
handle: 达人Handle
email: 达人Email
返回:
tuple: (KnowledgeBase对象, 是否新创建)
"""
try:
import json
import requests
import tempfile
import os
from datetime import datetime
from django.conf import settings
from user_management.models import FeishuCreator, KnowledgeBase, User, KnowledgeBaseDocument
# 查找达人
query = FeishuCreator.objects.all()
if creator_id:
query = query.filter(id=creator_id)
elif handle:
query = query.filter(handle=handle)
elif email:
query = query.filter(email=email)
creator = query.first()
if not creator:
logger.error(f"未找到达人: creator_id={creator_id}, handle={handle}, email={email}")
return None, False
# 检查达人是否有足够信息
if not creator.handle:
logger.error(f"达人没有handle无法创建知识库: {creator.id}")
return None, False
# 检查是否已有通过Gmail映射的知识库
gmail_kb = None
if creator.email:
from user_management.models import GmailTalentMapping
gmail_mapping = GmailTalentMapping.objects.filter(
talent_email=creator.email,
is_active=True
).first()
if gmail_mapping and gmail_mapping.knowledge_base:
gmail_kb = gmail_mapping.knowledge_base
logger.info(f"达人 {creator.handle} 已有Gmail知识库: {gmail_kb.id}")
return gmail_kb, False
# 获取当前用户(组长)
User = get_user_model()
admin_user = User.objects.filter(role='leader').first()
if not admin_user:
logger.error("未找到组长用户,无法创建知识库")
return None, False
# 生成知识库名称
kb_name = f"达人-{creator.handle}"
# 检查是否已存在同名知识库
existed_kb = KnowledgeBase.objects.filter(name=kb_name).first()
if existed_kb:
logger.info(f"知识库 {kb_name} 已存在ID: {existed_kb.id}")
return existed_kb, False
# 创建新知识库
kb = KnowledgeBase.objects.create(
name=kb_name,
desc=f"达人 {creator.handle} 的信息知识库",
type='private', # 设置为私有知识库
user_id=admin_user.id,
department=admin_user.department,
document_count=0
)
logger.info(f"已创建知识库: {kb.id}")
# 调用内部方法创建外部知识库
from user_management.views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
try:
external_id = kb_viewset._create_external_dataset(kb)
# 保存external_id
kb.external_id = external_id
kb.save()
logger.info(f"已创建外部知识库ID: {external_id}")
except Exception as e:
logger.error(f"创建外部知识库失败: {str(e)}")
# 如果外部知识库创建失败,删除本地知识库
kb.delete()
return None, False
# 准备达人信息文档
content = []
# 基本信息
content.append("# 达人基本信息")
content.append(f"Handle: {creator.handle}")
content.append(f"邮箱: {creator.email}")
content.append(f"对接人: {creator.contact_person}")
content.append(f"链接: {creator.tiktok_url}")
content.append(f"粉丝数: {creator.fans_count}")
content.append(f"GMV: {creator.gmv}")
content.append(f"电话: {creator.phone}")
# 账号属性
content.append("\n# 账号属性")
content.append(f"账号属性: {creator.account_type}")
content.append(f"报价: {creator.price_quote}")
content.append(f"回复速度: {creator.response_speed}")
# 合作信息
content.append("\n# 合作信息")
content.append(f"合作意向: {creator.cooperation_intention}")
content.append(f"支付方式: {creator.payment_method}")
content.append(f"收款账号: {creator.payment_account}")
content.append(f"收件地址: {creator.address}")
content.append(f"签约OOIN?: {creator.has_ooin}")
# 渠道和品类
content.append("\n# 渠道和品类")
content.append(f"渠道来源: {creator.source}")
content.append(f"建联进度: {creator.contact_status}")
content.append(f"合作品牌: {creator.cooperation_brands}")
content.append(f"系统展示品类: {creator.system_categories}")
content.append(f"实际品类: {creator.actual_categories}")
content.append(f"达人想要品类: {creator.human_categories}")
# 其他信息
content.append("\n# 其他信息")
content.append(f"达人base: {creator.creator_base}")
content.append(f"备注: {creator.notes}")
content.append(f"更新时间: {creator.updated_at.strftime('%Y-%m-%d %H:%M:%S')}")
# 合并内容
full_content = "\n".join(content)
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.txt', mode='w', encoding='utf-8') as temp:
temp.write(full_content)
temp_file_path = temp.name
try:
# 准备文档上传需要的段落数据
doc_data = {
"name": f"达人信息-{creator.handle}.txt",
"paragraphs": [
{
"title": "达人基本资料",
"content": full_content,
"is_active": True,
"problem_list": []
}
]
}
# 调用上传API
upload_response = kb_viewset._call_upload_api(kb.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=kb,
document_id=document_id,
document_name=f"达人信息-{creator.handle}.txt",
external_id=document_id,
status='active'
)
logger.info(f"文档上传成功ID: {document_id}")
# 更新知识库文档计数
kb.document_count = 1
kb.save()
# 如果有邮箱创建Gmail映射
if creator.email:
from user_management.models import GmailTalentMapping
GmailTalentMapping.objects.create(
user=admin_user,
talent_email=creator.email,
knowledge_base=kb,
conversation_id=f"feishu_{creator.id}",
is_active=True
)
logger.info(f"已创建Gmail映射: {creator.email} -> {kb.id}")
logger.info(f"成功为达人 {creator.handle} 创建知识库并上传文档: {kb.id}")
return kb, True
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"文档上传失败: {error_msg}")
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
except Exception as e:
logger.error(f"文档上传过程中出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
finally:
# 清理临时文件
try:
os.unlink(temp_file_path)
except:
pass
except Exception as e:
logger.error(f"同步达人到知识库时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None, False
def main(): def main():
"""主函数""" """主函数"""
if len(sys.argv) < 2: if len(sys.argv) < 2 or sys.argv[1] == "help":
print("使用方法:") print_help()
print("1. 从飞书同步: python feishu.py sync")
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
return return
command = sys.argv[1] command = sys.argv[1]
if command == 'sync': if command == "sync_from_feishu":
sync_from_feishu() sync_from_feishu()
elif command == 'excel': elif command == "update_from_excel":
if len(sys.argv) != 3: if len(sys.argv) < 3:
print("使用方法: python feishu.py excel <excel文件路径>") print("错误: 未指定Excel文件路径")
print("用法: python feishu.py update_from_excel <excel文件路径>")
return return
excel_file_path = sys.argv[2] excel_file_path = sys.argv[2]
update_from_excel(excel_file_path) update_from_excel(excel_file_path)
elif command == "export_to_feishu":
# 可选参数处理
handle_filter = None
limit = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle_filter = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
# 如果提供了handle过滤器创建自定义查询集
if handle_filter:
query_set = FeishuCreator.objects.filter(handle__icontains=handle_filter)
if limit:
query_set = query_set[:limit]
export_to_feishu(query_set)
else:
export_to_feishu()
elif command == "import_from_db":
if len(sys.argv) < 3:
print("错误: 未指定数据库模型")
print("用法: python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print("支持的模型: Data, KnowledgeBase, FeishuCreator")
return
model_name = sys.argv[2]
filters = {}
limit = None
# 解析额外参数
i = 3
while i < len(sys.argv):
if sys.argv[i] == "--filters" and i + 1 < len(sys.argv):
filter_str = sys.argv[i+1]
for item in filter_str.split(','):
if '=' in item:
key, value = item.split('=', 1)
filters[key.strip()] = value.strip()
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
import_from_database_to_feishu(model_name, filters, limit)
elif command == "custom_import":
if len(sys.argv) < 3:
print("错误: 未指定SQL文件路径")
print("用法: python feishu.py custom_import <sql_file>")
return
sql_file_path = sys.argv[2]
# 读取SQL文件
try:
with open(sql_file_path, 'r', encoding='utf-8') as f:
file_content = f.read().strip()
# 判断是否包含字段映射
if '-- FIELD_MAPPING:' in file_content:
sql_part, mapping_part = file_content.split('-- FIELD_MAPPING:', 1)
sql_query = sql_part.strip()
try:
field_mapping = json.loads(mapping_part.strip())
print(f"使用自定义字段映射: {field_mapping}")
except json.JSONDecodeError:
print(f"警告: 字段映射格式无效,将使用默认映射")
field_mapping = None
else:
sql_query = file_content
field_mapping = None
# 执行导入
customize_import_to_feishu(sql_query, field_mapping)
except FileNotFoundError:
print(f"错误: 找不到文件 '{sql_file_path}'")
except Exception as e:
print(f"处理SQL文件时出错: {str(e)}")
elif command == "sync_to_kb":
# 同步达人信息到知识库
if len(sys.argv) < 3:
print("错误: 未指定达人识别信息")
print("用法: python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
return
creator_id = None
handle = None
email = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--id" and i + 1 < len(sys.argv):
creator_id = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--email" and i + 1 < len(sys.argv):
email = sys.argv[i+1]
i += 2
else:
i += 1
kb, created = sync_to_knowledge_base(creator_id, handle, email)
if kb:
status = "新创建" if created else "已存在"
print(f"达人知识库{status}: {kb.id} ({kb.name})")
print(f"文档数量: {kb.document_count}")
else:
print("同步失败,未能创建或找到达人知识库")
else: else:
print("无效的命令。使用方法:") print(f"未知命令: {command}")
print("1. 从飞书同步: python feishu.py sync") print_help()
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

Binary file not shown.

View File

@ -18,7 +18,7 @@ from django.contrib import admin
from django.urls import path, include from django.urls import path, include
from django.conf import settings from django.conf import settings
from django.conf.urls.static import static from django.conf.urls.static import static
from user_management.views import gmail_webhook # 直接导入视图函数 from user_management.views import gmail_webhook, feishu_sync_api, feishu_to_kb_api, check_creator_kb_api # 直接导入视图函数
urlpatterns = [ urlpatterns = [
# 管理后台 # 管理后台
@ -31,6 +31,11 @@ urlpatterns = [
path('api/user/gmail/webhook/', gmail_webhook, name='root_gmail_webhook'), # 修改为正确路径 path('api/user/gmail/webhook/', gmail_webhook, name='root_gmail_webhook'), # 修改为正确路径
path('gmail/webhook/', gmail_webhook, name='alt_gmail_webhook'), # 添加备用路径 path('gmail/webhook/', gmail_webhook, name='alt_gmail_webhook'), # 添加备用路径
# 飞书相关API
path('api/feishu/sync', feishu_sync_api, name='feishu_sync_api'),
path('api/feishu/to_kb', feishu_to_kb_api, name='feishu_to_kb_api'),
path('api/feishu/check_kb', check_creator_kb_api, name='check_creator_kb_api'),
# 媒体文件服务 # 媒体文件服务
*static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT), *static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT),

View File

@ -1793,7 +1793,11 @@ class GmailIntegration:
return False return False
def _get_recommended_reply_from_deepseek(self, conversation_history): def _get_recommended_reply_from_deepseek(self, conversation_history):
"""调用DeepSeek V3 API生成推荐回复""" """
调用DeepSeek V3 API生成推荐回复
现在结合用户总目标和对话总结生成更有针对性的回复
"""
try: try:
# 使用有效的API密钥 # 使用有效的API密钥
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf" api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
@ -1805,10 +1809,53 @@ class GmailIntegration:
url = "https://api.siliconflow.cn/v1/chat/completions" url = "https://api.siliconflow.cn/v1/chat/completions"
# 直接使用默认系统消息,不进行复杂处理,尽量模仿文档示例 # 获取用户总目标
from .models import UserGoal, ConversationSummary
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
goal_content = user_goal.content if user_goal else None
# 尝试获取对话总结
talent_email = None
conversation_id = None
# 从对话历史中尝试提取达人邮箱
for message in conversation_history:
if message.get('role') == 'user' and 'metadata' in message and 'from_email' in message['metadata']:
talent_email = message['metadata']['from_email']
break
# 从对话历史中尝试提取对话ID
for message in conversation_history:
if 'metadata' in message and 'conversation_id' in message['metadata']:
conversation_id = message['metadata']['conversation_id']
break
# 获取对话总结
conversation_summary = None
if talent_email and conversation_id:
summary_obj = ConversationSummary.objects.filter(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
is_active=True
).first()
if summary_obj:
conversation_summary = summary_obj.summary
# 构建系统提示,结合用户总目标和对话总结
system_content = "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。"
if goal_content:
system_content += f"\n\n【用户总目标】\n{goal_content}\n\n"
if conversation_summary:
system_content += f"【对话总结】\n{conversation_summary}\n\n"
system_content += "请针对达人最后一条消息结合用户总目标和对话总结生成一个有针对性的专业回复。回复必须对达人当前问题直接响应同时与用户的总体合作目标保持一致。回复应该有至少100个字符必须提供有实质内容的回复。"
system_message = { system_message = {
"role": "system", "role": "system",
"content": "你是一位专业的电商客服和达人助手。你的任务是针对用户最近的消息生成一个有帮助、礼貌且详细的回复。即使用户消息很短或不明确也必须提供有实质内容的回复。禁止返回空白内容。回复应该有至少100个字符。" "content": system_content
} }
messages = [system_message] messages = [system_message]
@ -1822,7 +1869,7 @@ class GmailIntegration:
# 添加一个系统消息作为用户的最后一条消息 # 添加一个系统消息作为用户的最后一条消息
messages.append({ messages.append({
"role": "user", "role": "user",
"content": "请针对我之前的消息提供详细的回复建议" "content": "请针对达人最后一条消息生成专业的回复,考虑我们之前的对话历史和我设定的总目标"
}) })
# 完全按照文档提供的参数格式构建请求 # 完全按照文档提供的参数格式构建请求
@ -2305,50 +2352,293 @@ class GmailIntegration:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
def get_recent_emails(self, from_email=None, max_results=10): def get_recent_emails(self, from_email=None, max_results=10):
""" """获取最近的邮件"""
获取最近的邮件不依赖history_id
Args:
from_email (str, optional): 发件人邮箱过滤
max_results (int, optional): 最大结果数默认10封
Returns:
list: 邮件列表
"""
try: try:
# 构建查询 service = self.service
query = f"from:{from_email}" if from_email else None if not service:
logger.info(f"查询最近邮件: query={query}, max_results={max_results}") logger.error("Gmail服务未初始化")
# 查询邮件列表
response = self.gmail_service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
if 'messages' not in response:
logger.info("未找到匹配邮件")
return [] return []
# 获取邮件详情 query = ""
messages = [] if from_email:
for msg in response['messages']: query = f"from:{from_email}"
try:
message = self.gmail_service.users().messages().get(
userId='me', id=msg['id']
).execute()
email_data = self._extract_email_content(message) # 获取收件箱中的邮件列表
if email_data: results = service.users().messages().list(
messages.append(email_data) userId='me',
except Exception as msg_error: maxResults=max_results,
logger.error(f"处理邮件 {msg['id']} 失败: {str(msg_error)}") q=query
).execute()
logger.info(f"获取到 {len(messages)} 封邮件") messages = results.get('messages', [])
return messages emails = []
for message in messages:
msg = service.users().messages().get(userId='me', id=message['id']).execute()
email_data = self._extract_email_content(msg)
emails.append(email_data)
return emails
except Exception as e: except Exception as e:
logger.error(f"获取最近邮件失败: {str(e)}") logger.error(f"获取最近邮件失败: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return [] return []
def manage_user_goal(self, goal_content=None):
"""
创建或更新用户总目标
Args:
goal_content (str): 用户总目标内容如果为None则返回当前总目标
Returns:
dict: 包含操作结果和总目标信息
"""
from .models import UserGoal
try:
# 如果未提供内容,则返回当前总目标
if goal_content is None:
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
if user_goal:
return {
'status': 'success',
'action': 'retrieve',
'goal': {
'id': str(user_goal.id),
'content': user_goal.content,
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
else:
return {
'status': 'success',
'action': 'retrieve',
'goal': None
}
# 查找当前活跃的用户总目标
user_goal = UserGoal.objects.filter(user=self.user, is_active=True).first()
# 如果已存在,则更新
if user_goal:
user_goal.content = goal_content
user_goal.save()
action = 'update'
else:
# 否则创建新的总目标
user_goal = UserGoal.objects.create(
user=self.user,
content=goal_content
)
action = 'create'
return {
'status': 'success',
'action': action,
'goal': {
'id': str(user_goal.id),
'content': user_goal.content,
'created_at': user_goal.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': user_goal.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
except Exception as e:
logger.error(f"管理用户总目标失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': f"管理用户总目标失败: {str(e)}"
}
def generate_conversation_summary(self, talent_email):
"""
为用户与特定达人的所有对话生成总结
Args:
talent_email (str): 达人的邮箱地址
Returns:
dict: 包含操作结果和总结信息
"""
from .models import ConversationSummary
try:
# 获取与该达人的所有对话
conversations = self.get_conversations(talent_email)
if not conversations:
return {
'status': 'error',
'message': f"未找到与{talent_email}的对话记录"
}
# 准备对话历史记录
conversation_history = []
for conversation in conversations:
if 'subject' in conversation and conversation['subject']:
conversation_history.append({
'role': 'system',
'content': f"邮件主题: {conversation['subject']}"
})
for message in conversation.get('messages', []):
role = 'user' if message.get('from_email') == talent_email else 'assistant'
content = message.get('body', '')
if content:
conversation_history.append({
'role': role,
'content': content
})
# 调用DeepSeek API生成总结
summary = self._generate_summary_from_deepseek(conversation_history)
if not summary:
return {
'status': 'error',
'message': "生成对话总结失败"
}
# 保存或更新总结
conversation_id = conversations[0].get('id') if conversations else None
if not conversation_id:
return {
'status': 'error',
'message': "无法确定对话ID"
}
# 查找现有总结
conversation_summary = ConversationSummary.objects.filter(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
is_active=True
).first()
# 如果已存在,则更新
if conversation_summary:
conversation_summary.summary = summary
conversation_summary.save()
action = 'update'
else:
# 否则创建新的总结
conversation_summary = ConversationSummary.objects.create(
user=self.user,
talent_email=talent_email,
conversation_id=conversation_id,
summary=summary
)
action = 'create'
return {
'status': 'success',
'action': action,
'summary': {
'id': str(conversation_summary.id),
'talent_email': talent_email,
'conversation_id': conversation_id,
'summary': summary,
'created_at': conversation_summary.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': conversation_summary.updated_at.strftime('%Y-%m-%d %H:%M:%S')
}
}
except Exception as e:
logger.error(f"生成对话总结失败: {str(e)}")
logger.error(traceback.format_exc())
return {
'status': 'error',
'message': f"生成对话总结失败: {str(e)}"
}
def _generate_summary_from_deepseek(self, conversation_history):
"""调用DeepSeek API生成对话总结"""
try:
# 使用有效的API密钥
api_key = "sk-xqbujijjqqmlmlvkhvxeogqjtzslnhdtqxqgiyuhwpoqcjvf"
# 如果上面的密钥不正确,可以尝试从环境变量或数据库中获取
# 从Django设置中获取密钥
from django.conf import settings
if hasattr(settings, 'DEEPSEEK_API_KEY') and settings.DEEPSEEK_API_KEY:
api_key = settings.DEEPSEEK_API_KEY
url = "https://api.siliconflow.cn/v1/chat/completions"
# 系统消息指定生成总结的任务
system_message = {
"role": "system",
"content": "你是一位专业的电商客服和达人助手。你的任务是分析用户与达人之间的所有对话历史并生成一份简明扼要的总结。总结应包括1. 主要讨论的产品或服务2. 达人的主要关注点和需求3. 已经达成的共识或协议4. 未解决的问题或后续需要跟进的事项。总结应该客观、全面、结构清晰。"
}
messages = [system_message]
# 添加对话历史但限制消息数量避免超出token限制
# 如果对话历史太长,可能需要进一步处理或分割
if len(conversation_history) > 20:
# 选取关键消息:第一条、最后几条以及中间的一些重要消息
selected_messages = (
conversation_history[:2] + # 前两条
conversation_history[len(conversation_history)//2-2:len(conversation_history)//2+2] + # 中间四条
conversation_history[-6:] # 最后六条
)
messages.extend(selected_messages)
else:
messages.extend(conversation_history)
# 添加用户指令
messages.append({
"role": "user",
"content": "请根据以上对话历史生成一份全面的总结。"
})
# 构建API请求
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"stream": False,
"max_tokens": 1500, # 增加token上限以容纳完整总结
"temperature": 0.3, # 降低随机性,使总结更加确定性
"top_p": 0.9,
"top_k": 50,
"frequency_penalty": 0.5,
"presence_penalty": 0.2,
"n": 1,
"stop": [],
"response_format": {
"type": "text"
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
logger.info(f"开始调用DeepSeek API生成对话总结")
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
logger.error(f"DeepSeek API调用失败: {response.status_code}, {response.text}")
return None
result = response.json()
logger.debug(f"DeepSeek API返回: {result}")
# 提取回复内容
if 'choices' in result and len(result['choices']) > 0:
summary = result['choices'][0]['message']['content']
# 如果返回的内容为空直接返回None
if not summary or summary.strip() == '':
logger.warning("DeepSeek API返回的总结内容为空")
return None
return summary
logger.warning(f"DeepSeek API返回格式异常: {result}")
return None
except Exception as e:
logger.error(f"调用DeepSeek API生成总结失败: {str(e)}")
logger.error(traceback.format_exc())
return None

View File

@ -0,0 +1,50 @@
# Generated by Django 5.1.5 on 2025-04-14 04:27
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('user_management', '0008_gmailcredential_gmail_email_and_more'),
]
operations = [
migrations.CreateModel(
name='ConversationSummary',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('talent_email', models.EmailField(max_length=254, verbose_name='达人邮箱')),
('conversation_id', models.CharField(max_length=100, verbose_name='对话ID')),
('summary', models.TextField(verbose_name='对话总结')),
('is_active', models.BooleanField(default=True, verbose_name='是否激活')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='conversation_summaries', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': '对话总结',
'verbose_name_plural': '对话总结',
'db_table': 'conversation_summaries',
},
),
migrations.CreateModel(
name='UserGoal',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('content', models.TextField(verbose_name='总目标内容')),
('is_active', models.BooleanField(default=True, verbose_name='是否激活')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='goals', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': '用户总目标',
'verbose_name_plural': '用户总目标',
'db_table': 'user_goals',
},
),
]

View File

@ -765,4 +765,40 @@ class GmailAttachment(models.Model):
verbose_name_plural = 'Gmail附件' verbose_name_plural = 'Gmail附件'
def __str__(self): def __str__(self):
return f"{self.filename} ({self.gmail_message_id})" return f"{self.filename} ({self.filesize} bytes)"
class UserGoal(models.Model):
"""用户总目标模型"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='goals')
content = models.TextField(verbose_name='总目标内容')
is_active = models.BooleanField(default=True, verbose_name='是否激活')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
class Meta:
db_table = 'user_goals'
verbose_name = '用户总目标'
verbose_name_plural = '用户总目标'
def __str__(self):
return f"{self.user.username}的总目标 - {self.content[:50]}..."
class ConversationSummary(models.Model):
"""对话总结模型"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='conversation_summaries')
talent_email = models.EmailField(verbose_name='达人邮箱')
conversation_id = models.CharField(max_length=100, verbose_name='对话ID')
summary = models.TextField(verbose_name='对话总结')
is_active = models.BooleanField(default=True, verbose_name='是否激活')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
class Meta:
db_table = 'conversation_summaries'
verbose_name = '对话总结'
verbose_name_plural = '对话总结'
def __str__(self):
return f"{self.user.username}{self.talent_email}的对话总结"

View File

@ -15,7 +15,6 @@ from .views import (
LogoutView, LogoutView,
ChatHistoryViewSet, ChatHistoryViewSet,
user_profile, user_profile,
user_register,
setup_gmail_integration, setup_gmail_integration,
send_gmail_message, send_gmail_message,
gmail_webhook, gmail_webhook,
@ -25,7 +24,10 @@ from .views import (
refresh_gmail_watch, refresh_gmail_watch,
check_gmail_auth, check_gmail_auth,
import_gmail_from_sender, import_gmail_from_sender,
sync_talent_emails sync_talent_emails,
manage_user_goal,
generate_conversation_summary,
get_recommended_reply
) )
# 创建路由器 # 创建路由器
@ -51,6 +53,7 @@ urlpatterns = [
# 用户管理相关 # 用户管理相关
path('users/', user_list, name='user-list'), path('users/', user_list, name='user-list'),
path('users/profile/', user_profile, name='user-profile'),
path('users/<str:pk>/', user_detail, name='user-detail'), path('users/<str:pk>/', user_detail, name='user-detail'),
path('users/<str:pk>/update/', user_update, name='user-update'), path('users/<str:pk>/update/', user_update, name='user-update'),
path('users/<str:pk>/delete/', user_delete, name='user-delete'), path('users/<str:pk>/delete/', user_delete, name='user-delete'),
@ -66,4 +69,9 @@ urlpatterns = [
path('gmail/check-auth/', check_gmail_auth, name='check_gmail_auth'), path('gmail/check-auth/', check_gmail_auth, name='check_gmail_auth'),
path('gmail/import-from-sender/', import_gmail_from_sender, name='import_gmail_from_sender'), path('gmail/import-from-sender/', import_gmail_from_sender, name='import_gmail_from_sender'),
path('gmail/sync-talent/', sync_talent_emails, name='sync_talent_emails'), path('gmail/sync-talent/', sync_talent_emails, name='sync_talent_emails'),
# 新增功能API
path('user-goal/', manage_user_goal, name='manage_user_goal'),
path('conversation-summary/', generate_conversation_summary, name='generate_conversation_summary'),
path('recommended-reply/', get_recommended_reply, name='get_recommended_reply'),
] ]

File diff suppressed because it is too large Load Diff