import json import django import os import sys import pandas as pd from django.db import transaction # 设置 Django 环境 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings') django.setup() import lark_oapi as lark from lark_oapi.api.bitable.v1 import * from user_management.models import FeishuCreator # SDK 使用说明: https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/server-side-sdk/python--sdk/preparations-before-development # 以下示例代码默认根据文档示例值填充,如果存在代码问题,请在 API 调试台填上相关必要参数后再复制代码使用 def extract_field_value(field_value): """提取字段值""" if isinstance(field_value, list): if field_value and isinstance(field_value[0], dict): return field_value[0].get('text', '') elif isinstance(field_value, dict): if 'text' in field_value: return field_value['text'] elif 'link' in field_value: return field_value['link'] elif 'link_record_ids' in field_value: return '' return field_value def save_to_database(record): """从飞书多维表格保存记录到数据库""" fields = record.fields record_id = record.record_id creator_data = { 'record_id': record_id, 'contact_person': extract_field_value(fields.get('对接人', '')), 'handle': extract_field_value(fields.get('Handle', '')), 'tiktok_url': extract_field_value(fields.get('链接', '')), 'fans_count': extract_field_value(fields.get('粉丝数', '')), 'gmv': fields.get('GMV', ''), 'email': extract_field_value(fields.get('邮箱', '')), 'phone': extract_field_value(fields.get('手机号|WhatsApp', '')), 'account_type': extract_field_value(fields.get('账号属性', [])), 'price_quote': fields.get('报价', ''), 'response_speed': fields.get('回复速度', ''), 'cooperation_intention': fields.get('合作意向', ''), 'payment_method': fields.get('支付方式', ''), 'payment_account': fields.get('收款账号', ''), 'address': fields.get('收件地址', ''), 'has_ooin': fields.get('签约OOIN?', ''), 'source': fields.get('渠道来源', ''), 'contact_status': fields.get('建联进度', ''), 'cooperation_brands': fields.get('合作品牌', []), 'system_categories': extract_field_value(fields.get('系统展示的带货品类', [])), 'actual_categories': extract_field_value(fields.get('实际高播放量带货品类', [])), 'human_categories': fields.get('达人标想要货品类', ''), 'creator_base': '', 'notes': extract_field_value(fields.get('父记录', '')), } try: creator, created = FeishuCreator.objects.update_or_create( record_id=record_id, defaults=creator_data ) return creator, created except Exception as e: print(f"保存记录时出错: {str(e)}") print(f"记录数据: {creator_data}") return None, False def fetch_all_records(client, app_token, table_id, user_access_token): """获取所有记录""" total_records = [] page_token = None page_size = 20 while True: try: # 构造请求对象 builder = SearchAppTableRecordRequest.builder() \ .app_token(app_token) \ .table_id(table_id) \ .page_size(page_size) # 如果有page_token,添加到请求中 if page_token: builder = builder.page_token(page_token) # 构建完整请求 request = builder.request_body(SearchAppTableRecordRequestBody.builder().build()).build() print(f"发送请求,page_token: {page_token}") # 发起请求 option = lark.RequestOption.builder().user_access_token(user_access_token).build() response = client.bitable.v1.app_table_record.search(request, option) if not response.success(): print(f"请求失败: {response.code}, {response.msg}") break # 获取当前页记录 current_records = response.data.items if not current_records: print("没有更多记录") break total_records.extend(current_records) # 解析响应数据获取分页信息 response_data = json.loads(response.raw.content) total = response_data["data"]["total"] print(f"获取到 {len(current_records)} 条记录,当前总计: {len(total_records)}/{total} 条") # 获取下一页token page_token = response_data["data"].get("page_token") if not page_token or not response_data["data"].get("has_more", False): print("已获取所有数据") break except Exception as e: print(f"错误: {str(e)}") import traceback print(traceback.format_exc()) break print(f"最终获取到 {len(total_records)} 条记录") return total_records def update_from_excel(excel_file_path): """从Excel文件更新数据库记录""" try: print(f"开始读取Excel文件: {excel_file_path}") df = pd.read_excel(excel_file_path) if 'Handle' not in df.columns: print("错误: Excel文件中缺少'Handle'列") return update_count = 0 skip_count = 0 error_count = 0 # 获取可更新的字段列表 excluded_fields = {'id', 'record_id', 'created_at', 'updated_at'} model_fields = {f.name for f in FeishuCreator._meta.get_fields()} - excluded_fields valid_columns = set(df.columns) & model_fields print(f"可更新的列: {valid_columns}") with transaction.atomic(): for index, row in df.iterrows(): try: handle = str(row['Handle']).strip() if not handle: print(f"跳过第{index+2}行: Handle为空") skip_count += 1 continue # 查找现有记录 creator = FeishuCreator.objects.filter(handle=handle).first() if not creator: print(f"跳过Handle为'{handle}'的记录: 数据库中不存在") skip_count += 1 continue # 准备更新数据 update_data = {} for column in valid_columns: if column == 'Handle': continue value = row[column] if pd.isna(value): continue # 处理特殊类型 if isinstance(value, (list, dict)): value = json.dumps(value) elif isinstance(value, (int, float)): if column in ['fans_count']: value = int(value) else: value = str(value) else: value = str(value).strip() if value: update_data[column] = value # 更新记录 if update_data: for field, value in update_data.items(): setattr(creator, field, value) creator.save() update_count += 1 print(f"已更新Handle为'{handle}'的记录") else: skip_count += 1 print(f"跳过Handle为'{handle}'的记录: 无需更新") except Exception as e: error_count += 1 print(f"处理Handle'{handle}'时出错: {str(e)}") print("\nExcel更新完成!统计信息:") print(f"更新记录数:{update_count}") print(f"跳过记录数:{skip_count}") print(f"错误记录数:{error_count}") except Exception as e: print(f"处理Excel文件时出错: {str(e)}") def sync_from_feishu(): """从飞书多维表格同步数据""" # 创建client client = lark.Client.builder() \ .enable_set_token(True) \ .log_level(lark.LogLevel.DEBUG) \ .build() # 配置参数 APP_TOKEN = "XYE6bMQUOaZ5y5svj4vcWohGnmg" TABLE_ID = "tbl3oikG3F8YYtVA" USER_ACCESS_TOKEN = "u-ecM5BmzKx4uHz3sG0FouQSk1l9kxgl_3Xa00l5Ma24Jy" print("开始从飞书同步数据...") all_records = fetch_all_records(client, APP_TOKEN, TABLE_ID, USER_ACCESS_TOKEN) if not all_records: print("未获取到任何记录") return print("\n开始更新数据库...") created_count = 0 updated_count = 0 error_count = 0 for record in all_records: creator, created = save_to_database(record) if creator: if created: created_count += 1 if created_count % 10 == 0: print(f"已创建 {created_count} 条记录...") else: updated_count += 1 if updated_count % 10 == 0: print(f"已更新 {updated_count} 条记录...") else: error_count += 1 print(f"处理记录失败") print("\n飞书同步完成!统计信息:") print(f"新建记录:{created_count}") print(f"更新记录:{updated_count}") print(f"错误记录:{error_count}") print(f"总记录数:{len(all_records)}") def main(): """主函数""" if len(sys.argv) < 2: print("使用方法:") print("1. 从飞书同步: python feishu.py sync") print("2. 从Excel更新: python feishu.py excel ") return command = sys.argv[1] if command == 'sync': sync_from_feishu() elif command == 'excel': if len(sys.argv) != 3: print("使用方法: python feishu.py excel ") return excel_file_path = sys.argv[2] update_from_excel(excel_file_path) else: print("无效的命令。使用方法:") print("1. 从飞书同步: python feishu.py sync") print("2. 从Excel更新: python feishu.py excel ") if __name__ == "__main__": main()