daren/apps/feishu/services/gmail_extraction_service.py
2025-05-29 10:11:19 +08:00

153 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import re
import json
from django.db import transaction
from apps.gmail.models import GmailConversation
from apps.user.models import User
from ..models import FeishuTableMapping
from .bitable_service import BitableService
from apps.gmail.services.gmail_service import GmailService
logger = logging.getLogger(__name__)
class GmailExtractionService:
"""
从飞书多维表格中提取Gmail邮箱并创建对话的服务
"""
@staticmethod
def extract_gmail_addresses(text):
"""
从文本中提取Gmail邮箱地址
Args:
text: 需要提取邮箱的文本
Returns:
list: Gmail邮箱地址列表
"""
if not text:
return []
# Gmail邮箱正则表达式模式
gmail_pattern = r'[a-zA-Z0-9._%+-]+@gmail\.com'
# 查找所有匹配
matches = re.findall(gmail_pattern, text.lower())
# 返回唯一的Gmail地址
return list(set(matches))
@staticmethod
def find_duplicate_emails(db_table_name, feishu_table_url, access_token, email_field_name, user):
"""
查找数据库表和飞书多维表格中重复的Gmail邮箱
Args:
db_table_name: 数据库表名
feishu_table_url: 飞书多维表格URL
access_token: 访问令牌
email_field_name: 包含Gmail邮箱的字段名
user: 当前用户对象
Returns:
tuple: (邮箱列表, 错误信息)
"""
try:
# 从URL中提取app_token和table_id
app_token, table_id = BitableService.extract_params_from_url(feishu_table_url)
# 获取飞书表格中的所有记录
feishu_records = BitableService.search_records(
app_token=app_token,
table_id=table_id,
access_token=access_token,
page_size=1000 # 获取足够多的记录
)
if not feishu_records or 'items' not in feishu_records:
return None, "无法获取飞书表格数据"
# 提取每条记录中的Gmail邮箱
feishu_emails = []
for record in feishu_records.get('items', []):
field_data = record.get('fields', {})
if email_field_name in field_data:
email_value = field_data[email_field_name]
if isinstance(email_value, str):
gmail_addresses = GmailExtractionService.extract_gmail_addresses(email_value)
feishu_emails.extend(gmail_addresses)
# 确保邮箱地址唯一
feishu_emails = list(set(feishu_emails))
logger.info(f"从飞书表格中提取出 {len(feishu_emails)} 个Gmail邮箱")
return feishu_emails, None
except Exception as e:
logger.error(f"查找重复的Gmail邮箱失败: {str(e)}")
return None, f"查找重复的Gmail邮箱失败: {str(e)}"
@staticmethod
@transaction.atomic
def create_conversations_for_emails(user, user_email, emails, kb_id=None):
"""
为提取的Gmail邮箱创建对话
Args:
user: 当前用户对象
user_email: 用户Gmail邮箱
emails: 达人Gmail邮箱列表
kb_id: 知识库ID
Returns:
tuple: (成功创建的对话数量, 错误信息)
"""
try:
if not emails:
return 0, "没有提供邮箱列表"
success_count = 0
failed_emails = []
for email in emails:
try:
# 检查是否已存在对话
existing_conversation = GmailConversation.objects.filter(
user=user,
user_email=user_email,
influencer_email=email
).exists()
if existing_conversation:
logger.info(f"用户 {user.name}{email} 的对话已存在,跳过")
continue
# 创建新对话
conversation_id, error = GmailService.save_conversations_to_chat(
user=user,
user_email=user_email,
influencer_email=email,
kb_id=kb_id
)
if conversation_id:
success_count += 1
logger.info(f"成功创建与 {email} 的对话ID: {conversation_id}")
else:
logger.error(f"创建与 {email} 的对话失败: {error}")
failed_emails.append(email)
except Exception as e:
logger.error(f"处理邮箱 {email} 时出错: {str(e)}")
failed_emails.append(email)
if failed_emails:
return success_count, f"成功创建 {success_count} 个对话,失败 {len(failed_emails)} 个: {', '.join(failed_emails[:5])}{'' if len(failed_emails) > 5 else ''}"
else:
return success_count, None
except Exception as e:
logger.error(f"创建Gmail对话失败: {str(e)}")
return 0, f"创建Gmail对话失败: {str(e)}"