153 lines
5.6 KiB
Python
153 lines
5.6 KiB
Python
import logging
|
||
import re
|
||
import json
|
||
from django.db import transaction
|
||
from apps.gmail.models import GmailConversation
|
||
from apps.accounts.models import User
|
||
from ..models import FeishuTableMapping
|
||
from .bitable_service import BitableService
|
||
from apps.gmail.services.gmail_service import GmailService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class GmailExtractionService:
|
||
"""
|
||
从飞书多维表格中提取Gmail邮箱并创建对话的服务
|
||
"""
|
||
|
||
@staticmethod
|
||
def extract_gmail_addresses(text):
|
||
"""
|
||
从文本中提取Gmail邮箱地址
|
||
|
||
Args:
|
||
text: 需要提取邮箱的文本
|
||
|
||
Returns:
|
||
list: Gmail邮箱地址列表
|
||
"""
|
||
if not text:
|
||
return []
|
||
|
||
# Gmail邮箱正则表达式模式
|
||
gmail_pattern = r'[a-zA-Z0-9._%+-]+@gmail\.com'
|
||
|
||
# 查找所有匹配
|
||
matches = re.findall(gmail_pattern, text.lower())
|
||
|
||
# 返回唯一的Gmail地址
|
||
return list(set(matches))
|
||
|
||
@staticmethod
|
||
def find_duplicate_emails(db_table_name, feishu_table_url, access_token, email_field_name, user):
|
||
"""
|
||
查找数据库表和飞书多维表格中重复的Gmail邮箱
|
||
|
||
Args:
|
||
db_table_name: 数据库表名
|
||
feishu_table_url: 飞书多维表格URL
|
||
access_token: 访问令牌
|
||
email_field_name: 包含Gmail邮箱的字段名
|
||
user: 当前用户对象
|
||
|
||
Returns:
|
||
tuple: (邮箱列表, 错误信息)
|
||
"""
|
||
try:
|
||
# 从URL中提取app_token和table_id
|
||
app_token, table_id = BitableService.extract_params_from_url(feishu_table_url)
|
||
|
||
# 获取飞书表格中的所有记录
|
||
feishu_records = BitableService.search_records(
|
||
app_token=app_token,
|
||
table_id=table_id,
|
||
access_token=access_token,
|
||
page_size=1000 # 获取足够多的记录
|
||
)
|
||
|
||
if not feishu_records or 'items' not in feishu_records:
|
||
return None, "无法获取飞书表格数据"
|
||
|
||
# 提取每条记录中的Gmail邮箱
|
||
feishu_emails = []
|
||
for record in feishu_records.get('items', []):
|
||
field_data = record.get('fields', {})
|
||
if email_field_name in field_data:
|
||
email_value = field_data[email_field_name]
|
||
if isinstance(email_value, str):
|
||
gmail_addresses = GmailExtractionService.extract_gmail_addresses(email_value)
|
||
feishu_emails.extend(gmail_addresses)
|
||
|
||
# 确保邮箱地址唯一
|
||
feishu_emails = list(set(feishu_emails))
|
||
|
||
logger.info(f"从飞书表格中提取出 {len(feishu_emails)} 个Gmail邮箱")
|
||
|
||
return feishu_emails, None
|
||
|
||
except Exception as e:
|
||
logger.error(f"查找重复的Gmail邮箱失败: {str(e)}")
|
||
return None, f"查找重复的Gmail邮箱失败: {str(e)}"
|
||
|
||
@staticmethod
|
||
@transaction.atomic
|
||
def create_conversations_for_emails(user, user_email, emails, kb_id=None):
|
||
"""
|
||
为提取的Gmail邮箱创建对话
|
||
|
||
Args:
|
||
user: 当前用户对象
|
||
user_email: 用户Gmail邮箱
|
||
emails: 达人Gmail邮箱列表
|
||
kb_id: 知识库ID
|
||
|
||
Returns:
|
||
tuple: (成功创建的对话数量, 错误信息)
|
||
"""
|
||
try:
|
||
if not emails:
|
||
return 0, "没有提供邮箱列表"
|
||
|
||
success_count = 0
|
||
failed_emails = []
|
||
|
||
for email in emails:
|
||
try:
|
||
# 检查是否已存在对话
|
||
existing_conversation = GmailConversation.objects.filter(
|
||
user=user,
|
||
user_email=user_email,
|
||
influencer_email=email
|
||
).exists()
|
||
|
||
if existing_conversation:
|
||
logger.info(f"用户 {user.username} 与 {email} 的对话已存在,跳过")
|
||
continue
|
||
|
||
# 创建新对话
|
||
conversation_id, error = GmailService.save_conversations_to_chat(
|
||
user=user,
|
||
user_email=user_email,
|
||
influencer_email=email,
|
||
kb_id=kb_id
|
||
)
|
||
|
||
if conversation_id:
|
||
success_count += 1
|
||
logger.info(f"成功创建与 {email} 的对话,ID: {conversation_id}")
|
||
else:
|
||
logger.error(f"创建与 {email} 的对话失败: {error}")
|
||
failed_emails.append(email)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮箱 {email} 时出错: {str(e)}")
|
||
failed_emails.append(email)
|
||
|
||
if failed_emails:
|
||
return success_count, f"成功创建 {success_count} 个对话,失败 {len(failed_emails)} 个: {', '.join(failed_emails[:5])}{' 等' if len(failed_emails) > 5 else ''}"
|
||
else:
|
||
return success_count, None
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建Gmail对话失败: {str(e)}")
|
||
return 0, f"创建Gmail对话失败: {str(e)}" |