perma-alert/main.py

395 lines
15 KiB
Python

import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys
from bs4 import BeautifulSoup
if os.environ.get("DEBUG", False):
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s')
else:
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s')
logger = logging.getLogger()
class Timer():
_start: time = None
_end: time = None
took: float
def __init__(self):
self.start()
def start(self):
self._start = time.time()
def end(self):
self._end = time.time()
self.took = self._end - self._start
logger.debug(f"Execution took {self.took} seconds")
class PrometheusExporter():
metrics = []
def __init__(self):
self.metrics = []
def append_gauge_metric(self, metric_name, metric, help_text=""):
if help_text != "":
self.metrics.append(f"#HELP {metric_name} {help_text}")
self.metrics.append(f"#TYPE {metric_name} gauge")
self.metrics.append(f"{metric_name} {metric}")
def write(self):
with open("data/permalaert.prom", "w") as file:
for metric in self.metrics:
file.write(f"{metric}\n")
self.metrics = []
class FetchEmail():
metric_success = 0
connection = None
is_close: bool = False
error = None
def __init__(self, mail_server, username, password):
logger.debug(f"Connection to imap server {mail_server}")
try:
self.connection = imaplib.IMAP4_SSL(mail_server)
self.connection.login(username, password)
self.metric_success = 1
except Exception as err:
logger.error(f"Couldn't connect to imap server. Got: {err}")
self.metric_success = 0
def __del__(self):
pass
def select_mailbox(self, mailbox="Inbox"):
logger.debug(f"Selecting mailbox '{mailbox}'")
self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read
def close_connection(self):
"""
Close the connection to the IMAP server
"""
logger.debug(f"Closing IMAP Connection")
self.connection.close()
def get_headers(self, email_msg):
logger.debug("Checking email headers")
from email.parser import HeaderParser
parser = HeaderParser()
h = parser.parsestr(email_msg.as_string())
return h
def fetch_messages_content(self, searchTerm = "(UNSEEN)"):
"""
Retrieve unread messages
"""
logger.debug(f"Using search term '{searchTerm}'")
(result, messages) = self.connection.search(None, searchTerm)
body = None
if result == "OK":
mail_ids = messages[0]
id_list = mail_ids.split()
if mail_ids == b"":
logger.debug(f"No Email found")
self.close_connection()
return None
try:
#ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])')
status, data = self.connection.fetch(id_list[-1], '(RFC822)')
email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1])
headers = self.get_headers(email_msg)
# Make sure that email is send from etat.lu mailservers
if "mail.etat.lu" not in headers.get("Received-SPF"):
logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}")
raise RuntimeError("Email headers are wrong, will not proceed")
# If message is multi part we only want the text version of the body, this walks the message and gets the body
if email_msg.is_multipart():
for part in email_msg.walk():
if part.get_content_type() == "text/html":
body = part.get_payload(decode=True)
body = body.decode()
elif part.get_content_type() == "text/plain":
body = part.get_payload(decode=True)
body = body.decode()
except Exception as err:
logger.error(f"Could not retrieve email. Got: {err}", exc_info=True)
self.close_connection()
return None
logger.debug(f"Found an email")
return str(body)
def parse_email_address(self, email_address):
"""
Helper function to parse out the email address from the message
return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com')
"""
return email.utils.parseaddr(email_address)
class Alarmdepesche():
id: str
timestamp: datetime.datetime
engine: str = ""
intervention_number: str = None
intervention_code: str = ""
intervention_message: str = ""
intervention_description: str = ""
city: str = ""
info: str = ""
geo_point: tuple
raw: BeautifulSoup
metric_api_up = 0
def get_string(self, td, clean=True):
string = td.parent.find_all('td')[-1].string
if clean:
string = string.replace("\r\n", "").strip()
return string
def get_members_emails(self):
self.metric_api_up = 0
logger.info("Getting duty from API")
members = []
url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}"
logger.debug(f"Using api url {url}")
res = requests.get(url)
if not res.status_code == 200:
logger.error(f"The Api returned an error, got status code {res.status_code}")
return members
try:
json_res = res.json()
logger.debug(f"Got JSON result: {json_res}")
for member in json_res["vehicles"][0]["contacts"]:
members.append(member["mail"].lower())
self.metric_api_up = 1
except requests.exceptions.JSONDecodeError as e:
logger.error(f"Cannot get data from perma api. Got {e}")
except IndexError as e:
logger.error(f"The API changed, got an Index Error: {e}")
return members
def from_html(self, alarmdepesche_html):
logger.info("Stating parsing alarmdepesche")
self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser')
tds = self.raw.find_all('td')
for td in tds:
key = str(str(td.string).strip())
if key == "Druckdatum:":
self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S")
if key == "Einsatzstichwort:":
self.intervention_code = self.get_string(td)
if key == "Sachverhalt:":
self.intervention_message = self.get_string(td)
if key == "Alarmiertes Einsatzmittel:":
engine = self.get_string(td)
if "," in engine:
logger.warning(f"Unusual engine format, got {engine} only using last entry")
engine = engine.split(",")[-1].strip()
self.engine = engine
if key == "Einsatznummer:":
self.intervention_number = self.get_string(td)
if key == "Stadt:":
self.city = self.get_string(td)
if key == "Info:":
self.info = self.get_string(td)
if "UTM - Koordinaten:" in key:
utm_string = self.get_string(td).split(" ")
self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]),
utm_string[0][2])
logger.info("Alarmepesche parsed successfully")
logger.debug(self)
def __init__(self):
logger.debug(f"Object {self} was created")
def toText(self):
logger.debug(f"Returing {self} as Text")
text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}"
return text
def toHtml(self):
logger.debug(f"Returning {self} as HTML")
return self.raw
def toDict(self):
logger.debug(f"Creating dictionary for {self}")
return {
"timestamp": self.timestamp.isoformat(),
"engine": self.engine,
"intervention_number": self.intervention_number,
"intervention_code": self.intervention_code,
"intervention_message": self.intervention_message,
"intervention_description": self.intervention_description,
"intervention_city": self.city,
"intervention_info": self.info,
"intervention_coordinates": self.geo_point
}
def fromDict(self, json_data):
logger.debug("Creating new object")
self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"])
self.engine = json_data["engine"]
self.intervention_number = json_data["intervention_number"]
self.intervention_code = json_data["intervention_code"]
self.intervention_message = json_data["intervention_message"]
self.intervention_description = json_data["intervention_description"]
self.city = json_data["intervention_city"]
self.info = json_data["intervention_info"]
self.geo_point = json_data["intervention_coordinates"]
def loadFromDisk(self, file_path="data/last_alarmdepesche.json"):
logger.debug(f"Loading last Alarmdepesche from disk")
with open(file_path, "r") as file:
self.fromDict(json.load(file))
def saveToDisk(self, file_path="data/last_alarmdepesche.json"):
logger.debug(f"Writing {self} to disk")
with open(file_path, "w") as file:
json.dump(self.toDict(), file, indent=3)
def __repr__(self):
if self.intervention_number:
return f"<Alarmdepesche {self.intervention_number} (engine='{self.engine}', code='{self.intervention_code}', time='{self.timestamp.isoformat()}')>"
return "<Alarmdepesche>"
class Email():
smtp_client : smtplib.SMTP
def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password):
logger.debug(f"Connecting to SMTP Server {smtp_server}")
try:
self.smtp_client = smtplib.SMTP(smtp_server, smtp_port)
self.smtp_client.ehlo()
if os.environ.get("MAIL_SMTP_STARTTLS", True):
self.smtp_client.starttls()
self.smtp_client.login(smtp_user, smtp_password)
logger.debug("Connection to SMTP Sever was successfull")
except BaseException as err:
logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True)
def __del__(self):
logger.debug("Closing SMTP Connection")
self.smtp_client.close()
def sendmail(self, alarmdepesche, msg_from, msg_to):
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
logger.debug(f"Preparing email for {msg_to}")
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}"
msg['From'] = f"PermaAlert <{msg_from}>"
msg['To'] = msg_to
msg['Date'] = alarmdepesche.timestamp.isoformat()
msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER'))
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(alarmdepesche.toText(), 'plain')
part2 = MIMEText(alarmdepesche.toHtml(), 'html')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
logger.info(f"Sending email to {msg_to}")
# sendmail function takes 3 arguments: sender's address, recipient's address
# and message to send - here it is sent as one string.
try:
self.smtp_client.sendmail(msg_from, msg_to, msg.as_string())
logger.info(f"Email was send successfully to {msg_to}")
except Exception as err:
logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True)
if __name__ == '__main__':
os.makedirs('data/', exist_ok=True)
mail_server = os.environ.get('MAIL_SERVER', None)
mail_user = os.environ.get('MAIL_USER', None)
mail_from = os.environ.get('MAIL_FROM', mail_user)
mail_pass = os.environ.get('MAIL_PASS', None)
if not mail_server or not mail_user or not mail_pass:
logger.error("Environment variables not set!, exiting...")
exit(1)
metric_api_up = 1
try:
while True:
timer_global = Timer()
prometheus = PrometheusExporter()
now = datetime.datetime.now()
if os.path.exists("data/last_alarmdepesche.json"):
last_alarmdepsche = Alarmdepesche()
last_alarmdepsche.loadFromDisk()
last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds()
prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds,
help_text="Last received alarmdepsche time in seconds")
fetchmail = FetchEmail(mail_server, mail_user, mail_pass)
fetchmail.select_mailbox()
html_message = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)")
if html_message:
current_alarmdepsche = Alarmdepesche()
current_alarmdepsche.from_html(html_message)
current_alarmdepsche.saveToDisk()
metric_api_up = current_alarmdepsche.metric_api_up
members = current_alarmdepsche.get_members_emails()
sendmail = Email(mail_server, 587, mail_user, mail_pass)
for email_addr in members:
sendmail.sendmail(current_alarmdepsche, mail_from, email_addr)
else:
logger.info("Message could not be loaded, exiting...")
timer_global.end()
prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took,
help_text="Program execution time in seconds")
prometheus.append_gauge_metric("permaalert_duty_api_up", metric_api_up,
help_text="Boolean if api is up")
prometheus.append_gauge_metric("permaalert_mailserver_up", fetchmail.metric_success,
help_text="Boolean if mail server is up")
prometheus.write()
del prometheus
logger.debug("Sleeping for 20 seconds")
time.sleep(20)
except KeyboardInterrupt as err:
logger.info(f"Stopping Programm: {err}")
exit()