import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys from bs4 import BeautifulSoup if os.environ.get("DEBUG", False): logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s') else: logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s') logger = logging.getLogger() class Timer(): _start: time = None _end: time = None took: float def __init__(self): self.start() def start(self): self._start = time.time() def end(self): self._end = time.time() self.took = self._end - self._start logger.debug(f"Execution took {self.took} seconds") class PrometheusExporter(): metrics = [] def __init__(self): self.metrics = [] def append_gauge_metric(self, metric_name, metric, help_text=""): if help_text != "": self.metrics.append(f"#HELP {metric_name} {help_text}") self.metrics.append(f"#TYPE {metric_name} gauge") self.metrics.append(f"{metric_name} {metric}") def write(self): with open("data/permalaert.prom", "w") as file: for metric in self.metrics: file.write(f"{metric}\n") self.metrics = [] class FetchEmail(): metric_success = 0 connection = None is_close: bool = False error = None def __init__(self, mail_server, username, password): logger.debug(f"Connection to imap server {mail_server}") try: self.connection = imaplib.IMAP4_SSL(mail_server) self.connection.login(username, password) self.metric_success = 1 except Exception as err: logger.error(f"Couldn't connect to imap server. Got: {err}") self.metric_success = 0 def __del__(self): pass def select_mailbox(self, mailbox="Inbox"): logger.debug(f"Selecting mailbox '{mailbox}'") self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read def close_connection(self): """ Close the connection to the IMAP server """ logger.debug(f"Closing IMAP Connection") self.connection.close() def get_headers(self, email_msg): logger.debug("Checking email headers") from email.parser import HeaderParser parser = HeaderParser() h = parser.parsestr(email_msg.as_string()) return h def fetch_messages_content(self, searchTerm = "(UNSEEN)"): """ Retrieve unread messages """ logger.debug(f"Using search term '{searchTerm}'") (result, messages) = self.connection.search(None, searchTerm) body = None if result == "OK": mail_ids = messages[0] id_list = mail_ids.split() if mail_ids == b"": logger.debug(f"No Email found") self.close_connection() return None try: #ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])') status, data = self.connection.fetch(id_list[-1], '(RFC822)') email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1]) headers = self.get_headers(email_msg) # Make sure that email is send from etat.lu mailservers if "mail.etat.lu" not in headers.get("Received-SPF"): logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}") raise RuntimeError("Email headers are wrong, will not proceed") # If message is multi part we only want the text version of the body, this walks the message and gets the body if email_msg.is_multipart(): for part in email_msg.walk(): if part.get_content_type() == "text/html": body = part.get_payload(decode=True) body = body.decode() elif part.get_content_type() == "text/plain": body = part.get_payload(decode=True) body = body.decode() except Exception as err: logger.error(f"Could not retrieve email. Got: {err}", exc_info=True) self.close_connection() return None logger.debug(f"Found an email") return str(body) def parse_email_address(self, email_address): """ Helper function to parse out the email address from the message return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com') """ return email.utils.parseaddr(email_address) class Alarmdepesche(): id: str timestamp: datetime.datetime engine: str = "" intervention_number: str = None intervention_code: str = "" intervention_message: str = "" intervention_description: str = "" city: str = "" info: str = "" geo_point: tuple raw: BeautifulSoup metric_api_up = 0 def get_string(self, td, clean=True): string = td.parent.find_all('td')[-1].string if clean: string = string.replace("\r\n", "").strip() return string def get_members_emails(self): self.metric_api_up = 0 logger.info("Getting duty from API") members = [] url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}" logger.debug(f"Using api url {url}") res = requests.get(url) if not res.status_code == 200: logger.error(f"The Api returned an error, got status code {res.status_code}") return members try: json_res = res.json() logger.debug(f"Got JSON result: {json_res}") for member in json_res["vehicles"][0]["contacts"]: members.append(member["mail"].lower()) self.metric_api_up = 1 except requests.exceptions.JSONDecodeError as e: logger.error(f"Cannot get data from perma api. Got {e}") except IndexError as e: logger.error(f"The API changed, got an Index Error: {e}") return members def from_html(self, alarmdepesche_html): logger.info("Stating parsing alarmdepesche") self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser') tds = self.raw.find_all('td') for td in tds: key = str(str(td.string).strip()) if key == "Druckdatum:": self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S") if key == "Einsatzstichwort:": self.intervention_code = self.get_string(td) if key == "Sachverhalt:": self.intervention_message = self.get_string(td) if key == "Alarmiertes Einsatzmittel:": engine = self.get_string(td) if "," in engine: logger.warning(f"Unusual engine format, got {engine} only using last entry") engine = engine.split(",")[-1].strip() self.engine = engine if key == "Einsatznummer:": self.intervention_number = self.get_string(td) if key == "Stadt:": self.city = self.get_string(td) if key == "Info:": self.info = self.get_string(td) if key == "UTM - Koordinaten:": utm_string = self.get_string(td).split(" ") self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]), utm_string[0][2]) logger.info("Alarmepesche parsed successfully") logger.debug(self) def __init__(self): logger.debug(f"Object {self} was created") def toText(self): logger.debug(f"Returing {self} as Text") text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}" return text def toHtml(self): logger.debug(f"Returning {self} as HTML") return self.raw def toDict(self): logger.debug(f"Creating dictionary for {self}") return { "timestamp": self.timestamp.isoformat(), "engine": self.engine, "intervention_number": self.intervention_number, "intervention_code": self.intervention_code, "intervention_message": self.intervention_message, "intervention_description": self.intervention_description, "intervention_city": self.city, "intervention_info": self.info, "intervention_coordinates": self.geo_point } def fromDict(self, json_data): logger.debug("Creating new object") self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"]) self.engine = json_data["engine"] self.intervention_number = json_data["intervention_number"] self.intervention_code = json_data["intervention_code"] self.intervention_message = json_data["intervention_message"] self.intervention_description = json_data["intervention_description"] self.city = json_data["intervention_city"] self.info = json_data["intervention_info"] self.geo_point = json_data["intervention_coordinates"] def loadFromDisk(self, file_path="data/last_alarmdepesche.json"): logger.debug(f"Loading last Alarmdepesche from disk") with open(file_path, "r") as file: self.fromDict(json.load(file)) def saveToDisk(self, file_path="data/last_alarmdepesche.json"): logger.debug(f"Writing {self} to disk") with open(file_path, "w") as file: json.dump(self.toDict(), file, indent=3) def __repr__(self): if self.intervention_number: return f"" return "" class Email(): smtp_client : smtplib.SMTP def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password): logger.debug(f"Connecting to SMTP Server {smtp_server}") try: self.smtp_client = smtplib.SMTP(smtp_server, smtp_port) self.smtp_client.ehlo() if os.environ.get("MAIL_SMTP_STARTTLS", True): self.smtp_client.starttls() self.smtp_client.login(smtp_user, smtp_password) logger.debug("Connection to SMTP Sever was successfull") except BaseException as err: logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True) def __del__(self): logger.debug("Closing SMTP Connection") self.smtp_client.close() def sendmail(self, alarmdepesche, msg_from, msg_to): from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText logger.debug(f"Preparing email for {msg_to}") # Create message container - the correct MIME type is multipart/alternative. msg = MIMEMultipart('alternative') msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}" msg['From'] = f"PermaAlert <{msg_from}>" msg['To'] = msg_to msg['Date'] = alarmdepesche.timestamp.isoformat() msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER')) # Record the MIME types of both parts - text/plain and text/html. part1 = MIMEText(alarmdepesche.toText(), 'plain') part2 = MIMEText(alarmdepesche.toHtml(), 'html') # Attach parts into message container. # According to RFC 2046, the last part of a multipart message, in this case # the HTML message, is best and preferred. msg.attach(part1) msg.attach(part2) logger.info(f"Sending email to {msg_to}") # sendmail function takes 3 arguments: sender's address, recipient's address # and message to send - here it is sent as one string. try: self.smtp_client.sendmail(msg_from, msg_to, msg.as_string()) logger.info(f"Email was send successfully to {msg_to}") except Exception as err: logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True) if __name__ == '__main__': os.makedirs('data/', exist_ok=True) mail_server = os.environ.get('MAIL_SERVER', None) mail_user = os.environ.get('MAIL_USER', None) mail_from = os.environ.get('MAIL_FROM', mail_user) mail_pass = os.environ.get('MAIL_PASS', None) if not mail_server or not mail_user or not mail_pass: logger.error("Environment variables not set!, exiting...") exit(1) metric_api_up = 1 try: while True: timer_global = Timer() prometheus = PrometheusExporter() now = datetime.datetime.now() if os.path.exists("data/last_alarmdepesche.json"): last_alarmdepsche = Alarmdepesche() last_alarmdepsche.loadFromDisk() last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds() prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds, help_text="Last received alarmdepsche time in seconds") fetchmail = FetchEmail(mail_server, mail_user, mail_pass) fetchmail.select_mailbox() html_message = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)") if html_message: current_alarmdepsche = Alarmdepesche() current_alarmdepsche.from_html(html_message) current_alarmdepsche.saveToDisk() metric_api_up = current_alarmdepsche.metric_api_up members = current_alarmdepsche.get_members_emails() sendmail = Email(mail_server, 587, mail_user, mail_pass) for email_addr in members: sendmail.sendmail(current_alarmdepsche, mail_from, email_addr) else: logger.info("Message could not be loaded, exiting...") timer_global.end() prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took, help_text="Program execution time in seconds") prometheus.append_gauge_metric("permaalert_duty_api_up", metric_api_up, help_text="Boolean if api is up") prometheus.append_gauge_metric("permaalert_mailserver_up", fetchmail.metric_success, help_text="Boolean if mail server is up") prometheus.write() del prometheus logger.debug("Sleeping for 20 seconds") time.sleep(20) except KeyboardInterrupt as err: logger.info(f"Stopping Programm: {err}") exit()