4 changed files with 149 additions and 0 deletions
@ -1,2 +1,30 @@
|
||||
# mms2mail |
||||
|
||||
Convert MMSd MMS file to mbox. |
||||
|
||||
## usage |
||||
|
||||
mms2mail [-h] [-d | -f FILE] |
||||
|
||||
optional arguments: |
||||
-h, --help show this help message and exit |
||||
-d, --daemon watch for new mms in the mmsd storage folder |
||||
-f FILE, --file FILE parse a single mms file |
||||
|
||||
## dependancy |
||||
|
||||
- python3 |
||||
- python3-watchdog (apt install python3-watchdog) |
||||
- python-messaging (https://www.github.com/davegermiquet/python-messaging.git) |
||||
- marrow.mailer (pip install marrow.mailer) |
||||
|
||||
## config |
||||
```$HOME/.mms/modemmanager/mms2mail.ini``` |
||||
|
||||
``` |
||||
[mail] |
||||
mailbox = /var/mail/mobian |
||||
account = mobian |
||||
domain = "mobian.lan" |
||||
attach_mms = false |
||||
``` |
||||
|
@ -0,0 +1,103 @@
|
||||
#!/usr/bin/python3 |
||||
import argparse |
||||
import configparser |
||||
import time |
||||
import getpass |
||||
import socket |
||||
from pathlib import Path |
||||
from watchdog.observers import Observer |
||||
from watchdog.events import FileSystemEventHandler |
||||
from messaging.mms.message import MMSMessage |
||||
from marrow.mailer import Mailer, Message |
||||
|
||||
class Watcher: |
||||
DIRECTORY_TO_WATCH = "/home/alex/tmp" |
||||
mms_folder = f"{Path.home()}/.mms/modemmanager" |
||||
|
||||
def __init__(self): |
||||
self.observer = Observer() |
||||
|
||||
def run(self): |
||||
event_handler = Handler() |
||||
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=False) |
||||
self.observer.start() |
||||
try: |
||||
while True: |
||||
time.sleep(5) |
||||
except: |
||||
self.observer.stop() |
||||
print("Exiting") |
||||
|
||||
self.observer.join() |
||||
|
||||
|
||||
class Handler(FileSystemEventHandler): |
||||
|
||||
@staticmethod |
||||
def on_any_event(event): |
||||
if event.is_directory: |
||||
return None |
||||
|
||||
elif event.event_type == 'created': |
||||
if '.status' in event.src_path: |
||||
mms_path = event.src_path[:-7] |
||||
print(f"New MMS found : {event.src_path}.") |
||||
m.convert(mms_path) |
||||
|
||||
class MMS2Mail: |
||||
|
||||
def __init__(self): |
||||
self.config = configparser.ConfigParser() |
||||
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini") |
||||
self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")}) |
||||
self.mailer.start() |
||||
|
||||
def __del__(self): |
||||
self.mailer.stop() |
||||
|
||||
def convert(self, path): |
||||
status = configparser.ConfigParser() |
||||
status.read_file(open(f"{path}.status")) |
||||
if 'downloaded' in status['info']['state'] or 'received' in status['info']['state']: |
||||
print(f"New incomming MMS : converting to Mail ({path})") |
||||
else: |
||||
print(f"New outgoing MMS : doing nothing ({path})") |
||||
return |
||||
mms = MMSMessage.from_file(path) |
||||
|
||||
mms_from, mms_from_type = mms.headers['From'].split('/') |
||||
mms_to, mms_to_type = mms.headers['To'].split('/') |
||||
|
||||
message = Message(author=f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}", to=f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}") |
||||
message.subject = f"MMS from {mms_from}" |
||||
message.date = mms.headers['Date'] |
||||
message.headers = [('X-MMS-From', mms.headers['From']), ('X-MMS-To', mms.headers['To']), ('X-MMS-ID', mms.headers['Message-ID'])] |
||||
message.plain = f"MMS from {mms_from}" |
||||
if self.config.getboolean('mail','attach_mms', fallback=False): |
||||
message.attach(path, None, None, None, False, "mms.bin") |
||||
for data_part in mms.data_parts: |
||||
datacontent=data_part.headers['Content-Type'] |
||||
if datacontent is not None: |
||||
if 'text/plain' in datacontent[0]: |
||||
message.plain = data_part.data |
||||
if 'Name' in datacontent[1]: |
||||
filename = datacontent[1]['Name'] |
||||
message.attach(filename,data_part.data) |
||||
self.mailer.send(message) |
||||
|
||||
if __name__ == '__main__': |
||||
parser = argparse.ArgumentParser() |
||||
mode = parser.add_mutually_exclusive_group() |
||||
mode.add_argument("-d","--daemon", help="watch for new mms in the mmsd storage folder", action="store_true") |
||||
mode.add_argument("-f","--file", nargs=1, help="parse a single mms file") |
||||
args = parser.parse_args() |
||||
|
||||
m = MMS2Mail() |
||||
if args.daemon: |
||||
w = Watcher() |
||||
w.run() |
||||
elif args.file: |
||||
m.convert(args.file[0]) |
||||
else: |
||||
parser.print_help() |
||||
del m |
@ -0,0 +1,5 @@
|
||||
[mail] |
||||
mailbox = /var/mail/mobian |
||||
account = mobian |
||||
domain = "mobian.lan" |
||||
attach_mms = false |
Loading…
Reference in new issue