From aefde5013aa28902ed77f4cd9ddc27ca31c9c8de Mon Sep 17 00:00:00 2001 From: Mike Date: Tue, 27 Dec 2022 21:28:53 +0100 Subject: [PATCH] Initial Commit --- main.py | 374 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 374 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..52ca529 --- /dev/null +++ b/main.py @@ -0,0 +1,374 @@ +import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys +from bs4 import BeautifulSoup + +if os.environ.get("DEBUG", False): + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s') +else: + logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s') + +logger = logging.getLogger() + +class Timer(): + _start: time = None + _end: time = None + took: float + + def __init__(self): + self.start() + + def start(self): + self._start = time.time() + + def end(self): + self._end = time.time() + self.took = self._end - self._start + logger.debug(f"Execution took {self.took} seconds") + +class PrometheusExporter(): + + metrics = [] + + def append_gauge_metric(self, metric_name, metric, help_text=""): + if help_text != "": + self.metrics.append(f"#HELP {metric_name} {help_text}") + self.metrics.append(f"#TYPE {metric_name} gauge") + self.metrics.append(f"{metric_name} {metric}") + + def write(self): + with open("permalaert.prom", "w") as file: + for metric in self.metrics: + file.write(f"{metric}\n") + self.metrics = [] + + +class FetchEmail(): + + connection = None + is_close: bool = False + error = None + + def __init__(self, mail_server, username, password): + logger.debug(f"Connection to imap server {mail_server}") + try: + self.connection = imaplib.IMAP4_SSL(mail_server) + self.connection.login(username, password) + except Exception as err: + logger.error(f"Couldn't connect to imap server. Got: {err}") + + def __del__(self): + pass + + def select_mailbox(self, mailbox="Inbox"): + logger.debug(f"Selecting mailbox '{mailbox}'") + self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read + + def close_connection(self): + """ + Close the connection to the IMAP server + """ + logger.debug(f"Closing IMAP Connection") + self.connection.close() + + def get_headers(self, email_msg): + logger.debug("Checking email headers") + from email.parser import HeaderParser + parser = HeaderParser() + h = parser.parsestr(email_msg.as_string()) + + return h + + def fetch_messages_content(self, searchTerm = "(UNSEEN)"): + """ + Retrieve unread messages + """ + logger.debug(f"Using search term '{searchTerm}'") + (result, messages) = self.connection.search(None, searchTerm) + body = None + + if result == "OK": + mail_ids = messages[0] + id_list = mail_ids.split() + + if mail_ids == b"": + logger.debug(f"No Email found") + self.close_connection() + return None + + try: + #ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])') + status, data = self.connection.fetch(id_list[-1], '(RFC822)') + email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1]) + headers = self.get_headers(email_msg) + # Make sure that email is send from etat.lu mailservers + if "mail.etat.lu" not in headers.get("Received-SPF"): + logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}") + raise RuntimeError("Email headers are wrong, will not proceed") + + # If message is multi part we only want the text version of the body, this walks the message and gets the body + if email_msg.is_multipart(): + for part in email_msg.walk(): + if part.get_content_type() == "text/html": + body = part.get_payload(decode=True) + body = body.decode() + + elif part.get_content_type() == "text/plain": + body = part.get_payload(decode=True) + body = body.decode() + except Exception as err: + logger.error(f"Could not retrieve email. Got: {err}", exc_info=True) + self.close_connection() + return None + + logger.debug(f"Found an email") + return str(body) + + def parse_email_address(self, email_address): + """ + Helper function to parse out the email address from the message + return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com') + """ + return email.utils.parseaddr(email_address) + +class Alarmdepesche(): + + id: str + timestamp: datetime.datetime + engine: str = "" + intervention_number: str = None + intervention_code: str = "" + intervention_message: str = "" + intervention_description: str = "" + + city: str = "" + info: str = "" + geo_point: tuple + + raw: BeautifulSoup + + def get_string(self, td, clean=True): + string = td.parent.find_all('td')[-1].string + if clean: + string = string.replace("\r\n", "").strip() + + return string + + def get_members_emails(self): + logger.info("Getting duty from API") + members = [] + url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}" + logger.debug(f"Using api url {url}") + res = requests.get(url) + if not res.status_code == 200: + logger.error(f"The Api returned an error, got status code {res.status_code}") + return members + + try: + json_res = res.json() + logger.debug(f"Got JSON result: {json_res}") + for member in json_res["vehicles"][0]["contacts"]: + members.append(member["mail"].lower()) + + except requests.exceptions.JSONDecodeError as e: + logger.error(f"Cannot get data from perma api. Got {e}") + except IndexError as e: + logger.error(f"The API changed, got an Index Error: {e}") + + return members + + def from_html(self, alarmdepesche_html): + logger.info("Stating parsing alarmdepesche") + self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser') + tds = self.raw.find_all('td') + for td in tds: + key = str(str(td.string).strip()) + + if key == "Druckdatum:": + self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S") + if key == "Einsatzstichwort:": + self.intervention_code = self.get_string(td) + if key == "Sachverhalt:": + self.intervention_message = self.get_string(td) + if key == "Alarmiertes Einsatzmittel:": + engine = self.get_string(td) + if "," in engine: + logger.warning(f"Unusual engine format, got {engine} only using last entry") + engine = engine.split(",")[-1].strip() + self.engine = engine + if key == "Einsatznummer:": + self.intervention_number = self.get_string(td) + if key == "Stadt:": + self.city = self.get_string(td) + if key == "Info:": + self.info = self.get_string(td) + if key == "UTM - Koordinaten:": + utm_string = self.get_string(td).split(" ") + self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]), + utm_string[0][2]) + + logger.info("Alarmepesche parsed successfully") + logger.debug(self) + + + def __init__(self): + logger.debug(f"Object {self} was created") + + def toText(self): + logger.debug(f"Returing {self} as Text") + text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}" + return text + + def toHtml(self): + logger.debug(f"Returning {self} as HTML") + return self.raw + + + def toDict(self): + logger.debug(f"Creating dictionary for {self}") + return { + "timestamp": self.timestamp.isoformat(), + "engine": self.engine, + "intervention_number": self.intervention_number, + "intervention_code": self.intervention_code, + "intervention_message": self.intervention_message, + "intervention_description": self.intervention_description, + "intervention_city": self.city, + "intervention_info": self.info, + "intervention_coordinates": self.geo_point + + } + + def fromDict(self, json_data): + logger.debug("Creating new object") + self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"]) + self.engine = json_data["engine"] + self.intervention_number = json_data["intervention_number"] + self.intervention_code = json_data["intervention_code"] + self.intervention_message = json_data["intervention_message"] + self.intervention_description = json_data["intervention_description"] + self.city = json_data["intervention_city"] + self.info = json_data["intervention_info"] + self.geo_point = json_data["intervention_coordinates"] + + + def loadFromDisk(self, file_path="last_alarmdepesche.json"): + logger.debug(f"Loading last Alarmdepesche from disk") + with open(file_path, "r") as file: + self.fromDict(json.load(file)) + + def saveToDisk(self, file_path="last_alarmdepesche.json"): + logger.debug(f"Writing {self} to disk") + with open(file_path, "w") as file: + json.dump(self.toDict(), file, indent=3) + + def __repr__(self): + if self.intervention_number: + return f"" + return "" + +class Email(): + + smtp_client : smtplib.SMTP + + def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password): + logger.debug(f"Connecting to SMTP Server {smtp_server}") + try: + self.smtp_client = smtplib.SMTP(smtp_server, smtp_port) + self.smtp_client.ehlo() + if os.environ.get("MAIL_SMTP_STARTTLS", True): + self.smtp_client.starttls() + self.smtp_client.login(smtp_user, smtp_password) + logger.debug("Connection to SMTP Sever was successfull") + except BaseException as err: + logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True) + + def __del__(self): + logger.debug("Closing SMTP Connection") + self.smtp_client.close() + + + def sendmail(self, alarmdepesche, msg_from, msg_to): + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + + logger.debug(f"Preparing email for {msg_to}") + + # Create message container - the correct MIME type is multipart/alternative. + msg = MIMEMultipart('alternative') + msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}" + msg['From'] = f"PermaAlert <{msg_from}>" + msg['To'] = msg_to + msg['Date'] = alarmdepesche.timestamp.isoformat() + msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER')) + + # Record the MIME types of both parts - text/plain and text/html. + part1 = MIMEText(alarmdepesche.toText(), 'plain') + part2 = MIMEText(alarmdepesche.toHtml(), 'html') + + # Attach parts into message container. + # According to RFC 2046, the last part of a multipart message, in this case + # the HTML message, is best and preferred. + msg.attach(part1) + msg.attach(part2) + + logger.info(f"Sending email to {msg_to}") + + # sendmail function takes 3 arguments: sender's address, recipient's address + # and message to send - here it is sent as one string. + try: + self.smtp_client.sendmail(msg_from, msg_to, msg.as_string()) + logger.info(f"Email was send successfully to {msg_to}") + except Exception as err: + logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True) + + + +if __name__ == '__main__': + + mail_server = os.environ.get('MAIL_SERVER', None) + mail_user = os.environ.get('MAIL_USER', None) + mail_from = os.environ.get('MAIL_FROM', mail_user) + mail_pass = os.environ.get('MAIL_PASS', None) + + if not mail_server or not mail_user or not mail_pass: + logger.error("Environment variables not set!, exiting...") + exit(1) + + try: + while True: + timer_global = Timer() + prometheus = PrometheusExporter() + + now = datetime.datetime.now() + if os.path.exists("last_alarmdepesche.json"): + last_alarmdepsche = Alarmdepesche() + last_alarmdepsche.loadFromDisk() + last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds() + prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds, + help_text="Last received alarmdepsche time in seconds") + + fetchmail = FetchEmail(mail_server, mail_user, mail_pass) + fetchmail.select_mailbox() + html_message = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)") + if html_message: + current_alarmdepsche = Alarmdepesche() + current_alarmdepsche.from_html(html_message) + current_alarmdepsche.saveToDisk() + members = current_alarmdepsche.get_members_emails() + sendmail = Email(mail_server, 587, mail_user, mail_pass) + for email_addr in members: + sendmail.sendmail(current_alarmdepsche, mail_from, email_addr) + else: + logger.info("Message could not be loaded, exiting...") + + timer_global.end() + prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took, + help_text="Program execution time in seconds") + prometheus.write() + + del prometheus + + logger.debug("Sleeping for 20 seconds") + time.sleep(20) + except KeyboardInterrupt as err: + logger.info(f"Stopping Programm: {err}") + exit()