role_based_system/gmail/quickstart.py
2025-03-29 12:26:50 +08:00

171 lines
5.8 KiB
Python

from apiclient import discovery
from httplib2 import Http
from oauth2client import file, client, tools
import base64
from bs4 import BeautifulSoup
import dateutil.parser as parser
from datetime import datetime
import os
import json
# 代理设置
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'
# Gmail API 认证
SCOPES = 'https://www.googleapis.com/auth/gmail.readonly'
store = file.Storage('storage.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('client_secret.json', SCOPES)
creds = tools.run_flow(flow, store)
GMAIL = discovery.build('gmail', 'v1', http=creds.authorize(Http()))
def get_email_content(message):
"""提取邮件内容"""
try:
payload = message['payload']
headers = payload['headers']
# 获取邮件基本信息
email_data = {
'subject': '',
'from': '',
'date': '',
'body': ''
}
# 提取头部信息
for header in headers:
if header['name'] == 'Subject':
email_data['subject'] = header['value']
elif header['name'] == 'From':
email_data['from'] = header['value']
elif header['name'] == 'Date':
date = parser.parse(header['value'])
email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
# 提取邮件正文
if 'parts' in payload:
parts = payload['parts']
for part in parts:
if part['mimeType'] == 'text/plain':
data = part['body'].get('data', '')
if data:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
break
elif 'body' in payload:
data = payload['body'].get('data', '')
if data:
text = base64.urlsafe_b64decode(data).decode('utf-8')
email_data['body'] = text
return email_data
except Exception as e:
print(f"Error processing email: {str(e)}")
return None
def get_conversations(email1, email2):
"""获取两个用户之间的所有对话"""
try:
# 构建搜索查询
query = f"from:({email1} OR {email2}) to:({email1} OR {email2})"
# 获取所有匹配的邮件
response = GMAIL.users().messages().list(userId='me', q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
# 如果有更多页,继续获取
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = GMAIL.users().messages().list(
userId='me',
q=query,
pageToken=page_token
).execute()
messages.extend(response['messages'])
# 获取每封邮件的详细内容
conversations = []
for msg in messages:
message = GMAIL.users().messages().get(userId='me', id=msg['id']).execute()
email_data = get_email_content(message)
if email_data:
conversations.append(email_data)
# 按时间排序
conversations.sort(key=lambda x: x['date'])
return conversations
except Exception as e:
print(f"Error getting conversations: {str(e)}")
return []
def save_conversations(conversations, output_file):
"""保存对话记录(覆盖模式)"""
try:
# 使用 'w' 模式覆盖内容
with open(output_file, 'w', encoding='utf-8') as f:
# 写入时间分割线
f.write("=" * 50 + "\n")
f.write(f"记录时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 50 + "\n\n")
# 写入对话记录
for msg in conversations:
f.write(f"时间: {msg['date']}\n")
f.write(f"发件人: {msg['from']}\n")
f.write(f"主题: {msg['subject']}\n")
f.write("内容:\n")
f.write(f"{msg['body']}\n")
f.write("-" * 50 + "\n")
print(f"对话记录已保存到: {output_file}")
# 保存 JSON 格式
json_file = output_file.rsplit('.', 1)[0] + '.json'
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(conversations, f, ensure_ascii=False, indent=2)
print(f"JSON 格式对话记录已保存到: {json_file}")
except Exception as e:
print(f"Error saving conversations: {str(e)}")
def main():
# 设置固定的输出文件名,这样每次都会追加到同一个文件
output_file = "email_conversations.txt"
# 设置要查找的两个邮箱地址
email1 = "crushwds@gmail.com"
email2 = "ardonisierni@gmail.com"
print(f"正在获取 {email1}{email2} 之间的对话...")
# 获取对话记录
conversations = get_conversations(email1, email2)
if conversations:
print(f"找到 {len(conversations)} 条对话记录")
# 保存对话记录(追加模式)
save_conversations(conversations, output_file)
# 打印对话统计
print("\n对话统计:")
print(f"总消息数: {len(conversations)}")
senders = {}
for msg in conversations:
sender = msg['from']
senders[sender] = senders.get(sender, 0) + 1
for sender, count in senders.items():
print(f"{sender}: {count} 条消息")
else:
print("未找到对话记录")
if __name__ == "__main__":
main()