-
Notifications
You must be signed in to change notification settings - Fork 0
/
email2llama.py
246 lines (203 loc) · 10.4 KB
/
email2llama.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import imaplib
import email
from email.header import decode_header
import requests
import logging
from logging.handlers import RotatingFileHandler
import sys
import traceback
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dotenv import load_dotenv
import os
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import AdamW
# Charger les variables d'environnement
load_dotenv()
# Configuration du logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = RotatingFileHandler('log.txt', maxBytes=10000000, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Vérification des variables d'environnement
required_env_vars = ['IMAP_SERVER', 'SMTP_SERVER', 'SMTP_PORT', 'EMAIL', 'PASSWORD']
for var in required_env_vars:
if not os.getenv(var):
logger.error(f"La variable d'environnement {var} n'est pas définie")
sys.exit(1)
def lire_emails(imap_server, email_address, password):
logger.info(f"Tentative de connexion au serveur IMAP: {imap_server}")
try:
imap = imaplib.IMAP4_SSL(imap_server)
logger.info("Connexion SSL établie, tentative de login")
imap.login(email_address, password)
logger.info("Login réussi")
# Sélectionner la boîte de réception
status, messages = imap.select("INBOX")
if status != "OK":
logger.error(f"Impossible de sélectionner la boîte de réception: {messages}")
return
logger.info("Boîte de réception sélectionnée avec succès")
_, message_numbers = imap.search(None, "UNSEEN")
for num in message_numbers[0].split():
_, msg_data = imap.fetch(num, "(RFC822)")
email_body = msg_data[0][1]
email_message = email.message_from_bytes(email_body)
sujet = decode_header(email_message["Subject"])[0][0]
if isinstance(sujet, bytes):
sujet = sujet.decode()
contenu = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
contenu = part.get_payload(decode=True).decode()
else:
contenu = email_message.get_payload(decode=True).decode()
yield sujet, contenu, email_message
imap.close()
imap.logout()
except imaplib.IMAP4.error as e:
logger.error(f"Erreur IMAP lors de la lecture des emails: {str(e)}")
except Exception as e:
logger.error(f"Erreur inattendue lors de la lecture des emails: {str(e)}")
logger.error(traceback.format_exc())
def envoyer_email(smtp_server, smtp_port, sender_email, sender_password, recipient, subject, body):
try:
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = recipient
msg['Subject'] = f"Re: {subject}"
msg['Bcc'] = sender_email
msg.attach(MIMEText(body, 'plain'))
context = ssl.create_default_context()
logger.info(f"Tentative d'envoi de l'email à {recipient} en utilisant SSL")
try:
with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
logger.info(f"Réponse envoyée avec succès à {recipient} en utilisant SSL")
except Exception as e:
logger.warning(f"Échec de l'envoi avec SSL: {str(e)}. Tentative avec STARTTLS...")
# Essai avec STARTTLS
try:
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.ehlo()
if server.has_extn('STARTTLS'):
server.starttls(context=context)
server.ehlo()
server.login(sender_email, sender_password)
server.send_message(msg)
logger.info(f"Réponse envoyée avec succès à {recipient} en utilisant STARTTLS")
except Exception as e:
logger.warning(f"Échec de l'envoi avec STARTTLS: {str(e)}. Tentative sans chiffrement...")
# Si STARTTLS échoue, essayer sans chiffrement
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
logger.info(f"Réponse envoyée avec succès à {recipient} sans chiffrement")
except Exception as e:
logger.error(f"Erreur lors de l'envoi de l'email à {recipient}: {str(e)}")
logger.error(traceback.format_exc())
def creer_fichier_contextuel(email_address, reponse_generee, contenu):
try:
# Vérifie si le dossier des emails existe, sinon le crée
chemin_dossier = os.path.join("./emails/", email_address)
if not os.path.exists(chemin_dossier):
os.makedirs(chemin_dossier)
# Crée le fichier contextuel avec la réponse générée et le contenu de l'email
with open(os.path.join(chemin_dossier, "context.txt"), 'a') as file:
file.write("Contenu de l'email : \n")
file.write(contenu)
file.write("\n\nRéponse générée : \n")
file.write(reponse_generee)
logger.info(f"Fichier contextuel créé pour {email_address}")
except Exception as e:
logger.error(f"Erreur lors de la création du fichier contextuel : {str(e)}")
def generer_reponse(expediteur, sujet, contenu, model_name):
try:
ollama_api_url = os.getenv("OLLAMA_API_URL")
if not ollama_api_url:
raise ValueError("La variable d'environnement OLLAMA_API_URL n'est pas définie.")
# Générer l'embedding
embedding_data = {
"model": model_name,
"prompt": contenu
}
embedding_response = requests.post(f"{ollama_api_url}/api/embeddings", json=embedding_data)
# Vérifiez que la réponse contient bien les embeddings
if 'embedding' not in embedding_response.json():
raise KeyError("La clé 'embedding' n'est pas présente dans la réponse de l'API.")
embedding = embedding_response.json()["embedding"]
# Lire le contenu du fichier contextuel défini dans .env
CONTEXT = os.getenv("CONTEXT")
if CONTEXT is None:
raise ValueError("La variable d'environnement CONTEXT n'est pas définie.")
with open(CONTEXT, 'r') as file:
contexte_env = file.read()
# Initialiser le contexte avec le contenu du fichier .env
contexte_complet = contexte_env
# Ajouter le contenu du fichier contextuel spécifique à l'expéditeur
CONTEXT_FILE = os.path.join("./emails/", expediteur, "context.txt")
if os.path.exists(CONTEXT_FILE):
with open(CONTEXT_FILE, 'r') as file:
contexte_expediteur = file.read()
contexte_complet += f"\n\nHistorique de conversation avec l'expéditeur:\n{contexte_expediteur}"
prompt = f"Contexte général et spécifique:\n{contexte_complet}\n\nEmail actuel:\nSujet: {sujet}\nContenu: {contenu}\n\nRéponse:"
logger.debug(f"prompt : {prompt}")
# Générer la réponse
generate_data = {
"model": model_name,
"prompt": prompt,
"system": "Vous êtes ASTRO un assistant intelligent qui lit et réponds aux messages. NE PAS UTILISER de variable entre crochet dans ta répaonse (ex: [Votre nom]). Utilise les exemples précédents et le contexte fourni pour générer une réponse pertinente. Termine en signalant que tu es un 'assistant numérique' mis au point par le G1FabLab. Signe 'ASTRO, le petit robot.'",
"stream": False
}
response = requests.post(f"{ollama_api_url}/api/generate", json=generate_data)
logger.debug(f"Réponse brute de l'API Ollama : {response.text}")
# Tentative de décodage JSON
try:
response_json = response.json()
return response_json['response']
except json.JSONDecodeError as json_error:
logger.error(f"Erreur de décodage JSON : {str(json_error)}")
logger.error(f"Contenu de la réponse : {response.text}")
return "Désolé, une erreur s'est produite lors de la génération de la réponse."
except Exception as e:
logger.error(f"Erreur lors de la génération de la réponse par le modèle {model_name}: {str(e)}")
logger.error(traceback.format_exc())
return "Désolé, une erreur s'est produite lors de la génération de la réponse."
def traiter_emails_et_appliquer_rag(imap_server, email_address, password, smtp_server, smtp_port, model_name):
emails_traites = 0
try:
for sujet, contenu, email_message in lire_emails(imap_server, email_address, password):
expediteur = email.utils.parseaddr(email_message['From'])[1]
logger.info(f"Traitement par {model_name} de l'email de {expediteur} avec le sujet: {sujet} et le contenu {contenu}")
reponse_generee = generer_reponse(expediteur, sujet, contenu, model_name)
logger.info(f"Reponse : {reponse_generee}")
# Créer le fichier contextuel
creer_fichier_contextuel(expediteur, reponse_generee, contenu)
envoyer_email(smtp_server, smtp_port, email_address, password, expediteur, sujet, reponse_generee)
except Exception as e:
logger.error(f"Erreur générale dans le processus de traitement des emails: {str(e)}")
logger.error(traceback.format_exc())
if __name__ == "__main__":
try:
logger.info("Démarrage du processus de traitement des emails")
IMAP_SERVER = os.getenv("IMAP_SERVER")
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT"))
EMAIL = os.getenv("EMAIL")
PASSWORD = os.getenv("PASSWORD")
MODEL = os.getenv("MODEL")
CONTEXT = os.getenv("CONTEXT")
traiter_emails_et_appliquer_rag(IMAP_SERVER, EMAIL, PASSWORD, SMTP_SERVER, SMTP_PORT, MODEL)
logger.info("Fin du processus de traitement des emails")
except KeyboardInterrupt:
logger.info("Processus interrompu par l'utilisateur")
except Exception as e:
logger.critical(f"Erreur critique: {str(e)}")
logger.critical(traceback.format_exc())