daren_project/feishu/feishu.py

1100 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import django
import os
import sys
import pandas as pd
import requests
from django.db import transaction
from django.db.models import Q
from django.db import connection
from datetime import datetime
import traceback
import tempfile
import uuid
# 设置 Django 环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings')
django.setup()
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
from user_management.models import FeishuCreator, Data, KnowledgeBase, KnowledgeBaseDocument
from django.conf import settings
import logging
from django.contrib.auth import get_user_model
logger = logging.getLogger(__name__)
# SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development
# 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用
def extract_field_value(field_value):
"""提取字段值"""
if isinstance(field_value, list):
if field_value and isinstance(field_value[0], dict):
return field_value[0].get('text', '')
elif isinstance(field_value, dict):
if 'text' in field_value:
return field_value['text']
elif 'link' in field_value:
return field_value['link']
elif 'link_record_ids' in field_value:
return ''
return field_value
def save_to_database(record):
"""从飞书多维表格保存记录到数据库"""
fields = record.fields
record_id = record.record_id
creator_data = {
'record_id': record_id,
'contact_person': extract_field_value(fields.get('对接人', '')),
'handle': extract_field_value(fields.get('Handle', '')),
'tiktok_url': extract_field_value(fields.get('链接', '')),
'fans_count': extract_field_value(fields.get('粉丝数', '')),
'gmv': fields.get('GMV', ''),
'email': extract_field_value(fields.get('邮箱', '')),
'phone': extract_field_value(fields.get('手机号|WhatsApp', '')),
'account_type': extract_field_value(fields.get('账号属性', [])),
'price_quote': fields.get('报价', ''),
'response_speed': fields.get('回复速度', ''),
'cooperation_intention': fields.get('合作意向', ''),
'payment_method': fields.get('支付方式', ''),
'payment_account': fields.get('收款账号', ''),
'address': fields.get('收件地址', ''),
'has_ooin': fields.get('签约OOIN?', ''),
'source': fields.get('渠道来源', ''),
'contact_status': fields.get('建联进度', ''),
'cooperation_brands': fields.get('合作品牌', []),
'system_categories': extract_field_value(fields.get('系统展示的带货品类', [])),
'actual_categories': extract_field_value(fields.get('实际高播放量带货品类', [])),
'human_categories': fields.get('达人标想要货品类', ''),
'creator_base': '',
'notes': extract_field_value(fields.get('父记录', '')),
}
try:
creator, created = FeishuCreator.objects.update_or_create(
record_id=record_id,
defaults=creator_data
)
return creator, created
except Exception as e:
print(f"保存记录时出错: {str(e)}")
print(f"记录数据: {creator_data}")
return None, False
def fetch_all_records(client, app_token, table_id, user_access_token):
"""获取所有记录"""
total_records = []
page_token = None
page_size = 20
while True:
try:
# 构造请求对象
builder = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
# 如果有page_token添加到请求中
if page_token:
builder = builder.page_token(page_token)
# 构建完整请求
request = builder.request_body(SearchAppTableRecordRequestBody.builder().build()).build()
print(f"发送请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(user_access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
print(f"请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
print("没有更多记录")
break
total_records.extend(current_records)
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
total = response_data["data"]["total"]
print(f"获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not response_data["data"].get("has_more", False):
print("已获取所有数据")
break
except Exception as e:
print(f"错误: {str(e)}")
import traceback
print(traceback.format_exc())
break
print(f"最终获取到 {len(total_records)} 条记录")
return total_records
def update_from_excel(excel_file_path):
"""从Excel文件更新数据库记录"""
try:
print(f"开始读取Excel文件: {excel_file_path}")
df = pd.read_excel(excel_file_path)
if 'Handle' not in df.columns:
print("错误: Excel文件中缺少'Handle'")
return
update_count = 0
skip_count = 0
error_count = 0
# 获取可更新的字段列表
excluded_fields = {'id', 'record_id', 'created_at', 'updated_at'}
model_fields = {f.name for f in FeishuCreator._meta.get_fields()} - excluded_fields
valid_columns = set(df.columns) & model_fields
print(f"可更新的列: {valid_columns}")
with transaction.atomic():
for index, row in df.iterrows():
try:
handle = str(row['Handle']).strip()
if not handle:
print(f"跳过第{index+2}行: Handle为空")
skip_count += 1
continue
# 查找现有记录
creator = FeishuCreator.objects.filter(handle=handle).first()
if not creator:
print(f"跳过Handle为'{handle}'的记录: 数据库中不存在")
skip_count += 1
continue
# 准备更新数据
update_data = {}
for column in valid_columns:
if column == 'Handle':
continue
value = row[column]
if pd.isna(value):
continue
# 处理特殊类型
if isinstance(value, (list, dict)):
value = json.dumps(value)
elif isinstance(value, (int, float)):
if column in ['fans_count']:
value = int(value)
else:
value = str(value)
else:
value = str(value).strip()
if value:
update_data[column] = value
# 更新记录
if update_data:
for field, value in update_data.items():
setattr(creator, field, value)
creator.save()
update_count += 1
print(f"已更新Handle为'{handle}'的记录")
else:
skip_count += 1
print(f"跳过Handle为'{handle}'的记录: 无需更新")
except Exception as e:
error_count += 1
print(f"处理Handle'{handle}'时出错: {str(e)}")
print("\nExcel更新完成统计信息")
print(f"更新记录数:{update_count}")
print(f"跳过记录数:{skip_count}")
print(f"错误记录数:{error_count}")
except Exception as e:
print(f"处理Excel文件时出错: {str(e)}")
def sync_from_feishu(app_token=None, table_id=None, user_access_token=None, filters=None):
"""从飞书同步数据到系统
参数:
app_token: 飞书应用的APP_TOKEN如不提供则使用默认值
table_id: 飞书多维表格的TABLE_ID如不提供则使用默认值
user_access_token: 用户访问令牌,如不提供则使用默认值
filters: 筛选条件,字典格式
返回:
dict: 包含同步统计信息的字典
"""
try:
# 使用传入的参数或默认值
app_token = app_token or settings.FEISHU_APP_TOKEN
table_id = table_id or settings.FEISHU_TABLE_ID
user_access_token = user_access_token or settings.FEISHU_USER_ACCESS_TOKEN
if not all([app_token, table_id, user_access_token]):
logger.error("飞书API凭据不完整")
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': '飞书API凭据不完整'
}
# 初始化统计信息
created_count = 0
updated_count = 0
error_count = 0
created_creators = []
# 获取飞书数据
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
print("开始从飞书同步数据...")
all_records = fetch_all_records(client, app_token, table_id, user_access_token)
if not all_records:
print("未获取到任何记录")
return {
'created': 0,
'updated': 0,
'errors': 0,
'total': 0
}
print("\n开始更新数据库...")
for record in all_records:
creator, created = save_to_database(record)
if creator:
if created:
created_count += 1
if created_count % 10 == 0:
print(f"已创建 {created_count} 条记录...")
created_creators.append(creator)
else:
updated_count += 1
if updated_count % 10 == 0:
print(f"已更新 {updated_count} 条记录...")
else:
error_count += 1
print(f"处理记录失败")
print("\n飞书同步完成!统计信息:")
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}")
# 返回统计信息增加了created_creators列表
return {
'created': created_count,
'updated': updated_count,
'errors': error_count,
'total': created_count + updated_count,
'created_creators': created_creators # 新增:返回新创建的记录对象列表
}
except Exception as e:
logger.error(f"从飞书同步数据时出错: {str(e)}")
logger.error(traceback.format_exc())
return {
'created': 0,
'updated': 0,
'errors': 1,
'total': 0,
'error_message': str(e)
}
def format_field_value(value, field_type='text'):
"""格式化字段值以符合飞书多维表格API要求"""
if value is None or value == '':
return None
if field_type == 'text':
return {"text": str(value)}
elif field_type == 'number':
try:
return float(value)
except (ValueError, TypeError):
return None
elif field_type == 'checkbox':
return value.lower() in ('true', 'yes', '', '1')
elif field_type == 'multi_select':
# 多选项需要返回一个选项ID列表
if isinstance(value, str):
items = [item.strip() for item in value.split(',') if item.strip()]
return [{"text": item} for item in items]
elif isinstance(value, list):
return [{"text": str(item)} for item in value]
return []
return {"text": str(value)}
def export_to_feishu(query_set=None):
"""从数据库导出记录到飞书多维表格"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从数据库导出数据到飞书...")
# 如果没有提供查询集则使用所有没有record_id的记录
if not query_set:
query_set = FeishuCreator.objects.filter(record_id='')
if not query_set.exists():
print("没有需要导出的记录")
return
created_count = 0
updated_count = 0
error_count = 0
for creator in query_set:
try:
# 准备字段数据
fields = {
"对接人": format_field_value(creator.contact_person),
"Handle": format_field_value(creator.handle),
"链接": format_field_value(creator.tiktok_url),
"粉丝数": format_field_value(creator.fans_count),
"GMV": format_field_value(creator.gmv),
"邮箱": format_field_value(creator.email),
"手机号|WhatsApp": format_field_value(creator.phone),
"账号属性": format_field_value(creator.account_type, 'multi_select'),
"报价": format_field_value(creator.price_quote),
"回复速度": format_field_value(creator.response_speed),
"合作意向": format_field_value(creator.cooperation_intention),
"支付方式": format_field_value(creator.payment_method),
"收款账号": format_field_value(creator.payment_account),
"收件地址": format_field_value(creator.address),
"签约OOIN?": format_field_value(creator.has_ooin),
"渠道来源": format_field_value(creator.source),
"建联进度": format_field_value(creator.contact_status),
"合作品牌": creator.cooperation_brands,
"系统展示的带货品类": format_field_value(creator.system_categories, 'multi_select'),
"实际高播放量带货品类": format_field_value(creator.actual_categories, 'multi_select'),
"达人标想要货品类": format_field_value(creator.human_categories)
}
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
if creator.record_id:
# 更新现有记录
request = UpdateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.record_id(creator.record_id) \
.request_body(UpdateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.update(request, option)
if response.success():
updated_count += 1
print(f"已更新记录: {creator.handle}")
else:
error_count += 1
print(f"更新失败: {creator.handle}, 错误: {response.msg}")
else:
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
# 获取新记录的ID并保存
record_id = response.data.record_id
creator.record_id = record_id
creator.save()
created_count += 1
print(f"已创建记录: {creator.handle}, ID: {record_id}")
else:
error_count += 1
print(f"创建失败: {creator.handle}, 错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理记录 {creator.handle} 时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n飞书导出完成!统计信息:")
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总处理记录数:{created_count + updated_count + error_count}")
def import_from_database_to_feishu(model_name, filters=None, limit=None):
"""从指定数据库模型导入数据到飞书多维表格
参数:
model_name (str): 模型名称,如 'Data', 'KnowledgeBase'
filters (dict): 过滤条件,如 {'department': '技术部'}
limit (int): 限制导入的记录数量
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print(f"开始从{model_name}模型导入数据到飞书...")
# 选择模型
model_mapping = {
'Data': Data,
'KnowledgeBase': KnowledgeBase,
'FeishuCreator': FeishuCreator,
}
model_class = model_mapping.get(model_name)
if not model_class:
print(f"错误: 不支持的模型名称 {model_name}")
return
# 构建查询
query = model_class.objects.all()
# 应用过滤条件
if filters:
q_objects = Q()
for field, value in filters.items():
if isinstance(value, list):
# 对列表值使用OR查询
q_filter = Q()
for v in value:
q_filter |= Q(**{field: v})
q_objects &= q_filter
else:
q_objects &= Q(**{field: value})
query = query.filter(q_objects)
# 限制记录数量
if limit and isinstance(limit, int) and limit > 0:
query = query[:limit]
# 检查查询结果
total_count = query.count()
if total_count == 0:
print("没有符合条件的记录")
return
print(f"找到 {total_count} 条记录,准备导入到飞书...")
# 记录统计
created_count = 0
error_count = 0
# 处理不同模型的字段映射
for index, record in enumerate(query):
try:
if model_name == 'Data':
# Data模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.get_type_display()),
"部门": format_field_value(record.department),
"字符长度": format_field_value(record.char_length, 'number'),
"文档数量": format_field_value(record.document_count, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"更新时间": format_field_value(record.update_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
elif model_name == 'KnowledgeBase':
# KnowledgeBase模型到飞书字段的映射
fields = {
"名称": format_field_value(record.name),
"描述": format_field_value(record.desc),
"类型": format_field_value(record.type),
"部门": format_field_value(record.department),
"小组": format_field_value(record.group),
"文档数量": format_field_value(record.document_count, 'number'),
"字符长度": format_field_value(record.char_length, 'number'),
"创建时间": format_field_value(record.create_time.strftime('%Y-%m-%d %H:%M:%S')),
"ID": format_field_value(str(record.id)),
}
else:
# 对于FeishuCreator模型使用export_to_feishu函数的逻辑
continue
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == total_count:
print(f"已导入 {created_count}/{total_count} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
print("\n飞书导入完成!统计信息:")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{total_count}")
def customize_import_to_feishu(sql_query, field_mapping=None):
"""从自定义SQL查询结果导入数据到飞书多维表格
参数:
sql_query (str): SQL查询语句
field_mapping (dict): 字段映射,如 {'db_field': 'feishu_field'}
"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从自定义SQL查询导入数据到飞书...")
try:
# 执行SQL查询
with connection.cursor() as cursor:
cursor.execute(sql_query)
columns = [col[0] for col in cursor.description]
result_set = cursor.fetchall()
# 检查查询结果
if not result_set:
print("SQL查询没有返回任何结果")
return
print(f"查询返回 {len(result_set)} 条记录,准备导入到飞书...")
# 默认字段映射:数据库字段名 -> 飞书字段名
if not field_mapping:
field_mapping = {col: col for col in columns}
# 记录统计
created_count = 0
error_count = 0
# 处理每条记录
for index, row in enumerate(result_set):
try:
# 创建记录字典
record_dict = dict(zip(columns, row))
# 将数据映射到飞书字段
fields = {}
for db_field, feishu_field in field_mapping.items():
if db_field in record_dict:
value = record_dict[db_field]
# 处理不同类型的值
if isinstance(value, (int, float)):
fields[feishu_field] = format_field_value(value, 'number')
else:
fields[feishu_field] = format_field_value(value)
# 过滤掉None值
fields = {k: v for k, v in fields.items() if v is not None}
# 创建新记录
request = CreateAppTableRecordRequest.builder() \
.app_token(APP_TOKEN) \
.table_id(TABLE_ID) \
.request_body(CreateAppTableRecordRequestBody.builder().fields(fields).build()) \
.build()
option = lark.RequestOption.builder().user_access_token(USER_ACCESS_TOKEN).build()
response = client.bitable.v1.app_table_record.create(request, option)
if response.success():
created_count += 1
if created_count % 10 == 0 or created_count == len(result_set):
print(f"已导入 {created_count}/{len(result_set)} 条记录...")
else:
error_count += 1
print(f"导入第 {index+1} 条记录失败,错误: {response.msg}")
except Exception as e:
error_count += 1
print(f"处理第 {index+1} 条记录时出错: {str(e)}")
import traceback
print(traceback.format_exc())
print("\n自定义SQL导入完成统计信息")
print(f"成功导入记录数:{created_count}")
print(f"失败记录数:{error_count}")
print(f"总记录数:{len(result_set)}")
except Exception as e:
print(f"执行SQL查询时出错: {str(e)}")
import traceback
print(traceback.format_exc())
def print_help():
"""打印帮助信息"""
print("飞书数据同步工具")
print("================")
print("\n可用命令:")
print(" 1. 从飞书导入数据到数据库:")
print(" python feishu.py sync_from_feishu")
print()
print(" 2. 从Excel文件更新数据库记录:")
print(" python feishu.py update_from_excel <excel文件路径>")
print()
print(" 3. 从数据库导出数据到飞书多维表格:")
print(" python feishu.py export_to_feishu [--handle 关键字] [--limit 数量]")
print()
print(" 4. 从特定数据库模型导入数据到飞书:")
print(" python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print(" 支持的模型: Data, KnowledgeBase, FeishuCreator")
print(" 示例: python feishu.py import_from_db Data --filters type=admin,department=技术部 --limit 50")
print()
print(" 5. 从自定义SQL查询导入数据到飞书:")
print(" python feishu.py custom_import <sql_file>")
print(" SQL文件中应包含SQL查询语句及可选的字段映射JSON")
print()
print(" 6. 将达人信息同步到知识库:")
print(" python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
print(" 示例: python feishu.py sync_to_kb --handle tiktok_user123")
print()
print(" 7. 显示帮助信息:")
print(" python feishu.py help")
print()
def sync_to_knowledge_base(creator_id=None, handle=None, email=None):
"""将达人信息同步到外部知识库
参数:
creator_id: 达人ID
handle: 达人Handle
email: 达人Email
返回:
tuple: (KnowledgeBase对象, 是否新创建)
"""
try:
import json
import requests
import tempfile
import os
from datetime import datetime
from django.conf import settings
from user_management.models import FeishuCreator, KnowledgeBase, User, KnowledgeBaseDocument
# 查找达人
query = FeishuCreator.objects.all()
if creator_id:
query = query.filter(id=creator_id)
elif handle:
query = query.filter(handle=handle)
elif email:
query = query.filter(email=email)
creator = query.first()
if not creator:
logger.error(f"未找到达人: creator_id={creator_id}, handle={handle}, email={email}")
return None, False
# 检查达人是否有足够信息
if not creator.handle:
logger.error(f"达人没有handle无法创建知识库: {creator.id}")
return None, False
# 检查是否已有通过Gmail映射的知识库
gmail_kb = None
if creator.email:
from user_management.models import GmailTalentMapping
gmail_mapping = GmailTalentMapping.objects.filter(
talent_email=creator.email,
is_active=True
).first()
if gmail_mapping and gmail_mapping.knowledge_base:
gmail_kb = gmail_mapping.knowledge_base
logger.info(f"达人 {creator.handle} 已有Gmail知识库: {gmail_kb.id}")
return gmail_kb, False
# 获取当前用户(组长)
User = get_user_model()
admin_user = User.objects.filter(role='leader').first()
if not admin_user:
logger.error("未找到组长用户,无法创建知识库")
return None, False
# 生成知识库名称
kb_name = f"达人-{creator.handle}"
# 检查是否已存在同名知识库
existed_kb = KnowledgeBase.objects.filter(name=kb_name).first()
if existed_kb:
logger.info(f"知识库 {kb_name} 已存在ID: {existed_kb.id}")
return existed_kb, False
# 创建新知识库
kb = KnowledgeBase.objects.create(
name=kb_name,
desc=f"达人 {creator.handle} 的信息知识库",
type='private', # 设置为私有知识库
user_id=admin_user.id,
department=admin_user.department,
document_count=0
)
logger.info(f"已创建知识库: {kb.id}")
# 调用内部方法创建外部知识库
from user_management.views import KnowledgeBaseViewSet
kb_viewset = KnowledgeBaseViewSet()
try:
external_id = kb_viewset._create_external_dataset(kb)
# 保存external_id
kb.external_id = external_id
kb.save()
logger.info(f"已创建外部知识库ID: {external_id}")
except Exception as e:
logger.error(f"创建外部知识库失败: {str(e)}")
# 如果外部知识库创建失败,删除本地知识库
kb.delete()
return None, False
# 准备达人信息文档
content = []
# 基本信息
content.append("# 达人基本信息")
content.append(f"Handle: {creator.handle}")
content.append(f"邮箱: {creator.email}")
content.append(f"对接人: {creator.contact_person}")
content.append(f"链接: {creator.tiktok_url}")
content.append(f"粉丝数: {creator.fans_count}")
content.append(f"GMV: {creator.gmv}")
content.append(f"电话: {creator.phone}")
# 账号属性
content.append("\n# 账号属性")
content.append(f"账号属性: {creator.account_type}")
content.append(f"报价: {creator.price_quote}")
content.append(f"回复速度: {creator.response_speed}")
# 合作信息
content.append("\n# 合作信息")
content.append(f"合作意向: {creator.cooperation_intention}")
content.append(f"支付方式: {creator.payment_method}")
content.append(f"收款账号: {creator.payment_account}")
content.append(f"收件地址: {creator.address}")
content.append(f"签约OOIN?: {creator.has_ooin}")
# 渠道和品类
content.append("\n# 渠道和品类")
content.append(f"渠道来源: {creator.source}")
content.append(f"建联进度: {creator.contact_status}")
content.append(f"合作品牌: {creator.cooperation_brands}")
content.append(f"系统展示品类: {creator.system_categories}")
content.append(f"实际品类: {creator.actual_categories}")
content.append(f"达人想要品类: {creator.human_categories}")
# 其他信息
content.append("\n# 其他信息")
content.append(f"达人base: {creator.creator_base}")
content.append(f"备注: {creator.notes}")
content.append(f"更新时间: {creator.updated_at.strftime('%Y-%m-%d %H:%M:%S')}")
# 合并内容
full_content = "\n".join(content)
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.txt', mode='w', encoding='utf-8') as temp:
temp.write(full_content)
temp_file_path = temp.name
try:
# 准备文档上传需要的段落数据
doc_data = {
"name": f"达人信息-{creator.handle}.txt",
"paragraphs": [
{
"title": "达人基本资料",
"content": full_content,
"is_active": True,
"problem_list": []
}
]
}
# 调用上传API
upload_response = kb_viewset._call_upload_api(kb.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
# 上传成功,保存记录到数据库
document_id = upload_response['data']['id']
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=kb,
document_id=document_id,
document_name=f"达人信息-{creator.handle}.txt",
external_id=document_id,
status='active'
)
logger.info(f"文档上传成功ID: {document_id}")
# 更新知识库文档计数
kb.document_count = 1
kb.save()
# 如果有邮箱创建Gmail映射
if creator.email:
from user_management.models import GmailTalentMapping
GmailTalentMapping.objects.create(
user=admin_user,
talent_email=creator.email,
knowledge_base=kb,
conversation_id=f"feishu_{creator.id}",
is_active=True
)
logger.info(f"已创建Gmail映射: {creator.email} -> {kb.id}")
logger.info(f"成功为达人 {creator.handle} 创建知识库并上传文档: {kb.id}")
return kb, True
else:
# 上传失败,记录错误信息
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"文档上传失败: {error_msg}")
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
except Exception as e:
logger.error(f"文档上传过程中出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 尝试删除创建的知识库和外部知识库
try:
kb_viewset._delete_external_dataset(kb.external_id)
except Exception as del_err:
logger.error(f"删除外部知识库失败: {str(del_err)}")
kb.delete()
return None, False
finally:
# 清理临时文件
try:
os.unlink(temp_file_path)
except:
pass
except Exception as e:
logger.error(f"同步达人到知识库时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return None, False
def main():
"""主函数"""
if len(sys.argv) < 2 or sys.argv[1] == "help":
print_help()
return
command = sys.argv[1]
if command == "sync_from_feishu":
sync_from_feishu()
elif command == "update_from_excel":
if len(sys.argv) < 3:
print("错误: 未指定Excel文件路径")
print("用法: python feishu.py update_from_excel <excel文件路径>")
return
excel_file_path = sys.argv[2]
update_from_excel(excel_file_path)
elif command == "export_to_feishu":
# 可选参数处理
handle_filter = None
limit = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle_filter = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
# 如果提供了handle过滤器创建自定义查询集
if handle_filter:
query_set = FeishuCreator.objects.filter(handle__icontains=handle_filter)
if limit:
query_set = query_set[:limit]
export_to_feishu(query_set)
else:
export_to_feishu()
elif command == "import_from_db":
if len(sys.argv) < 3:
print("错误: 未指定数据库模型")
print("用法: python feishu.py import_from_db <模型名称> [--filters 字段1=值1,字段2=值2,...] [--limit 数量]")
print("支持的模型: Data, KnowledgeBase, FeishuCreator")
return
model_name = sys.argv[2]
filters = {}
limit = None
# 解析额外参数
i = 3
while i < len(sys.argv):
if sys.argv[i] == "--filters" and i + 1 < len(sys.argv):
filter_str = sys.argv[i+1]
for item in filter_str.split(','):
if '=' in item:
key, value = item.split('=', 1)
filters[key.strip()] = value.strip()
i += 2
elif sys.argv[i] == "--limit" and i + 1 < len(sys.argv):
try:
limit = int(sys.argv[i+1])
except ValueError:
print(f"警告: 无效的limit值 '{sys.argv[i+1]}',将使用默认值")
i += 2
else:
i += 1
import_from_database_to_feishu(model_name, filters, limit)
elif command == "custom_import":
if len(sys.argv) < 3:
print("错误: 未指定SQL文件路径")
print("用法: python feishu.py custom_import <sql_file>")
return
sql_file_path = sys.argv[2]
# 读取SQL文件
try:
with open(sql_file_path, 'r', encoding='utf-8') as f:
file_content = f.read().strip()
# 判断是否包含字段映射
if '-- FIELD_MAPPING:' in file_content:
sql_part, mapping_part = file_content.split('-- FIELD_MAPPING:', 1)
sql_query = sql_part.strip()
try:
field_mapping = json.loads(mapping_part.strip())
print(f"使用自定义字段映射: {field_mapping}")
except json.JSONDecodeError:
print(f"警告: 字段映射格式无效,将使用默认映射")
field_mapping = None
else:
sql_query = file_content
field_mapping = None
# 执行导入
customize_import_to_feishu(sql_query, field_mapping)
except FileNotFoundError:
print(f"错误: 找不到文件 '{sql_file_path}'")
except Exception as e:
print(f"处理SQL文件时出错: {str(e)}")
elif command == "sync_to_kb":
# 同步达人信息到知识库
if len(sys.argv) < 3:
print("错误: 未指定达人识别信息")
print("用法: python feishu.py sync_to_kb --id <creator_id> | --handle <handle> | --email <email>")
return
creator_id = None
handle = None
email = None
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--id" and i + 1 < len(sys.argv):
creator_id = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--handle" and i + 1 < len(sys.argv):
handle = sys.argv[i+1]
i += 2
elif sys.argv[i] == "--email" and i + 1 < len(sys.argv):
email = sys.argv[i+1]
i += 2
else:
i += 1
kb, created = sync_to_knowledge_base(creator_id, handle, email)
if kb:
status = "新创建" if created else "已存在"
print(f"达人知识库{status}: {kb.id} ({kb.name})")
print(f"文档数量: {kb.document_count}")
else:
print("同步失败,未能创建或找到达人知识库")
else:
print(f"未知命令: {command}")
print_help()
if __name__ == "__main__":
main()