Skip to content

Commit 70fd1ba

Browse files
committed
feat: rfc5322 email processing helpers added
1 parent 2924d56 commit 70fd1ba

File tree

2 files changed

+346
-3
lines changed

2 files changed

+346
-3
lines changed
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
"""SOAR SDK email processing extras."""
2-
31
from soar_sdk.extras.email.processor import EmailProcessor, ProcessEmailContext
2+
from soar_sdk.extras.email.rfc5322 import RFC5322EmailData, extract_rfc5322_email_data
43

5-
__all__ = ["EmailProcessor", "ProcessEmailContext"]
4+
__all__ = [
5+
"EmailProcessor",
6+
"ProcessEmailContext",
7+
"RFC5322EmailData",
8+
"extract_rfc5322_email_data",
9+
]
Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
import email
2+
import re
3+
from dataclasses import dataclass, field
4+
from email.header import decode_header, make_header
5+
from email.message import Message
6+
from html import unescape
7+
from typing import Any
8+
from urllib.parse import urlparse
9+
10+
from bs4 import BeautifulSoup, UnicodeDammit # type: ignore[attr-defined]
11+
12+
from soar_sdk.extras.email.utils import clean_url, decode_uni_string, is_ip
13+
from soar_sdk.logging import getLogger
14+
15+
logger = getLogger()
16+
17+
URI_REGEX = r"[Hh][Tt][Tt][Pp][Ss]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+#]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
18+
EMAIL_REGEX = r"\b[A-Z0-9._%+-]+@+[A-Z0-9.-]+\.[A-Z]{2,}\b"
19+
20+
21+
@dataclass
22+
class EmailHeaders:
23+
"""Extracted email headers from an RFC 5322 message."""
24+
25+
email_id: str | None = None
26+
message_id: str | None = None
27+
to: str | None = None
28+
from_address: str | None = None
29+
subject: str | None = None
30+
date: str | None = None
31+
received: list[str] = field(default_factory=list)
32+
cc: str | None = None
33+
bcc: str | None = None
34+
x_mailer: str | None = None
35+
x_priority: str | None = None
36+
reply_to: str | None = None
37+
content_type: str | None = None
38+
raw_headers: dict[str, Any] = field(default_factory=dict)
39+
40+
41+
@dataclass
42+
class EmailBody:
43+
"""Extracted email body content."""
44+
45+
plain_text: str | None = None
46+
html: str | None = None
47+
charset: str | None = None
48+
49+
50+
@dataclass
51+
class EmailAttachment:
52+
"""Extracted email attachment metadata."""
53+
54+
filename: str
55+
content_type: str | None = None
56+
size: int = 0
57+
content_id: str | None = None
58+
content: bytes | None = None
59+
is_inline: bool = False
60+
61+
62+
@dataclass
63+
class RFC5322EmailData:
64+
"""Complete extracted data from an RFC 5322 email message."""
65+
66+
raw_email: str
67+
headers: EmailHeaders
68+
body: EmailBody
69+
urls: list[str] = field(default_factory=list)
70+
attachments: list[EmailAttachment] = field(default_factory=list)
71+
72+
def to_dict(self) -> dict[str, Any]:
73+
"""Convert to dictionary representation."""
74+
return {
75+
"raw_email": self.raw_email,
76+
"headers": {
77+
"email_id": self.headers.email_id,
78+
"message_id": self.headers.message_id,
79+
"to": self.headers.to,
80+
"from": self.headers.from_address,
81+
"subject": self.headers.subject,
82+
"date": self.headers.date,
83+
"received": self.headers.received,
84+
"cc": self.headers.cc,
85+
"bcc": self.headers.bcc,
86+
"x_mailer": self.headers.x_mailer,
87+
"x_priority": self.headers.x_priority,
88+
"reply_to": self.headers.reply_to,
89+
"content_type": self.headers.content_type,
90+
"raw_headers": self.headers.raw_headers,
91+
},
92+
"body": {
93+
"plain_text": self.body.plain_text,
94+
"html": self.body.html,
95+
"charset": self.body.charset,
96+
},
97+
"urls": self.urls,
98+
"attachments": [
99+
{
100+
"filename": att.filename,
101+
"content_type": att.content_type,
102+
"size": att.size,
103+
"content_id": att.content_id,
104+
"is_inline": att.is_inline,
105+
}
106+
for att in self.attachments
107+
],
108+
}
109+
110+
111+
def _decode_header_value(value: str | None) -> str | None:
112+
if not value:
113+
return None
114+
try:
115+
return str(make_header(decode_header(value)))
116+
except Exception:
117+
return decode_uni_string(value, value)
118+
119+
120+
def _get_charset(part: Message) -> str:
121+
charset = part.get_content_charset()
122+
return charset if charset else "utf-8"
123+
124+
125+
def _decode_payload(payload: bytes, charset: str) -> str:
126+
try:
127+
return UnicodeDammit(payload).unicode_markup.encode("utf-8").decode("utf-8")
128+
except Exception:
129+
try:
130+
return payload.decode(charset)
131+
except Exception:
132+
return payload.decode("utf-8", errors="replace")
133+
134+
135+
def _extract_urls_from_content(content: str, urls: set[str], is_html: bool) -> None:
136+
if is_html:
137+
try:
138+
soup = BeautifulSoup(content, "html.parser")
139+
for link in soup.find_all(href=True):
140+
href = link["href"]
141+
if href and not href.startswith("mailto:"):
142+
cleaned = clean_url(href)
143+
if cleaned.startswith("http"):
144+
urls.add(cleaned)
145+
for src in soup.find_all(src=True):
146+
src_val = src["src"]
147+
if src_val:
148+
cleaned = clean_url(src_val)
149+
if cleaned.startswith("http"):
150+
urls.add(cleaned)
151+
except Exception as e:
152+
logger.debug(f"Error parsing HTML for URLs: {e}")
153+
154+
content = unescape(content)
155+
uri_matches = re.findall(URI_REGEX, content)
156+
for uri in uri_matches:
157+
cleaned = clean_url(uri)
158+
if cleaned.startswith("http"):
159+
urls.add(cleaned)
160+
161+
162+
def extract_email_headers(mail: Message, email_id: str | None = None) -> EmailHeaders:
163+
"""Extract headers from a parsed email Message."""
164+
headers = EmailHeaders()
165+
headers.email_id = email_id
166+
headers.message_id = mail.get("Message-ID")
167+
headers.to = _decode_header_value(mail.get("To"))
168+
headers.from_address = _decode_header_value(mail.get("From"))
169+
headers.subject = _decode_header_value(mail.get("Subject"))
170+
headers.date = mail.get("Date")
171+
headers.cc = _decode_header_value(mail.get("CC"))
172+
headers.bcc = _decode_header_value(mail.get("BCC"))
173+
headers.x_mailer = mail.get("X-Mailer")
174+
headers.x_priority = mail.get("X-Priority")
175+
headers.reply_to = _decode_header_value(mail.get("Reply-To"))
176+
headers.content_type = mail.get("Content-Type")
177+
178+
received_headers = mail.get_all("Received") or []
179+
headers.received = [str(r) for r in received_headers]
180+
181+
for key, value in mail.items():
182+
if key.lower() == "received":
183+
continue
184+
headers.raw_headers[key] = _decode_header_value(str(value)) if value else None
185+
186+
return headers
187+
188+
189+
def extract_email_body(mail: Message) -> EmailBody:
190+
"""Extract plain text and HTML body from a parsed email Message."""
191+
body = EmailBody()
192+
charset = _get_charset(mail)
193+
body.charset = charset
194+
195+
if not mail.is_multipart():
196+
payload = mail.get_payload(decode=True)
197+
if payload and isinstance(payload, bytes):
198+
content_type = mail.get_content_type()
199+
decoded = _decode_payload(payload, charset)
200+
if content_type == "text/html":
201+
body.html = decoded
202+
else:
203+
body.plain_text = decoded
204+
return body
205+
206+
for part in mail.walk():
207+
if part.is_multipart():
208+
continue
209+
210+
content_type = part.get_content_type()
211+
content_disp = str(part.get("Content-Disposition") or "")
212+
213+
if "attachment" in content_disp.lower():
214+
continue
215+
216+
payload = part.get_payload(decode=True)
217+
if not payload or not isinstance(payload, bytes):
218+
continue
219+
220+
part_charset = _get_charset(part)
221+
decoded = _decode_payload(payload, part_charset)
222+
223+
if content_type == "text/plain" and not body.plain_text:
224+
body.plain_text = decoded
225+
if not body.charset:
226+
body.charset = part_charset
227+
elif content_type == "text/html" and not body.html:
228+
body.html = decoded
229+
230+
return body
231+
232+
233+
def extract_email_urls(mail: Message) -> list[str]:
234+
"""Extract all URLs from email body content."""
235+
urls: set[str] = set()
236+
body = extract_email_body(mail)
237+
238+
if body.html:
239+
_extract_urls_from_content(body.html, urls, is_html=True)
240+
if body.plain_text:
241+
_extract_urls_from_content(body.plain_text, urls, is_html=False)
242+
243+
return sorted(urls)
244+
245+
246+
def extract_email_attachments(
247+
mail: Message, include_content: bool = False
248+
) -> list[EmailAttachment]:
249+
"""Extract attachment metadata from a parsed email Message."""
250+
attachments: list[EmailAttachment] = []
251+
252+
if not mail.is_multipart():
253+
return attachments
254+
255+
for part in mail.walk():
256+
if part.is_multipart():
257+
continue
258+
259+
content_disp = str(part.get("Content-Disposition") or "")
260+
content_type = part.get_content_type()
261+
content_id = part.get("Content-ID")
262+
263+
filename = part.get_filename()
264+
if not filename:
265+
if "attachment" not in content_disp.lower():
266+
continue
267+
filename = "unnamed_attachment"
268+
269+
filename = _decode_header_value(filename) or filename
270+
is_inline = "inline" in content_disp.lower()
271+
raw_payload = part.get_payload(decode=True)
272+
payload = raw_payload if isinstance(raw_payload, bytes) else None
273+
274+
attachment = EmailAttachment(
275+
filename=filename,
276+
content_type=content_type,
277+
size=len(payload) if payload else 0,
278+
content_id=content_id.strip("<>") if content_id else None,
279+
is_inline=is_inline,
280+
)
281+
282+
if include_content and payload:
283+
attachment.content = payload
284+
285+
attachments.append(attachment)
286+
287+
return attachments
288+
289+
290+
def extract_rfc5322_email_data(
291+
rfc822_email: str,
292+
email_id: str | None = None,
293+
include_attachment_content: bool = False,
294+
) -> RFC5322EmailData:
295+
"""Extract all components from an RFC 5322 email string."""
296+
mail = email.message_from_string(rfc822_email)
297+
298+
return RFC5322EmailData(
299+
raw_email=rfc822_email,
300+
headers=extract_email_headers(mail, email_id),
301+
body=extract_email_body(mail),
302+
urls=extract_email_urls(mail),
303+
attachments=extract_email_attachments(mail, include_attachment_content),
304+
)
305+
306+
307+
def extract_domains_from_urls(urls: list[str]) -> list[str]:
308+
"""Extract unique domains from a list of URLs."""
309+
domains: set[str] = set()
310+
311+
for url in urls:
312+
try:
313+
parsed = urlparse(url)
314+
if parsed.netloc and not is_ip(parsed.netloc):
315+
domain = parsed.netloc.split(":")[0]
316+
domains.add(domain)
317+
except Exception as e:
318+
logger.debug(f"Failed to parse URL for domain extraction: {e}")
319+
continue
320+
321+
return sorted(domains)
322+
323+
324+
def extract_email_addresses_from_body(mail: Message) -> list[str]:
325+
"""Extract email addresses found in the email body."""
326+
addresses: set[str] = set()
327+
body = extract_email_body(mail)
328+
329+
content = ""
330+
if body.plain_text:
331+
content += body.plain_text
332+
if body.html:
333+
content += body.html
334+
335+
if content:
336+
matches = re.findall(EMAIL_REGEX, content, re.IGNORECASE)
337+
addresses.update(m.lower() for m in matches)
338+
339+
return sorted(addresses)

0 commit comments

Comments
 (0)