#!/usr/bin/python3 """An mms to mail converter for mmsd.""" import argparse import configparser import getpass import socket import mimetypes import time import logging from pathlib import Path from messaging.mms.message import MMSMessage import mailbox import email from gi.repository import GLib import dbus import dbus.mainloop.glib log = logging.getLogger(__name__) class MMS2Mail: """ The class handling the conversion between MMS and mail format. MMS support is provided by python-messaging Mail support is provided by marrow.mailer """ def __init__(self, delete=False, force_read=False, force_unlock=False): """ Return class instance. :param delete: delete MMS after conversion :type delete: bool :param force_read: force converting an already read MMS (batch mode) :type force_read: bool :param disable_dbus: Disable sending dbus commands to mmsd (batch mode) :type disable_dbus: bool :param force_unlock: Force mbox unlocking after a few minutes :type force_unlock: bool """ self.delete = delete self.force_read = force_read self.force_unlock = force_unlock self.config = configparser.ConfigParser() self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini") self.attach_mms = self.config.getboolean('mail', 'attach_mms', fallback=False) self.domain = self.config.get('mail', 'domain', fallback=socket.getfqdn()) self.user = self.config.get('mail', 'user', fallback=getpass.getuser()) mbox_file = self.config.get('mail', 'mailbox', fallback=f"/var/mail/{self.user}") self.mailbox = mailbox.mbox(mbox_file) self.dbus = None def set_dbus(self, dbusmmsd): """ Return the DBus SessionBus. :param dbusmmsd: The DBus MMSd abstraction class :type dbusmmsd: DbusMMSd() """ self.dbus = dbusmmsd def check_mms(self, path): """ Check wether the provided file would be converted. :param path: the mms filesystem path :type path: str :return the mms status or None :rtype str """ # Check for mmsd data file if not Path(f"{path}").is_file(): log.error("MMS file not found : aborting") return None # Check for mmsd status file status = configparser.ConfigParser() if not Path(f"{path}.status").is_file(): log.error("MMS status file not found : aborting") return None status.read_file(open(f"{path}.status")) if not (self.force_read or not status.getboolean('info', 'read')): log.error("Already converted MMS : aborting") return None return status['info']['state'] def message_added(self, name, value, member, path, interface): """Trigger conversion on MessageAdded signal.""" if value['Status'] == 'downloaded' or value['Status'] == 'received': log.debug(f"New incoming MMS found ({name.split('/')[-1]})") self.convert(path=value['Attachments'][0][2], dbus_path=name, properties=value) else: log.debug(f"New outgoing MMS found ({name.split('/')[-1]})") def convert(self, path, dbus_path=None, properties=None): """ Convert a provided mms file to a mail stored in a mbox. :param path: the mms filesystem path :type path: str :param dbus_path: the mms dbus path :type dbus_path: str """ # Check if the provided file present status = self.check_mms(path) if not status: log.error("MMS file not convertible.") return # Generate its dbus path, for future operation (mark as read, delete) if not dbus_path: dbus_path = f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}" mms = MMSMessage.from_file(path) message = email.message.EmailMessage() # Generate Mail Headers mms_h_from = mms.headers.get('From', 'unknown/undef') log.debug(f"MMS[From]: {mms_h_from}") if 'not inserted' in mms_h_from: mms_h_from = 'unknown/undef' mms_from, mms_from_type = mms_h_from.split('/') message['From'] = f"{mms_from}@{self.domain}" mms_h_to = mms.headers.get('To', 'unknown/undef') log.debug(f"MMS[To]: {mms_h_to}") if 'not inserted' in mms_h_to: mms_h_to = 'unknown/undef' mms_to, mms_to_type = mms_h_to.split('/') message['To'] = f"{mms_to}@{self.domain}" # Get other recipients from dbus signal # https://github.com/pmarti/python-messaging/issues/49 if properties: cc = "" for r in properties['Recipients']: if mms_to in r: continue log.debug(f'MMS/MAIL CC : {r}') cc += f"{r}@{self.domain}," if cc: cc = cc[:-1] message['CC'] = cc if 'Subject' in mms.headers and mms.headers['Subject']: message['Subject'] = mms.headers['Subject'] else: if status == 'sent' or status == 'draft': message['Subject'] = f"MMS to {mms_to}" else: message['Subject'] = f"MMS from {mms_from}" if 'Date' in mms.headers and mms.headers['Date']: message['Date'] = mms.headers['Date'] # Recopy MMS HEADERS for header in mms.headers: message.add_header(f"X-MMS-{header}", f"{mms.headers[header]}") message.preamble = "This mail is converted from a MMS." body = "" data_id = 1 attachments = [] for data_part in mms.data_parts: datacontent = data_part.headers['Content-Type'] if datacontent is not None: maintype, subtype = datacontent[0].split('/', 1) if 'text/plain' in datacontent[0]: encoding = datacontent[1].get('Charset', 'utf-8') body += data_part.data.decode(encoding) + '\n' continue extension = str(mimetypes.guess_extension(datacontent[0])) filename = datacontent[1].get('Name', str(data_id)) attachments.append([data_part.data, maintype, subtype, filename + extension]) data_id = data_id + 1 if body: message.set_content(body) for a in attachments: message.add_attachment(a[0], maintype=a[1], subtype=a[2], filename=a[3]) # Add MMS binary file, for debugging purpose or reparsing in the future if self.attach_mms: with open(path, 'rb') as fp: message.add_attachment(fp.read(), maintype='application', subtype='octet-stream', filename=path.split('/')[-1]) # Write the mail in case of mbox lock retry for 5 minutes # Ultimately write in an mbox in the home folder end_time = time.time() + (5 * 60) while True: try: # self.mailer.send(message) self.mailbox.lock() self.mailbox.add(mailbox.mboxMessage(message)) self.mailbox.flush() self.mailbox.unlock() break except (mailbox.ExternalClashError, FileExistsError) as e: log.warn(f"Exception Mbox lock : {e}") if time.time() > end_time: if self.force_unlock: log.error("Force removing lock") self.mailbox.unlock() else: fs_mbox_path = f"{Path.home()}/.mms/failsafembox" fs_mbox = mailbox.mbox(fs_mbox_path) log.warning(f"Writing in internal mbox {fs_mbox_path}") try: fs_mbox.unlock() fs_mbox.lock() fs_mbox.add(mailbox.mboxMessage(str(message))) fs_mbox.flush() fs_mbox.unlock() break except (mailbox.ExternalClashError, FileExistsError) as e: log.error(f"Failsafe Mbox error : {e}") log.error(f"MMS cannot be written to any mbox : \ {path.split('/')[-1]}") finally: break else: time.sleep(5) # Ask mmsd to mark message as read and delete it if properties: self.dbus.mark_mms_read(dbus_path) if self.delete: self.dbus.delete_mms(dbus_path) class DbusMMSd(): """Use DBus communication with mmsd.""" def __init__(self, mms2mail=None): """ Return a DBusWatcher instance. :param mms2mail: An mms2mail instance to convert new mms :type mms2mail: mms2mail() """ self.mms2mail = mms2mail dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SessionBus() def set_mms2mail(self, mms2mail): """ Set mms2mail instance handling dbus event. :param mms2mail: An mms2mail instance to convert new mms :type mms2mail: mms2mail() """ self.mms2mail = mms2mail def mark_mms_read(self, dbus_path): """ Ask mmsd to mark the mms as read. :param dbus_path: the mms dbus path :type dbus_path: str """ message = dbus.proxies.Interface(self.bus.get_object('org.ofono.mms', dbus_path), 'org.ofono.mms.Message') log.debug(f"Marking MMS as read {dbus_path}") message.MarkRead() def delete_mms(self, dbus_path): """ Ask mmsd to delete the mms. :param dbus_path: the mms dbus path :type dbus_path: str """ if self.disable_dbus: return None message = dbus.proxies.Interface(self.bus.get_object('org.ofono.mms', dbus_path), 'org.ofono.mms.Message') log.debug(f"Deleting MMS {dbus_path}") message.Delete() def add_signal_receiver(self): """Add a signal receiver to the current bus.""" if self.mms2mail: self.bus.add_signal_receiver(self.mms2mail.message_added, bus_name="org.ofono.mms", signal_name="MessageAdded", member_keyword="member", path_keyword="path", interface_keyword="interface") return True else: return False def run(self): """Run the dbus mainloop.""" mainloop = GLib.MainLoop() log.info("Starting DBus watcher mainloop") try: mainloop.run() except KeyboardInterrupt: log.info("Stopping DBus watcher mainloop") mainloop.quit() def main(): """Run the different functions handling mms and mail.""" parser = argparse.ArgumentParser() mode = parser.add_mutually_exclusive_group() mode.add_argument("-d", "--daemon", help="Use dbus signal from mmsd to trigger conversion", action='store_true', dest='watcher') mode.add_argument("-f", "--file", nargs='+', help="Parse specified mms files and quit", dest='files') parser.add_argument('--delete', action='store_true', dest='delete', help="Ask mmsd to delete the converted MMS") parser.add_argument('--force-read', action='store_true', dest='force_read', help="Force conversion even if MMS \ is marked as read") parser.add_argument('--force-unlock', action='store_true', dest='force_unlock', help="BEWARE COULD LEAD TO \ WHOLE MBOX CORRUPTION \ Force unlocking the mbox \ after a few minutes /!\\") parser.add_argument('-l', '--logging', dest='log_level', default='warning', choices=['critical', 'error', 'warning', 'info', 'debug'], help='Define the logger output level' ) args = parser.parse_args() log.setLevel(args.log_level.upper()) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' formatter = logging.Formatter(log_format) ch.setFormatter(formatter) log.addHandler(ch) d = DbusMMSd() m = MMS2Mail(delete=args.delete, force_read=args.force_read, force_unlock=args.force_unlock) m.set_dbus(d) if args.files: for mms_file in args.files: m.convert(path=mms_file) return elif args.watcher: log.info("Starting mms2mail in daemon mode") d.set_mms2mail(m) d.add_signal_receiver() else: parser.print_help() d.run() if __name__ == '__main__': main()