role_based_system/feishu/feishu.py

290 lines
11 KiB
Python
Raw Normal View History

2025-03-29 12:26:50 +08:00
import json
import django
import os
import sys
2025-04-02 12:25:40 +08:00
import pandas as pd
from django.db import transaction
2025-03-29 12:26:50 +08:00
# 设置 Django 环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings')
django.setup()
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
from user_management.models import FeishuCreator
# SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development
# 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用
def extract_field_value(field_value):
"""提取字段值"""
if isinstance(field_value, list):
if field_value and isinstance(field_value[0], dict):
return field_value[0].get('text', '')
elif isinstance(field_value, dict):
if 'text' in field_value:
return field_value['text']
elif 'link' in field_value:
return field_value['link']
elif 'link_record_ids' in field_value:
return ''
return field_value
def save_to_database(record):
2025-04-02 12:25:40 +08:00
"""从飞书多维表格保存记录到数据库"""
2025-03-29 12:26:50 +08:00
fields = record.fields
record_id = record.record_id
creator_data = {
'record_id': record_id,
'contact_person': extract_field_value(fields.get('对接人', '')),
'handle': extract_field_value(fields.get('Handle', '')),
'tiktok_url': extract_field_value(fields.get('链接', '')),
'fans_count': extract_field_value(fields.get('粉丝数', '')),
'gmv': fields.get('GMV', ''),
'email': extract_field_value(fields.get('邮箱', '')),
'phone': extract_field_value(fields.get('手机号|WhatsApp', '')),
'account_type': extract_field_value(fields.get('账号属性', [])),
'price_quote': fields.get('报价', ''),
'response_speed': fields.get('回复速度', ''),
'cooperation_intention': fields.get('合作意向', ''),
'payment_method': fields.get('支付方式', ''),
'payment_account': fields.get('收款账号', ''),
'address': fields.get('收件地址', ''),
'has_ooin': fields.get('签约OOIN?', ''),
'source': fields.get('渠道来源', ''),
'contact_status': fields.get('建联进度', ''),
'cooperation_brands': fields.get('合作品牌', []),
'system_categories': extract_field_value(fields.get('系统展示的带货品类', [])),
'actual_categories': extract_field_value(fields.get('实际高播放量带货品类', [])),
'human_categories': fields.get('达人标想要货品类', ''),
2025-04-02 12:25:40 +08:00
'creator_base': '',
2025-03-29 12:26:50 +08:00
'notes': extract_field_value(fields.get('父记录', '')),
}
try:
creator, created = FeishuCreator.objects.update_or_create(
record_id=record_id,
defaults=creator_data
)
return creator, created
except Exception as e:
print(f"保存记录时出错: {str(e)}")
print(f"记录数据: {creator_data}")
return None, False
def fetch_all_records(client, app_token, table_id, user_access_token):
"""获取所有记录"""
total_records = []
page_token = None
page_size = 20
while True:
try:
# 构造请求对象
builder = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
# 如果有page_token添加到请求中
if page_token:
builder = builder.page_token(page_token)
# 构建完整请求
request = builder.request_body(SearchAppTableRecordRequestBody.builder().build()).build()
print(f"发送请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(user_access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
print(f"请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
print("没有更多记录")
break
total_records.extend(current_records)
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
total = response_data["data"]["total"]
print(f"获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not response_data["data"].get("has_more", False):
print("已获取所有数据")
break
except Exception as e:
print(f"错误: {str(e)}")
import traceback
print(traceback.format_exc())
break
print(f"最终获取到 {len(total_records)} 条记录")
return total_records
2025-04-02 12:25:40 +08:00
def update_from_excel(excel_file_path):
"""从Excel文件更新数据库记录"""
try:
print(f"开始读取Excel文件: {excel_file_path}")
df = pd.read_excel(excel_file_path)
if 'Handle' not in df.columns:
print("错误: Excel文件中缺少'Handle'")
return
update_count = 0
skip_count = 0
error_count = 0
# 获取可更新的字段列表
excluded_fields = {'id', 'record_id', 'created_at', 'updated_at'}
model_fields = {f.name for f in FeishuCreator._meta.get_fields()} - excluded_fields
valid_columns = set(df.columns) & model_fields
print(f"可更新的列: {valid_columns}")
with transaction.atomic():
for index, row in df.iterrows():
try:
handle = str(row['Handle']).strip()
if not handle:
print(f"跳过第{index+2}行: Handle为空")
skip_count += 1
continue
# 查找现有记录
creator = FeishuCreator.objects.filter(handle=handle).first()
if not creator:
print(f"跳过Handle为'{handle}'的记录: 数据库中不存在")
skip_count += 1
continue
# 准备更新数据
update_data = {}
for column in valid_columns:
if column == 'Handle':
continue
value = row[column]
if pd.isna(value):
continue
# 处理特殊类型
if isinstance(value, (list, dict)):
value = json.dumps(value)
elif isinstance(value, (int, float)):
if column in ['fans_count']:
value = int(value)
else:
value = str(value)
else:
value = str(value).strip()
if value:
update_data[column] = value
# 更新记录
if update_data:
for field, value in update_data.items():
setattr(creator, field, value)
creator.save()
update_count += 1
print(f"已更新Handle为'{handle}'的记录")
else:
skip_count += 1
print(f"跳过Handle为'{handle}'的记录: 无需更新")
except Exception as e:
error_count += 1
print(f"处理Handle'{handle}'时出错: {str(e)}")
print("\nExcel更新完成统计信息")
print(f"更新记录数:{update_count}")
print(f"跳过记录数:{skip_count}")
print(f"错误记录数:{error_count}")
except Exception as e:
print(f"处理Excel文件时出错: {str(e)}")
def sync_from_feishu():
"""从飞书多维表格同步数据"""
2025-03-29 12:26:50 +08:00
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
2025-04-02 12:25:40 +08:00
print("开始从飞书同步数据...")
2025-03-29 12:26:50 +08:00
all_records = fetch_all_records(client, APP_TOKEN, TABLE_ID, USER_ACCESS_TOKEN)
if not all_records:
print("未获取到任何记录")
return
print("\n开始更新数据库...")
created_count = 0
updated_count = 0
error_count = 0
for record in all_records:
creator, created = save_to_database(record)
if creator:
if created:
created_count += 1
2025-04-02 12:25:40 +08:00
if created_count % 10 == 0:
2025-03-29 12:26:50 +08:00
print(f"已创建 {created_count} 条记录...")
else:
updated_count += 1
if updated_count % 10 == 0:
print(f"已更新 {updated_count} 条记录...")
else:
error_count += 1
print(f"处理记录失败")
2025-04-02 12:25:40 +08:00
print("\n飞书同步完成!统计信息:")
2025-03-29 12:26:50 +08:00
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}")
2025-04-02 12:25:40 +08:00
def main():
"""主函数"""
if len(sys.argv) < 2:
print("使用方法:")
print("1. 从飞书同步: python feishu.py sync")
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
return
command = sys.argv[1]
if command == 'sync':
sync_from_feishu()
elif command == 'excel':
if len(sys.argv) != 3:
print("使用方法: python feishu.py excel <excel文件路径>")
return
excel_file_path = sys.argv[2]
update_from_excel(excel_file_path)
else:
print("无效的命令。使用方法:")
print("1. 从飞书同步: python feishu.py sync")
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
2025-03-29 12:26:50 +08:00
if __name__ == "__main__":
main()