From 45d49d4b4acd82dee4c0a502e76f10a50527defc Mon Sep 17 00:00:00 2001 From: wanjia Date: Thu, 10 Apr 2025 18:25:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E6=8F=90=E4=BA=A4=20-=20Gmai?= =?UTF-8?q?l=E9=9B=86=E6=88=90=E5=8A=9F=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gmail/client_secret.json | 2 +- gmail/email_conversations.json | 82 +- gmail/email_conversations.txt | 158 +- gmail/quickstart.py | 2 +- gmail/storage.json | 2 +- gmail/token.json | 1 - gmail_attachments/1960eec066745682_test2.txt | 10 + ..._c3f7f4a3-3419-42a4-a2c4-6d490e3b50e0.json | 1 + role_based_system/settings.py | 25 +- role_based_system/urls.py | 5 + temp_client_secret.json | 1 + user_management/gmail_integration.py | 2187 +++++++++++++++++ user_management/management/__init__.py | 1 + .../management/commands/__init__.py | 1 + .../commands/update_gmail_credentials.py | 80 + ...ment_gmailcredential_gmailtalentmapping.py | 71 + .../0006_gmailcredential_credentials.py | 18 + ..._alter_gmailcredential_options_and_more.py | 57 + user_management/models.py | 58 + user_management/urls.py | 26 +- user_management/views.py | 1113 ++++++++- 21 files changed, 3815 insertions(+), 86 deletions(-) delete mode 100644 gmail/token.json create mode 100644 gmail_attachments/1960eec066745682_test2.txt create mode 100644 gmail_tokens/gmail_token_c3f7f4a3-3419-42a4-a2c4-6d490e3b50e0.json create mode 100644 temp_client_secret.json create mode 100644 user_management/gmail_integration.py create mode 100644 user_management/management/commands/update_gmail_credentials.py create mode 100644 user_management/migrations/0005_gmailattachment_gmailcredential_gmailtalentmapping.py create mode 100644 user_management/migrations/0006_gmailcredential_credentials.py create mode 100644 user_management/migrations/0007_alter_gmailcredential_options_and_more.py diff --git a/gmail/client_secret.json b/gmail/client_secret.json index c9852c2..a59f8b1 100644 --- a/gmail/client_secret.json +++ b/gmail/client_secret.json @@ -1 +1 @@ -{"installed":{"client_id":"240872828479-570luspoc31259l1faab6kmjmpcsa9n9.apps.googleusercontent.com","project_id":"first-renderer-454910-c1","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost"]}} \ No newline at end of file +{"installed":{"client_id":"266164728215-v84lngbp3vgr4ulql01sqkg5vaigf4a5.apps.googleusercontent.com","project_id":"knowledge-454905","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost"]}} \ No newline at end of file diff --git a/gmail/email_conversations.json b/gmail/email_conversations.json index beded6f..f270226 100644 --- a/gmail/email_conversations.json +++ b/gmail/email_conversations.json @@ -34,8 +34,88 @@ "filename": "test2.txt", "mimeType": "text/plain", "size": 996, - "attachmentId": "ANGjdJ-f7-vJBk-5aJnh1zPE0rwbH0PXFzJHsXv9XL5LavJ0r4yQAayw1dw1pQ0AUhgwWyTP_hxcROnZO2C5W3MhBGGFmrcVpjwWE-yFr5BYA-I1-6sn24oEPFqX8Hem4dZxcYd2h6RgSfUfq3BqOe0AtZru0GWvUxRK7kadSCjpP2kigFkVhxvl18MiNmeR5B9aLkkjB6kwSCsTekw8uNTZxh01ZCkjOB1tRiCJnchmh4-TPfOrcZBfV0WNFcbend0DfK9tMRmcs3Gv5aWzFe0mF6eXE5dTAACMEkLwwOrvP-aXRq526DN43RFVasGMvq7-E81pjtcZN_8xPWNs" + "attachmentId": "ANGjdJ8O1uGve4uPqFSC2C4sMeC5jXJ3DGilhB1By705ZLGbOF30m6uITRtJHWsnB7yREKVslhYRdu4GKKvrrkWw-63ogqaZPGgi0WuSoB0OxIWXwbQSjaayUOPvc3P8y1g9A2mMAm5k0DkjH_LP0QMmulhXMg8ZgUcm4CZR7-HTBYztZSfWOoUeYkNugdzV5_2ax3GT34P5uaGHh3i4Ge6y-XDN-TDq3i1w9u_eTjZowoyVzjTj48uaQmzmFU36TQxg8ncovHk0sNZEygzCE1gbHbS1uM9N1tUHOOa6FWEpajHgz2aQwLh65SsMRqKe-LognFES-J_082IHddWs" } ] + }, + { + "id": "1961a1981e915eb3", + "subject": "", + "from": "crush wds ", + "date": "2025-04-09 18:29:51", + "body": "测试角色1\r\n", + "attachments": [] + }, + { + "id": "1961a19cc997f933", + "subject": "", + "from": "wds crush ", + "date": "2025-04-09 18:30:21", + "body": "测试角色2\r\n", + "attachments": [] + }, + { + "id": "1961de790e0b39e6", + "subject": "", + "from": "crushwds@gmail.com", + "date": "2025-04-10 00:13:57", + "body": "您好,关于我们之前讨论的产品,我想确认一些细节...", + "attachments": [] + }, + { + "id": "1961dee46652dc86", + "subject": "", + "from": "crushwds@gmail.com", + "date": "2025-04-10 00:21:17", + "body": "您好,关于我们之前讨论的产品,我想确认一些细节...", + "attachments": [] + }, + { + "id": "1961dfd7d36bf92c", + "subject": "", + "from": "crushwds@gmail.com", + "date": "2025-04-10 00:37:54", + "body": "您好,关于我们之前讨论的产品,我想确认一些细节...", + "attachments": [] + }, + { + "id": "1961e39ad02da5f0", + "subject": "", + "from": "crushwds@gmail.com", + "date": "2025-04-10 01:43:38", + "body": "您好,关于我们之前讨论的产品,我想确认一些细节...", + "attachments": [] + }, + { + "id": "1961df3bd554a4af", + "subject": "", + "from": "crush wds ", + "date": "2025-04-10 12:27:03", + "body": "yes\r\n", + "attachments": [] + }, + { + "id": "1961e3944c8aab04", + "subject": "", + "from": "crush wds ", + "date": "2025-04-10 13:42:58", + "body": "1\r\n", + "attachments": [] + }, + { + "id": "1961e39660feebd9", + "subject": "", + "from": "crush wds ", + "date": "2025-04-10 13:43:07", + "body": "2\r\n", + "attachments": [] + }, + { + "id": "1961ec2028c16037", + "subject": "", + "from": "crush wds ", + "date": "2025-04-10 16:12:20", + "body": "你好\r\n", + "attachments": [] } ] \ No newline at end of file diff --git a/gmail/email_conversations.txt b/gmail/email_conversations.txt index 8b10c07..1b78942 100644 --- a/gmail/email_conversations.txt +++ b/gmail/email_conversations.txt @@ -1,51 +1,107 @@ -================================================== -记录时间: 2025-04-07 14:24:38 -================================================== - -时间: 2025-03-27 12:04:29 -发件人: crush wds -主题: 这是主题 -内容: -你好呀 - - --------------------------------------------------- -时间: 2025-03-27 12:05:54 -发件人: wds crush -主题: -内容: -你那里天气怎么样 - - --------------------------------------------------- -时间: 2025-03-27 13:13:03 -发件人: wds crush -主题: -内容: -吃饭了吗 - - --------------------------------------------------- -时间: 2025-04-07 14:24:28 -发件人: wds crush -主题: Re: 这是主题 -内容: -测试附件内容 - - - - - -crush wds 于2025年3月27日周四 12:04写道: - - - -> 你好呀 - -> - - - -附件: - - test2.txt (text/plain, 996 字节) --------------------------------------------------- +================================================== +记录时间: 2025-04-10 16:52:49 +================================================== + +时间: 2025-03-27 12:04:29 +发件人: crush wds +主题: 这是主题 +内容: +你好呀 + +-------------------------------------------------- +时间: 2025-03-27 12:05:54 +发件人: wds crush +主题: +内容: +你那里天气怎么样 + +-------------------------------------------------- +时间: 2025-03-27 13:13:03 +发件人: wds crush +主题: +内容: +吃饭了吗 + +-------------------------------------------------- +时间: 2025-04-07 14:24:28 +发件人: wds crush +主题: Re: 这是主题 +内容: +测试附件内容 + + +crush wds 于2025年3月27日周四 12:04写道: + +> 你好呀 +> + + +附件: + - test2.txt (text/plain, 996 字节) +-------------------------------------------------- +时间: 2025-04-09 18:29:51 +发件人: crush wds +主题: +内容: +测试角色1 + +-------------------------------------------------- +时间: 2025-04-09 18:30:21 +发件人: wds crush +主题: +内容: +测试角色2 + +-------------------------------------------------- +时间: 2025-04-10 00:13:57 +发件人: crushwds@gmail.com +主题: +内容: +您好,关于我们之前讨论的产品,我想确认一些细节... +-------------------------------------------------- +时间: 2025-04-10 00:21:17 +发件人: crushwds@gmail.com +主题: +内容: +您好,关于我们之前讨论的产品,我想确认一些细节... +-------------------------------------------------- +时间: 2025-04-10 00:37:54 +发件人: crushwds@gmail.com +主题: +内容: +您好,关于我们之前讨论的产品,我想确认一些细节... +-------------------------------------------------- +时间: 2025-04-10 01:43:38 +发件人: crushwds@gmail.com +主题: +内容: +您好,关于我们之前讨论的产品,我想确认一些细节... +-------------------------------------------------- +时间: 2025-04-10 12:27:03 +发件人: crush wds +主题: +内容: +yes + +-------------------------------------------------- +时间: 2025-04-10 13:42:58 +发件人: crush wds +主题: +内容: +1 + +-------------------------------------------------- +时间: 2025-04-10 13:43:07 +发件人: crush wds +主题: +内容: +2 + +-------------------------------------------------- +时间: 2025-04-10 16:12:20 +发件人: crush wds +主题: +内容: +你好 + +-------------------------------------------------- diff --git a/gmail/quickstart.py b/gmail/quickstart.py index eda5118..afc58a4 100644 --- a/gmail/quickstart.py +++ b/gmail/quickstart.py @@ -13,7 +13,7 @@ os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890' os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890' # Gmail API 认证 -SCOPES = 'https://www.googleapis.com/auth/gmail.readonly' +SCOPES = ['https://mail.google.com/'] store = file.Storage('storage.json') creds = store.get() if not creds or creds.invalid: diff --git a/gmail/storage.json b/gmail/storage.json index efc6c8c..8903fb2 100644 --- a/gmail/storage.json +++ b/gmail/storage.json @@ -1 +1 @@ -{"access_token": "ya29.a0AZYkNZjEUAU0zL5oOFtnyDomfMZSzeAVMn5tvmHziV8CaMb4LaK3FAwZ_YC3itk2XlUQlIX0jZ4LQbqZZvdgk7fuwlimFzOHIrfkLt2nd6TPt-L3OqJbmYbYsJRgZ6twrlkX4-nvDsvK0br-WByC5WZj4Ih2FevlMb_-n6J-aCgYKAUUSARISFQHGX2MiYlBZMAAMwlcMNHHm-HnGiA0175", "client_id": "240872828479-570luspoc31259l1faab6kmjmpcsa9n9.apps.googleusercontent.com", "client_secret": "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", "refresh_token": "1//0e45xAmjtJtxpCgYIARAAGA4SNwF-L9Ir7GKiW_h2fGY6auAsSxOtmpocidE68QpFfuUDtwPhSJYHhS_1ZfYDNu8pRcD85X1Kh_Y", "token_expiry": "2025-04-07T07:20:36Z", "token_uri": "https://oauth2.googleapis.com/token", "user_agent": null, "revoke_uri": "https://oauth2.googleapis.com/revoke", "id_token": null, "id_token_jwt": null, "token_response": {"access_token": "ya29.a0AZYkNZjEUAU0zL5oOFtnyDomfMZSzeAVMn5tvmHziV8CaMb4LaK3FAwZ_YC3itk2XlUQlIX0jZ4LQbqZZvdgk7fuwlimFzOHIrfkLt2nd6TPt-L3OqJbmYbYsJRgZ6twrlkX4-nvDsvK0br-WByC5WZj4Ih2FevlMb_-n6J-aCgYKAUUSARISFQHGX2MiYlBZMAAMwlcMNHHm-HnGiA0175", "expires_in": 3599, "refresh_token": "1//0e45xAmjtJtxpCgYIARAAGA4SNwF-L9Ir7GKiW_h2fGY6auAsSxOtmpocidE68QpFfuUDtwPhSJYHhS_1ZfYDNu8pRcD85X1Kh_Y", "scope": "https://www.googleapis.com/auth/gmail.readonly", "token_type": "Bearer", "refresh_token_expires_in": 604799}, "scopes": ["https://www.googleapis.com/auth/gmail.readonly"], "token_info_uri": "https://oauth2.googleapis.com/tokeninfo", "invalid": false, "_class": "OAuth2Credentials", "_module": "oauth2client.client"} \ No newline at end of file +{"access_token": "ya29.a0AZYkNZga-tjDnp1lsXRohu1Tji-eVV88RaLnPjxr3HpYuBDW_6boys1aqnRnete1pT-E7ygZ5drpb0Hhbt9o15ryqbfeaKqS4HTDG_iIVvFn3npNNLSqIdvsf98burhBOnR-Nf6ty7xCsPLyFaO15bG2LybRgGL1mubVNMXSaCgYKAdQSARISFQHGX2MicVi2eoShd196_WeptFDUZg0175", "client_id": "266164728215-v84lngbp3vgr4ulql01sqkg5vaigf4a5.apps.googleusercontent.com", "client_secret": "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", "refresh_token": "1//0eAXpVapw8WjjCgYIARAAGA4SNwF-L9Irm0iHkQzqzM7Hn39nctE-DOWKTsm89Ge3nG0bfdfqloRvLMiN4YWHEKcDpLdPIuZel0Q", "token_expiry": "2025-04-10T09:51:34Z", "token_uri": "https://oauth2.googleapis.com/token", "user_agent": null, "revoke_uri": "https://oauth2.googleapis.com/revoke", "id_token": null, "id_token_jwt": null, "token_response": {"access_token": "ya29.a0AZYkNZga-tjDnp1lsXRohu1Tji-eVV88RaLnPjxr3HpYuBDW_6boys1aqnRnete1pT-E7ygZ5drpb0Hhbt9o15ryqbfeaKqS4HTDG_iIVvFn3npNNLSqIdvsf98burhBOnR-Nf6ty7xCsPLyFaO15bG2LybRgGL1mubVNMXSaCgYKAdQSARISFQHGX2MicVi2eoShd196_WeptFDUZg0175", "expires_in": 3599, "refresh_token": "1//0eAXpVapw8WjjCgYIARAAGA4SNwF-L9Irm0iHkQzqzM7Hn39nctE-DOWKTsm89Ge3nG0bfdfqloRvLMiN4YWHEKcDpLdPIuZel0Q", "scope": "https://mail.google.com/", "token_type": "Bearer", "refresh_token_expires_in": 604799}, "scopes": ["https://mail.google.com/"], "token_info_uri": "https://oauth2.googleapis.com/tokeninfo", "invalid": false, "_class": "OAuth2Credentials", "_module": "oauth2client.client"} \ No newline at end of file diff --git a/gmail/token.json b/gmail/token.json deleted file mode 100644 index c608f0f..0000000 --- a/gmail/token.json +++ /dev/null @@ -1 +0,0 @@ -{"token": "ya29.a0AeXRPp5MvarP9U4JZ3ZlV_xpIeHNL7IK1HEQLX6qhY-thS5IOFsdHrcNwbdNqkYrClFm4yGiMCXzWO-IilVbdSrDfEDw-Qfs7V9oTsZS2RYjbgcSR9rLUtZlL8ObnaljQie5VVzQZysHPZcMSdWiqabcQq2UsU5TNuOWo4J7_waCgYKAQESARISFQHGX2MiAR1O_HNYd0F6z0Rj-vSybA0177", "refresh_token": "1//0e1Usc30XxlXRCgYIARAAGA4SNwF-L9IrOOjEN7EkOSatDMHzTy0KaNdp_jf1XtLn7a5pFSljo3IKnrDdvVnxTO_FX5Asj_UDiAw", "token_uri": "https://oauth2.googleapis.com/token", "client_id": "240872828479-570luspoc31259l1faab6kmjmpcsa9n9.apps.googleusercontent.com", "client_secret": "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", "scopes": ["https://www.googleapis.com/auth/gmail.readonly"], "universe_domain": "googleapis.com", "account": "", "expiry": "2025-03-27T04:39:31.274041Z"} \ No newline at end of file diff --git a/gmail_attachments/1960eec066745682_test2.txt b/gmail_attachments/1960eec066745682_test2.txt new file mode 100644 index 0000000..b16ac8e --- /dev/null +++ b/gmail_attachments/1960eec066745682_test2.txt @@ -0,0 +1,10 @@ +本期节目内容简介 +在参加各类比赛时,肯定会遇到竞争对手。英语单词 rival、opponent、competitor 和 contestant 的含义相似,都可以用来指“与他人之间存在竞争关系的人或团队”。在本集《你问我答》节目中,我们将通过和体育比赛有关的实例来为大家阐释这四个近义词之间的区别和用法。 + +欢迎你加入并和我们一起讨论英语学习的方方面面。请通过微博“BBC英语教学”或邮件与我们取得联系。我们的邮箱地址是 questions.chinaelt@bbc.co.uk。 + +文字稿 +(关于台词的备注: 请注意这不是广播节目的逐字稿件。本文稿可能没有体现录制、编辑过程中对节目做出的改变。) + +Feifei +大家好,欢迎收听 BBC 英语教学的《你问我答》节目,我是冯菲菲。每集节目中,我们会回答大家在英语学习时遇到的一个问题。本集的问题来自 Adela。我们来听一下她的问题。 diff --git a/gmail_tokens/gmail_token_c3f7f4a3-3419-42a4-a2c4-6d490e3b50e0.json b/gmail_tokens/gmail_token_c3f7f4a3-3419-42a4-a2c4-6d490e3b50e0.json new file mode 100644 index 0000000..8903fb2 --- /dev/null +++ b/gmail_tokens/gmail_token_c3f7f4a3-3419-42a4-a2c4-6d490e3b50e0.json @@ -0,0 +1 @@ +{"access_token": "ya29.a0AZYkNZga-tjDnp1lsXRohu1Tji-eVV88RaLnPjxr3HpYuBDW_6boys1aqnRnete1pT-E7ygZ5drpb0Hhbt9o15ryqbfeaKqS4HTDG_iIVvFn3npNNLSqIdvsf98burhBOnR-Nf6ty7xCsPLyFaO15bG2LybRgGL1mubVNMXSaCgYKAdQSARISFQHGX2MicVi2eoShd196_WeptFDUZg0175", "client_id": "266164728215-v84lngbp3vgr4ulql01sqkg5vaigf4a5.apps.googleusercontent.com", "client_secret": "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", "refresh_token": "1//0eAXpVapw8WjjCgYIARAAGA4SNwF-L9Irm0iHkQzqzM7Hn39nctE-DOWKTsm89Ge3nG0bfdfqloRvLMiN4YWHEKcDpLdPIuZel0Q", "token_expiry": "2025-04-10T09:51:34Z", "token_uri": "https://oauth2.googleapis.com/token", "user_agent": null, "revoke_uri": "https://oauth2.googleapis.com/revoke", "id_token": null, "id_token_jwt": null, "token_response": {"access_token": "ya29.a0AZYkNZga-tjDnp1lsXRohu1Tji-eVV88RaLnPjxr3HpYuBDW_6boys1aqnRnete1pT-E7ygZ5drpb0Hhbt9o15ryqbfeaKqS4HTDG_iIVvFn3npNNLSqIdvsf98burhBOnR-Nf6ty7xCsPLyFaO15bG2LybRgGL1mubVNMXSaCgYKAdQSARISFQHGX2MicVi2eoShd196_WeptFDUZg0175", "expires_in": 3599, "refresh_token": "1//0eAXpVapw8WjjCgYIARAAGA4SNwF-L9Irm0iHkQzqzM7Hn39nctE-DOWKTsm89Ge3nG0bfdfqloRvLMiN4YWHEKcDpLdPIuZel0Q", "scope": "https://mail.google.com/", "token_type": "Bearer", "refresh_token_expires_in": 604799}, "scopes": ["https://mail.google.com/"], "token_info_uri": "https://oauth2.googleapis.com/tokeninfo", "invalid": false, "_class": "OAuth2Credentials", "_module": "oauth2client.client"} \ No newline at end of file diff --git a/role_based_system/settings.py b/role_based_system/settings.py index ea6ebb7..cc8939b 100644 --- a/role_based_system/settings.py +++ b/role_based_system/settings.py @@ -142,7 +142,7 @@ TIME_ZONE = 'Asia/Shanghai' USE_I18N = True -USE_TZ = False +USE_TZ = True # 将此项设置为True以启用时区支持 @@ -294,3 +294,26 @@ REST_FRAMEWORK = { 'rest_framework.parsers.MultiPartParser' ], } + +# Gmail API配置 +GOOGLE_CLOUD_PROJECT = 'knowledge-454905' # 更新为当前使用的项目ID +GMAIL_API_SCOPES = ['https://mail.google.com/'] +GMAIL_TOPIC_NAME = 'gmail-watch-topic' + +# Gmail webhook地址 (开发环境使用本机内网穿透地址) +GMAIL_WEBHOOK_URL = 'https://a7a4-116-227-35-74.ngrok-free.app/api/user/gmail/webhook/' + +# 如果在生产环境,使用以下固定地址 +# GMAIL_WEBHOOK_URL = 'https://你的域名/api/user/gmail/webhook/' + +# 媒体文件目录 +MEDIA_ROOT = os.path.join(BASE_DIR, 'media') +MEDIA_URL = '/media/' + +# 时区设置 +TIME_ZONE = 'Asia/Shanghai' +USE_I18N = True +USE_L10N = True +USE_TZ = True # 将此项设置为True以启用时区支持 + + diff --git a/role_based_system/urls.py b/role_based_system/urls.py index 2f51078..14b4f5f 100644 --- a/role_based_system/urls.py +++ b/role_based_system/urls.py @@ -18,6 +18,7 @@ from django.contrib import admin from django.urls import path, include from django.conf import settings from django.conf.urls.static import static +from user_management.views import gmail_webhook # 直接导入视图函数 urlpatterns = [ # 管理后台 @@ -26,6 +27,10 @@ urlpatterns = [ # API路由 path('api/', include('user_management.urls')), + # 专用Gmail Webhook路由 - 直接匹配根路径 + path('api/user/gmail/webhook/', gmail_webhook, name='root_gmail_webhook'), # 修改为正确路径 + path('gmail/webhook/', gmail_webhook, name='alt_gmail_webhook'), # 添加备用路径 + # 媒体文件服务 *static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT), diff --git a/temp_client_secret.json b/temp_client_secret.json new file mode 100644 index 0000000..49a0ba3 --- /dev/null +++ b/temp_client_secret.json @@ -0,0 +1 @@ +{"installed": {"client_id": "240872828479-570luspoc31259l1faab6kmjmpcsa9n9.apps.googleusercontent.com", "project_id": "first-renderer-454910-c1", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_secret": "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", "redirect_uris": ["http://localhost"]}} \ No newline at end of file diff --git a/user_management/gmail_integration.py b/user_management/gmail_integration.py new file mode 100644 index 0000000..e726464 --- /dev/null +++ b/user_management/gmail_integration.py @@ -0,0 +1,2187 @@ +import os +import json +import uuid +import base64 +import pickle +import logging +import traceback # 确保导入traceback模块 +from pathlib import Path +import dateutil.parser as parser +from datetime import datetime +from bs4 import BeautifulSoup +from django.utils import timezone +from django.conf import settings # 添加Django设置导入 +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase +from email import encoders +import warnings +import mimetypes + +# 忽略oauth2client相关的所有警告 +warnings.filterwarnings('ignore', message='file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth') +warnings.filterwarnings('ignore', category=DeprecationWarning, module='oauth2client') +warnings.filterwarnings('ignore', message='.*google-auth.*') +warnings.filterwarnings('ignore', message='.*oauth2client.*') + +from apiclient import discovery +from httplib2 import Http +from oauth2client import file, client, tools + +from .models import GmailCredential, KnowledgeBase, KnowledgeBaseDocument, User, GmailTalentMapping, ChatHistory, GmailAttachment + +logger = logging.getLogger(__name__) + +# Gmail服务单例管理器 +class GmailServiceManager: + _instances = {} # 以用户ID为键存储Gmail服务实例 + + @classmethod + def get_instance(cls, user): + """获取用户的Gmail服务实例,如果不存在则创建""" + user_id = str(user.id) + + if user_id not in cls._instances: + try: + # 从数据库获取认证信息 + credential = GmailCredential.objects.filter(user=user, is_active=True).first() + + if credential and credential.credentials: + # 反序列化凭证 + creds = pickle.loads(credential.credentials) + + # 检查凭证是否有效 + if not creds.invalid: + # 初始化服务 + gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) + + # 存储实例 + cls._instances[user_id] = { + 'service': gmail_service, + 'credentials': creds, + 'timestamp': timezone.now(), + 'user': user + } + + logger.info(f"创建用户 {user.username} 的Gmail服务单例") + return cls._instances[user_id] + except Exception as e: + logger.error(f"创建Gmail服务单例失败: {e}") + else: + # 检查实例是否过期(超过30分钟) + instance = cls._instances[user_id] + time_diff = timezone.now() - instance['timestamp'] + + if time_diff.total_seconds() > 1800: # 30分钟过期 + del cls._instances[user_id] + return cls.get_instance(user) # 递归调用,重新创建 + + # 更新时间戳 + cls._instances[user_id]['timestamp'] = timezone.now() + logger.info(f"复用用户 {user.username} 的Gmail服务单例") + return cls._instances[user_id] + + return None + + @classmethod + def update_instance(cls, user, credentials, service): + """更新用户的Gmail服务实例""" + user_id = str(user.id) + cls._instances[user_id] = { + 'service': service, + 'credentials': credentials, + 'timestamp': timezone.now(), + 'user': user + } + + @classmethod + def clear_instance(cls, user): + """清除用户的Gmail服务实例""" + user_id = str(user.id) + if user_id in cls._instances: + del cls._instances[user_id] + +class GmailIntegration: + """Gmail集成类""" + + # Gmail API 权限范围 + SCOPES = ['https://mail.google.com/'] + + def __init__(self, user, email=None, client_secret_json=None, use_proxy=True, proxy_url='http://127.0.0.1:7890'): + self.user = user + self.user_email = user.email if user else None + self.email = email # 目标邮箱 + self.client_secret = client_secret_json + self.credentials = None + self.gmail_service = None + self.use_proxy = use_proxy + self.proxy_url = proxy_url + + # 设置代理 + if self.use_proxy: + logger.info(f"设置Gmail API代理: {proxy_url}") + os.environ['HTTP_PROXY'] = proxy_url + os.environ['HTTPS_PROXY'] = proxy_url + + # 设置Token文件存储路径 + if user: + # 使用用户ID创建唯一的token存储路径 + token_file = f"gmail_token_{user.id}.json" + self.token_storage_path = os.path.join("gmail_tokens", token_file) + + # 确保目录存在 + os.makedirs("gmail_tokens", exist_ok=True) + logger.info(f"设置Token存储路径: {self.token_storage_path}") + + # 尝试从数据库加载凭证 + try: + gmail_cred = GmailCredential.objects.filter(user=user, is_active=True).first() + if gmail_cred and gmail_cred.credentials: + logger.info(f"从数据库加载用户 {user.username} 的Gmail凭证") + self.credentials = pickle.loads(gmail_cred.credentials) + + # 初始化Gmail服务 + self.gmail_service = discovery.build('gmail', 'v1', http=self.credentials.authorize(Http())) + logger.info("从数据库凭证初始化Gmail服务成功") + except Exception as e: + logger.error(f"从数据库加载凭证失败: {str(e)}") + # 继续使用文件方式 + else: + self.token_storage_path = "gmail_token.json" + logger.warning("未提供用户,将使用默认Token存储路径") + + # 存储对象 + self.storage = file.Storage(self.token_storage_path) + + def authenticate(self): + """获取Gmail API凭证并认证""" + try: + # 优先尝试使用单例服务 + if self.user: + instance = GmailServiceManager.get_instance(self.user) + if instance: + self.gmail_service = instance['service'] + self.credentials = instance['credentials'] + logger.info("使用现有的Gmail服务单例") + return True + + # 以下是原有的认证逻辑... + # 写入client_secret.json + if self.client_secret: + client_secret_path = 'client_secret.json' + with open(client_secret_path, 'w') as f: + if isinstance(self.client_secret, str): + try: + # 确保是有效的JSON + json_data = json.loads(self.client_secret) + json.dump(json_data, f) + except json.JSONDecodeError as e: + logger.error(f"client_secret不是有效的JSON: {str(e)}") + return False + else: + json.dump(self.client_secret, f) + + logger.info(f"已将client_secret写入临时文件: {client_secret_path}") + + # 使用与quickstart.py相同的流程 + logger.info(f"创建或读取token存储: {self.token_storage_path}") + + # 确保token目录存在 + token_dir = os.path.dirname(self.token_storage_path) + if token_dir and not os.path.exists(token_dir): + logger.info(f"创建token目录: {token_dir}") + os.makedirs(token_dir) + + store = file.Storage(self.token_storage_path) + creds = store.get() + + if not creds or creds.invalid: + logger.info("没有有效的凭证,需要重新授权") + if not self.client_secret: + logger.error("没有提供client_secret_json且找不到有效凭证") + return False + + # 提取重定向URI + redirect_uri = None + if isinstance(self.client_secret, dict): + for key in ['web', 'installed']: + if key in self.client_secret and 'redirect_uris' in self.client_secret[key]: + redirect_uri = self.client_secret[key]['redirect_uris'][0] + break + elif isinstance(self.client_secret, str): + try: + json_data = json.loads(self.client_secret) + for key in ['web', 'installed']: + if key in json_data and 'redirect_uris' in json_data[key]: + redirect_uri = json_data[key]['redirect_uris'][0] + break + except: + pass + + # 如果找不到重定向URI,使用默认值 + if not redirect_uri or redirect_uri == 'urn:ietf:wg:oauth:2.0:oob': + logger.info("使用非浏览器认证模式") + redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' + flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES) + flow.redirect_uri = redirect_uri + else: + logger.info(f"使用重定向URI: {redirect_uri}") + flow = client.flow_from_clientsecrets('client_secret.json', self.SCOPES) + flow.redirect_uri = redirect_uri + + # 获取授权URL并抛出异常 + auth_url = flow.step1_get_authorize_url() + logger.info(f"获取授权URL: {auth_url[:50]}...") + raise Exception(f"Please visit this URL to authorize: {auth_url}") + + # 如果有有效凭证,初始化服务 + self.credentials = creds + logger.info("使用现有凭证初始化Gmail服务") + self.gmail_service = discovery.build('gmail', 'v1', http=creds.authorize(Http())) + logger.info("Gmail服务初始化成功") + + # 获取Gmail账号信息 + gmail_email = None + try: + # 调用Gmail API获取用户资料 + profile = self.gmail_service.users().getProfile(userId='me').execute() + gmail_email = profile.get('emailAddress') + logger.info(f"获取到Gmail账号: {gmail_email}") + except Exception as profile_error: + logger.error(f"获取Gmail账号失败: {str(profile_error)}") + + # 即使获取Gmail账号失败,也要尝试获取消息以提取邮箱 + if not gmail_email: + try: + # 尝试获取一条消息来提取邮箱 + messages = self.gmail_service.users().messages().list(userId='me', maxResults=1).execute() + if 'messages' in messages and len(messages['messages']) > 0: + msg_id = messages['messages'][0]['id'] + msg = self.gmail_service.users().messages().get(userId='me', id=msg_id).execute() + # 从消息中查找与当前用户匹配的邮箱 + if 'payload' in msg and 'headers' in msg['payload']: + for header in msg['payload']['headers']: + if header['name'] in ['From', 'To', 'Cc', 'Bcc']: + if '<' in header['value'] and '>' in header['value']: + email = header['value'].split('<')[1].split('>')[0] + # 如果找到一个邮箱就使用它 + if not gmail_email: + gmail_email = email + logger.info(f"从消息中提取到Gmail账号: {gmail_email}") + except Exception as msg_error: + logger.error(f"尝试从消息中提取Gmail账号失败: {str(msg_error)}") + + # 保存凭证到数据库 + if self.user: + from django.utils import timezone + logger.info("保存凭证到数据库") + + # 将凭证对象序列化 + credentials_data = pickle.dumps(creds) + + # 更新或创建凭证记录,添加gmail_email字段 + gmail_credential, created = GmailCredential.objects.update_or_create( + user=self.user, + defaults={ + 'credentials': credentials_data, + 'token_path': self.token_storage_path, + 'gmail_email': gmail_email, # 添加实际Gmail账号 + 'updated_at': timezone.now(), + 'is_active': True + } + ) + action = "创建" if created else "更新" + logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录,实际Gmail账号: {gmail_email}") + + # 认证成功后更新单例 + if self.user and self.gmail_service and self.credentials: + GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service) + + return True + + except Exception as e: + # 保留授权URL异常,视图层会处理 + if "Please visit this URL" in str(e): + raise e + logger.error(f"Gmail认证失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return False + finally: + # 清理client_secret.json文件 + if self.client_secret and os.path.exists('client_secret.json'): + logger.info("删除临时client_secret文件") + os.unlink('client_secret.json') + + def create_talent_knowledge_base(self, talent_email): + """ + 创建或获取与talent_email关联的知识库 + + Args: + talent_email (str): 达人Gmail邮箱地址 + + Returns: + tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志 + """ + try: + # 优先查找现有的关联关系 + mapping = GmailTalentMapping.objects.filter( + user=self.user, + talent_email=talent_email, + is_active=True + ).first() + + if mapping and mapping.knowledge_base: + logger.info(f"找到现有的Gmail-达人映射: {talent_email} -> {mapping.knowledge_base.name}") + return mapping.knowledge_base, False + + # 查找与该达人邮箱关联的知识库 + # 根据达人邮箱生成一个唯一的标识名称 + kb_name = f"Gmail-{talent_email.split('@')[0]}" + + # 检查该名称的知识库是否已存在 + existing_kb = KnowledgeBase.objects.filter( + name=kb_name, + user_id=self.user.id + ).first() + + if existing_kb: + logger.info(f"找到现有知识库: {kb_name}") + + # 创建映射关系 + GmailTalentMapping.objects.update_or_create( + user=self.user, + talent_email=talent_email, + defaults={ + 'knowledge_base': existing_kb, + 'is_active': True + } + ) + + return existing_kb, False + + # 没有找到现有知识库,直接使用基本方法创建 + logger.info(f"使用基本方法创建知识库: {kb_name}") + return self._create_knowledge_base_basic(talent_email) + + except Exception as e: + logger.error(f"创建或获取Gmail-达人知识库失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + # 创建失败时,尝试使用基本方法创建 + logger.info("尝试使用基本方法创建知识库") + return self._create_knowledge_base_basic(talent_email) + + def _create_knowledge_base_basic(self, talent_email): + """ + 使用基本方法创建知识库(当KnowledgeBaseViewSet创建失败时的后备方案) + + Args: + talent_email (str): 达人Gmail邮箱地址 + + Returns: + tuple: (knowledge_base, created) - 知识库对象和是否新创建的标志 + """ + try: + # 根据达人邮箱生成一个唯一的标识名称 + kb_name = f"Gmail-{talent_email.split('@')[0]}" + + # 检查该名称的知识库是否已存在 + existing_kb = KnowledgeBase.objects.filter( + name=kb_name, + user_id=self.user.id + ).first() + + if existing_kb: + logger.info(f"找到现有知识库: {kb_name}") + + # 创建映射关系 + GmailTalentMapping.objects.update_or_create( + user=self.user, + talent_email=talent_email, + defaults={ + 'knowledge_base': existing_kb, + 'is_active': True + } + ) + + return existing_kb, False + + # 创建新知识库 + knowledge_base = KnowledgeBase.objects.create( + name=kb_name, + desc=f"与{talent_email}的Gmail邮件交流记录", + type="private", + user_id=self.user.id, + documents=[] + ) + + # 创建外部知识库 + try: + from .views import KnowledgeBaseViewSet + kb_viewset = KnowledgeBaseViewSet() + external_id = kb_viewset._create_external_dataset(knowledge_base) + if external_id: + knowledge_base.external_id = external_id + knowledge_base.save() + logger.info(f"成功创建外部知识库: {external_id}") + except Exception as e: + logger.error(f"创建外部知识库失败: {str(e)}") + # 继续执行,不影响基本功能 + + # 创建映射关系 + GmailTalentMapping.objects.create( + user=self.user, + talent_email=talent_email, + knowledge_base=knowledge_base, + is_active=True + ) + + logger.info(f"成功创建新知识库: {kb_name}, ID: {knowledge_base.id}") + return knowledge_base, True + + except Exception as e: + logger.error(f"创建或获取Gmail-达人知识库基本方法失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + raise + + def get_conversations(self, talent_gmail): + """获取与特定用户的所有邮件对话""" + try: + if not self.gmail_service: + logger.error("Gmail服务未初始化") + return [] + + # 使用crushwds@gmail.com作为固定邮箱 + email1 = "crushwds@gmail.com" # 固定设置为quickstart.py中的邮箱 + email2 = talent_gmail # 使用参数传入的目标邮箱 + + logger.info(f"执行Gmail查询: {email1} 与 {email2}") + + # 构建搜索查询 + query = f"from:({email1} OR {email2}) to:({email1} OR {email2})" + + # 获取所有匹配的邮件 + response = self.gmail_service.users().messages().list(userId='me', q=query).execute() + messages = [] + + if 'messages' in response: + messages.extend(response['messages']) + message_count = len(messages) + logger.info(f"找到 {message_count} 封邮件") + + # 分页获取所有邮件 + while 'nextPageToken' in response: + page_token = response['nextPageToken'] + response = self.gmail_service.users().messages().list( + userId='me', + q=query, + pageToken=page_token + ).execute() + if 'messages' in response: + new_messages = response['messages'] + messages.extend(new_messages) + new_count = len(new_messages) + message_count += new_count + logger.info(f"加载额外 {new_count} 封邮件,当前总数: {message_count}") + else: + logger.warning(f"未找到匹配邮件") + + # 处理每封邮件 + conversations = [] + if messages: + logger.info(f"开始获取 {len(messages)} 封邮件详情...") + + for i, msg in enumerate(messages): + try: + message = self.gmail_service.users().messages().get(userId='me', id=msg['id']).execute() + email_data = self._extract_email_content(message) + if email_data: + conversations.append(email_data) + if (i+1) % 5 == 0 or i+1 == len(messages): # 每5封邮件或最后一封邮件时记录进度 + logger.info(f"已处理 {i+1}/{len(messages)} 封邮件") + else: + logger.warning(f"邮件 {i+1}/{len(messages)} 内容提取失败") + except Exception as e: + logger.error(f"获取邮件 {msg['id']} 详情失败: {str(e)}") + + # 按时间排序 + conversations.sort(key=lambda x: x['date']) + + logger.info(f"总共找到并解析了 {len(conversations)} 封邮件") + return conversations + + except Exception as e: + logger.error(f"获取Gmail对话失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return [] + + def _extract_email_content(self, message): + """提取邮件内容,完全按照quickstart.py的get_email_content函数实现""" + try: + message_id = message['id'] # 获取邮件ID + payload = message['payload'] + headers = payload['headers'] + + # 获取邮件基本信息 + email_data = { + 'id': message_id, # 保存邮件ID + 'subject': '', + 'from': '', + 'date': '', + 'body': '', + 'attachments': [] # 新增附件列表 + } + + # 提取头部信息 + for header in headers: + if header['name'] == 'Subject': + email_data['subject'] = header['value'] + elif header['name'] == 'From': + email_data['from'] = header['value'] + elif header['name'] == 'Date': + try: + date = parser.parse(header['value']) + # 转换为与Django时区一致的格式 + if hasattr(settings, 'USE_TZ') and settings.USE_TZ: + date = timezone.make_aware(date) + email_data['date'] = date.strftime('%Y-%m-%d %H:%M:%S') + except Exception as e: + logger.error(f"解析日期失败: {str(e)}") + email_data['date'] = header['value'] + + # 定义一个递归函数来处理所有部分和附件 + def process_parts(parts): + for part in parts: + # 检查是否是附件 + if 'filename' in part and part['filename']: + attachment = { + 'filename': part['filename'], + 'mimeType': part['mimeType'], + 'size': part['body'].get('size', 0) + } + + # 如果有附件内容数据,可以获取附件ID + if 'attachmentId' in part['body']: + attachment['attachmentId'] = part['body']['attachmentId'] + + email_data['attachments'].append(attachment) + + # 处理文本内容 + if part['mimeType'] == 'text/plain' and not email_data['body']: + data = part['body'].get('data', '') + if data: + try: + text = base64.urlsafe_b64decode(data).decode('utf-8') + email_data['body'] = text + except Exception as e: + logger.error(f"解码邮件内容失败: {str(e)}") + + # 递归处理多部分内容 + if 'parts' in part: + process_parts(part['parts']) + + # 处理邮件正文和附件 + if 'parts' in payload: + process_parts(payload['parts']) + elif 'body' in payload and 'data' in payload['body']: + # 没有parts,直接处理body + data = payload['body'].get('data', '') + if data: + try: + text = base64.urlsafe_b64decode(data).decode('utf-8') + email_data['body'] = text + except Exception as e: + logger.error(f"解码邮件内容失败: {str(e)}") + + return email_data + + except Exception as e: + logger.error(f"处理邮件内容时出错: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return None + + def download_attachment(self, message_id, attachment_id, filename): + """下载邮件附件""" + try: + attachment = self.gmail_service.users().messages().attachments().get( + userId='me', messageId=message_id, id=attachment_id + ).execute() + + data = attachment['data'] + file_data = base64.urlsafe_b64decode(data) + + # 创建附件目录 + attachments_dir = 'gmail_attachments' + if not os.path.exists(attachments_dir): + os.makedirs(attachments_dir) + + # 保存附件 + filepath = os.path.join(attachments_dir, f"{message_id}_{filename}") + with open(filepath, 'wb') as f: + f.write(file_data) + + return filepath + except Exception as e: + logger.error(f"下载附件失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return None + + def save_conversations_to_knowledge_base(self, conversations, knowledge_base): + """ + 将Gmail对话保存到知识库 + + Args: + conversations: Gmail邮件列表 + knowledge_base: 保存到的知识库对象 + """ + try: + # 导入所需模型 + from .models import GmailAttachment, ChatHistory + + # 检查入参 + if not conversations or not knowledge_base: + logger.error("参数不完整: conversations或knowledge_base为空") + return False + + if not conversations: + logger.warning("没有邮件对话可保存") + return {"conversation_id": None, "success": False, "error": "没有邮件对话可保存"} + + # 查找现有的对话ID - 优先使用talent_email查找 + conversation_id = None + + # 1. 首先尝试通过talent_email查找现有映射 + if self.email: + existing_mapping = GmailTalentMapping.objects.filter( + user=self.user, + talent_email=self.email, + knowledge_base=knowledge_base, + is_active=True + ).first() + + if existing_mapping and existing_mapping.conversation_id: + conversation_id = existing_mapping.conversation_id + logger.info(f"通过达人邮箱 {self.email} 找到现有对话ID: {conversation_id}") + + # 2. 如果上面没找到,尝试通过知识库查找任何相关的对话 + if not conversation_id: + existing_conversation = ChatHistory.objects.filter( + knowledge_base=knowledge_base, + user=self.user, + is_deleted=False + ).values('conversation_id').distinct().first() + + if existing_conversation: + conversation_id = existing_conversation['conversation_id'] + logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}") + + # 如果有talent_email,更新或创建映射关系 + if self.email: + GmailTalentMapping.objects.update_or_create( + user=self.user, + talent_email=self.email, + defaults={ + 'knowledge_base': knowledge_base, + 'conversation_id': conversation_id, + 'is_active': True + } + ) + logger.info(f"更新Gmail达人映射: {self.email} -> 对话ID {conversation_id}") + + # 3. 如果仍然没找到,则创建新的对话ID + if not conversation_id: + # 生成唯一的对话ID + conversation_id = str(uuid.uuid4()) + logger.info(f"创建新的对话ID: {conversation_id}") + + # 创建或更新Gmail和达人的映射关系 + if self.email: + talent_mapping, created = GmailTalentMapping.objects.update_or_create( + user=self.user, + talent_email=self.email, + defaults={ + 'knowledge_base': knowledge_base, + 'conversation_id': conversation_id, + 'is_active': True + } + ) + action = "创建" if created else "更新" + logger.info(f"已{action}Gmail达人映射: {self.email} -> {knowledge_base.name}") + + # 构建知识库文档 + logger.info(f"开始构建知识库文档,共{len(conversations)}个邮件对话") + document = { + "title": f"与{self.email or '联系人'}的Gmail邮件对话", + "url": "", + "source_type": "gmail", + "paragraphs": [] + } + + # 对话按时间顺序排序 + conversations.sort(key=lambda x: x.get('date', '')) + + # 将对话添加到文档 + previous_message_dict = {} # 用于存储邮件时间与消息ID的映射,以便找到正确的parent_id + + # 首先查找现有的聊天记录 + existing_messages = ChatHistory.objects.filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('created_at') + + # 如果已有消息,跳过重复导入 + existing_gmail_ids = set() + if existing_messages.exists(): + for msg in existing_messages: + if msg.metadata and 'gmail_message_id' in msg.metadata: + existing_gmail_ids.add(msg.metadata['gmail_message_id']) + logger.info(f"发现已存在 {len(existing_gmail_ids)} 条消息记录,将跳过重复导入") + + for message in conversations: + try: + # 跳过已导入的消息 + if message.get('id') in existing_gmail_ids: + logger.info(f"跳过已导入的消息: {message.get('id')}") + continue + + # 提取邮件内容 + sender = message.get('from', '未知发件人') + date_str = message.get('date', '未知时间') + subject = message.get('subject', '无主题') + body = message.get('body', '').strip() + + logger.info(f"处理邮件: {date_str}, {sender}, {subject[:20]}...") + + # 判断角色 + # 从发件人地址中提取邮箱部分 + sender_email = '' + if '<' in sender and '>' in sender: + # 格式如 "姓名 " + sender_email = sender.split('<')[1].split('>')[0] + else: + # 格式可能直接是邮箱 + sender_email = sender + + # 根据邮箱判断角色:检查发件人与用户邮箱或者目标邮箱是否匹配 + is_user_email = self.user.email.lower() == sender_email.lower() + + # 检查是否是目标邮箱(talent_email) + is_talent_email = False + if self.email and sender_email.lower() == self.email.lower(): + is_talent_email = True + + # 如果是用户邮箱或目标邮箱,则为user角色,否则为assistant + role = 'user' if is_user_email or is_talent_email else 'assistant' + + logger.info(f"设置消息角色: {role},发件人: {sender_email},用户邮箱: {self.user.email},目标邮箱: {self.email}") + + # 将邮件添加到文档 + paragraph = { + "id": f"msg_{len(document['paragraphs']) + 1}", + "title": f"{date_str} - {sender} - {subject}", + "content": body, + "meta": { + "sender": sender, + "date": date_str, + "subject": subject, + "has_attachments": len(message.get('attachments', [])) > 0, + "message_id": message.get('id', '') + } + } + document['paragraphs'].append(paragraph) + + # 解析邮件日期为datetime对象 + try: + date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + except ValueError: + # 如果解析失败,使用当前时间 + logger.warning(f"无法解析邮件日期: {date_str},使用当前时间") + date_obj = datetime.now() + + # 查找parent_id(查找日期早于当前邮件且最接近的消息) + parent_id = None + closest_date = None + + for d, mid in previous_message_dict.items(): + if d < date_obj and (closest_date is None or d > closest_date): + closest_date = d + parent_id = mid + + # 保存消息到聊天历史,使用邮件实际日期 + from django.utils import timezone + aware_date = timezone.make_aware(date_obj) if not timezone.is_aware(date_obj) else date_obj + + # 创建消息记录 + chat_message = ChatHistory.objects.create( + user=self.user, + knowledge_base=knowledge_base, + conversation_id=conversation_id, + parent_id=parent_id, + role=role, + content=f"{subject}\n\n{body}", + metadata={ + 'gmail_message_id': message.get('id', ''), + 'from': sender, + 'date': date_str, + 'subject': subject, + 'dataset_id_list': [str(knowledge_base.id)], + 'dataset_names': [knowledge_base.name] + }, + created_at=aware_date # 设置正确的创建时间 + ) + + # 更新previous_message_dict + previous_message_dict[date_obj] = str(chat_message.id) + + # 处理附件 + attachments = message.get('attachments', []) + if attachments: + for attachment in attachments: + if 'attachmentId' in attachment and 'filename' in attachment: + try: + # 下载附件 + filepath = self.download_attachment( + message_id=message['id'], + attachment_id=attachment['attachmentId'], + filename=attachment['filename'] + ) + + if filepath: + # 记录附件信息 + GmailAttachment.objects.create( + chat_message=chat_message, + gmail_message_id=message['id'], + filename=attachment['filename'], + filepath=filepath, + mimetype=attachment.get('mimeType', ''), + filesize=attachment.get('size', 0) + ) + logger.info(f"已保存附件: {attachment['filename']}") + except Exception as e: + logger.error(f"保存附件失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + except Exception as e: + logger.error(f"处理邮件时出错: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + # 将文档添加到知识库 + logger.info(f"将Gmail对话添加到知识库: {knowledge_base.name}") + + # 准备文档数据结构 + doc_data = { + "name": f"Gmail对话与{self.email or '联系人'}.txt", + "paragraphs": [] + } + + # 将所有对话转换为段落 + for paragraph in document['paragraphs']: + doc_data["paragraphs"].append({ + "title": paragraph['title'], + "content": paragraph['content'], + "is_active": True, + "problem_list": [] + }) + + # 调用知识库的文档上传API + from .views import KnowledgeBaseViewSet + kb_viewset = KnowledgeBaseViewSet() + + # 上传文档 + upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) + + if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): + # 上传成功,保存记录到数据库 + document_id = upload_response['data']['id'] + + # 创建文档记录 + from .models import KnowledgeBaseDocument + doc_record = KnowledgeBaseDocument.objects.create( + knowledge_base=knowledge_base, + document_id=document_id, + document_name=doc_data["name"], + external_id=document_id + ) + + logger.info(f"Gmail对话文档上传成功,ID: {document_id}") + + # 上传所有附件 + self._upload_attachments_to_knowledge_base(knowledge_base, conversations) + else: + # 上传失败,记录错误信息 + error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' + logger.error(f"Gmail对话文档上传失败: {error_msg}") + + # 更新知识库文档计数 + knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( + knowledge_base=knowledge_base, + status='active' + ).count() + knowledge_base.save() + + logger.info(f"Gmail对话已保存到知识库: {knowledge_base.name}") + + return { + "conversation_id": conversation_id, + "success": True, + "message_count": len(conversations), + "knowledge_base_id": str(knowledge_base.id) + } + + except Exception as e: + logger.error(f"保存Gmail对话到知识库失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return {"conversation_id": None, "success": False, "error": str(e)} + + def send_email(self, to_email, subject, body, conversation_id=None, attachments=None): + """发送邮件并保存到聊天记录""" + try: + # 构建邮件内容 + if attachments and len(attachments) > 0: + message = self._create_message_with_attachment(to_email, subject, body, attachments) + else: + message = self._create_message(to_email, subject, body) + + # 发送邮件 + sent_message = self.gmail_service.users().messages().send( + userId='me', body=message + ).execute() + + message_id = sent_message['id'] + logger.info(f"邮件发送成功, ID: {message_id}") + + # 获取邮件详情,包括服务器时间戳 + try: + message_detail = self.gmail_service.users().messages().get( + userId='me', id=message_id + ).execute() + + # 从消息详情中提取时间戳 + internal_date = message_detail.get('internalDate') + if internal_date: + # 转换毫秒时间戳为datetime,不使用timezone-aware + email_date = datetime.fromtimestamp(int(internal_date)/1000) + # 如果系统设置了USE_TZ,再转换为timezone-aware + if hasattr(timezone, 'is_aware') and not timezone.is_aware(email_date): + email_date = timezone.make_aware(email_date) + date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') + else: + email_date = datetime.now() + date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') + except Exception as e: + logger.warning(f"获取邮件时间戳失败: {str(e)}, 使用当前时间") + email_date = datetime.now() + date_str = email_date.strftime('%Y-%m-%d %H:%M:%S') + + # 如果有conversation_id,保存到聊天记录 + if conversation_id: + # 查找现有的聊天记录 + from django.db.models import Q + existing_messages = ChatHistory.objects.filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('created_at') + + if not existing_messages.exists(): + logger.warning(f"找不到对话ID: {conversation_id},无法保存消息") + return message_id + + # 查找关联的知识库 + first_message = existing_messages.first() + if first_message.knowledge_base: + knowledge_base = first_message.knowledge_base + else: + # 如果第一条消息没有知识库,尝试从metadata获取 + if first_message.metadata and 'dataset_id_list' in first_message.metadata: + kb_id = first_message.metadata.get('dataset_id_list', [])[0] + knowledge_base = KnowledgeBase.objects.get(id=kb_id) + else: + logger.error(f"找不到关联的知识库,无法保存消息") + return message_id + + # 查找parent_id:时间早于当前邮件且最接近的消息,避免时区问题 + parent_id = None + try: + # 使用简单查询,不依赖时区 + previous_messages = ChatHistory.objects.filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('-id')[:1] # 使用ID降序排序而非时间 + + if previous_messages: + parent_id = str(previous_messages[0].id) + except Exception as e: + logger.error(f"查找父消息失败: {str(e)}") + logger.error(traceback.format_exc()) + + # 构建metadata + metadata = { + 'gmail_message_id': message_id, + 'from': self.user.email, + 'to': to_email, + 'date': date_str, + 'subject': subject, + 'dataset_id_list': [str(knowledge_base.id)], + 'dataset_names': [knowledge_base.name] + } + + # 如果现有消息有dataset_id_list,保留它 + if first_message.metadata and 'dataset_id_list' in first_message.metadata: + metadata['dataset_id_list'] = first_message.metadata['dataset_id_list'] + # 尝试获取对应的dataset_names + if 'dataset_names' in first_message.metadata: + metadata['dataset_names'] = first_message.metadata['dataset_names'] + + # 创建聊天记录,使用当前时间而不是邮件的实际时间 + chat_message = ChatHistory.objects.create( + user=self.user, + knowledge_base=knowledge_base, + conversation_id=conversation_id, + parent_id=parent_id, + role='user', # 用户发送的消息,角色固定为'user' + content=f"[{subject}] {body}", + metadata=metadata + # 不设置created_at,使用数据库默认时间 + ) + + # 如果有附件,保存附件信息 + if attachments and len(attachments) > 0: + for att_path in attachments: + file_name = os.path.basename(att_path) + file_size = os.path.getsize(att_path) + + # 获取MIME类型 + mime_type, _ = mimetypes.guess_type(att_path) + if not mime_type: + mime_type = 'application/octet-stream' + + # 保存附件信息到数据库 + gmail_attachment = GmailAttachment.objects.create( + chat_message=chat_message, + gmail_message_id=message_id, + filename=file_name, + filepath=att_path, + mimetype=mime_type, + filesize=file_size + ) + + # 更新知识库文档 + self._append_to_knowledge_base_document(knowledge_base, subject, body, to_email) + + return message_id + + except Exception as e: + logger.error(f"发送邮件失败: {str(e)}") + logger.error(traceback.format_exc()) + raise + + def _create_message(self, to, subject, body): + """创建邮件消息""" + message = MIMEText(body) + message['to'] = to + message['subject'] = subject + + # 编码为JSON安全的字符串 + raw = base64.urlsafe_b64encode(message.as_bytes()).decode() + return {'raw': raw} + + def _create_message_with_attachment(self, to, subject, body, attachment_files): + """创建带附件的邮件消息""" + message = MIMEMultipart() + message['to'] = to + message['subject'] = subject + + # 添加邮件正文 + msg = MIMEText(body) + message.attach(msg) + + # 添加附件 + for file in attachment_files: + try: + with open(file, 'rb') as f: + part = MIMEBase('application', 'octet-stream') + part.set_payload(f.read()) + + # 编码附件内容 + encoders.encode_base64(part) + + # 添加头部信息 + filename = os.path.basename(file) + part.add_header( + 'Content-Disposition', + f'attachment; filename="{filename}"' + ) + message.attach(part) + except Exception as e: + logger.error(f"添加附件 {file} 失败: {str(e)}") + + # 编码为JSON安全的字符串 + raw = base64.urlsafe_b64encode(message.as_bytes()).decode() + return {'raw': raw} + + def _append_to_knowledge_base_document(self, knowledge_base, subject, body, recipient): + """将新发送的邮件内容追加到知识库文档中""" + try: + # 准备文档数据结构 + doc_data = { + "name": f"Gmail回复给{recipient}_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", + "paragraphs": [ + { + "title": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {subject}", + "content": f"发件人: {self.user.email}\n收件人: {recipient}\n\n{body}", + "is_active": True, + "problem_list": [] + } + ] + } + + # 调用知识库的文档上传API + from .views import KnowledgeBaseViewSet + kb_viewset = KnowledgeBaseViewSet() + + # 验证知识库是否有external_id + if not knowledge_base.external_id: + logger.error(f"知识库没有external_id,无法上传文档: {knowledge_base.name}") + return None + + # 上传文档 + upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, doc_data) + + if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): + # 上传成功,保存记录到数据库 + document_id = upload_response['data']['id'] + + # 创建文档记录 + from .models import KnowledgeBaseDocument + doc_record = KnowledgeBaseDocument.objects.create( + knowledge_base=knowledge_base, + document_id=document_id, + document_name=doc_data["name"], + external_id=document_id + ) + + logger.info(f"Gmail回复文档上传成功,ID: {document_id}") + + # 更新知识库文档计数 + knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( + knowledge_base=knowledge_base, + status='active' + ).count() + knowledge_base.save() + + return upload_response + else: + # 上传失败,记录错误信息 + error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败' + logger.error(f"Gmail回复文档上传失败: {error_msg}") + return None + + except Exception as e: + logger.error(f"追加邮件内容到知识库失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return None + + def setup_watch(self, topic_name=None): + """设置Gmail监听""" + try: + # 如果没有指定topic,使用配置的主题名称 + if not topic_name: + # 从settings获取项目配置的主题名称 + topic_name = getattr(settings, 'GMAIL_TOPIC_NAME', 'gmail-watch-topic') + + # 直接使用settings中配置的项目ID,不再从client_secret.json获取 + project_id = getattr(settings, 'GOOGLE_CLOUD_PROJECT', 'knowledge-454905') + logger.info(f"使用settings中配置的项目ID: {project_id}") + + # 注意:不再使用用户邮箱作为判断依据,Gmail API总是使用'me'作为userId + # Gmail认证是通过OAuth流程,与系统用户邮箱无关 + logger.info(f"系统用户: {self.user.email},Gmail API使用OAuth认证的邮箱 (userId='me')") + + # 构建完整的webhook URL + webhook_url = getattr(settings, 'GMAIL_WEBHOOK_URL', None) + if not webhook_url: + # 使用默认值,确保这是一个完全限定的URL + domain = getattr(settings, 'ALLOWED_HOSTS', ['localhost'])[0] + protocol = 'https' if getattr(settings, 'SECURE_SSL_REDIRECT', False) else 'http' + if domain == 'localhost' or domain.startswith('127.0.0.1'): + # 本地开发环境需要公网可访问的URL,可以用ngrok等工具暴露本地服务 + webhook_url = f"{protocol}://{domain}:8000/api/user/gmail/webhook/" + logger.warning("使用本地开发环境URL作为webhook回调,Google可能无法访问。建议使用ngrok等工具创建公网地址。") + else: + webhook_url = f"{protocol}://{domain}/api/user/gmail/webhook/" + + logger.info(f"使用Gmail webhook URL: {webhook_url}") + + # 如果想在Google Cloud控制台中手动配置webhook,打印明确的说明 + logger.info("如需手动配置Gmail推送通知,请确保在Google Cloud控制台的Pub/Sub主题配置中设置以下URL:") + logger.info(f"推送端点: {webhook_url}") + logger.info("权限: 为Gmail API服务账号(gmail-api-push@system.gserviceaccount.com)和您的Gmail账号授予发布权限") + + # 请求监听 + request = { + 'labelIds': ['INBOX', 'SENT', 'IMPORTANT', 'UNREAD'], # 监听更多标签 + 'topicName': f"projects/{project_id}/topics/{topic_name}", + 'labelFilterAction': 'include' + } + + logger.info(f"设置Gmail监听: {request}") + + # 执行watch请求 + response = self.gmail_service.users().watch(userId='me', body=request).execute() + + # 获取historyId,用于后续同步 + history_id = response.get('historyId') + expiration = response.get('expiration') + + logger.info(f"Gmail监听设置成功: historyId={history_id}, expiration={expiration}") + + # 保存监听信息到数据库 + if self.user: + credential = GmailCredential.objects.filter(user=self.user, is_active=True).first() + if credential: + # 转换时间戳为datetime + expiration_time = None + if expiration: + # 将毫秒时间戳转换为timezone-aware的datetime + naive_time = datetime.fromtimestamp(int(expiration)/1000) + expiration_time = timezone.make_aware(naive_time) + + credential.last_history_id = history_id + credential.watch_expiration = expiration_time + credential.save() + + logger.info(f"更新Gmail监听信息: {self.user.username}, history_id: {history_id}") + + return { + 'historyId': history_id, + 'expiration': expiration + } + + except Exception as e: + logger.error(f"设置Gmail监听失败: {str(e)}") + logger.error(traceback.format_exc()) + raise + + def get_history(self, start_history_id): + """获取历史变更""" + try: + logger.info(f"获取历史记录,起始ID: {start_history_id}") + response = self.gmail_service.users().history().list( + userId='me', startHistoryId=start_history_id + ).execute() + + logger.info(f"历史记录响应: {response}") + + history_list = [] + if 'history' in response: + history_list.extend(response['history']) + logger.info(f"找到 {len(response['history'])} 个历史记录") + + # 获取所有页 + while 'nextPageToken' in response: + page_token = response['nextPageToken'] + response = self.gmail_service.users().history().list( + userId='me', startHistoryId=start_history_id, pageToken=page_token + ).execute() + if 'history' in response: + history_list.extend(response['history']) + logger.info(f"加载额外 {len(response['history'])} 个历史记录") + else: + logger.info(f"没有新的历史记录,最新historyId: {response.get('historyId', 'N/A')}") + + # 提取新消息ID + new_message_ids = set() + for history in history_list: + logger.info(f"处理历史记录: {history}") + if 'messagesAdded' in history: + for message in history['messagesAdded']: + message_id = message['message']['id'] + new_message_ids.add(message_id) + logger.info(f"新增消息ID: {message_id}") + + if 'labelsAdded' in history: + for label in history['labelsAdded']: + message_id = label['message']['id'] + if 'INBOX' in label.get('labelIds', []): + new_message_ids.add(message_id) + logger.info(f"标签变更的消息ID: {message_id}, 添加了INBOX标签") + + if new_message_ids: + logger.info(f"总共找到 {len(new_message_ids)} 个新消息") + else: + logger.info("没有新增消息") + + return list(new_message_ids) + + except Exception as e: + logger.error(f"获取Gmail历史记录失败: {str(e)}") + return [] + + def process_notification(self, notification_data): + """处理Gmail推送通知""" + try: + # 提取通知数据 + logger.info(f"处理Gmail通知: {notification_data}") + + # 处理Google Pub/Sub消息格式 + if isinstance(notification_data, dict) and 'message' in notification_data and 'data' in notification_data['message']: + try: + import base64 + import json + logger.info("检测到Google Pub/Sub消息格式") + + # Base64解码data字段 + encoded_data = notification_data['message']['data'] + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + logger.info(f"解码后的数据: {decoded_data}") + + # 解析JSON获取email和historyId + json_data = json.loads(decoded_data) + email = json_data.get('emailAddress') + history_id = json_data.get('historyId') + logger.info(f"从Pub/Sub消息中提取: email={email}, historyId={history_id}") + except Exception as decode_error: + logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}") + logger.error(traceback.format_exc()) + return False + else: + # 原始格式处理 + email = notification_data.get('emailAddress') + history_id = notification_data.get('historyId') + + if not email or not history_id: + logger.error("Gmail通知数据不完整: 找不到emailAddress或historyId") + return False + + # 查找关联用户 + from .models import User + user = User.objects.filter(email=email).first() + + # 如果找不到用户,尝试使用gmail_email字段查找 + if not user: + logger.info(f"找不到email={email}的用户,尝试使用gmail_email查找") + from .models import GmailCredential + credential = GmailCredential.objects.filter(gmail_email=email, is_active=True).first() + if credential: + user = credential.user + logger.info(f"通过gmail_email找到用户: {user.email}") + + if not user: + logger.error(f"找不到与 {email} 关联的用户") + return False + + # 初始化Gmail集成 + gmail_integration = GmailIntegration(user) + if not gmail_integration.authenticate(): + logger.error(f"Gmail认证失败: {email}") + return False + + # 首先尝试使用历史记录API获取新消息 + message_ids = gmail_integration.get_history(history_id) + if message_ids: + logger.info(f"从历史记录找到 {len(message_ids)} 个新消息") + # 处理每个新消息 + for message_id in message_ids: + self._process_new_message(gmail_integration, message_id) + return True + + # 如果历史记录API没有返回新消息,尝试获取最近的对话 + logger.info("历史记录API没有返回新消息,尝试获取达人对话") + + # 查找所有与该用户关联的达人映射 + from .models import GmailTalentMapping + mappings = GmailTalentMapping.objects.filter( + user=user, + is_active=True + ) + + if not mappings.exists(): + logger.info(f"用户 {user.email} 没有达人映射记录") + return False + + # 处理每个达人映射 + for mapping in mappings: + talent_email = mapping.talent_email + logger.info(f"处理达人 {talent_email} 的对话") + + # 获取达人最近的邮件 + recent_emails = gmail_integration.get_recent_emails( + from_email=talent_email, + max_results=5 # 限制获取最近5封 + ) + + if not recent_emails: + logger.info(f"没有找到来自 {talent_email} 的最近邮件") + continue + + logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件") + + # 创建或获取知识库 + knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_email) + kb_action = "创建" if created else "获取" + logger.info(f"知识库{kb_action}成功: {knowledge_base.name}") + + # 保存对话 + result = gmail_integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base) + logger.info(f"保存达人对话结果: {result}") + + return True + + except Exception as e: + logger.error(f"处理Gmail通知失败: {str(e)}") + logger.error(traceback.format_exc()) + return False + + def _process_new_message(self, gmail_integration, message_id): + """处理新收到的邮件""" + try: + # 导入所需模型 + from .models import GmailTalentMapping, GmailAttachment, KnowledgeBase, ChatHistory + + # 获取邮件详情 + message = gmail_integration.gmail_service.users().messages().get( + userId='me', id=message_id + ).execute() + + # 提取邮件内容 + email_data = gmail_integration._extract_email_content(message) + if not email_data: + logger.error(f"提取邮件内容失败: {message_id}") + return False + + # 获取发件人邮箱 + from_email = email_data.get('from', '') + sender_email = '' + if '<' in from_email and '>' in from_email: + # 格式如 "姓名 " + sender_email = from_email.split('<')[1].split('>')[0] + else: + # 格式可能直接是邮箱 + sender_email = from_email + + # 根据邮箱判断角色:检查发件人与用户邮箱或者映射的talent邮箱是否匹配 + is_user_email = gmail_integration.user.email.lower() == sender_email.lower() + + # 检查是否有与当前用户关联的talent邮箱映射 + is_mapped_talent = False + talent_mapping = GmailTalentMapping.objects.filter( + user=gmail_integration.user, + talent_email=sender_email, + is_active=True + ).first() # 修改为first()以获取实际的对象而不是布尔值 + + # 如果是用户邮箱或映射的talent邮箱,则为user角色,否则为assistant + role = 'user' if is_user_email or talent_mapping else 'assistant' + + logger.info(f"设置消息角色: {role}, 发件人: {sender_email}, 用户邮箱: {gmail_integration.user.email}, 是否映射达人: {talent_mapping}") + + # 查找是否有关联的达人知识库 + kb_name = f"Gmail-{sender_email.split('@')[0]}" + knowledge_base = KnowledgeBase.objects.filter(name=kb_name).first() + + # 如果没有以发件人邮箱命名的知识库,尝试查找自定义知识库 + if not knowledge_base: + # 查找映射关系 + mapping = GmailTalentMapping.objects.filter( + talent_email=sender_email, + user=gmail_integration.user, + is_active=True + ).first() + + if mapping and mapping.knowledge_base: + knowledge_base = mapping.knowledge_base + logger.info(f"使用映射的知识库: {knowledge_base.name}") + else: + logger.info(f"收到新邮件,但没有找到关联的达人知识库: {sender_email}") + return False + + # 查找关联的对话ID + conversation_id = None + + # 1. 首先通过talent_mapping查找 + if talent_mapping and talent_mapping.conversation_id: + conversation_id = talent_mapping.conversation_id + logger.info(f"通过达人映射找到对话ID: {conversation_id}") + + # 2. 如果没有找到,尝试通过知识库查找任何相关对话 + if not conversation_id: + existing_conversation = ChatHistory.objects.filter( + knowledge_base=knowledge_base, + user=gmail_integration.user, + is_deleted=False + ).values('conversation_id').distinct().first() + + if existing_conversation: + conversation_id = existing_conversation['conversation_id'] + logger.info(f"通过知识库 {knowledge_base.name} 找到现有对话ID: {conversation_id}") + + # 更新或创建映射关系 + if not talent_mapping: + GmailTalentMapping.objects.update_or_create( + user=gmail_integration.user, + talent_email=sender_email, + defaults={ + 'knowledge_base': knowledge_base, + 'conversation_id': conversation_id, + 'is_active': True + } + ) + logger.info(f"更新Gmail达人映射: {sender_email} -> 对话ID {conversation_id}") + + # 3. 如果仍没找到,创建新的对话ID + if not conversation_id: + conversation_id = str(uuid.uuid4()) + logger.info(f"创建新的对话ID: {conversation_id}") + + # 保存映射关系 + if not talent_mapping: + GmailTalentMapping.objects.create( + user=gmail_integration.user, + talent_email=sender_email, + knowledge_base=knowledge_base, + conversation_id=conversation_id, + is_active=True + ) + logger.info(f"已创建新的Gmail达人映射: {sender_email} -> {knowledge_base.name}") + + # 检查消息是否已经处理过 + if ChatHistory.objects.filter( + conversation_id=conversation_id, + metadata__gmail_message_id=message_id, + is_deleted=False + ).exists(): + logger.info(f"邮件已处理过,跳过: {message_id}") + return True + + # 解析邮件日期 - 使用普通datetime而非timezone-aware + date_str = email_data.get('date', '') + try: + date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + # 不应用时区转换 + aware_date = date_obj + except (ValueError, TypeError): + logger.warning(f"无法解析邮件日期: {date_str},使用当前时间") + aware_date = datetime.now() + + # 查找适合的parent_id: 使用ID排序而非时间 + try: + previous_messages = ChatHistory.objects.filter( + conversation_id=conversation_id, + is_deleted=False + ).order_by('-id')[:1] + + parent_id = None + if previous_messages: + parent_id = str(previous_messages[0].id) + logger.info(f"找到父消息ID: {parent_id}") + except Exception as e: + logger.error(f"查找父消息失败: {str(e)}") + logger.error(traceback.format_exc()) + parent_id = None + + # 下载附件 + attachment_records = [] + for attachment in email_data.get('attachments', []): + if 'attachmentId' in attachment: + filepath = gmail_integration.download_attachment( + message_id, + attachment['attachmentId'], + attachment['filename'] + ) + + if filepath: + attachment_records.append({ + 'filepath': filepath, + 'filename': attachment['filename'], + 'message_id': message_id, + 'date': date_str + }) + + # 构建metadata + metadata = { + 'gmail_message_id': message_id, + 'from': email_data.get('from', ''), + 'date': date_str, + 'subject': email_data.get('subject', ''), + 'dataset_id_list': [str(knowledge_base.id)], + 'dataset_names': [knowledge_base.name] + } + + if attachment_records: + metadata['message_attachments'] = attachment_records + + # 创建聊天记录,不指定时间 + chat_message = ChatHistory.objects.create( + user=gmail_integration.user, + knowledge_base=knowledge_base, + conversation_id=conversation_id, + parent_id=parent_id, + role=role, # 使用上面确定的role变量,而不是硬编码为'assistant' + content=f"[{email_data['subject']}] {email_data['body']}", + metadata=metadata + # 不设置created_at,使用数据库默认时间 + ) + + # 更新知识库文档 + gmail_integration._append_to_knowledge_base_document( + knowledge_base, + email_data['subject'], + email_data['body'], + gmail_integration.user.email + ) + + # 如果有附件,上传到知识库 + if attachment_records: + gmail_integration._upload_message_attachments_to_knowledge_base( + knowledge_base, + attachment_records + ) + + # 添加WebSocket通知功能 + try: + # 导入必要的模块 + from channels.layers import get_channel_layer + from asgiref.sync import async_to_sync + from django.conf import settings + + # 检查是否有WebSocket通道层配置 + channel_layer = get_channel_layer() + if channel_layer: + # 创建通知数据 + notification_data = { + "type": "notification", + "data": { + "message_type": "new_gmail", + "conversation_id": conversation_id, + "message": { + "id": str(chat_message.id), + "role": role, + "content": f"[{email_data['subject']}] {email_data['body'][:100]}{'...' if len(email_data['body']) > 100 else ''}", + "sender": sender_email, + "subject": email_data['subject'], + "has_attachments": len(attachment_records) > 0 + } + } + } + + # 发送WebSocket消息 + async_to_sync(channel_layer.group_send)( + f"notification_user_{gmail_integration.user.id}", + notification_data + ) + logger.info(f"已发送WebSocket通知: 用户 {gmail_integration.user.id} 收到新Gmail消息") + + # 创建系统通知记录 + try: + from .models import Notification + Notification.objects.create( + sender=gmail_integration.user, + receiver=gmail_integration.user, + title="新Gmail消息", + content=f"您收到了来自 {sender_email} 的新邮件: {email_data['subject']}", + type="system_notice", + related_resource=conversation_id + ) + logger.info(f"已创建系统通知记录: 用户 {gmail_integration.user.id} 的新Gmail消息") + except Exception as notification_error: + logger.error(f"创建系统通知记录失败: {str(notification_error)}") + except Exception as ws_error: + logger.error(f"发送WebSocket通知失败: {str(ws_error)}") + logger.error(traceback.format_exc()) + # 通知失败不影响消息处理流程,继续执行 + + logger.info(f"成功处理新邮件: {message_id} 从 {sender_email}") + return True + + except Exception as e: + logger.error(f"处理新邮件失败: {str(e)}") + logger.error(traceback.format_exc()) + return False + + def get_attachment_by_conversation(self, conversation_id): + """获取特定对话的所有附件""" + try: + # 查找对话记录 + records = ChatHistory.objects.filter( + conversation_id=conversation_id, + user=self.user, + is_deleted=False + ) + + if not records: + logger.warning(f"找不到对话记录: {conversation_id}") + return [] + + # 使用GmailAttachment数据模型查询附件 + attachments = [] + records_ids = [record.id for record in records] + + gmail_attachments = GmailAttachment.objects.filter( + chat_message_id__in=records_ids + ).select_related('chat_message') + + for attachment in gmail_attachments: + chat_message = attachment.chat_message + attachments.append({ + 'id': str(attachment.id), + 'filename': attachment.filename, + 'filepath': attachment.filepath, + 'mimetype': attachment.mimetype, + 'filesize': attachment.filesize, + 'message_id': str(chat_message.id), + 'gmail_message_id': attachment.gmail_message_id, + 'role': chat_message.role, + 'created_at': attachment.created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'content_preview': chat_message.content[:100] + '...' if len(chat_message.content) > 100 else chat_message.content + }) + + return attachments + + except Exception as e: + logger.error(f"获取对话附件失败: {str(e)}") + return [] + + def handle_auth_code(self, auth_code): + """ + 处理授权码并完成OAuth2授权流程,使用与quickstart.py相同的简化流程 + + Args: + auth_code (str): 从Google授权页面获取的授权码 + + Returns: + bool: 授权是否成功 + """ + try: + logger.info("开始处理Gmail授权码...") + + # 确保client_secret_json已提供 + if not self.client_secret: + logger.error("未提供client_secret_json,无法处理授权码") + return False + + # 创建临时文件存储client_secret + client_secret_path = 'client_secret.json' + with open(client_secret_path, 'w') as f: + if isinstance(self.client_secret, str): + logger.info("client_secret是字符串,解析为JSON") + try: + # 确保是有效的JSON + json_data = json.loads(self.client_secret) + json.dump(json_data, f) + except json.JSONDecodeError as e: + logger.error(f"client_secret不是有效的JSON: {str(e)}") + return False + else: + logger.info("client_secret是字典,直接写入文件") + json.dump(self.client_secret, f) + + logger.info(f"已将client_secret写入临时文件: {client_secret_path}") + + try: + # 确认token目录存在 + token_dir = os.path.dirname(self.token_storage_path) + if token_dir and not os.path.exists(token_dir): + logger.info(f"创建token目录: {token_dir}") + os.makedirs(token_dir) + + # 设置token存储 + logger.info(f"设置token存储: {self.token_storage_path}") + store = file.Storage(self.token_storage_path) + + # 提取重定向URI + redirect_uri = None + if isinstance(self.client_secret, dict): + for key in ['web', 'installed']: + if key in self.client_secret and 'redirect_uris' in self.client_secret[key]: + redirect_uri = self.client_secret[key]['redirect_uris'][0] + break + elif isinstance(self.client_secret, str): + try: + json_data = json.loads(self.client_secret) + for key in ['web', 'installed']: + if key in json_data and 'redirect_uris' in json_data[key]: + redirect_uri = json_data[key]['redirect_uris'][0] + break + except: + pass + + # 如果找不到重定向URI,使用默认值 + if not redirect_uri: + redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' + + logger.info(f"使用重定向URI: {redirect_uri}") + + # 从client_secret创建flow + logger.info("从client_secret创建授权流程") + flow = client.flow_from_clientsecrets( + client_secret_path, + self.SCOPES, + redirect_uri=redirect_uri + ) + + # 使用授权码交换token + logger.info("使用授权码交换访问令牌") + credentials = flow.step2_exchange(auth_code) + logger.info("成功获取到访问令牌") + + # 保存到文件 + logger.info(f"保存凭证到文件: {self.token_storage_path}") + store.put(credentials) + + # 保存到实例变量 + self.credentials = credentials + + # 初始化Gmail服务 + logger.info("初始化Gmail服务") + self.gmail_service = discovery.build('gmail', 'v1', http=credentials.authorize(Http())) + logger.info("Gmail服务初始化成功") + + # 保存到数据库 + from django.utils import timezone + + logger.info("保存凭证到数据库") + # 将凭证对象序列化 + credentials_data = pickle.dumps(credentials) + + gmail_credential, created = GmailCredential.objects.update_or_create( + user=self.user, + defaults={ + 'credentials': credentials_data, + 'token_path': self.token_storage_path, + 'updated_at': timezone.now(), + 'is_active': True + } + ) + + action = "创建" if created else "更新" + logger.info(f"已{action}用户 {self.user.username} 的Gmail凭证记录") + + # 成功获取凭证后更新单例 + if self.user and self.gmail_service and self.credentials: + GmailServiceManager.update_instance(self.user, self.credentials, self.gmail_service) + + return True + + except client.FlowExchangeError as e: + logger.error(f"授权码交换失败: {str(e)}") + return False + except Exception as e: + logger.error(f"处理授权码时发生错误: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return False + finally: + # 删除临时文件 + if os.path.exists(client_secret_path): + logger.info(f"删除临时文件: {client_secret_path}") + os.unlink(client_secret_path) + + except Exception as e: + logger.error(f"处理授权码过程中发生异常: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return False + + def _upload_attachments_to_knowledge_base(self, knowledge_base, conversations): + """上传所有邮件附件到知识库""" + try: + # 收集所有需要上传的附件 + attachments_to_upload = [] + + # 从conversations中提取所有附件信息 + for message in conversations: + if 'attachments' in message and message['attachments']: + message_id = message.get('id', '') + sender = message.get('from', '未知发件人') + subject = message.get('subject', '无主题') + date_str = message.get('date', '未知时间') + + for attachment in message['attachments']: + if 'attachmentId' in attachment and 'filename' in attachment: + # 下载附件(如果尚未下载) + filepath = None + + # 检查数据库中是否已有记录 + existing_attachment = GmailAttachment.objects.filter( + gmail_message_id=message_id, + filename=attachment['filename'] + ).first() + + if existing_attachment and os.path.exists(existing_attachment.filepath): + filepath = existing_attachment.filepath + else: + # 下载附件 + filepath = self.download_attachment( + message_id=message_id, + attachment_id=attachment['attachmentId'], + filename=attachment['filename'] + ) + + if filepath: + attachments_to_upload.append({ + 'filepath': filepath, + 'filename': attachment['filename'], + 'message_id': message_id, + 'sender': sender, + 'subject': subject, + 'date': date_str + }) + + # 上传收集到的所有附件 + if attachments_to_upload: + logger.info(f"开始上传 {len(attachments_to_upload)} 个附件到知识库") + + from .views import KnowledgeBaseViewSet + import django.core.files.uploadedfile as uploadedfile + + # 导入FileUploadParser + from rest_framework.parsers import FileUploadParser + + # 创建视图集实例 + kb_viewset = KnowledgeBaseViewSet() + + # 批量上传附件 + for i in range(0, len(attachments_to_upload), 10): # 每批最多10个文件 + batch = attachments_to_upload[i:i+10] + files = [] + + for att in batch: + filepath = att['filepath'] + filename = att['filename'] + + # 确认文件存在 + if not os.path.exists(filepath): + logger.warning(f"附件文件不存在: {filepath}") + continue + + # 读取文件并创建UploadedFile对象 + with open(filepath, 'rb') as f: + file_content = f.read() + files.append(uploadedfile.SimpleUploadedFile( + name=filename, + content=file_content, + content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' + )) + + # 如果这批有文件,调用_call_split_api_multiple上传 + if files: + # 直接调用文档分割API + split_response = kb_viewset._call_split_api_multiple(files) + + if not split_response or split_response.get('code') != 200: + logger.error(f"附件批量分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") + continue + + # 处理分割后的文档 + documents_data = split_response.get('data', []) + + # 对每个文档调用上传API + for doc in documents_data: + doc_name = doc.get('name', 'Gmail附件') + doc_content = doc.get('content', []) + + # 准备文档数据 + upload_doc_data = { + "name": doc_name, + "paragraphs": [] + } + + # 将所有段落添加到文档中 + for paragraph in doc_content: + upload_doc_data["paragraphs"].append({ + "content": paragraph.get('content', ''), + "title": paragraph.get('title', ''), + "is_active": True, + "problem_list": [] + }) + + # 调用文档上传API + upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) + + if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): + # 上传成功,保存记录到数据库 + document_id = upload_response['data']['id'] + + # 创建文档记录 + from .models import KnowledgeBaseDocument + doc_record = KnowledgeBaseDocument.objects.create( + knowledge_base=knowledge_base, + document_id=document_id, + document_name=doc_name, + external_id=document_id + ) + + logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") + + # 更新知识库文档计数 + knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( + knowledge_base=knowledge_base, + status='active' + ).count() + knowledge_base.save() + + logger.info(f"完成附件上传,共 {len(attachments_to_upload)} 个文件") + + except Exception as e: + logger.error(f"上传附件到知识库失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + def _upload_message_attachments_to_knowledge_base(self, knowledge_base, attachment_records): + """上传单个邮件的附件到知识库""" + try: + # 检查是否有附件需要上传 + if not attachment_records: + return + + logger.info(f"开始上传 {len(attachment_records)} 个附件到知识库") + + from .views import KnowledgeBaseViewSet + import django.core.files.uploadedfile as uploadedfile + + # 创建视图集实例 + kb_viewset = KnowledgeBaseViewSet() + + # 准备文件列表 + files = [] + + for att in attachment_records: + filepath = att.get('filepath') + filename = att.get('filename') + + # 确认文件存在 + if not os.path.exists(filepath): + logger.warning(f"附件文件不存在: {filepath}") + continue + + # 读取文件并创建UploadedFile对象 + with open(filepath, 'rb') as f: + file_content = f.read() + files.append(uploadedfile.SimpleUploadedFile( + name=filename, + content=file_content, + content_type=mimetypes.guess_type(filepath)[0] or 'application/octet-stream' + )) + + # 如果有文件,调用_call_split_api_multiple上传 + if files: + # 直接调用文档分割API + split_response = kb_viewset._call_split_api_multiple(files) + + if not split_response or split_response.get('code') != 200: + logger.error(f"附件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}") + return + + # 处理分割后的文档 + documents_data = split_response.get('data', []) + + # 对每个文档调用上传API + for doc in documents_data: + doc_name = doc.get('name', 'Gmail附件') + doc_content = doc.get('content', []) + + # 准备文档数据 + upload_doc_data = { + "name": doc_name, + "paragraphs": [] + } + + # 将所有段落添加到文档中 + for paragraph in doc_content: + upload_doc_data["paragraphs"].append({ + "content": paragraph.get('content', ''), + "title": paragraph.get('title', ''), + "is_active": True, + "problem_list": [] + }) + + # 调用文档上传API + upload_response = kb_viewset._call_upload_api(knowledge_base.external_id, upload_doc_data) + + if upload_response and upload_response.get('code') == 200 and upload_response.get('data'): + # 上传成功,保存记录到数据库 + document_id = upload_response['data']['id'] + + # 创建文档记录 + from .models import KnowledgeBaseDocument + doc_record = KnowledgeBaseDocument.objects.create( + knowledge_base=knowledge_base, + document_id=document_id, + document_name=doc_name, + external_id=document_id + ) + + logger.info(f"Gmail附件文档上传成功,ID: {document_id}, 文件名: {doc_name}") + + # 更新知识库文档计数 + knowledge_base.document_count = KnowledgeBaseDocument.objects.filter( + knowledge_base=knowledge_base, + status='active' + ).count() + knowledge_base.save() + + logger.info(f"完成附件上传,共 {len(files)} 个文件") + + except Exception as e: + logger.error(f"上传附件到知识库失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + def get_recent_emails(self, from_email=None, max_results=10): + """ + 获取最近的邮件(不依赖history_id) + + Args: + from_email (str, optional): 发件人邮箱过滤 + max_results (int, optional): 最大结果数,默认10封 + + Returns: + list: 邮件列表 + """ + try: + # 构建查询 + query = f"from:{from_email}" if from_email else None + logger.info(f"查询最近邮件: query={query}, max_results={max_results}") + + # 查询邮件列表 + response = self.gmail_service.users().messages().list( + userId='me', + q=query, + maxResults=max_results + ).execute() + + if 'messages' not in response: + logger.info("未找到匹配邮件") + return [] + + # 获取邮件详情 + messages = [] + for msg in response['messages']: + try: + message = self.gmail_service.users().messages().get( + userId='me', id=msg['id'] + ).execute() + + email_data = self._extract_email_content(message) + if email_data: + messages.append(email_data) + except Exception as msg_error: + logger.error(f"处理邮件 {msg['id']} 失败: {str(msg_error)}") + + logger.info(f"获取到 {len(messages)} 封邮件") + return messages + + except Exception as e: + logger.error(f"获取最近邮件失败: {str(e)}") + logger.error(traceback.format_exc()) + return [] diff --git a/user_management/management/__init__.py b/user_management/management/__init__.py index e69de29..31bbb3b 100644 --- a/user_management/management/__init__.py +++ b/user_management/management/__init__.py @@ -0,0 +1 @@ +# 管理命令包 diff --git a/user_management/management/commands/__init__.py b/user_management/management/commands/__init__.py index e69de29..e266d78 100644 --- a/user_management/management/commands/__init__.py +++ b/user_management/management/commands/__init__.py @@ -0,0 +1 @@ +# Gmail管理命令 diff --git a/user_management/management/commands/update_gmail_credentials.py b/user_management/management/commands/update_gmail_credentials.py new file mode 100644 index 0000000..f113118 --- /dev/null +++ b/user_management/management/commands/update_gmail_credentials.py @@ -0,0 +1,80 @@ +from django.core.management.base import BaseCommand, CommandError +from user_management.models import GmailCredential, User +from user_management.gmail_integration import GmailIntegration +import logging +import pickle + +logger = logging.getLogger(__name__) + +class Command(BaseCommand): + help = '更新Gmail凭证中的邮箱信息' + + def add_arguments(self, parser): + parser.add_argument('--email', type=str, help='指定用户邮箱') + parser.add_argument('--gmail', type=str, help='指定要设置的Gmail邮箱') + parser.add_argument('--all', action='store_true', help='更新所有凭证') + + def handle(self, *args, **options): + email = options.get('email') + gmail = options.get('gmail') + update_all = options.get('all') + + if update_all: + # 更新所有凭证 + credentials = GmailCredential.objects.filter(is_active=True) + self.stdout.write(f"找到 {credentials.count()} 个活跃的Gmail凭证") + + for credential in credentials: + self._update_credential(credential, gmail) + elif email: + # 更新指定用户的凭证 + try: + user = User.objects.get(email=email) + credentials = GmailCredential.objects.filter(user=user, is_active=True) + + if not credentials.exists(): + raise CommandError(f"未找到用户 {email} 的Gmail凭证") + + for credential in credentials: + self._update_credential(credential, gmail) + except User.DoesNotExist: + raise CommandError(f"未找到用户 {email}") + else: + self.stdout.write("请提供--email参数或--all参数") + + def _update_credential(self, credential, gmail=None): + """更新单个凭证""" + user = credential.user + self.stdout.write(f"正在更新用户 {user.email} 的Gmail凭证...") + + if gmail: + # 如果指定了Gmail邮箱,直接使用 + credential.gmail_email = gmail + credential.save() + self.stdout.write(self.style.SUCCESS(f"已手动设置Gmail邮箱为: {gmail}")) + return + + # 尝试使用API获取Gmail邮箱 + try: + # 从凭证数据中恢复服务 + creds = pickle.loads(credential.credentials) + + if creds and not creds.invalid: + # 创建Gmail集成实例 + integration = GmailIntegration(user=user) + integration.credentials = creds + + # 尝试调用API获取用户资料 + profile = integration.gmail_service.users().getProfile(userId='me').execute() + gmail_email = profile.get('emailAddress') + + if gmail_email: + credential.gmail_email = gmail_email + credential.save() + self.stdout.write(self.style.SUCCESS(f"已更新Gmail邮箱为: {gmail_email}")) + else: + self.stdout.write(self.style.WARNING("无法从API获取Gmail邮箱")) + else: + self.stdout.write(self.style.ERROR("凭证无效,请重新授权")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"更新失败: {str(e)}")) \ No newline at end of file diff --git a/user_management/migrations/0005_gmailattachment_gmailcredential_gmailtalentmapping.py b/user_management/migrations/0005_gmailattachment_gmailcredential_gmailtalentmapping.py new file mode 100644 index 0000000..847fb6e --- /dev/null +++ b/user_management/migrations/0005_gmailattachment_gmailcredential_gmailtalentmapping.py @@ -0,0 +1,71 @@ +# Generated by Django 5.1.5 on 2025-04-09 15:16 + +import django.db.models.deletion +import uuid +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('user_management', '0004_knowledgebasedocument'), + ] + + operations = [ + migrations.CreateModel( + name='GmailAttachment', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('gmail_message_id', models.CharField(max_length=100, verbose_name='Gmail消息ID')), + ('filename', models.CharField(max_length=255, verbose_name='文件名')), + ('filepath', models.CharField(max_length=500, verbose_name='文件路径')), + ('mimetype', models.CharField(max_length=100, verbose_name='MIME类型')), + ('filesize', models.IntegerField(default=0, verbose_name='文件大小')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ('chat_message', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='gmail_attachments', to='user_management.chathistory')), + ], + options={ + 'verbose_name': 'Gmail附件', + 'verbose_name_plural': 'Gmail附件', + 'db_table': 'gmail_attachments', + }, + ), + migrations.CreateModel( + name='GmailCredential', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('token_path', models.CharField(max_length=255, verbose_name='Token存储路径')), + ('last_history_id', models.CharField(blank=True, max_length=100, null=True, verbose_name='上次同步历史ID')), + ('watch_expiration', models.DateTimeField(blank=True, null=True, verbose_name='监听过期时间')), + ('is_active', models.BooleanField(default=True, verbose_name='是否激活')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='gmail_credentials', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'Gmail认证凭据', + 'verbose_name_plural': 'Gmail认证凭据', + 'db_table': 'gmail_credentials', + }, + ), + migrations.CreateModel( + name='GmailTalentMapping', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('talent_email', models.EmailField(max_length=254, verbose_name='达人邮箱')), + ('conversation_id', models.CharField(max_length=100, verbose_name='对话ID')), + ('is_active', models.BooleanField(default=True, verbose_name='是否激活')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')), + ('knowledge_base', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='gmail_mappings', to='user_management.knowledgebase')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='gmail_talent_mappings', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'Gmail达人映射', + 'verbose_name_plural': 'Gmail达人映射', + 'db_table': 'gmail_talent_mappings', + 'unique_together': {('user', 'talent_email')}, + }, + ), + ] diff --git a/user_management/migrations/0006_gmailcredential_credentials.py b/user_management/migrations/0006_gmailcredential_credentials.py new file mode 100644 index 0000000..d788c92 --- /dev/null +++ b/user_management/migrations/0006_gmailcredential_credentials.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.5 on 2025-04-09 16:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('user_management', '0005_gmailattachment_gmailcredential_gmailtalentmapping'), + ] + + operations = [ + migrations.AddField( + model_name='gmailcredential', + name='credentials', + field=models.BinaryField(blank=True, null=True, verbose_name='序列化的凭证数据'), + ), + ] diff --git a/user_management/migrations/0007_alter_gmailcredential_options_and_more.py b/user_management/migrations/0007_alter_gmailcredential_options_and_more.py new file mode 100644 index 0000000..239058d --- /dev/null +++ b/user_management/migrations/0007_alter_gmailcredential_options_and_more.py @@ -0,0 +1,57 @@ +# Generated by Django 5.1.5 on 2025-04-10 09:17 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('user_management', '0006_gmailcredential_credentials'), + ] + + operations = [ + migrations.AlterModelOptions( + name='gmailcredential', + options={}, + ), + migrations.AlterField( + model_name='gmailcredential', + name='created_at', + field=models.DateTimeField(auto_now_add=True), + ), + migrations.AlterField( + model_name='gmailcredential', + name='credentials', + field=models.BinaryField(default=b''), + preserve_default=False, + ), + migrations.AlterField( + model_name='gmailcredential', + name='is_active', + field=models.BooleanField(default=True), + ), + migrations.AlterField( + model_name='gmailcredential', + name='last_history_id', + field=models.CharField(blank=True, max_length=255, null=True), + ), + migrations.AlterField( + model_name='gmailcredential', + name='token_path', + field=models.CharField(max_length=255), + ), + migrations.AlterField( + model_name='gmailcredential', + name='updated_at', + field=models.DateTimeField(auto_now=True), + ), + migrations.AlterField( + model_name='gmailcredential', + name='watch_expiration', + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AlterModelTable( + name='gmailcredential', + table='gmail_credential', + ), + ] diff --git a/user_management/models.py b/user_management/models.py index 04e387d..e7d8137 100644 --- a/user_management/models.py +++ b/user_management/models.py @@ -707,3 +707,61 @@ class KnowledgeBaseDocument(models.Model): def __str__(self): return f"{self.knowledge_base.name} - {self.document_name}" + +class GmailCredential(models.Model): + """Gmail认证信息""" + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='gmail_credentials') + gmail_email = models.EmailField(max_length=255, null=True, blank=True, help_text="实际授权的Gmail账号,可能与user.email不同") + credentials = models.BinaryField() # 序列化的凭证对象 + token_path = models.CharField(max_length=255) # token存储路径 + last_history_id = models.CharField(max_length=255, null=True, blank=True) # 最后处理的historyId + watch_expiration = models.DateTimeField(null=True, blank=True) # 监听过期时间 + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + is_active = models.BooleanField(default=True) + + class Meta: + db_table = 'gmail_credential' + + def __str__(self): + return f"{self.user.username}的Gmail认证" + +class GmailTalentMapping(models.Model): + """Gmail达人映射关系模型""" + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='gmail_talent_mappings') + talent_email = models.EmailField(verbose_name='达人邮箱') + knowledge_base = models.ForeignKey(KnowledgeBase, on_delete=models.CASCADE, related_name='gmail_mappings') + conversation_id = models.CharField(max_length=100, verbose_name='对话ID') + is_active = models.BooleanField(default=True, verbose_name='是否激活') + created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') + updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间') + + class Meta: + db_table = 'gmail_talent_mappings' + unique_together = ['user', 'talent_email'] + verbose_name = 'Gmail达人映射' + verbose_name_plural = 'Gmail达人映射' + + def __str__(self): + return f"{self.user.username} - {self.talent_email}" + +class GmailAttachment(models.Model): + """Gmail附件模型""" + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + chat_message = models.ForeignKey(ChatHistory, on_delete=models.CASCADE, related_name='gmail_attachments') + gmail_message_id = models.CharField(max_length=100, verbose_name='Gmail消息ID') + filename = models.CharField(max_length=255, verbose_name='文件名') + filepath = models.CharField(max_length=500, verbose_name='文件路径') + mimetype = models.CharField(max_length=100, verbose_name='MIME类型') + filesize = models.IntegerField(default=0, verbose_name='文件大小') + created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') + + class Meta: + db_table = 'gmail_attachments' + verbose_name = 'Gmail附件' + verbose_name_plural = 'Gmail附件' + + def __str__(self): + return f"{self.filename} ({self.gmail_message_id})" diff --git a/user_management/urls.py b/user_management/urls.py index f07c09b..68eb23c 100644 --- a/user_management/urls.py +++ b/user_management/urls.py @@ -13,7 +13,19 @@ from .views import ( RegisterView, LoginView, LogoutView, - ChatHistoryViewSet + ChatHistoryViewSet, + user_profile, + user_register, + setup_gmail_integration, + send_gmail_message, + gmail_webhook, + get_gmail_attachments, + download_gmail_attachment, + get_gmail_talents, + refresh_gmail_watch, + check_gmail_auth, + import_gmail_from_sender, + sync_talent_emails ) # 创建路由器 @@ -42,4 +54,16 @@ urlpatterns = [ path('users//', user_detail, name='user-detail'), path('users//update/', user_update, name='user-update'), path('users//delete/', user_delete, name='user-delete'), + + # Gmail集成API + path('gmail/setup/', setup_gmail_integration, name='setup_gmail_integration'), + path('gmail/send/', send_gmail_message, name='send_gmail_message'), + path('gmail/webhook/', gmail_webhook, name='gmail_webhook'), + path('gmail/attachments/', get_gmail_attachments, name='get_gmail_attachments'), + path('gmail/download/', download_gmail_attachment, name='download_gmail_attachment'), + path('gmail/talents/', get_gmail_talents, name='get_gmail_talents'), + path('gmail/refresh-watch/', refresh_gmail_watch, name='refresh_gmail_watch'), + path('gmail/check-auth/', check_gmail_auth, name='check_gmail_auth'), + path('gmail/import-from-sender/', import_gmail_from_sender, name='import_gmail_from_sender'), + path('gmail/sync-talent/', sync_talent_emails, name='sync_talent_emails'), ] diff --git a/user_management/views.py b/user_management/views.py index f735e12..b96e24e 100644 --- a/user_management/views.py +++ b/user_management/views.py @@ -26,7 +26,7 @@ import os from rest_framework.test import APIRequestFactory from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey -from django.http import Http404, HttpResponse, StreamingHttpResponse +from django.http import Http404, HttpResponse, StreamingHttpResponse, FileResponse from django.db import IntegrityError from channels.exceptions import ChannelFull from django.conf import settings @@ -43,6 +43,7 @@ import traceback import requests import json import threading +import re @@ -55,7 +56,10 @@ from .models import ( KnowledgeBase, Notification, KnowledgeBasePermission as KBPermissionModel, - KnowledgeBaseDocument + KnowledgeBaseDocument, + GmailCredential, + GmailTalentMapping, + GmailAttachment ) from .serializers import ( UserSerializer, @@ -253,11 +257,31 @@ class ChatHistoryViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): queryset = ChatHistory.objects.all() def get_queryset(self): - """确保用户只能看到自己的未删除的聊天记录""" - return ChatHistory.objects.filter( - user=self.request.user, + """确保用户只能看到自己的未删除的聊天记录以及有权限的知识库关联的聊天记录""" + user = self.request.user + + # 当前用户的聊天记录 + user_records = ChatHistory.objects.filter( + user=user, is_deleted=False ) + + # 获取用户有权限的知识库ID列表 + accessible_kb_ids = [] + for kb in KnowledgeBase.objects.all(): + if self.check_knowledge_base_permission(kb, user, 'read'): + accessible_kb_ids.append(kb.id) + + # 其他用户创建的、但当前用户有权限访问的知识库的聊天记录 + others_records = ChatHistory.objects.filter( + knowledge_base_id__in=accessible_kb_ids, + is_deleted=False + ).exclude(user=user) # 排除用户自己的记录,避免重复 + + # 合并两个查询集 + combined_queryset = user_records | others_records + + return combined_queryset def list(self, request): """获取对话列表概览""" @@ -349,7 +373,7 @@ class ChatHistoryViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): 'data': None }, status=status.HTTP_400_BAD_REQUEST) - # 获取对话历史 + # 获取对话历史,确保按时间顺序排序 messages = self.get_queryset().filter( conversation_id=conversation_id ).order_by('created_at') @@ -380,18 +404,26 @@ class ChatHistoryViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): 'type': ds.type } for ds in accessible_datasets] + # 构建消息列表,包含parent_id信息 + message_list = [] + for msg in messages: + message_data = { + 'id': str(msg.id), + 'parent_id': msg.parent_id, # 添加parent_id + 'role': msg.role, + 'content': msg.content, + 'created_at': msg.created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'metadata': msg.metadata # 添加metadata + } + message_list.append(message_data) + return Response({ 'code': 200, 'message': '获取成功', 'data': { 'conversation_id': conversation_id, 'datasets': dataset_info, - 'messages': [{ - 'id': str(msg.id), - 'role': msg.role, - 'content': msg.content, - 'created_at': msg.created_at.strftime('%Y-%m-%d %H:%M:%S') - } for msg in messages] + 'messages': message_list } }) @@ -2111,23 +2143,30 @@ class KnowledgeBaseViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): "data": None }, status=status.HTTP_403_FORBIDDEN) - with transaction.atomic(): - # 删除外部知识库 - if instance.external_id: - try: - self._delete_external_dataset(instance.external_id) - logger.info(f"外部知识库删除成功: {instance.external_id}") - except ExternalAPIError as e: - logger.error(f"删除外部知识库失败: {str(e)}") - return Response({ - "code": 500, - "message": f"删除外部知识库失败: {str(e)}", - "data": None - }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + # 删除外部知识库(如果存在) + external_delete_success = True + external_error_message = None + if instance.external_id: + try: + self._delete_external_dataset(instance.external_id) + logger.info(f"外部知识库删除成功: {instance.external_id}") + except ExternalAPIError as e: + # 记录错误但继续执行本地删除 + external_delete_success = False + external_error_message = str(e) + logger.warning(f"外部知识库删除失败,将继续删除本地知识库: {str(e)}") - # 删除本地知识库 - self.perform_destroy(instance) - logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}") + # 删除本地知识库 + self.perform_destroy(instance) + logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}") + + # 如果外部知识库删除失败,返回警告消息 + if not external_delete_success: + return Response({ + "code": 200, + "message": f"知识库已删除,但外部知识库删除失败: {external_error_message}", + "data": None + }) return Response({ "code": 200, @@ -2150,6 +2189,59 @@ class KnowledgeBaseViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet): "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + def _delete_external_dataset(self, external_id): + """删除外部知识库""" + try: + if not external_id: + logger.warning("外部知识库ID为空,跳过删除") + return True + + response = requests.delete( + f'{settings.API_BASE_URL}/api/dataset/{external_id}', + headers={'Content-Type': 'application/json'}, + timeout=30 + ) + + logger.info(f"删除外部知识库响应: status_code={response.status_code}, response={response.text}") + + # 检查响应状态码 + if response.status_code == 404: + logger.warning(f"外部知识库不存在: {external_id}") + return True # 如果知识库不存在,也视为删除成功 + elif response.status_code not in [200, 204]: + logger.warning(f"删除外部知识库状态码异常: {response.status_code}, {response.text}") + return True # 即使状态码异常,也允许继续删除本地知识库 + + # 检查业务状态码 + try: + api_response = response.json() + if api_response.get('code') != 200: + # 如果是因为ID不存在,也视为成功 + if "不存在" in api_response.get('message', ''): + logger.warning(f"外部知识库ID不存在,视为删除成功: {external_id}") + return True + logger.warning(f"业务处理返回非200状态码: {api_response.get('code')}, {api_response.get('message')}") + return True # 不再抛出异常,允许本地删除继续 + logger.info(f"外部知识库删除成功: {external_id}") + return True + except ValueError: + # 如果无法解析 JSON,但状态码是 200,也认为成功 + logger.warning(f"外部知识库删除响应无法解析JSON,但状态码为200,视为成功: {external_id}") + return True + + except requests.exceptions.Timeout: + logger.error(f"删除外部知识库超时: {external_id}") + # 不再抛出异常,允许本地删除继续 + return False + except requests.exceptions.RequestException as e: + logger.error(f"删除外部知识库请求异常: {external_id}, error={str(e)}") + # 不再抛出异常,允许本地删除继续 + return False + except Exception as e: + logger.error(f"删除外部知识库其他错误: {external_id}, error={str(e)}") + # 不再抛出异常,允许本地删除继续 + return False + @action(detail=True, methods=['get']) def permissions(self, request, pk=None): """获取用户对特定知识库的权限""" @@ -4703,3 +4795,968 @@ def user_delete(request, pk): except User.DoesNotExist: return Response({'message': '用户不存在'}, status=404) +from rest_framework.decorators import api_view, permission_classes +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response +from rest_framework import status +from django.http import FileResponse +import os +from .gmail_integration import GmailIntegration +import traceback +import logging +from django.utils import timezone # 导入timezone模块 +from django.conf import settings # 导入settings以检查USE_TZ + +logger = logging.getLogger(__name__) + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def setup_gmail_integration(request): + """ + 设置Gmail集成并加载邮件到知识库 + """ + user = request.user + logger.info(f"用户 {user.username} 请求设置Gmail集成") + + request_data = request.data + + # 检查必填字段 + required_fields = ['client_secret_json', 'talent_gmail'] + for field in required_fields: + if field not in request_data: + logger.warning(f"缺少必填字段: {field}") + return Response({"error": f"缺少必填字段: {field}"}, status=status.HTTP_400_BAD_REQUEST) + + client_secret_json = request_data.get('client_secret_json') + talent_gmail = request_data.get('talent_gmail') + logger.info(f"目标Gmail: {talent_gmail}") + + # 可选的授权码(如果用户已经获取了授权码) + auth_code = request_data.get('auth_code') + has_auth_code = bool(auth_code) + + # 获取可选的代理设置 + use_proxy = request_data.get('use_proxy', True) + proxy_url = request_data.get('proxy_url', 'http://127.0.0.1:7890') + + # 初始化Gmail集成 + gmail_integration = GmailIntegration( + user=user, + email=talent_gmail, + client_secret_json=client_secret_json, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + try: + if auth_code: + # 处理授权码完成OAuth流程 + logger.info("使用授权码完成Gmail认证") + auth_success = gmail_integration.handle_auth_code(auth_code) + + if not auth_success: + logger.error("授权码认证失败") + return Response( + {"error": "授权码认证失败,请确保授权码正确"}, + status=status.HTTP_400_BAD_REQUEST + ) + + logger.info("授权码认证成功") + else: + # 尝试常规认证 + try: + gmail_integration.authenticate() + logger.info("Gmail认证成功") + except Exception as e: + # 如果异常中包含授权URL,返回给前端引导用户授权 + error_message = str(e) + + if "Please visit this URL" in error_message: + # 从错误消息中提取URL + auth_url = error_message.split("Please visit this URL to authorize: ")[1] + logger.info("需要用户授权,返回授权URL") + + return Response( + { + "status": "authorization_required", + "auth_url": auth_url, + "message": "请访问提供的URL获取授权码,然后与client_secret_json和talent_gmail一起提交" + }, + status=status.HTTP_202_ACCEPTED + ) + else: + # 其他错误 + logger.error(f"Gmail认证失败: {error_message}") + return Response( + {"error": f"Gmail认证失败: {error_message}"}, + status=status.HTTP_400_BAD_REQUEST + ) + + # 创建或获取talent知识库 + logger.info(f"创建或获取Gmail知识库: {talent_gmail}") + knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_gmail) + kb_action = "创建" if created else "获取" + logger.info(f"知识库{kb_action}成功: {knowledge_base.id}") + + # 获取邮件对话 + logger.info(f"获取与Gmail: {talent_gmail} 的邮件对话") + conversations = gmail_integration.get_conversations(talent_gmail) + + conversation_count = len(conversations) + logger.info(f"获取到 {conversation_count} 个邮件对话") + + if not conversations: + logger.warning("没有找到Gmail邮件对话") + + # 生成随机会话ID + conversation_id = f"conv_{uuid.uuid4().hex[:10]}" + return Response( + { + "message": f"没有找到与 {talent_gmail} 的邮件对话,请确保您有与该地址的邮件往来", + "knowledge_base_id": str(knowledge_base.id), + "conversation_id": conversation_id, + "troubleshooting": { + "check_emails": f"请确认Gmail中是否有与 {talent_gmail} 的往来邮件", + "verify_address": "请确认邮箱地址拼写正确", + "check_permissions": "请确保已授权完整的邮箱访问权限" + } + }, + status=status.HTTP_200_OK + ) + + # 保存对话到知识库 + logger.info(f"将 {conversation_count} 个邮件对话保存到知识库") + result = gmail_integration.save_conversations_to_knowledge_base(conversations, knowledge_base) + conversation_id = result.get('conversation_id') + logger.info(f"对话已保存到知识库,ID: {conversation_id}") + + return Response( + { + "message": f"Gmail集成成功。已加载与 {talent_gmail} 的 {conversation_count} 个邮件对话到知识库。", + "knowledge_base_id": str(knowledge_base.id), + "conversation_id": conversation_id + }, + status=status.HTTP_200_OK + ) + + except Exception as e: + logger.error(f"设置Gmail集成失败: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + return Response( + {"error": f"设置Gmail集成失败: {str(e)}"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def send_gmail_message(request): + """通过Gmail发送消息给达人""" + try: + data = request.data + conversation_id = data.get('conversation_id') + to_email = data.get('to_email') + subject = data.get('subject') + body = data.get('body') + + # 验证必填字段 + if not all([conversation_id, to_email, subject, body]): + return Response({ + 'code': 400, + 'message': '缺少必填字段: conversation_id, to_email, subject, body', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 查找对话记录,确认用户有权限 - 确保不使用timezone-aware字段进行过滤 + chat_records = ChatHistory.objects.filter( + conversation_id=conversation_id, + user=request.user, + is_deleted=False + ).first() + + if not chat_records: + return Response({ + 'code': 404, + 'message': '聊天记录不存在或无权访问', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 处理附件上传 + attachment_files = [] + if 'attachments' in request.FILES: + uploaded_files = request.FILES.getlist('attachments') + + # 创建临时目录保存上传的文件 + temp_dir = os.path.join(settings.MEDIA_ROOT, 'temp_attachments') + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + # 保存上传的附件 + for file in uploaded_files: + filepath = os.path.join(temp_dir, file.name) + with open(filepath, 'wb+') as destination: + for chunk in file.chunks(): + destination.write(chunk) + attachment_files.append(filepath) + + # 获取可选的代理设置 + use_proxy = data.get('use_proxy', True) + proxy_url = data.get('proxy_url', 'http://127.0.0.1:7890') + + # 创建Gmail集成实例并认证 + gmail_integration = GmailIntegration( + user=request.user, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + if not gmail_integration.authenticate(): + return Response({ + 'code': 400, + 'message': 'Gmail认证失败,请先完成认证', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 发送邮件 + message_id = gmail_integration.send_email( + to_email=to_email, + subject=subject, + body=body, + conversation_id=conversation_id, + attachments=attachment_files + ) + + # 清理临时文件 + for file in attachment_files: + if os.path.exists(file): + os.remove(file) + + return Response({ + 'code': 200, + 'message': '邮件发送成功', + 'data': { + 'message_id': message_id, + 'conversation_id': conversation_id + } + }) + + except Exception as e: + logger.error(f"发送Gmail消息失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'发送Gmail消息失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['POST']) +@permission_classes([]) +def gmail_webhook(request): + """Gmail推送通知webhook""" + try: + # 添加更详细的日志 + logger.info(f"接收到Gmail webhook请求: 路径={request.path}, 方法={request.method}") + logger.info(f"请求头: {dict(request.headers)}") + logger.info(f"请求数据: {request.data}") + + # 验证请求来源(可以添加额外的安全校验) + data = request.data + + if not data: + return Response({ + 'status': 'error', + 'message': '无效的请求数据' + }, status=status.HTTP_400_BAD_REQUEST) + + # 处理数据 + email_address = None + history_id = None + + # 处理Google Pub/Sub消息格式 + if isinstance(data, dict) and 'message' in data and 'data' in data['message']: + try: + import base64 + import json + logger.info("检测到Google Pub/Sub消息格式") + + # Base64解码data字段 + encoded_data = data['message']['data'] + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + logger.info(f"解码后的数据: {decoded_data}") + + # 解析JSON获取email和historyId + json_data = json.loads(decoded_data) + email_address = json_data.get('emailAddress') + history_id = json_data.get('historyId') + logger.info(f"从Pub/Sub消息中提取: email={email_address}, historyId={history_id}") + except Exception as decode_error: + logger.error(f"解析Pub/Sub消息失败: {str(decode_error)}") + logger.error(traceback.format_exc()) + # 处理其他格式的数据 + elif isinstance(data, dict): + # 直接使用JSON格式数据 + logger.info("接收到JSON格式数据") + email_address = data.get('emailAddress') + history_id = data.get('historyId') + elif hasattr(data, 'decode'): + # 尝试解析原始数据 + logger.info("接收到原始数据格式,尝试解析") + try: + import json + json_data = json.loads(data.decode('utf-8')) + email_address = json_data.get('emailAddress') + history_id = json_data.get('historyId') + except Exception as parse_error: + logger.error(f"解析请求数据失败: {str(parse_error)}") + email_address = None + history_id = None + else: + # 尝试从请求参数获取 + logger.info("尝试从请求参数获取数据") + email_address = request.GET.get('emailAddress') or request.POST.get('emailAddress') + history_id = request.GET.get('historyId') or request.POST.get('historyId') + + logger.info(f"提取的邮箱: {email_address}, 历史ID: {history_id}") + + if not email_address or not history_id: + return Response({ + 'status': 'error', + 'message': '缺少必要的参数' + }, status=status.HTTP_400_BAD_REQUEST) + + # 查找用户和认证信息 + user = User.objects.filter(email=email_address).first() + if not user: + logger.info(f"没有找到email={email_address}的用户,尝试使用gmail_email查找") + # 尝试使用gmail_email字段查找 + credential = GmailCredential.objects.filter(gmail_email=email_address, is_active=True).first() + if credential: + user = credential.user + logger.info(f"通过gmail_email找到用户: {user.email}") + else: + logger.error(f"无法找到与{email_address}关联的用户") + return Response({ + 'status': 'error', + 'message': f'找不到与 {email_address} 关联的用户' + }, status=status.HTTP_404_NOT_FOUND) + + # 查找认证信息 + credential = GmailCredential.objects.filter(user=user, is_active=True).first() + if not credential: + return Response({ + 'status': 'error', + 'message': f'找不到用户 {email_address} 的Gmail认证信息' + }, status=status.HTTP_404_NOT_FOUND) + + # 更新history_id + credential.last_history_id = history_id + credential.save() + + # 如果请求中包含达人邮箱,直接处理特定达人的邮件 + talent_email = data.get('talent_email') or request.GET.get('talent_email') + + if talent_email and user: + logger.info(f"检测到特定达人邮箱: {talent_email},将直接处理其最近邮件") + try: + # 创建Gmail集成实例 + integration = GmailIntegration(user=user, email=talent_email) + if integration.authenticate(): + # 获取达人最近的邮件 + recent_emails = integration.get_recent_emails( + from_email=talent_email, + max_results=5 # 限制获取最近5封 + ) + + if recent_emails: + logger.info(f"找到 {len(recent_emails)} 封来自 {talent_email} 的最近邮件") + # 创建或获取知识库 + knowledge_base, created = integration.create_talent_knowledge_base(talent_email) + # 保存对话 + result = integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base) + logger.info(f"已处理达人 {talent_email} 的最近邮件: {result}") + else: + logger.info(f"没有找到来自 {talent_email} 的最近邮件") + else: + logger.error("Gmail认证失败") + except Exception as talent_error: + logger.error(f"处理达人邮件失败: {str(talent_error)}") + logger.error(traceback.format_exc()) + + # 异步处理通知 + # 在生产环境中,应该使用Celery等异步任务队列处理 + try: + integration = GmailIntegration(user=user) + if integration.authenticate(): + result = integration.process_notification(data) + + # 如果处理成功,尝试通过WebSocket发送通知 + # 注意: WebSocket通知已集成到_process_new_message方法中,这里只是额外的备份通知 + if result: + try: + from channels.layers import get_channel_layer + from asgiref.sync import async_to_sync + + # 获取Channel Layer + channel_layer = get_channel_layer() + if channel_layer: + # 发送WebSocket消息 + async_to_sync(channel_layer.group_send)( + f"notification_user_{user.id}", + { + "type": "notification", + "data": { + "message_type": "gmail_update", + "message": "您的Gmail有新消息,已自动处理", + "history_id": history_id, + "timestamp": timezone.now().isoformat() + } + } + ) + except Exception as ws_error: + logger.error(f"发送WebSocket通知失败: {str(ws_error)}") + logger.error(traceback.format_exc()) + else: + logger.error(f"Gmail认证失败: {email_address}") + except Exception as process_error: + logger.error(f"处理Gmail通知失败: {str(process_error)}") + logger.error(traceback.format_exc()) + + return Response({ + 'status': 'success', + 'message': '通知已处理', + 'data': { + 'user_id': str(user.id), + 'history_id': history_id + } + }) + + except Exception as e: + logger.error(f"处理Gmail webhook失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'status': 'error', + 'message': f'处理通知失败: {str(e)}' + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def get_gmail_attachments(request): + """获取Gmail邮件附件""" + try: + conversation_id = request.query_params.get('conversation_id') + if not conversation_id: + return Response({"error": "缺少conversation_id参数"}, status=status.HTTP_400_BAD_REQUEST) + + # 获取可选的代理设置 + use_proxy = request.query_params.get('use_proxy', 'true').lower() == 'true' + proxy_url = request.query_params.get('proxy_url', 'http://127.0.0.1:7890') + + # 创建Gmail集成实例 + gmail_integration = GmailIntegration( + user=request.user, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + # 获取附件列表 + attachments = gmail_integration.get_attachment_by_conversation(conversation_id) + + return Response({ + 'code': 200, + 'message': '获取附件列表成功', + 'data': { + 'attachments': attachments, + 'count': len(attachments) + } + }) + + except Exception as e: + logger.error(f"获取Gmail附件列表失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'获取Gmail附件列表失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def download_gmail_attachment(request): + """下载Gmail附件""" + try: + filepath = request.query_params.get('filepath') + if not filepath: + return Response({ + 'code': 400, + 'message': '缺少必填参数: filepath', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 验证文件存在 + if not os.path.exists(filepath): + return Response({ + 'code': 404, + 'message': '附件文件不存在', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 验证文件是否在允许的目录中 + attachments_dir = os.path.join(settings.MEDIA_ROOT, 'gmail_attachments') + if not filepath.startswith(attachments_dir): + return Response({ + 'code': 403, + 'message': '无权访问该文件', + 'data': None + }, status=status.HTTP_403_FORBIDDEN) + + # 返回文件 + filename = os.path.basename(filepath) + response = FileResponse(open(filepath, 'rb')) + response['Content-Disposition'] = f'attachment; filename="{filename}"' + return response + + except Exception as e: + logger.error(f"下载Gmail附件失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'下载Gmail附件失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def get_gmail_talents(request): + """获取用户的所有达人Gmail映射""" + try: + # 获取用户的所有达人映射 + from .models import GmailTalentMapping + mappings = GmailTalentMapping.objects.filter( + user=request.user, + is_active=True + ).select_related('knowledge_base') + + # 格式化返回数据 + result = [] + for mapping in mappings: + # 查询最新的对话记录 + latest_message = ChatHistory.objects.filter( + conversation_id=mapping.conversation_id, + is_deleted=False + ).order_by('-created_at').first() + + # 查询附件数量 + attachment_count = GmailAttachment.objects.filter( + chat_message__conversation_id=mapping.conversation_id + ).count() + + result.append({ + 'id': str(mapping.id), + 'talent_email': mapping.talent_email, + 'knowledge_base_id': str(mapping.knowledge_base.id), + 'knowledge_base_name': mapping.knowledge_base.name, + 'conversation_id': mapping.conversation_id, + 'created_at': mapping.created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': mapping.updated_at.strftime('%Y-%m-%d %H:%M:%S'), + 'latest_message': { + 'content': latest_message.content if latest_message else '', + 'time': latest_message.created_at.strftime('%Y-%m-%d %H:%M:%S') if latest_message else '', + 'role': latest_message.role if latest_message else '' + }, + 'attachment_count': attachment_count + }) + + return Response({ + 'code': 200, + 'message': '获取成功', + 'data': { + 'talents': result, + 'count': len(result) + } + }) + + except Exception as e: + logger.error(f"获取Gmail达人映射失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'获取Gmail达人映射失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def refresh_gmail_watch(request): + """刷新Gmail监听""" + try: + # 获取用户的Gmail认证信息 + from .models import GmailCredential + credential = GmailCredential.objects.filter(user=request.user, is_active=True).first() + + if not credential: + return Response({ + 'code': 404, + 'message': '找不到Gmail认证信息,请先设置Gmail集成', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 记录用户信息和Gmail邮箱信息 + gmail_email = credential.gmail_email or "未知Gmail账号" + logger.info(f"刷新Gmail监听: 系统用户={request.user.email}, Gmail账号={gmail_email}") + logger.info(f"请确保Gmail账号 {gmail_email} 和 gmail-api-push@system.gserviceaccount.com 都有 Pub/Sub 发布权限") + + # 检查监听是否过期 + is_expired = False + if credential.watch_expiration: + # 使用timezone.now()获取带时区信息的当前时间 + is_expired = credential.watch_expiration < timezone.now() + + # 获取可选的代理设置 + use_proxy = request.data.get('use_proxy', True) + proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890') + + # 创建Gmail集成实例 + gmail_integration = GmailIntegration( + user=request.user, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + # 认证Gmail + if not gmail_integration.authenticate(): + return Response({ + 'code': 400, + 'message': 'Gmail认证失败', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + try: + # 重新设置监听 + watch_result = gmail_integration.setup_watch() + + return Response({ + 'code': 200, + 'message': 'Gmail监听已刷新', + 'data': { + 'history_id': watch_result['historyId'], + 'expiration': watch_result['expiration'], + 'was_expired': is_expired, + 'gmail_email': gmail_email + } + }) + except Exception as watch_error: + logger.error(f"设置监听失败: {str(watch_error)}") + logger.error(traceback.format_exc()) + + # 返回项目未配置错误 + if "settings" in str(watch_error).lower() or "google_cloud_project" in str(watch_error).lower(): + return Response({ + 'code': 500, + 'message': 'Gmail监听设置失败: 服务器配置错误,请联系管理员配置Google Cloud Project', + 'data': { + 'error': str(watch_error), + 'hint': '请在settings.py中设置GOOGLE_CLOUD_PROJECT' + } + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + # 返回一般错误 + return Response({ + 'code': 500, + 'message': f'Gmail监听设置失败: {str(watch_error)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + except Exception as e: + logger.error(f"刷新Gmail监听失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'刷新Gmail监听失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def check_gmail_auth(request): + """检查Gmail认证状态""" + try: + # 获取用户的Gmail认证信息 + from .models import GmailCredential + credential = GmailCredential.objects.filter(user=request.user, is_active=True).first() + + if not credential: + return Response({ + 'code': 404, + 'message': '未找到Gmail认证信息', + 'data': { + 'authenticated': False, + 'needs_setup': True + } + }) + + # 获取可选的代理设置 + use_proxy = request.query_params.get('use_proxy', 'true').lower() == 'true' + proxy_url = request.query_params.get('proxy_url', 'http://127.0.0.1:7890') + + # 创建Gmail集成实例 + gmail_integration = GmailIntegration( + user=request.user, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + # 测试认证 + auth_valid = gmail_integration.authenticate() + + # 检查监听是否过期 + watch_expired = True + if credential.watch_expiration: + watch_expired = credential.watch_expiration < timezone.now() + + return Response({ + 'code': 200, + 'message': '认证信息获取成功', + 'data': { + 'authenticated': auth_valid, + 'needs_setup': not auth_valid, + 'watch_expired': watch_expired, + 'last_history_id': credential.last_history_id, + 'watch_expiration': credential.watch_expiration.strftime('%Y-%m-%d %H:%M:%S') if credential.watch_expiration else None + } + }) + + except Exception as e: + logger.error(f"检查Gmail认证状态失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'检查Gmail认证状态失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['POST']) +@permission_classes([IsAdminUser]) # 只允许管理员访问 +def refresh_all_gmail_watches(request): + """刷新所有即将过期的Gmail监听""" + # 查找2天内将过期的凭证(提前预防) + expire_time = timezone.now() + timedelta(days=2) + + credentials = GmailCredential.objects.filter( + is_active=True, + watch_expiration__lt=expire_time + ).select_related('user') + + results = [] + for credential in credentials: + try: + user = credential.user + gmail_integration = GmailIntegration(user=user) + + if gmail_integration.authenticate(): + watch_result = gmail_integration.setup_watch() + results.append({ + 'user': user.email, + 'success': True, + 'expiration': watch_result['expiration'] + }) + else: + results.append({ + 'user': user.email, + 'success': False, + 'error': '认证失败' + }) + except Exception as e: + results.append({ + 'user': credential.user.email if credential.user else 'Unknown', + 'success': False, + 'error': str(e) + }) + + return Response({ + 'code': 200, + 'message': f'已刷新 {len(results)} 个Gmail监听', + 'data': results + }) + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def import_gmail_from_sender(request): + """手动导入特定发件人的Gmail邮件""" + try: + # 获取请求参数 + sender_email = request.data.get('sender_email') + max_results = int(request.data.get('max_results', 10)) + + if not sender_email: + return Response({ + 'code': 400, + 'message': '缺少必填参数: sender_email', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 获取可选的代理设置 + use_proxy = request.data.get('use_proxy', True) + proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890') + + # 创建Gmail集成实例 + gmail_integration = GmailIntegration( + user=request.user, + email=sender_email, # 设置达人邮箱 + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + # 认证Gmail + if not gmail_integration.authenticate(): + return Response({ + 'code': 400, + 'message': 'Gmail认证失败', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 构建查询参数 + query = f"from:{sender_email}" + + # 查询邮件 + response = gmail_integration.gmail_service.users().messages().list( + userId='me', + q=query, + maxResults=max_results + ).execute() + + if 'messages' not in response: + return Response({ + 'code': 404, + 'message': f'未找到来自 {sender_email} 的邮件', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 处理邮件 + message_ids = [message['id'] for message in response['messages']] + logger.info(f"找到 {len(message_ids)} 封来自 {sender_email} 的邮件") + + # 获取邮件详情 + conversations = [] + for message_id in message_ids: + message = gmail_integration.gmail_service.users().messages().get( + userId='me', id=message_id + ).execute() + + # 提取邮件内容 + email_data = gmail_integration._extract_email_content(message) + if email_data: + conversations.append(email_data) + + if not conversations: + return Response({ + 'code': 404, + 'message': f'无法提取来自 {sender_email} 的邮件内容', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 创建知识库 + knowledge_base, created = gmail_integration.create_talent_knowledge_base(sender_email) + kb_action = "创建" if created else "获取" + logger.info(f"知识库{kb_action}成功: {knowledge_base.id}") + + # 保存对话到知识库 + result = gmail_integration.save_conversations_to_knowledge_base(conversations, knowledge_base) + + # 返回结果 + return Response({ + 'code': 200, + 'message': f'已成功导入 {len(conversations)} 封来自 {sender_email} 的邮件', + 'data': { + 'conversation_id': result.get('conversation_id'), + 'knowledge_base_id': str(knowledge_base.id), + 'message_count': len(conversations) + } + }) + + except Exception as e: + logger.error(f"导入Gmail邮件失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'导入Gmail邮件失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def sync_talent_emails(request): + """手动同步特定达人的Gmail邮件""" + try: + # 获取请求参数 + talent_email = request.data.get('talent_email') + max_results = int(request.data.get('max_results', 10)) + + if not talent_email: + return Response({ + 'code': 400, + 'message': '缺少必填参数: talent_email', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 获取代理设置 + use_proxy = request.data.get('use_proxy', True) + proxy_url = request.data.get('proxy_url', 'http://127.0.0.1:7890') + + # 初始化Gmail集成 + gmail_integration = GmailIntegration( + user=request.user, + email=talent_email, + use_proxy=use_proxy, + proxy_url=proxy_url + ) + + # 认证 + if not gmail_integration.authenticate(): + return Response({ + 'code': 400, + 'message': 'Gmail认证失败', + 'data': None + }, status=status.HTTP_400_BAD_REQUEST) + + # 获取最近邮件 + recent_emails = gmail_integration.get_recent_emails( + from_email=talent_email, + max_results=max_results + ) + + if not recent_emails: + return Response({ + 'code': 404, + 'message': f'未找到来自 {talent_email} 的最近邮件', + 'data': None + }, status=status.HTTP_404_NOT_FOUND) + + # 创建或获取知识库 + knowledge_base, created = gmail_integration.create_talent_knowledge_base(talent_email) + kb_action = "创建" if created else "获取" + logger.info(f"知识库{kb_action}成功: {knowledge_base.name}") + + # 保存到知识库 + result = gmail_integration.save_conversations_to_knowledge_base(recent_emails, knowledge_base) + + # 响应 + return Response({ + 'code': 200, + 'message': f'已成功同步 {len(recent_emails)} 封来自 {talent_email} 的邮件', + 'data': { + 'conversation_id': result.get('conversation_id'), + 'knowledge_base_id': str(knowledge_base.id), + 'message_count': len(recent_emails) + } + }) + + except Exception as e: + logger.error(f"同步达人邮件失败: {str(e)}") + logger.error(traceback.format_exc()) + return Response({ + 'code': 500, + 'message': f'同步达人邮件失败: {str(e)}', + 'data': None + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) +