From b8578a65b26fd8b714be2196952eda0afefc1032 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 5 May 2021 05:49:38 +0200 Subject: [PATCH] [fix] PEP08 --- mms2mail | 203 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 107 insertions(+), 96 deletions(-) diff --git a/mms2mail b/mms2mail index 5a236d3..c7d82ea 100755 --- a/mms2mail +++ b/mms2mail @@ -1,18 +1,27 @@ #!/usr/bin/python3 -""" -An mms to mail converter for mmsd -""" -#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-689586587 +"""An mms to mail converter for mmsd.""" +# upstream bug dirty fix +# https://github.com/marrow/mailer/issues/87#issuecomment-689586587 import sys if sys.version_info[0] == 3 and sys.version_info[1] > 7: sys.modules["cgi.parse_qsl"] = None -#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-713319548 +# upstream bug dirty fix +# https://github.com/marrow/mailer/issues/87#issuecomment-713319548 import base64 if sys.version_info[0] == 3 and sys.version_info[1] > 8: def encodestring(value): + """ + Encode string in base64. + + :param value: the string to encode. + :type value: str + + :rtype str + :return: the base64 encoded string + """ return base64.b64encode(value) base64.encodestring = encodestring -#end bugfix +# end bugfix import argparse import configparser @@ -25,106 +34,111 @@ from pathlib import Path from messaging.mms.message import MMSMessage from marrow.mailer import Mailer, Message -from gi.repository import GObject, GLib +from gi.repository import GLib import dbus import dbus.mainloop.glib from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler + class MMS2Mail: """ - The class handling the conversion between MMS and mail format + The class handling the conversion between MMS and mail format. MMS support is provided by python-messaging Mail support is provided by marrow.mailer """ + def __init__(self): - """ - Constructor loading configuration - """ + """Return class instance.""" self.config = configparser.ConfigParser() self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini") - self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")}) + self.domain = self.config.get('mail', 'domain', + fallback=socket.getfqdn()) + self.user = self.config.get('mail', 'user', fallback=getpass.getuser()) + mbox_file = self.config.get('mail', 'mailbox', + fallback=f"/var/mail/{self.user}") + self.mailer = Mailer({'manager.use': 'immediate', + 'transport.use': 'mbox', + 'transport.file': mbox_file}) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SessionBus() def get_bus(self): """ - Return the DBus SessionBus + Return the DBus SessionBus. :rtype dbus.SessionBus() :return: an active SessionBus """ return self.bus - def mark_mms_read(self,dbus_path): + def mark_mms_read(self, dbus_path): """ - Ask MMSd to mark the mms as read + Ask MMSd to mark the mms as read. :param dbus_path: the mms dbus path - :type dbus_path: str + :type dbus_path: str """ - message = dbus.Interface(self.bus.get_object('org.ofono.mms', dbus_path), - 'org.ofono.mms.Message') + message = dbus.Interface(self.bus.get_object('org.ofono.mms', + dbus_path), + 'org.ofono.mms.Message') message.MarkRead() - def convert(self, path, dbus_path = None): - """ - Convert a provided mms file to a mail stored in a mbox - - :param path: the mms filesystem path - :type path: str - - :param dbus_path: the mms dbus path - :type dbus_path: str + def convert(self, path, dbus_path=None): """ + Convert a provided mms file to a mail stored in a mbox. + :param path: the mms filesystem path + :type path: str + + :param dbus_path: the mms dbus path + :type dbus_path: str + """ # Check if the provided file present if not Path(f"{path}").is_file(): - print(f"MMS file not found : aborting", file=sys.stderr) + print("MMS file not found : aborting", file=sys.stderr) return # Generate its dbus path, for future operation (mark as read, delete) if not dbus_path: - dbus_path=f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}" + dbus_path = f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}" if Path(f"{path}.mail").is_file() and args.watcher: - print(f"Already converted MMS : doing nothing ({path})", file=sys.stderr) + print(f"Already converted MMS : doing nothing ({path})", + file=sys.stderr) return # Check for mmsd status file status = configparser.ConfigParser() if not Path(f"{path}.status").is_file(): - print(f"MMS status file not found : aborting", file=sys.stderr) + print("MMS status file not found : aborting", file=sys.stderr) return status.read_file(open(f"{path}.status")) # Allow only incoming MMS for the time beeing - if status['info']['state'] == 'downloaded' or status['info']['state'] == 'received': - print(f"New incoming MMS : converting to Mail ({path})", file=sys.stderr) + if (status['info']['state'] == 'downloaded' or + status['info']['state'] == 'received'): + print(f"New incoming MMS : converting to Mail ({path})", + file=sys.stderr) else: - print(f"New outgoing MMS : doing nothing ({path})", file=sys.stderr) + print(f"New outgoing MMS : doing nothing ({path})", + file=sys.stderr) return mms = MMSMessage.from_file(path) - + self.mailer.start() message = Message() # Generate Mail Headers - if 'From' in mms.headers and mms.headers['From']: - mms_from, mms_from_type = mms.headers['From'].split('/') - else: - mms_from = "unknown" - mms_from_type = "undef" - message.author = f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}" - if 'To' in mms.headers and mms.headers['To']: - mms_to, mms_to_type = mms.headers['To'].split('/') - else: - mms_to = "unknown" - mms_to_type = "undef" - message.to = f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}" + mms_from, mms_from_type = mms.headers.get('From', + 'unknown/undef').split('/') + message.author = f"{mms_from}@{self.domain}" + mms_from, mms_from_type = mms.headers.get('To', + 'unknown/undef').split('/') + message.to = f"{self.user}@{self.domain}" if 'Subject' in mms.headers and mms.headers['Subject']: message.subject = mms.headers['Subject'] - else: + else: message.subject = f"MMS from {mms_from}" if 'Date' in mms.headers and mms.headers['Date']: @@ -132,50 +146,54 @@ class MMS2Mail: # Recopy MMS HEADERS for header in mms.headers: - message.headers.append((f"X-MMS-{header}", f"{mms.headers[header]}")) - + message.headers.append((f"X-MMS-{header}", + f"{mms.headers[header]}")) + for data_part in mms.data_parts: - datacontent=data_part.headers['Content-Type'] + datacontent = data_part.headers['Content-Type'] if datacontent is not None: if 'text/plain' in datacontent[0]: message.plain = f"{data_part.data} \n" if 'Name' in datacontent[1]: filename = datacontent[1]['Name'] - message.attach(filename,data_part.data) + message.attach(filename, data_part.data) # Ensure a proper body content in the resulting mail if not message.plain: message.plain = " " - # Add MMS binary file, for debugging purpose or reparsing in the future - if self.config.getboolean('mail','attach_mms', fallback=False): + # Add MMS binary file, for debugging purpose or reparsing in the future + if self.config.getboolean('mail', 'attach_mms', fallback=False): message.attach(path, None, None, None, False, "mms.bin") - # Creating an empty file stating the mms as been converted + # Creating an empty file stating the mms as been converted Path(f"{path}.mail").touch() # Write the mail self.mailer.send(message) self.mailer.stop() + class FSWatcher: """ - Use OS filesystem notification to watch for new MMS (DEPRECATED) + Use OS filesystem notification to watch for new MMS (DEPRECATED). Events are send to the FSHandler class """ + # Path to modemmanager storage mms_folder = f"{Path.home()}/.mms/modemmanager" def __init__(self): + """Construct an instance.""" self.observer = Observer() self.patternold = re.compile('[0-9A-F]{40}$') self.pattern = re.compile('[0-9a-f]{36}$') - def is_mmsd_mms_file(self,path): + def is_mmsd_mms_file(self, path): """ - Test if the provided file seems to be a mms file created by mmsd + Test if the provided file seems to be a mms file created by mmsd. :param path: the mms filesystem path - :type path: str + :type path: str :rtype boolean :return: the test result @@ -186,31 +204,24 @@ class FSWatcher: return False def run(self): - """ - Run the watcher mainloop - """ + """Run the watcher mainloop.""" event_handler = FSHandler() self.observer.schedule(event_handler, self.mms_folder, recursive=False) self.observer.start() try: while True: time.sleep(5) - except: + finally: self.observer.stop() - print("Exiting") - - self.observer.join() + self.observer.join() class FSHandler(FileSystemEventHandler): - """ - Handle the FSWatcher event - """ + """Handle the FSWatcher event.""" + @staticmethod def on_any_event(event): - """ - Function triggered on event by the FSWatcher - """ + """Trigger conversion on event by the FSWatcher.""" if event.is_directory: return None elif event.event_type == 'created' or event.event_type == 'modified': @@ -222,55 +233,55 @@ class FSHandler(FileSystemEventHandler): print(f"New MMS found : {event.dest_path}.", file=sys.stderr) m.convert(event.dest_path) + class DbusWatcher(): - """ - Use DBus Signal notification to watch for new MMS - """ - def run(self): - """ - Run the watcher mainloop - """ + """Use DBus Signal notification to watch for new MMS.""" + + def run(self): + """Run the watcher mainloop.""" bus = m.get_bus() bus.add_signal_receiver(self.message_added, - bus_name="org.ofono.mms", - signal_name = "MessageAdded", - member_keyword="member", - path_keyword="path", - interface_keyword="interface") + bus_name="org.ofono.mms", + signal_name="MessageAdded", + member_keyword="member", + path_keyword="path", + interface_keyword="interface") mainloop = GLib.MainLoop() mainloop.run() - def message_added(self,name, value, member, path, interface): - """ - Function triggered on MessageAdded signal - """ + def message_added(self, name, value, member, path, interface): + """Trigger conversion on MessageAdded signal.""" if value['Status'] == 'downloaded' or value['Status'] == 'received': print(f"New incoming MMS found ({name.split('/')[-1]})") - m.convert(value['Attachments'][0][2],name) + m.convert(value['Attachments'][0][2], name) else: print(f"New outgoing MMS found ({name.split('/')[-1]})") + if __name__ == '__main__': parser = argparse.ArgumentParser() mode = parser.add_mutually_exclusive_group() - mode.add_argument("-d","--daemon", help="Use dbus signal from mmsd by default but can also watch mmsd storage folder (useful for mmsd < 1.0)", nargs="?", default="dbus", choices=['dbus','filesystem'], dest='watcher') - mode.add_argument("-f","--file", nargs='+', help="parse specified mms files", dest='files') + mode.add_argument("-d", "--daemon", + help="Use dbus signal from mmsd by default but can also \ + watch mmsd storage folder (useful for mmsd < 1.0)", + nargs="?", default="dbus", + choices=['dbus', 'filesystem'], dest='watcher') + mode.add_argument("-f", "--file", nargs='+', + help="parse specified mms files", dest='files') args = parser.parse_args() m = MMS2Mail() - + if args.files: for mms_file in args.files: m.convert(mms_file) - elif args.watcher == 'dbus': + elif args.watcher == 'dbus': print("Starting mms2mail in daemon mode with dbus watcher") w = DbusWatcher() w.run() - elif args.watcher == 'filesystem': + elif args.watcher == 'filesystem': print("Starting mms2mail in daemon mode with filesystem watcher") w = FSWatcher() w.run() else: parser.print_help() - del m -