import logging
from typing import Union, List, Dict
import re
import binascii
from os import path, listdir
from io import IOBase, BytesIO, StringIO
import zipfile
import yara
import pdftotext
from mailsuite.utils import parse_email, from_trusted_domain, decode_base64
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
__version__ = "3.2.0"
delimiters = ["r\"", r"'", r"`", r"\*", r"\*\*", r"_", r"|", r"”", r"”", r"’",
r"‚", r""", r"“", r"〝", r"‟", r"〞", r"”", ("❝", r"❞"),
(r"❮", r"❯"), (r"\(", r"\)"), (r"\[", r"\]"), (r"\{", r"\}"),
(r"<", r">"), (r">", "</"), (r"”", r"„"), (r"‘", r"’"),
(r"‹", "›"), (r"»", "«"), (r"«", r"»"), (r"「", r"」"),
(r"〔", r"〕"), (r"『", r"』"), (r"「", r"」"), (r"❬", "❭")]
password_regex = [re.compile(r"\s*(\S+)\s*", re.MULTILINE)]
for delimiter in delimiters:
if isinstance(delimiter, str):
regex = re.compile(f"{delimiter}(.+){delimiter}", re.MULTILINE)
password_regex.append(regex)
regex = re.compile(f"{delimiter}.+{delimiter}", re.MULTILINE)
password_regex.append(regex)
elif isinstance(delimiter, tuple):
regex = re.compile(f"{delimiter[0]}(.+){delimiter[1]}", re.MULTILINE)
password_regex.append(regex)
regex = re.compile(f"{delimiter[0]}.+{delimiter[1]}", re.MULTILINE)
password_regex.append(regex)
def _carve_passwords(content: str) -> List[str]:
passwords = []
for _regex in password_regex:
matches = _regex.findall(content)
passwords += matches
additional_passwords = []
for password in passwords:
# Make object type clear to IDEs
password = str(password)
# Account for any extra spaces added during markdown conversion
if " " in password:
additional_passwords.append(password.replace(" ", ""))
passwords += additional_passwords
return passwords
def _deduplicate_list(og_list: list):
new_list = []
for item in og_list:
if item not in new_list:
new_list.append(item)
return new_list
def _match_to_dict(match: Union[yara.Match,
List[yara.Match]]) -> Union[List[Dict],
Dict]:
def match_to_dict_(_match: yara.Match) -> Dict:
strings = []
for s in match.strings:
if type(s) is tuple:
strings.append(s)
else:
for i in s.instances:
strings.append((i.offset, s.identifier, i.matched_data))
strings = sorted(strings, key=lambda x: x[0])
return dict(rule=_match.rule,
namespace=_match.namespace,
tags=_match.tags,
meta=_match.meta,
strings=strings,
warnings=[]
)
if isinstance(match, list):
matches = match.copy()
for i in range(len(matches)):
matches[i] = _match_to_dict(matches[i])
return matches
elif isinstance(match, yara.Match):
return match_to_dict_(match)
def _is_pdf(file_bytes: bytes) -> bool:
try:
return file_bytes.startswith(b"\x25\x50\x44\x46\x2D")
except TypeError:
return False
def _is_zip(file_bytes: bytes) -> bool:
try:
return file_bytes.startswith(b"\x50\x4B\03\04")
except TypeError:
return False
def _pdf_to_markdown(pdf_bytes: bytes) -> str:
if not _is_pdf(pdf_bytes):
raise ValueError("Not a PDF file")
with BytesIO(pdf_bytes) as f:
return "\n\n".join(pdftotext.PDF(f))
def _input_to_str_list(_input: Union[List[str], str, IOBase]) -> list:
_list = []
if _input is None:
return _list
if isinstance(_input, list):
_list = _input
if isinstance(_input, str):
if path.exists(_input):
with open(_input) as f:
_list = f.read().split("\n")
if isinstance(_input, StringIO):
_list = _input.read().split("\n")
try:
_list.remove("")
except ValueError:
pass
return _list
def _compile_rules(rules: Union[yara.Rules, IOBase, str]) -> yara.Rules:
if isinstance(rules, yara.Rules):
return rules
if isinstance(rules, IOBase):
rules = rules.read()
if not path.exists(rules):
return yara.compile(source=rules)
if not path.isdir(rules):
return yara.compile(filepath=rules)
rules_str = ""
for filename in listdir():
file_path = path.join(rules, filename)
if not path.isdir(file_path):
with open(file_path) as rules_file:
rules_str += rules_file.read()
return yara.compile(source=rules_str)
[docs]
class MailScanner(object):
def __init__(self, header_rules: Union[str, IOBase, yara.Rules] = None,
body_rules: Union[str, IOBase, yara.Rules] = None,
header_body_rules: Union[str, IOBase, yara.Rules] = None,
attachment_rules: Union[str, IOBase, yara.Rules] = None,
passwords: Union[List[str], IOBase, str] = None,
max_zip_depth: int = None,
implicit_safe_domains: Union[List[str], IOBase, str] = None,
allow_multiple_authentication_results: bool = False,
use_authentication_results_original: bool = False):
"""
A YARA scanner for emails that can also check Authentication-Results
headers.
Args:
header_rules: Rules that only apply to email header content
body_rules: Rules that only apply to email body content
header_body_rules: Rules that apply to combined email \
header and body content
attachment_rules: Rules that only apply to file \
attachment content
passwords: A list of passwords to use when attempting to scan \
password-protected files
max_zip_depth: Number of times to recurse into nested ZIP files
implicit_safe_domains: Always add the ``safe`` category to \
emails from these domains
allow_multiple_authentication_results: Allow multiple \
``Authentication-Results-Original`` headers when checking \
authentication results
use_authentication_results_original: Use the \
``Authentication-Results-Original`` header instead of the \
``Authentication-Results`` header
.. note::
Each ``_rules`` parameter can accept raw rule content, a path to a
rules file, a file-like object, or a ``yara.Rule`` object.
.. tip::
Use the ``include`` directive in the YARA rule files that you
pass to ``MailScanner`` to include rules from other files. That
way, rules can be divided into separate files as you see fit.
.. warning ::
Authentication results are based on the headers of the email sample,
so only trust authentication results on emails that have been
received by trusted mail servers, and not on third-party emails.
.. warning::
Set ``allow_multiple_authentication_results`` to ``True``
**if and only if** the receiving mail service splits the results
of each authentication method in separate ``Authentication-Results``
headers **and always** includes DMARC results.
.. warning::
Set ``use_authentication_results_original`` to ``True``
**if and only if** you use an email security gateway that adds an
``Authentication-Results-Original`` header, such as Proofpoint or
Cisco IronPort. This **does not** include API-based email security
solutions, such as Abnormal Security.
.. note::
``infected`, ``malware``, and the contents of the message body \
are always tried as passwords.
.. note::
Starting in version 2.1.0, the contents of the message body are \
automatically tried as passwords for password-protected ZIP \
attachments.
"""
self._header_rules = header_rules
self._body_rules = body_rules
self._header_body_rules = header_body_rules
self._attachment_rules = attachment_rules
if header_rules:
self._header_rules = _compile_rules(header_rules)
if body_rules:
self._body_rules = _compile_rules(body_rules)
if header_body_rules:
self._header_body_rules = _compile_rules(header_body_rules)
if attachment_rules:
self._attachment_rules = _compile_rules(attachment_rules)
self.passwords = _input_to_str_list(passwords)
self.passwords += ["malware", "infected"]
self.passwords = _deduplicate_list(self.passwords)
self.max_zip_depth = max_zip_depth
self.implicit_safe_domains = _input_to_str_list(
implicit_safe_domains)
allow_multi_auth = allow_multiple_authentication_results
self.allow_multiple_authentication_results = allow_multi_auth
use_og_auth = use_authentication_results_original
self.use_authentication_results_original = use_og_auth
def _scan_pdf_text(self, payload: Union[bytes, BytesIO]) -> List[Dict]:
if isinstance(payload, BytesIO):
payload = payload.read()
if not _is_pdf(payload):
raise ValueError("Payload is not a PDF file")
pdf_markdown = _pdf_to_markdown(payload)
markdown_matches = _match_to_dict(
self._attachment_rules.match(data=pdf_markdown))
for match in markdown_matches:
tags = match["tags"].copy()
tags.append("pdf2text")
match["tags"] = _deduplicate_list(tags)
return markdown_matches
def _scan_zip(self, payload: Union[bytes, BytesIO, str],
filename: str = None, passwords: List[str] = None,
_current_depth: int = 0):
if isinstance(payload, str):
if not path.exists(payload):
raise FileNotFoundError(f"{payload} not found")
with open(payload, "rb") as f:
payload = f.read()
if isinstance(payload, BytesIO):
payload = payload.read()
if isinstance(payload, bytes):
if not _is_zip(payload):
raise ValueError("Payload is not a ZIP file")
payload = BytesIO(payload)
_current_depth += 1
zip_matches = []
with zipfile.ZipFile(payload) as zip_file:
for name in zip_file.namelist():
if passwords is None:
passwords = []
if None not in passwords:
passwords = [None] + passwords
for password in passwords:
if isinstance(password, str):
password = password.encode("utf-8")
member_content = None
matches = []
try:
with zip_file.open(name, pwd=password) as member:
tags = ["zip"]
location = name
if filename:
location = "{}:{}".format(filename, name)
member_content = member.read()
matches = _match_to_dict(
self._attachment_rules.match(
data=member_content))
break
except RuntimeError:
continue
if member_content is None:
logger.warning("Unable to read the contents "
"of the ZIP file")
return zip_matches
for match in matches:
if "location" in match:
existing_location = match["location"]
location = f"{existing_location}:{location}"
match["location"] = location
zip_matches += matches
if _is_pdf(member_content):
try:
zip_matches += self._scan_pdf_text(
member_content)
except Exception as e:
logger.warning(
"Unable to convert PDF to markdown. "
f"{e} Scanning raw file content only"
".")
elif _is_zip(member_content):
max_depth = self.max_zip_depth
if max_depth is None or _current_depth > max_depth:
zip_matches += self._scan_zip(
member_content,
filename=name,
passwords=passwords,
_current_depth=_current_depth)
for match in zip_matches:
match["tags"] = _deduplicate_list(match["tags"] + tags)
return zip_matches
def _scan_attachments(self, attachments: Union[List, Dict],
passwords: List[str] = None) -> List[Dict]:
def add_location(_attachment_matches: List[Dict], _filename: str):
for match in _attachment_matches:
base_location = f"attachment:{_filename}"
if "location" in match:
og_location = match["location"]
match["location"] = f"{base_location}:{og_location}"
else:
match["location"] = base_location
return _attachment_matches
if passwords is None:
passwords = []
passwords = passwords + self.passwords
combined_attachment_matches = []
if isinstance(attachments, dict):
attachments = [attachments]
for attachment in attachments:
filename = attachment["filename"]
file_extension = filename.lower().split(".")[-1]
payload = attachment["payload"]
is_binary = attachment.get('binary', False)
if is_binary:
try:
payload = decode_base64(attachment["payload"])
except binascii.Error:
pass
attachment_matches = _match_to_dict(
self._attachment_rules.match(data=payload))
attachment_matches = add_location(attachment_matches, filename)
combined_attachment_matches += attachment_matches
if is_binary and _is_pdf(payload):
try:
attachment_matches = self._scan_pdf_text(payload)
attachment_matches = add_location(attachment_matches,
filename)
combined_attachment_matches += attachment_matches
except Exception as e:
logger.warning(
f"Unable to convert {filename} to markdown. {e}. "
f"Scanning raw file content only.")
elif is_binary and _is_zip(payload):
try:
attachment_matches += self._scan_zip(
payload,
passwords=passwords,
filename=filename)
attachment_matches = add_location(attachment_matches,
filename)
combined_attachment_matches += attachment_matches
except UserWarning as e:
logger.warning(f"Unable to scan {filename}. {e}.")
elif file_extension in ["eml", "msg"]:
try:
matches = self.scan_email(parse_email(payload))
combined_attachment_matches += matches
except UserWarning as e:
logger.warning(f"Unable to scan {filename}. {e}.")
return combined_attachment_matches
[docs]
def scan_email(self, email: Union[str, IOBase, Dict],
use_raw_headers: bool = False,
use_raw_body: bool = False) -> Dict:
"""
Scans an email using YARA rules
Args:
email: Email file content, a path to an email \
file, a file-like object, or output from \
``mailsuite.utils.parse_email()``
use_raw_headers: Scan headers with indentations included
use_raw_body: Scan the raw email body instead of converting it to \
Markdown first
Returns: A dictionary
The returned dictionary contains the following key-value pairs:
- ``matches`` - A list of YARA match dictionaries
- ``name`` - The name of the rule.
- ``namespace`` - The namespace of the rule.
- ``meta`` - A dictionary of key-value pairs from the meta section.
- ``tags`` - A list of the rule's tags.
- ``strings`` - A list of lists identifying strings or patterns that
match.
0. The location/offset of the identified string
1. The variable name of the string/pattern in the rule
2. The matching string/pattern content
- ``categories`` - A list of categories of YARA matches
- ``msg_from_domain`` - Message From domain details
- ``domain`` - The message From domain
- ``authenticated`` - bool: domain is authenticated
- ``implicit_safe`` - bool: domain is in the implicit_safe_domains
list
- ``has_attachment`` - bool: The email sample has an attachment
- ``warnings`` - A list of warnings. Possible warnings include:
- ``domain-authentication-failed`` - Authentication of the message
From domain failed
- ``from-domain-mismatch`` - The message From domain did not exactly
match the value of the ``meta`` key ``from_domain``
- ``safe-rule-missing-from-domain`` - The rule is missing a
``from_domain`` ``meta`` key that is required for rules with the
``category`` meta key set to ``safe``
- ``unexpected-attachment`` - An email win an attachment matched a
rule with the ``meta`` key ``no attachment`` or ``no_attachments``
set to ``true``
- ``location`` - The part of the email where the match was
found, for example:
- ``header``
- ``body``
- ``header_body``
- ``attachment:filename``
- ``attachment:example.zip:evil.js``
- ``attachment:first.zip:nested.zip:evil.js``
- ``attachment:evil.eml:attachment:example.zip:evil.js``
- ``verdict`` - The verdict of the scan. Possible verdicts include:
- ``None`` - No categories matched
- ``safe`` - The email is considered safe
- ``ambiguous`` - Multiple categories matched
- Any custom ``category`` specified in the ``meta`` section of a
YARA rule
"""
if isinstance(email, str):
if path.exists(email):
with open(email, "rb") as email_file:
email = email_file.read()
if isinstance(email, dict):
parsed_email = email
else:
parsed_email = parse_email(email)
msg_from_domain = None
if "from" in parsed_email:
msg_from_domain = parsed_email["from"]["domain"]
if use_raw_headers:
headers = parsed_email["raw_headers"]
else:
headers = parsed_email["headers_string"]
body = ""
if use_raw_body:
if len(parsed_email["text_plain"]) > 0:
body = "\n\n".join(parsed_email["text_plain"])
if len(parsed_email["text_html"]) > 0:
body = "\n\n".join(parsed_email["text_html"])
else:
body = parsed_email["body_markdown"]
attachments = parsed_email["attachments"]
matches = []
if self._header_rules:
header_matches = _match_to_dict(self._header_rules.match(
data=headers))
for header_match in header_matches:
header_match["location"] = "header"
matches.append(header_match)
if self._body_rules:
body_matches = _match_to_dict(self._body_rules.match(
data=body))
for body_match in body_matches:
body_match["location"] = "body"
matches.append(body_match)
if self._header_body_rules:
header_body_matches = _match_to_dict(
self._header_body_rules.match(data=f"{headers}\n\n{body}"))
for header_body_match in header_body_matches:
header_body_match["location"] = "header_body"
matches.append(header_body_match)
if self._attachment_rules:
passwords = _carve_passwords(parsed_email["body_markdown"])
matches += self._scan_attachments(attachments, passwords=passwords)
verdict = None
multi_auth_headers = self.allow_multiple_authentication_results
use_og_auth_results = self.use_authentication_results_original
implicit_safe_domain = from_trusted_domain(
parsed_email, self.implicit_safe_domains,
allow_multiple_authentication_results=multi_auth_headers,
use_authentication_results_original=use_og_auth_results,
)
authenticated_domain = from_trusted_domain(
parsed_email, [parsed_email["from"]["domain"]],
allow_multiple_authentication_results=multi_auth_headers,
use_authentication_results_original=use_og_auth_results,
)
categories = []
has_attachment = len(attachments) > 0
for match in matches:
auth_optional = False
if "authentication_optional" in match["meta"]:
auth_optional = match["meta"]["authentication_optional"]
if "auth_optional" in match["meta"]:
auth_optional = match["meta"]["auth_optional"]
passed_authentication = authenticated_domain or auth_optional
no_attachment = False
if "no_attachments" in match["meta"]:
no_attachment = match["meta"]["no_attachments"]
if "no_attachment" in match["meta"]:
no_attachment = match["meta"]["no_attachment"]
if no_attachment and has_attachment:
match["warnings"].append("unexpected-attachment")
rule_from_domains = None
if "from_domains" in match["meta"]:
rule_from_domains = match["meta"]["from_domains"]
elif "from_domain" in match["meta"]:
rule_from_domains = match["meta"]["from_domain"]
if rule_from_domains is not None:
rule_from_domains = rule_from_domains.split(" ")
if parsed_email["from"]["domain"] not in rule_from_domains:
match["warnings"].append("from-domain-mismatch")
if not passed_authentication:
match["warnings"].append("domain-authentication-failed")
if "category" in match["meta"]:
if match["meta"]["category"] == "safe":
if rule_from_domains is None:
match["warnings"].append(
"safe-rule-missing-from-domain")
if len(match["warnings"]) == 0:
categories.append(match["meta"]["category"].lower())
if implicit_safe_domain:
categories.append("safe")
categories = _deduplicate_list(categories)
if len(categories) == 1:
verdict = categories[0]
elif len(categories) > 1:
verdict = "ambiguous"
msg_from_domain_results = dict(
domain=msg_from_domain,
authenticated=authenticated_domain,
implicit_safe=implicit_safe_domain)
return dict(matches=matches, categories=categories,
msg_from_domain=msg_from_domain_results,
has_attachment=has_attachment,
verdict=verdict)