role_based_system/feishu/feishu.py
2025-04-02 12:25:40 +08:00

290 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import django
import os
import sys
import pandas as pd
from django.db import transaction
# 设置 Django 环境
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings')
django.setup()
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
from user_management.models import FeishuCreator
# SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development
# 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用
def extract_field_value(field_value):
"""提取字段值"""
if isinstance(field_value, list):
if field_value and isinstance(field_value[0], dict):
return field_value[0].get('text', '')
elif isinstance(field_value, dict):
if 'text' in field_value:
return field_value['text']
elif 'link' in field_value:
return field_value['link']
elif 'link_record_ids' in field_value:
return ''
return field_value
def save_to_database(record):
"""从飞书多维表格保存记录到数据库"""
fields = record.fields
record_id = record.record_id
creator_data = {
'record_id': record_id,
'contact_person': extract_field_value(fields.get('对接人', '')),
'handle': extract_field_value(fields.get('Handle', '')),
'tiktok_url': extract_field_value(fields.get('链接', '')),
'fans_count': extract_field_value(fields.get('粉丝数', '')),
'gmv': fields.get('GMV', ''),
'email': extract_field_value(fields.get('邮箱', '')),
'phone': extract_field_value(fields.get('手机号|WhatsApp', '')),
'account_type': extract_field_value(fields.get('账号属性', [])),
'price_quote': fields.get('报价', ''),
'response_speed': fields.get('回复速度', ''),
'cooperation_intention': fields.get('合作意向', ''),
'payment_method': fields.get('支付方式', ''),
'payment_account': fields.get('收款账号', ''),
'address': fields.get('收件地址', ''),
'has_ooin': fields.get('签约OOIN?', ''),
'source': fields.get('渠道来源', ''),
'contact_status': fields.get('建联进度', ''),
'cooperation_brands': fields.get('合作品牌', []),
'system_categories': extract_field_value(fields.get('系统展示的带货品类', [])),
'actual_categories': extract_field_value(fields.get('实际高播放量带货品类', [])),
'human_categories': fields.get('达人标想要货品类', ''),
'creator_base': '',
'notes': extract_field_value(fields.get('父记录', '')),
}
try:
creator, created = FeishuCreator.objects.update_or_create(
record_id=record_id,
defaults=creator_data
)
return creator, created
except Exception as e:
print(f"保存记录时出错: {str(e)}")
print(f"记录数据: {creator_data}")
return None, False
def fetch_all_records(client, app_token, table_id, user_access_token):
"""获取所有记录"""
total_records = []
page_token = None
page_size = 20
while True:
try:
# 构造请求对象
builder = SearchAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
# 如果有page_token添加到请求中
if page_token:
builder = builder.page_token(page_token)
# 构建完整请求
request = builder.request_body(SearchAppTableRecordRequestBody.builder().build()).build()
print(f"发送请求page_token: {page_token}")
# 发起请求
option = lark.RequestOption.builder().user_access_token(user_access_token).build()
response = client.bitable.v1.app_table_record.search(request, option)
if not response.success():
print(f"请求失败: {response.code}, {response.msg}")
break
# 获取当前页记录
current_records = response.data.items
if not current_records:
print("没有更多记录")
break
total_records.extend(current_records)
# 解析响应数据获取分页信息
response_data = json.loads(response.raw.content)
total = response_data["data"]["total"]
print(f"获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total}")
# 获取下一页token
page_token = response_data["data"].get("page_token")
if not page_token or not response_data["data"].get("has_more", False):
print("已获取所有数据")
break
except Exception as e:
print(f"错误: {str(e)}")
import traceback
print(traceback.format_exc())
break
print(f"最终获取到 {len(total_records)} 条记录")
return total_records
def update_from_excel(excel_file_path):
"""从Excel文件更新数据库记录"""
try:
print(f"开始读取Excel文件: {excel_file_path}")
df = pd.read_excel(excel_file_path)
if 'Handle' not in df.columns:
print("错误: Excel文件中缺少'Handle'")
return
update_count = 0
skip_count = 0
error_count = 0
# 获取可更新的字段列表
excluded_fields = {'id', 'record_id', 'created_at', 'updated_at'}
model_fields = {f.name for f in FeishuCreator._meta.get_fields()} - excluded_fields
valid_columns = set(df.columns) & model_fields
print(f"可更新的列: {valid_columns}")
with transaction.atomic():
for index, row in df.iterrows():
try:
handle = str(row['Handle']).strip()
if not handle:
print(f"跳过第{index+2}行: Handle为空")
skip_count += 1
continue
# 查找现有记录
creator = FeishuCreator.objects.filter(handle=handle).first()
if not creator:
print(f"跳过Handle为'{handle}'的记录: 数据库中不存在")
skip_count += 1
continue
# 准备更新数据
update_data = {}
for column in valid_columns:
if column == 'Handle':
continue
value = row[column]
if pd.isna(value):
continue
# 处理特殊类型
if isinstance(value, (list, dict)):
value = json.dumps(value)
elif isinstance(value, (int, float)):
if column in ['fans_count']:
value = int(value)
else:
value = str(value)
else:
value = str(value).strip()
if value:
update_data[column] = value
# 更新记录
if update_data:
for field, value in update_data.items():
setattr(creator, field, value)
creator.save()
update_count += 1
print(f"已更新Handle为'{handle}'的记录")
else:
skip_count += 1
print(f"跳过Handle为'{handle}'的记录: 无需更新")
except Exception as e:
error_count += 1
print(f"处理Handle'{handle}'时出错: {str(e)}")
print("\nExcel更新完成统计信息")
print(f"更新记录数:{update_count}")
print(f"跳过记录数:{skip_count}")
print(f"错误记录数:{error_count}")
except Exception as e:
print(f"处理Excel文件时出错: {str(e)}")
def sync_from_feishu():
"""从飞书多维表格同步数据"""
# 创建client
client = lark.Client.builder() \
.enable_set_token(True) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 配置参数
APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg"
TABLE_ID = "tbl3oikG3F8YYtVA"
USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy"
print("开始从飞书同步数据...")
all_records = fetch_all_records(client, APP_TOKEN, TABLE_ID, USER_ACCESS_TOKEN)
if not all_records:
print("未获取到任何记录")
return
print("\n开始更新数据库...")
created_count = 0
updated_count = 0
error_count = 0
for record in all_records:
creator, created = save_to_database(record)
if creator:
if created:
created_count += 1
if created_count % 10 == 0:
print(f"已创建 {created_count} 条记录...")
else:
updated_count += 1
if updated_count % 10 == 0:
print(f"已更新 {updated_count} 条记录...")
else:
error_count += 1
print(f"处理记录失败")
print("\n飞书同步完成!统计信息:")
print(f"新建记录:{created_count}")
print(f"更新记录:{updated_count}")
print(f"错误记录:{error_count}")
print(f"总记录数:{len(all_records)}")
def main():
"""主函数"""
if len(sys.argv) < 2:
print("使用方法:")
print("1. 从飞书同步: python feishu.py sync")
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
return
command = sys.argv[1]
if command == 'sync':
sync_from_feishu()
elif command == 'excel':
if len(sys.argv) != 3:
print("使用方法: python feishu.py excel <excel文件路径>")
return
excel_file_path = sys.argv[2]
update_from_excel(excel_file_path)
else:
print("无效的命令。使用方法:")
print("1. 从飞书同步: python feishu.py sync")
print("2. 从Excel更新: python feishu.py excel <excel文件路径>")
if __name__ == "__main__":
main()