这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 100 additions & 34 deletions collector/scripts/watch/convert/as_mbox.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,127 @@
import os
import datetime
import email.utils
from mailbox import mbox
import re
import quopri
import base64
from mailbox import mbox, mboxMessage
from slugify import slugify
from ..utils import guid, file_creation_time, write_to_server_documents, move_source
from ...utils import tokenize
from bs4 import BeautifulSoup
from scripts.watch.utils import (
guid,
file_creation_time,
write_to_server_documents,
move_source,
)
from scripts.utils import tokenize


def get_content(message: mboxMessage) -> str:
content = "None"
# if message.is_multipart():
for part in message.walk():
if part.get_content_type() == "text/plain":
content = part.get_payload(decode=True)
break
elif part.get_content_type() == "text/html":
soup = BeautifulSoup(part.get_payload(decode=True), "html.parser")
content = soup.get_text()

if isinstance(content, bytes):
try:
content = content.decode("utf-8")
except UnicodeDecodeError:
content = content.decode("latin-1")

return content


def parse_subject(subject: str) -> str:
# Check if subject is Quoted-Printable encoded
if subject.startswith("=?") and subject.endswith("?="):
# Extract character set and encoding information
match = re.match(r"=\?(.+)\?(.)\?(.+)\?=", subject)
if match:
charset = match.group(1)
encoding = match.group(2)
encoded_text = match.group(3)
is_quoted_printable = encoding.upper() == "Q"
is_base64 = encoding.upper() == "B"
if is_quoted_printable:
# Decode Quoted-Printable encoded text
subject = quopri.decodestring(encoded_text).decode(charset)
elif is_base64:
# Decode Base64 encoded text
subject = base64.b64decode(encoded_text).decode(charset)

return subject


# Process all mbox-related documents.
def as_mbox(**kwargs):
parent_dir = kwargs.get('directory', 'hotdir')
filename = kwargs.get('filename')
ext = kwargs.get('ext', '.mbox')
remove = kwargs.get('remove_on_complete', False)
parent_dir = kwargs.get("directory", "hotdir")
filename = kwargs.get("filename")
ext = kwargs.get("ext", ".mbox")
remove = kwargs.get("remove_on_complete", False)

if filename is not None:
filename = str(filename)
else:
print("[ERROR]: No filename provided.")
return (False, "No filename provided.")

fullpath = f"{parent_dir}/{filename}{ext}"

print(f"-- Working {fullpath} --")
box = mbox(fullpath)

for message in box:
content = ""
if message.is_multipart():
for part in message.get_payload():
if part.get_content_type() == 'text/plain':
content = part.get_payload()
elif part.get_content_type() == 'text/html':
soup = BeautifulSoup(part.get_payload(), 'html.parser')
content = soup.get_text()
else:
content = message.get_payload()
content = get_content(message)
content = content.strip().replace("\r\n", "\n")

if len(content) == 0:
print("[WARNING]: Mail with no content. Ignored.")
continue

date_tuple = email.utils.parsedate_tz(message['Date'])
date_tuple = email.utils.parsedate_tz(message["Date"])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
local_date = datetime.datetime.fromtimestamp(
email.utils.mktime_tz(date_tuple)
)
date_sent = local_date.strftime("%a, %d %b %Y %H:%M:%S")
else:
date_sent = None

subject = message["Subject"]

if subject is None:
print("[WARNING]: Mail with no subject. But has content.")
subject = "None"
else:
subject = parse_subject(subject)

abs_path = os.path.abspath(
f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}"
)
data = {
'id': guid(),
'url': "file://"+os.path.abspath(f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}"),
'title': message['Subject'],
'docAuthor': message['From'],
'description': f"email {message['From']} to {message['To']}",
'docSource': "mbox file uploaded by the user.",
'published': file_creation_time(fullpath),
'sender': message['From'],
'recipient': message['To'],
'subject': message['Subject'],
'date_sent': date_sent,
'wordCount': len(content),
'pageContent': content,
'token_count_estimate': len(tokenize(content))
"id": guid(),
"url": f"file://{abs_path}",
"title": subject,
"docAuthor": message["From"],
"description": f"email from {message['From']} to {message['To']}",
"docSource": "mbox file uploaded by the user.",
"published": file_creation_time(fullpath),
"sender": message["From"],
"recipient": message["To"],
"subject": subject,
"date_sent": date_sent,
"wordCount": len(content),
"pageContent": content,
"token_count_estimate": len(tokenize(content)),
}

write_to_server_documents(data, f"{slugify(filename)}-{data.get('id')}")

move_source(parent_dir, f"{filename}{ext}", remove=remove)
print(f"[SUCCESS]: {filename}{ext} converted & ready for embedding.\n")
return(True, None)
return (True, None)