daren_project/feishu/feishu.py

1100 lines
43 KiB
Python
Raw Permalink Normal View History

2025-03-29 12:26:50 +08:00
import json
import django
import os
import sys
2025-04-02 12:25:40 +08:00
import pandas as pd
import requests
2025-04-02 12:25:40 +08:00
from django.db import transaction
from django.db.models import Q
from django.db import connection
from datetime import datetime
import traceback
import tempfile
import uuid
2025-03-29 12:26:50 +08:00
# 设置 Django 环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings')
django.setup()
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
from user_management.models import FeishuCreator, Data, KnowledgeBase, KnowledgeBaseDocument
from django.conf import settings
import logging
from django.contrib.auth import get_user_model
2025-03-29 12:26:50 +08:00
logger = logging.getLogger(__name__)
2025-03-29 12:26:50 +08:00
# SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development
# 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用
def extract_field_value(field_value):
"""提取字段值"""
if isinstance(field_value, list):
if field_value and isinstance(field_value[0], dict):
return field_value[0].get('text', '')
elif isinstance(field_value, dict):
if 'text' in field_value:
return field_value['text']
elif 'link' in field_value:
return field_value['link']
elif 'link_record_ids' in field_value:
return ''
return field_value
def save_to_database(record):
2025-04-02 12:25:40 +08:00
"""从飞书多维表格保存记录到数据库"""
2025-03-29 12:26:50 +08:00
fields = record.fields
record_id = record.record_id
creator_data = {
'record_id': record_id,
'contact_person': extract_field_value(fields.get('对接人', '')),
'handle': extract_field_value(fields.get('Handle', '')),
'tiktok_url': extract_field_value(fields.get('链接', '')),
'fans_count': extract_field_value(fields.get('粉丝数', '')),
'gmv': fields.get('GMV', ''),
'email': extract_field_value(fields.get('邮箱', '')),
'phone': extract_field_value(fields.get('手机号|WhatsApp', '')),
'account_type': extract_field_value(fields.get('账号属性', [])),
'price_quote': fields.get('报价', ''),
'response_speed': fields.get('回复速度', ''),
'cooperation_intention': fields.get('合作意向', ''),
'payment_method': fields.get('支付方式', ''),
'payment_account': fields.get('收款账号', ''),
'address': fields.get('收件地址', ''),
'has_ooin': fields.get('签约OOIN?', ''),
'source': fields.get('渠道来源', ''),
'contact_status': fields.get('建联进度', ''),
'cooperation_brands': fields.get('合作品牌', []),
'system_categories': extract_field_value(fields.get('系统展示的带货品类', [])),
'actual_categories': extract_field_value(fields.get('实际高播放量带货品类', [])),
'human_categories': fields.get('达人标想要货品类', ''),
2025-04-02 12:25:40 +08:00
'creator_base': '',
2025-03-29 12:26:50 +08:00
'notes': extract_field_value(fields.get('父记录', '')),
}
try:
creator, created = FeishuCreator.objects.update_or_create(
record_id=record_id,
defaults=creator_data
)
return creator, created
except Exception as e:
print(f"保存记录时出错: {str(e)}")
print(f"记录数据: {creator_data}")
return None, False
def fetch_all_records(client, app_token, table_id, user_access_token):
"""获取所有记录"""
total_records = []
page_token = None
page_size = 20
while True:
try:
# 构造请求对象
builder = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
# 如果有page_token添加到请求中
if page_token:
builder = builder.page_token(page_token)
# 构建完整请求
request = builder.request_body(SearchAppTableRecordRequestBody.builder().build()).build()
print(f"发送请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(user_access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
print(f"请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
print("没有更多记录")
break
total_records.extend(current_records)
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
total = response_data["data"]["total"]
print(f"获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not response_data["data"].get("has_more", False):
print("已获取所有数据")
break
except Exception as e:
print(f"错误: {str(e)}")
import traceback
print(traceback.format_exc())
break
print(f"最终获取到 {len(total_records)} 条记录")
return total_records
2025-04-02 12:25:40 +08:00
def update_from_excel(excel_file_path):
"""从Excel文件更新数据库记录"""
try:
print(f"开始读取Excel文件: {excel_file_path}")
df = pd.read_excel(excel_file_path)
if 'Handle' not in df.columns:
print("错误: Excel文件中缺少'Handle'")
return
update_count = 0
skip_count = 0
error_count = 0
# 获取可更新的字段列表
excluded_fields = {'id', 'record_id', 'created_at', 'updated_at'}
model_fields = {f.name for f in FeishuCreator._meta.get_fields()} - excluded_fields
valid_columns = set(df.columns) & model_fields
print(f"可更新的列: {valid_columns}")
with transaction.atomic():
for index, row in df.iterrows():
try:
handle = str(row['Handle']).strip()
if not handle:
print(f"跳过第{index+2}行: Handle为空")
skip_count += 1
continue
# 查找现有记录
creator = FeishuCreator.objects.filter(handle=handle).first()
if not creator:
print(f"跳过Handle为'{handle}'的记录: 数据库中不存在")
skip_count += 1
continue
# 准备更新数据
update_data = {}
for column in valid_columns:
if column == 'Handle':
continue
value = row[column]
if pd.isna(value):
continue
# 处理特殊类型
if isinstance(value, (list, dict)):
value = json.dumps(value)
elif isinstance(value, (int, float)):
if column in ['fans_count']:
value = int(value)
else:
value = str(value)
else:
value = str(value).strip()
if value:
update_data[column] = value
# 更新记录
if update_data:
for field, value in update_data.items():
setattr(creator, field, value)
creator.save()
update_count += 1
print(f"已更新Handle为'{handle}'的记录")
else:
skip_count += 1
print(f"跳过Handle为'{handle}'的记录: 无需更新")
except Exception as e:
error_count += 1
print(f"处理Handle'{handle}'时出错: {str(e)}")
print("\nExcel更新完成统计信息")
print(f"更新记录数:{update_count}")
print(f"跳过记录数:{skip_count}")
print(f"错误记录数:{error_count}")
except Exception as e:
print(f"处理Excel文件时出错: {str(e)}")
def sync_from_feishu(app_token=None, table_id=None, user_access_token=None, filters=None):
"""从飞书同步数据到系统
参数:
app_token: 飞书应用的APP_TOKEN如不提供则使用默认值
table_id: 飞书多维表格的TABLE_ID如不提供则使用默认值
user_access_token: 用户访问令牌如不提供则使用默认值
filters: 筛选条件字典格式
返回:
dict: 包含同步统计信息的字典
"""
try:
# 使用传入的参数或默认值
app_token = app_token or settings.FEISHU_APP_TOKEN
table_id = table_id or settings.FEISHU_TABLE_ID
user_access_token = user_access_token or settings.FEISHU_USER_ACCESS_TOKEN
if not all([app_token, table_id, user_access_token]):
logger.error("飞书API凭据不完整")
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': '飞书API凭据不完整'
}
# 初始化统计信息
created_count = 0
updated_count = 0
error_count = 0
created_creators = []
# 获取飞书数据
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
print("开始从飞书同步数据...")
all_records = fetch_all_records(client, app_token, table_id, user_access_token)
if not all_records:
print("未获取到任何记录")
return {
'created': 0,
'updated': 0,
'errors': 0,
'total': 0
}
print("\n开始更新数据库...")
for record in all_records:
creator, created = save_to_database(record)
if creator:
if created:
created_count += 1
if created_count % 10 == 0:
print(f"已创建 {created_count} 条记录...")
created_creators.append(creator)
else:
updated_count += 1
if updated_count % 10 == 0:
print(f"已更新 {updated_count} 条记录...")
else:
error_count += 1
print(f"处理记录失败")
print("\n飞书同步完成!统计信息:")
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}")
# 返回统计信息增加了created_creators列表
return {
'created': created_count,
'updated': updated_count,
'errors': error_count,
'total': created_count + updated_count,
'created_creators': created_creators # 新增:返回新创建的记录对象列表
}
except Exception as e:
logger.error(f"从飞书同步数据时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': str(e)
}
def format_field_value(value, field_type='text'):
"""格式化字段值以符合飞书多维表格API要求"""
if value is None or value == '':
return None
if field_type == 'text':
return {"text": str(value)}
elif field_type == 'number':
try:
return float(value)
except (ValueError, TypeError):
return None
elif field_type == 'checkbox':
return value.lower() in ('true', 'yes', '', '1')
elif field_type == 'multi_select':
# 多选项需要返回一个选项ID列表
if isinstance(value, str):
items = [item.strip() for item in value.split(',') if item.strip()]
return [{"text": item} for item in items]
elif isinstance(value, list):
return [{"text": str(item)} for item in value]
return []
return {"text": str(value)}
def export_to_feishu(query_set=None):
"""从数据库导出记录到飞书多维表格"""
2025-03-29 12:26:50 +08:00
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从数据库导出数据到飞书...")
# 如果没有提供查询集则使用所有没有record_id的记录
if not query_set:
query_set = FeishuCreator.objects.filter(record_id='')
if not query_set.exists():
print("没有需要导出的记录")
2025-03-29 12:26:50 +08:00
return
2025-03-29 12:26:50 +08:00
created_count = 0
updated_count = 0
error_count = 0
for creator in query_set:
try:
# 准备字段数据
fields = {
"对接人": format_field_value(creator.contact_person),
"Handle": format_field_value(creator.handle),
"链接": format_field_value(creator.tiktok_url),
"粉丝数": format_field_value(creator.fans_count),
"GMV": format_field_value(creator.gmv),
"邮箱": format_field_value(creator.email),
"手机号|WhatsApp": format_field_value(creator.phone),
"账号属性": format_field_value(creator.account_type, 'multi_select'),
"报价": format_field_value(creator.price_quote),
"回复速度": format_field_value(creator.response_speed),
"合作意向": format_field_value(creator.cooperation_intention),
"支付方式": format_field_value(creator.payment_method),
"收款账号": format_field_value(creator.payment_account),
"收件地址": format_field_value(creator.address),
"签约OOIN?": format_field_value(creator.has_ooin),
"渠道来源": format_field_value(creator.source),
"建联进度": format_field_value(creator.contact_status),
"合作品牌": creator.cooperation_brands,
"系统展示的带货品类": format_field_value(creator.system_categories, 'multi_select'),
"实际高播放量带货品类": format_field_value(creator.actual_categories, 'multi_select'),
"达人标想要货品类": format_field_value(creator.human_categories)
}
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
if creator.record_id:
# 更新现有记录
request = UpdateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.record_id(creator.record_id) \
.request_body(UpdateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.update(request, option)
if response.success():
updated_count += 1
print(f"已更新记录: {creator.handle}")
else:
error_count += 1
print(f"更新失败: {creator.handle}, 错误: {response.msg}")
2025-03-29 12:26:50 +08:00
else:
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
# 获取新记录的ID并保存
record_id = response.data.record_id
creator.record_id = record_id
creator.save()
created_count += 1
print(f"已创建记录: {creator.handle}, ID: {record_id}")
else:
error_count += 1
print(f"创建失败: {creator.handle}, 错误: {response.msg}")
except Exception as e:
2025-03-29 12:26:50 +08:00
error_count += 1
print(f"处理记录 {creator.handle} 时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n飞书导出完成!统计信息:")
2025-03-29 12:26:50 +08:00
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总处理记录数:{created_count + updated_count + error_count}")
def import_from_database_to_feishu(model_name, filters=None, limit=None):
"""从指定数据库模型导入数据到飞书多维表格
参数:
model_name (str): 模型名称 'Data', 'KnowledgeBase'
filters (dict): 过滤条件 {'department': '技术部'}
limit (int): 限制导入的记录数量
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print(f"开始从{model_name}模型导入数据到飞书...")
# 选择模型
model_mapping = {
'Data': Data,
'KnowledgeBase': KnowledgeBase,
'FeishuCreator': FeishuCreator,
}
model_class = model_mapping.get(model_name)
if not model_class:
print(f"错误: 不支持的模型名称 {model_name}")
return
# 构建查询
query = model_class.objects.all()
# 应用过滤条件
if filters:
q_objects = Q()
for field, value in filters.items():
if isinstance(value, list):
# 对列表值使用OR查询
q_filter = Q()
for v in value:
q_filter |= Q(**{field: v})
q_objects &= q_filter
else:
q_objects &= Q(**{field: value})
query = query.filter(q_objects)
# 限制记录数量
if limit and isinstance(limit, int) and limit > 0:
query = query[:limit]
# 检查查询结果
total_count = query.count()
if total_count == 0:
print("没有符合条件的记录")
return
print(f"找到 {total_count} 条记录,准备导入到飞书...")
# 记录统计
created_count = 0
error_count = 0
# 处理不同模型的字段映射
for index, record in enumerate(query):
try:
if model_name == 'Data':
# Data模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.get_type_display()),
"部门": format_field_value(record.department),
"字符长度": format_field_value(record.char_length, 'number'),
"文档数量": format_field_value(record.document_count, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"更新时间": format_field_value(record.update_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
elif model_name == 'KnowledgeBase':
# KnowledgeBase模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.type),
"部门": format_field_value(record.department),
"小组": format_field_value(record.group),
"文档数量": format_field_value(record.document_count, 'number'),
"字符长度": format_field_value(record.char_length, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
else:
# 对于FeishuCreator模型使用export_to_feishu函数的逻辑
continue
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == total_count:
print(f"已导入 {created_count}/{total_count} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
print("\n飞书导入完成!统计信息:")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{total_count}")
def customize_import_to_feishu(sql_query, field_mapping=None):
"""从自定义SQL查询结果导入数据到飞书多维表格
参数:
sql_query (str): SQL查询语句
field_mapping (dict): 字段映射 {'db_field': 'feishu_field'}
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从自定义SQL查询导入数据到飞书...")
try:
# 执行SQL查询
with connection.cursor() as cursor:
cursor.execute(sql_query)
columns = [col[0] for col in cursor.description]
result_set = cursor.fetchall()
# 检查查询结果
if not result_set:
print("SQL查询没有返回任何结果")
return
print(f"查询返回 {len(result_set)} 条记录,准备导入到飞书...")
# 默认字段映射:数据库字段名 -> 飞书字段名
if not field_mapping:
field_mapping = {col: col for col in columns}
# 记录统计
created_count = 0
error_count = 0
# 处理每条记录
for index, row in enumerate(result_set):
try:
# 创建记录字典
record_dict = dict(zip(columns, row))
# 将数据映射到飞书字段
fields = {}
for db_field, feishu_field in field_mapping.items():
if db_field in record_dict:
value = record_dict[db_field]
# 处理不同类型的值
if isinstance(value, (int, float)):
fields[feishu_field] = format_field_value(value, 'number')
else:
fields[feishu_field] = format_field_value(value)
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == len(result_set):
print(f"已导入 {created_count}/{len(result_set)} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n自定义SQL导入完成统计信息")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{len(result_set)}")
except Exception as e:
print(f"执行SQL查询时出错: {str(e)}")
import traceback
print(traceback.format_exc())
def print_help():
"""打印帮助信息"""
print("飞书数据同步工具")
print("================")
print("\n可用命令:")
print(" 1. 从飞书导入数据到数据库:")
print(" python feishu.py sync_from_feishu")
print()
print(" 2. 从Excel文件更新数据库记录:")
print(" python feishu.py update_from_excel <excel文件路径>")
print()
print(" 3. 从数据库导出数据到飞书多维表格:")
print(" python feishu.py export_to_feishu [--handle 关键字] [--limit 数量]")
print()
print(" 4. 从特定数据库模型导入数据到飞书:")
print(" python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print(" 支持的模型: Data, KnowledgeBase, FeishuCreator")
print(" 示例: python feishu.py import_from_db Data --filters type=admin,department=技术部 --limit 50")
print()
print(" 5. 从自定义SQL查询导入数据到飞书:")
print(" python feishu.py custom_import <sql_file>")
print(" SQL文件中应包含SQL查询语句及可选的字段映射JSON")
print()
print(" 6. 将达人信息同步到知识库:")
print(" python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
print(" 示例: python feishu.py sync_to_kb --handle tiktok_user123")
print()
print(" 7. 显示帮助信息:")
print(" python feishu.py help")
print()
def sync_to_knowledge_base(creator_id=None, handle=None, email=None):
"""将达人信息同步到外部知识库
参数:
creator_id: 达人ID
handle: 达人Handle
email: 达人Email
返回:
tuple: (KnowledgeBase对象, 是否新创建)
"""
try:
import json
import requests
import tempfile
import os
from datetime import datetime
from django.conf import settings
from user_management.models import FeishuCreator, KnowledgeBase, User, KnowledgeBaseDocument
# 查找达人
query = FeishuCreator.objects.all()
if creator_id:
query = query.filter(id=creator_id)
elif handle:
query = query.filter(handle=handle)
elif email:
query = query.filter(email=email)
creator = query.first()
if not creator:
logger.error(f"未找到达人: creator_id={creator_id}, handle={handle}, email={email}")
return None, False
# 检查达人是否有足够信息
if not creator.handle:
logger.error(f"达人没有handle无法创建知识库: {creator.id}")
return None, False
# 检查是否已有通过Gmail映射的知识库
gmail_kb = None
if creator.email:
from user_management.models import GmailTalentMapping
gmail_mapping = GmailTalentMapping.objects.filter(
talent_email=creator.email,
is_active=True
).first()
if gmail_mapping and gmail_mapping.knowledge_base:
gmail_kb = gmail_mapping.knowledge_base
logger.info(f"达人 {creator.handle} 已有Gmail知识库: {gmail_kb.id}")
return gmail_kb, False
# 获取当前用户(组长)
User = get_user_model()
admin_user = User.objects.filter(role='leader').first()
if not admin_user:
logger.error("未找到组长用户,无法创建知识库")
return None, False
# 生成知识库名称
kb_name = f"达人-{creator.handle}"
# 检查是否已存在同名知识库
existed_kb = KnowledgeBase.objects.filter(name=kb_name).first()
if existed_kb:
logger.info(f"知识库 {kb_name} 已存在ID: {existed_kb.id}")
return existed_kb, False
# 创建新知识库
kb = KnowledgeBase.objects.create(
name=kb_name,
desc=f"达人 {creator.handle} 的信息知识库",
type='private', # 设置为私有知识库
user_id=admin_user.id,
department=admin_user.department,
document_count=0
)
logger.info(f"已创建知识库: {kb.id}")
# 调用内部方法创建外部知识库
from user_management.views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
try:
external_id = kb_viewset._create_external_dataset(kb)
# 保存external_id
kb.external_id = external_id
kb.save()
logger.info(f"已创建外部知识库ID: {external_id}")
except Exception as e:
logger.error(f"创建外部知识库失败: {str(e)}")
# 如果外部知识库创建失败,删除本地知识库
kb.delete()
return None, False
# 准备达人信息文档
content = []
# 基本信息
content.append("# 达人基本信息")
content.append(f"Handle: {creator.handle}")
content.append(f"邮箱: {creator.email}")
content.append(f"对接人: {creator.contact_person}")
content.append(f"链接: {creator.tiktok_url}")
content.append(f"粉丝数: {creator.fans_count}")
content.append(f"GMV: {creator.gmv}")
content.append(f"电话: {creator.phone}")
# 账号属性
content.append("\n# 账号属性")
content.append(f"账号属性: {creator.account_type}")
content.append(f"报价: {creator.price_quote}")
content.append(f"回复速度: {creator.response_speed}")
# 合作信息
content.append("\n# 合作信息")
content.append(f"合作意向: {creator.cooperation_intention}")
content.append(f"支付方式: {creator.payment_method}")
content.append(f"收款账号: {creator.payment_account}")
content.append(f"收件地址: {creator.address}")
content.append(f"签约OOIN?: {creator.has_ooin}")
# 渠道和品类
content.append("\n# 渠道和品类")
content.append(f"渠道来源: {creator.source}")
content.append(f"建联进度: {creator.contact_status}")
content.append(f"合作品牌: {creator.cooperation_brands}")
content.append(f"系统展示品类: {creator.system_categories}")
content.append(f"实际品类: {creator.actual_categories}")
content.append(f"达人想要品类: {creator.human_categories}")
# 其他信息
content.append("\n# 其他信息")
content.append(f"达人base: {creator.creator_base}")
content.append(f"备注: {creator.notes}")
content.append(f"更新时间: {creator.updated_at.strftime('%Y-%m-%d %H:%M:%S')}")
# 合并内容
full_content = "\n".join(content)
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.txt', mode='w', encoding='utf-8') as temp:
temp.write(full_content)
temp_file_path = temp.name
try:
# 准备文档上传需要的段落数据
doc_data = {
"name": f"达人信息-{creator.handle}.txt",
"paragraphs": [
{
"title": "达人基本资料",
"content": full_content,
"is_active": True,
"problem_list": []
}
]
}
# 调用上传API
upload_response = kb_viewset._call_upload_api(kb.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=kb,
document_id=document_id,
document_name=f"达人信息-{creator.handle}.txt",
external_id=document_id,
status='active'
)
logger.info(f"文档上传成功ID: {document_id}")
# 更新知识库文档计数
kb.document_count = 1
kb.save()
# 如果有邮箱创建Gmail映射
if creator.email:
from user_management.models import GmailTalentMapping
GmailTalentMapping.objects.create(
user=admin_user,
talent_email=creator.email,
knowledge_base=kb,
conversation_id=f"feishu_{creator.id}",
is_active=True
)
logger.info(f"已创建Gmail映射: {creator.email} -> {kb.id}")
logger.info(f"成功为达人 {creator.handle} 创建知识库并上传文档: {kb.id}")
return kb, True
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"文档上传失败: {error_msg}")
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
except Exception as e:
logger.error(f"文档上传过程中出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
finally:
# 清理临时文件
try:
os.unlink(temp_file_path)
except:
pass
except Exception as e:
logger.error(f"同步达人到知识库时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None, False
2025-03-29 12:26:50 +08:00
2025-04-02 12:25:40 +08:00
def main():
"""主函数"""
if len(sys.argv) < 2 or sys.argv[1] == "help":
print_help()
2025-04-02 12:25:40 +08:00
return
2025-04-02 12:25:40 +08:00
command = sys.argv[1]
if command == "sync_from_feishu":
2025-04-02 12:25:40 +08:00
sync_from_feishu()
elif command == "update_from_excel":
if len(sys.argv) < 3:
print("错误: 未指定Excel文件路径")
print("用法: python feishu.py update_from_excel <excel文件路径>")
2025-04-02 12:25:40 +08:00
return
excel_file_path = sys.argv[2]
update_from_excel(excel_file_path)
elif command == "export_to_feishu":
# 可选参数处理
handle_filter = None
limit = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle_filter = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
# 如果提供了handle过滤器创建自定义查询集
if handle_filter:
query_set = FeishuCreator.objects.filter(handle__icontains=handle_filter)
if limit:
query_set = query_set[:limit]
export_to_feishu(query_set)
else:
export_to_feishu()
elif command == "import_from_db":
if len(sys.argv) < 3:
print("错误: 未指定数据库模型")
print("用法: python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print("支持的模型: Data, KnowledgeBase, FeishuCreator")
return
model_name = sys.argv[2]
filters = {}
limit = None
# 解析额外参数
i = 3
while i < len(sys.argv):
if sys.argv[i] == "--filters" and i + 1 < len(sys.argv):
filter_str = sys.argv[i+1]
for item in filter_str.split(','):
if '=' in item:
key, value = item.split('=', 1)
filters[key.strip()] = value.strip()
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
import_from_database_to_feishu(model_name, filters, limit)
elif command == "custom_import":
if len(sys.argv) < 3:
print("错误: 未指定SQL文件路径")
print("用法: python feishu.py custom_import <sql_file>")
return
sql_file_path = sys.argv[2]
# 读取SQL文件
try:
with open(sql_file_path, 'r', encoding='utf-8') as f:
file_content = f.read().strip()
# 判断是否包含字段映射
if '-- FIELD_MAPPING:' in file_content:
sql_part, mapping_part = file_content.split('-- FIELD_MAPPING:', 1)
sql_query = sql_part.strip()
try:
field_mapping = json.loads(mapping_part.strip())
print(f"使用自定义字段映射: {field_mapping}")
except json.JSONDecodeError:
print(f"警告: 字段映射格式无效,将使用默认映射")
field_mapping = None
else:
sql_query = file_content
field_mapping = None
# 执行导入
customize_import_to_feishu(sql_query, field_mapping)
except FileNotFoundError:
print(f"错误: 找不到文件 '{sql_file_path}'")
except Exception as e:
print(f"处理SQL文件时出错: {str(e)}")
elif command == "sync_to_kb":
# 同步达人信息到知识库
if len(sys.argv) < 3:
print("错误: 未指定达人识别信息")
print("用法: python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
return
creator_id = None
handle = None
email = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--id" and i + 1 < len(sys.argv):
creator_id = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--email" and i + 1 < len(sys.argv):
email = sys.argv[i+1]
i += 2
else:
i += 1
kb, created = sync_to_knowledge_base(creator_id, handle, email)
if kb:
status = "新创建" if created else "已存在"
print(f"达人知识库{status}: {kb.id} ({kb.name})")
print(f"文档数量: {kb.document_count}")
else:
print("同步失败,未能创建或找到达人知识库")
2025-04-02 12:25:40 +08:00
else:
print(f"未知命令: {command}")
print_help()
2025-03-29 12:26:50 +08:00
if __name__ == "__main__":
main()