daren/apps/feishu/services/data_sync_service.py

346 lines
14 KiB
Python
Raw Normal View History

2025-05-29 10:08:06 +08:00
import logging
import json
from django.db import connection, models, transaction
from django.apps import apps
from django.db.models.fields import CharField, TextField, EmailField, IntegerField, FloatField
from django.db.models.fields.json import JSONField
from django.db.utils import ProgrammingError, DataError
# 不再需要飞书 SDK
# from lark_oapi.api.bitable.v1 import *
# from lark_oapi.api.bitable.v1.model.app_table_field import ListAppTableFieldRequest, ListAppTableFieldResponse
from ..models import FeishuTableMapping
from .bitable_service import BitableService
logger = logging.getLogger(__name__)
class DataSyncService:
"""
数据同步服务
将飞书多维表格数据同步到数据库中
"""
@staticmethod
def get_or_create_table_mapping(app_token, table_id, table_url, feishu_table_name=None, table_name=None):
"""
获取或创建表格映射
Args:
app_token: 应用令牌
table_id: 表格ID
table_url: 表格URL
feishu_table_name: 飞书表格名称
table_name: 自定义表名
Returns:
mapping: 表格映射对象
created: 是否为新创建
"""
try:
# 尝试查找现有映射
mapping = FeishuTableMapping.objects.get(app_token=app_token, table_id=table_id)
created = False
# 如果提供了表名,更新映射关系
if table_name and mapping.table_name != table_name:
mapping.table_name = table_name
mapping.save(update_fields=['table_name'])
# 如果提供了飞书表格名称,更新映射关系
if feishu_table_name and mapping.feishu_table_name != feishu_table_name:
mapping.feishu_table_name = feishu_table_name
mapping.save(update_fields=['feishu_table_name'])
# 确保URL是最新的
if mapping.table_url != table_url:
mapping.table_url = table_url
mapping.save(update_fields=['table_url'])
return mapping, created
except FeishuTableMapping.DoesNotExist:
# 创建新映射
# 如果没有提供自定义表名,使用默认命名规则
if not table_name:
base_name = feishu_table_name or f"table_{table_id}"
table_name = f"feishu_{base_name.lower().replace(' ', '_').replace('-', '_')}"
# 处理特殊字符,确保表名合法
import re
table_name = re.sub(r'[^a-zA-Z0-9_]', '', table_name)
# 确保表名不会太长
if len(table_name) > 50:
table_name = table_name[:45] + "_" + table_id[-4:]
# 创建新映射
mapping = FeishuTableMapping.objects.create(
app_token=app_token,
table_id=table_id,
table_url=table_url,
table_name=table_name,
feishu_table_name=feishu_table_name
)
return mapping, True
@staticmethod
def create_model_from_fields(table_name, fields, app_label='feishu'):
"""
动态创建Django模型
Args:
table_name: 数据库表名
fields: 字段信息列表
app_label: 应用标签
Returns:
model: 创建的模型类
"""
# 检查模型是否已存在
model_name = ''.join(word.capitalize() for word in table_name.split('_'))
try:
return apps.get_model(app_label, model_name)
except LookupError:
pass # 模型不存在,继续创建
# 定义属性字典
attrs = {
'__module__': f'{app_label}.models',
'Meta': type('Meta', (), {
'db_table': table_name,
'app_label': app_label,
'verbose_name': table_name,
'verbose_name_plural': table_name,
}),
'feishu_record_id': models.CharField(max_length=255, unique=True, verbose_name='飞书记录ID'),
'created_at': models.DateTimeField(auto_now_add=True, verbose_name='创建时间'),
'updated_at': models.DateTimeField(auto_now=True, verbose_name='更新时间'),
}
# 添加字段
for field in fields:
field_name = field.get('field_name', '')
if not field_name or field_name in ['id', 'feishu_record_id', 'created_at', 'updated_at']:
continue
# 将字段名转换为Python合法标识符
field_name = field_name.lower().replace(' ', '_').replace('-', '_')
# 根据字段类型创建对应的Django字段
field_type = field.get('type', 'text')
if field_type in ['text', 'single_select', 'multi_select']:
attrs[field_name] = models.TextField(blank=True, null=True, verbose_name=field.get('field_name', ''))
elif field_type == 'number':
attrs[field_name] = models.FloatField(blank=True, null=True, verbose_name=field.get('field_name', ''))
elif field_type == 'datetime':
attrs[field_name] = models.DateTimeField(blank=True, null=True, verbose_name=field.get('field_name', ''))
elif field_type == 'checkbox':
attrs[field_name] = models.BooleanField(default=False, verbose_name=field.get('field_name', ''))
elif field_type in ['attachment', 'person']:
attrs[field_name] = models.JSONField(default=list, blank=True, null=True, verbose_name=field.get('field_name', ''))
else:
# 默认使用文本字段
attrs[field_name] = models.TextField(blank=True, null=True, verbose_name=field.get('field_name', ''))
# 添加模型基类
attrs['id'] = models.AutoField(primary_key=True)
# 创建模型类
model = type(model_name, (models.Model,), attrs)
return model
@staticmethod
def create_table_from_model(model):
"""
根据模型创建数据库表
Args:
model: 模型类
Returns:
bool: 是否成功创建表
"""
try:
with connection.schema_editor() as schema_editor:
schema_editor.create_model(model)
return True
except Exception as e:
logger.error(f"创建表失败: {str(e)}")
return False
@staticmethod
def check_table_exists(table_name):
"""
检查表是否存在
Args:
table_name: 表名
Returns:
bool: 表是否存在
"""
with connection.cursor() as cursor:
tables = connection.introspection.table_names(cursor)
return table_name in tables
@staticmethod
def sync_data_to_db(table_url, access_token, table_name=None, primary_key=None, auto_sync=False):
"""
将飞书多维表格数据同步到数据库
Args:
table_url: 多维表格URL
access_token: 访问令牌
table_name: 自定义表名默认使用飞书表格名
primary_key: 主键字段名用于更新数据
auto_sync: 是否自动同步使用已保存的映射
Returns:
dict: 同步结果
"""
try:
# 提取参数
2025-05-29 17:21:16 +08:00
app_token, table_id = BitableService.extract_params_from_url(table_url)
2025-05-29 10:08:06 +08:00
# 1. 获取表格元数据
2025-05-29 17:21:16 +08:00
metadata = BitableService.get_metadata(app_token, table_id, access_token)
feishu_table_name = metadata.get('name', f'table_{table_id}')
2025-05-29 10:08:06 +08:00
# 2. 获取或创建表格映射
if auto_sync:
# 如果是自动同步模式,尝试从数据库中获取已保存的表名
try:
mapping = FeishuTableMapping.objects.get(app_token=app_token, table_id=table_id)
final_table_name = mapping.table_name
except FeishuTableMapping.DoesNotExist:
# 如果找不到映射,自动创建一个
default_table_name = f"feishu_{feishu_table_name.lower().replace(' ', '_').replace('-', '_')}"
mapping, _ = DataSyncService.get_or_create_table_mapping(
app_token,
table_id,
table_url,
feishu_table_name,
table_name or default_table_name
)
final_table_name = mapping.table_name
else:
# 非自动同步模式,优先使用用户提供的表名
default_table_name = f"feishu_{feishu_table_name.lower().replace(' ', '_').replace('-', '_')}"
final_table_name = table_name or default_table_name
# 创建或更新映射
mapping, _ = DataSyncService.get_or_create_table_mapping(
app_token,
table_id,
table_url,
feishu_table_name,
final_table_name
)
# 3. 获取字段信息
2025-05-29 17:21:16 +08:00
fields = BitableService.get_table_fields(app_token, table_id, access_token)
2025-05-29 10:08:06 +08:00
# 4. 创建模型
model = DataSyncService.create_model_from_fields(final_table_name, fields)
# 5. 检查表是否存在,不存在则创建
table_exists = DataSyncService.check_table_exists(final_table_name)
if not table_exists:
DataSyncService.create_table_from_model(model)
logger.info(f"{final_table_name} 创建成功")
else:
logger.info(f"{final_table_name} 已存在,将按 feishu_record_id 更新或追加数据")
# 6. 分页获取所有记录
all_records = []
page_token = None
page_size = 100
2025-05-29 17:21:16 +08:00
while True:
# 查询记录
result = BitableService.search_records(
app_token=app_token,
table_id=table_id,
access_token=access_token,
page_size=page_size,
page_token=page_token
)
records = result.get('items', [])
all_records.extend(records)
# 检查是否有更多数据
page_token = result.get('page_token')
if not page_token or not records:
break
2025-05-29 10:08:06 +08:00
# 7. 同步数据到数据库
2025-05-29 17:21:16 +08:00
with transaction.atomic():
# 统计数据
created_count = 0
updated_count = 0
for record in all_records:
record_id = record.get('record_id')
fields_data = record.get('fields', {})
2025-05-29 10:08:06 +08:00
2025-05-29 17:21:16 +08:00
# 准备数据
data = {'feishu_record_id': record_id}
# 处理每个字段的数据
for field_name, field_value in fields_data.items():
# 将字段名转换为Python合法标识符
db_field_name = field_name.lower().replace(' ', '_').replace('-', '_')
2025-05-29 10:08:06 +08:00
2025-05-29 17:21:16 +08:00
# 跳过已保留的字段名
if db_field_name in ['id', 'created_at', 'updated_at']:
continue
2025-05-29 10:08:06 +08:00
2025-05-29 17:21:16 +08:00
# 确保字段存在于模型中
if hasattr(model, db_field_name):
# 处理不同类型的字段值
if isinstance(field_value, (list, dict)):
data[db_field_name] = json.dumps(field_value)
2025-05-29 10:08:06 +08:00
else:
2025-05-29 17:21:16 +08:00
data[db_field_name] = field_value
2025-05-29 10:08:06 +08:00
2025-05-29 17:21:16 +08:00
# 尝试更新或创建记录
try:
# 总是使用 feishu_record_id 作为唯一标识符进行更新或创建
obj, created = model.objects.update_or_create(
feishu_record_id=record_id,
defaults=data
)
if created:
created_count += 1
else:
updated_count += 1
except Exception as e:
logger.error(f"更新或创建记录失败: {str(e)}, 记录ID: {record_id}")
continue
# 更新映射表中的记录数
mapping.total_records = len(all_records)
mapping.save(update_fields=['total_records', 'last_sync_time'])
2025-05-29 10:08:06 +08:00
return {
'success': True,
'table_name': final_table_name,
'feishu_table_name': feishu_table_name,
'total_records': len(all_records),
'created_count': created_count,
'updated_count': updated_count,
'app_token': app_token,
'table_id': table_id,
}
except Exception as e:
2025-05-29 17:21:16 +08:00
logger.error(f"数据同步失败: {str(e)}")
2025-05-29 10:08:06 +08:00
return {
'success': False,
2025-05-29 17:21:16 +08:00
'error': str(e)
2025-05-29 10:08:06 +08:00
}