420 lines
16 KiB
Python
420 lines
16 KiB
Python
import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys
|
|
from urllib.parse import unquote
|
|
from bs4 import BeautifulSoup
|
|
|
|
if os.environ.get("DEBUG", False):
|
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s')
|
|
else:
|
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s')
|
|
|
|
logger = logging.getLogger()
|
|
|
|
class Timer():
|
|
_start: time = None
|
|
_end: time = None
|
|
took: float
|
|
|
|
def __init__(self):
|
|
self.start()
|
|
|
|
def start(self):
|
|
self._start = time.time()
|
|
|
|
def end(self):
|
|
self._end = time.time()
|
|
self.took = self._end - self._start
|
|
logger.debug(f"Execution took {self.took} seconds")
|
|
|
|
class PrometheusExporter():
|
|
|
|
metrics = []
|
|
|
|
def __init__(self):
|
|
self.metrics = []
|
|
|
|
def append_gauge_metric(self, metric_name, metric, help_text=""):
|
|
if help_text != "":
|
|
self.metrics.append(f"#HELP {metric_name} {help_text}")
|
|
self.metrics.append(f"#TYPE {metric_name} gauge")
|
|
self.metrics.append(f"{metric_name} {metric}")
|
|
|
|
def write(self):
|
|
with open("data/permalaert.prom", "w") as file:
|
|
for metric in self.metrics:
|
|
file.write(f"{metric}\n")
|
|
self.metrics = []
|
|
|
|
|
|
class FetchEmail():
|
|
|
|
metric_success = 0
|
|
|
|
connection = None
|
|
is_close: bool = False
|
|
error = None
|
|
|
|
def __init__(self, mail_server, username, password):
|
|
logger.debug(f"Connection to imap server {mail_server}")
|
|
try:
|
|
self.connection = imaplib.IMAP4_SSL(mail_server)
|
|
self.connection.login(username, password)
|
|
self.metric_success = 1
|
|
except Exception as err:
|
|
logger.error(f"Couldn't connect to imap server. Got: {err}")
|
|
self.metric_success = 0
|
|
|
|
def __del__(self):
|
|
pass
|
|
|
|
def select_mailbox(self, mailbox="Inbox"):
|
|
logger.debug(f"Selecting mailbox '{mailbox}'")
|
|
self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read
|
|
|
|
def close_connection(self):
|
|
"""
|
|
Close the connection to the IMAP server
|
|
"""
|
|
logger.debug(f"Closing IMAP Connection")
|
|
self.connection.close()
|
|
|
|
def get_headers(self, email_msg):
|
|
logger.debug("Checking email headers")
|
|
from email.parser import HeaderParser
|
|
parser = HeaderParser()
|
|
h = parser.parsestr(email_msg.as_string())
|
|
|
|
return h
|
|
|
|
def fetch_messages_content(self, searchTerm = "(UNSEEN)"):
|
|
"""
|
|
Retrieve unread messages
|
|
"""
|
|
logger.debug(f"Using search term '{searchTerm}'")
|
|
(result, messages) = self.connection.search(None, searchTerm)
|
|
body = None
|
|
attachment = []
|
|
|
|
if result == "OK":
|
|
mail_ids = messages[0]
|
|
id_list = mail_ids.split()
|
|
|
|
if mail_ids == b"":
|
|
logger.debug(f"No Email found")
|
|
self.close_connection()
|
|
return None, attachment
|
|
|
|
try:
|
|
#ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])')
|
|
status, data = self.connection.fetch(id_list[-1], '(RFC822)')
|
|
email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1])
|
|
headers = self.get_headers(email_msg)
|
|
# Make sure that email is send from etat.lu mailservers
|
|
if "mail.etat.lu" not in headers.get("Received-SPF"):
|
|
logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}")
|
|
raise RuntimeError("Email headers are wrong, will not proceed")
|
|
|
|
# If message is multi part we only want the text version of the body, this walks the message and gets the body
|
|
if email_msg.is_multipart():
|
|
for part in email_msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
logger.debug("Found HTML File in Email")
|
|
body = part.get_payload(decode=True)
|
|
body = body.decode()
|
|
elif part.get_content_type() == "text/plain":
|
|
logger.debug("Found Text in Email")
|
|
body = part.get_payload(decode=True)
|
|
body = body.decode()
|
|
|
|
if part.get_content_type() == "application/pdf":
|
|
logger.debug("Found PDF File in Email")
|
|
attachment.append({
|
|
"filename": part.get_filename(),
|
|
"type": "application/pdf",
|
|
"content": part.get_payload(decode=True)
|
|
})
|
|
|
|
except Exception as err:
|
|
logger.error(f"Could not retrieve email. Got: {err}", exc_info=True)
|
|
self.close_connection()
|
|
return None, attachment
|
|
|
|
logger.debug(f"Found an email")
|
|
return str(body), attachment
|
|
|
|
def parse_email_address(self, email_address):
|
|
"""
|
|
Helper function to parse out the email address from the message
|
|
return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com')
|
|
"""
|
|
return email.utils.parseaddr(email_address)
|
|
|
|
class Alarmdepesche():
|
|
|
|
id: str
|
|
timestamp: datetime.datetime
|
|
engine: str = ""
|
|
intervention_number: str = None
|
|
intervention_code: str = ""
|
|
intervention_message: str = ""
|
|
intervention_description: str = ""
|
|
|
|
city: str = ""
|
|
info: str = ""
|
|
geo_point: tuple
|
|
|
|
raw: BeautifulSoup
|
|
|
|
metric_api_up = 0
|
|
|
|
def get_string(self, td, clean=True):
|
|
string = td.parent.find_all('td')[-1].string
|
|
if clean:
|
|
string = string.replace("\r\n", "").strip()
|
|
|
|
return string
|
|
|
|
def get_members_emails(self):
|
|
self.metric_api_up = 0
|
|
logger.info("Getting duty from API")
|
|
members = []
|
|
url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}"
|
|
logger.debug(f"Using api url {url}")
|
|
res = requests.get(url)
|
|
if not res.status_code == 200:
|
|
logger.error(f"The Api returned an error, got status code {res.status_code}")
|
|
return members
|
|
|
|
try:
|
|
json_res = res.json()
|
|
logger.debug(f"Got JSON result: {json_res}")
|
|
for member in json_res["vehicles"][0]["contacts"]:
|
|
members.append(member["mail"].lower())
|
|
self.metric_api_up = 1
|
|
|
|
except requests.exceptions.JSONDecodeError as e:
|
|
logger.error(f"Cannot get data from perma api. Got {e}")
|
|
except IndexError as e:
|
|
logger.error(f"The API changed, got an Index Error: {e}")
|
|
|
|
return members
|
|
|
|
def from_html(self, alarmdepesche_html):
|
|
logger.info("Stating parsing alarmdepesche")
|
|
self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser')
|
|
tds = self.raw.find_all('td')
|
|
for td in tds:
|
|
key = str(str(td.string).strip())
|
|
|
|
if "Druckdatum:" in key:
|
|
self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S")
|
|
if "Einsatzstichwort:" in key:
|
|
self.intervention_code = self.get_string(td)
|
|
if "Sachverhalt:" in key:
|
|
self.intervention_message = self.get_string(td)
|
|
if "Alarmiertes Einsatzmittel:" in key:
|
|
engine = self.get_string(td)
|
|
if "," in engine:
|
|
logger.warning(f"Unusual engine format, got {engine} only using last entry")
|
|
engine = engine.split(",")[-1].strip()
|
|
self.engine = engine
|
|
if "Einsatznummer:" in key:
|
|
self.intervention_number = self.get_string(td)
|
|
if "Stadt:" in key:
|
|
self.city = self.get_string(td)
|
|
if "Info:" in key:
|
|
self.info = self.get_string(td)
|
|
if "UTM - Koordinaten:" in key:
|
|
utm_string = self.get_string(td).split(" ")
|
|
self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]),
|
|
utm_string[0][2])
|
|
|
|
logger.info("Alarmepesche parsed successfully")
|
|
logger.debug(self)
|
|
|
|
|
|
def __init__(self):
|
|
logger.debug(f"Object {self} was created")
|
|
|
|
def toText(self):
|
|
logger.debug(f"Returing {self} as Text")
|
|
text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}"
|
|
return text
|
|
|
|
def toHtml(self):
|
|
logger.debug(f"Returning {self} as HTML")
|
|
return self.raw
|
|
|
|
|
|
def toDict(self):
|
|
logger.debug(f"Creating dictionary for {self}")
|
|
return {
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"engine": self.engine,
|
|
"intervention_number": self.intervention_number,
|
|
"intervention_code": self.intervention_code,
|
|
"intervention_message": self.intervention_message,
|
|
"intervention_description": self.intervention_description,
|
|
"intervention_city": self.city,
|
|
"intervention_info": self.info,
|
|
"intervention_coordinates": self.geo_point
|
|
|
|
}
|
|
|
|
def fromDict(self, json_data):
|
|
logger.debug("Creating new object")
|
|
self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"])
|
|
self.engine = json_data["engine"]
|
|
self.intervention_number = json_data["intervention_number"]
|
|
self.intervention_code = json_data["intervention_code"]
|
|
self.intervention_message = json_data["intervention_message"]
|
|
self.intervention_description = json_data["intervention_description"]
|
|
self.city = json_data["intervention_city"]
|
|
self.info = json_data["intervention_info"]
|
|
self.geo_point = json_data["intervention_coordinates"]
|
|
|
|
|
|
def loadFromDisk(self, file_path="data/last_alarmdepesche.json"):
|
|
logger.debug(f"Loading last Alarmdepesche from disk")
|
|
with open(file_path, "r") as file:
|
|
self.fromDict(json.load(file))
|
|
|
|
def saveToDisk(self, file_path="data/last_alarmdepesche.json"):
|
|
logger.debug(f"Writing {self} to disk")
|
|
with open(file_path, "w") as file:
|
|
json.dump(self.toDict(), file, indent=3)
|
|
|
|
def __repr__(self):
|
|
if self.intervention_number:
|
|
return f"<Alarmdepesche {self.intervention_number} (engine='{self.engine}', code='{self.intervention_code}', time='{self.timestamp.isoformat()}')>"
|
|
return "<Alarmdepesche>"
|
|
|
|
class Email():
|
|
|
|
smtp_client : smtplib.SMTP
|
|
|
|
def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password):
|
|
if os.environ.get("SMTP_SERVER", None):
|
|
smtp_server = os.environ.get("SMTP_SERVER")
|
|
if os.environ.get("SMTP_PORT", None):
|
|
smtp_port = os.environ.get("SMTP_PORT")
|
|
|
|
logger.debug(f"Connecting to SMTP Server {smtp_server}")
|
|
try:
|
|
self.smtp_client = smtplib.SMTP(smtp_server, smtp_port)
|
|
self.smtp_client.ehlo()
|
|
if os.environ.get("MAIL_SMTP_STARTTLS", True):
|
|
self.smtp_client.starttls()
|
|
if os.environ.get("SMTP_AUTH", True):
|
|
self.smtp_client.login(smtp_user, smtp_password)
|
|
logger.debug("Connection to SMTP Sever was successfull")
|
|
except BaseException as err:
|
|
logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True)
|
|
|
|
def __del__(self):
|
|
logger.debug("Closing SMTP Connection")
|
|
self.smtp_client.close()
|
|
|
|
|
|
def sendmail(self, alarmdepesche, msg_from, msg_to, attachment=None):
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.application import MIMEApplication
|
|
|
|
logger.debug(f"Preparing email for {msg_to}")
|
|
|
|
# Create message container - the correct MIME type is multipart/alternative.
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}"
|
|
msg['From'] = f"PermaAlert <{msg_from}>"
|
|
msg['To'] = msg_to
|
|
msg['Date'] = alarmdepesche.timestamp.isoformat()
|
|
msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER'))
|
|
|
|
# Record the MIME types of both parts - text/plain and text/html.
|
|
part1 = MIMEText(alarmdepesche.toText(), 'plain')
|
|
part2 = MIMEText(alarmdepesche.toHtml(), 'html')
|
|
|
|
# Attach parts into message container.
|
|
# According to RFC 2046, the last part of a multipart message, in this case
|
|
# the HTML message, is best and preferred.
|
|
msg.attach(part1)
|
|
msg.attach(part2)
|
|
|
|
if attachment:
|
|
part3 = MIMEApplication(attachment[0]["content"], Name=unquote(attachment[0]["filename"]), _subtype="pdf")
|
|
part3['Content-Disposition'] = f'attachment; filename="{unquote(attachment[0]["filename"])}"'
|
|
msg.attach(part3)
|
|
|
|
logger.info(f"Sending email to {msg_to}")
|
|
|
|
# sendmail function takes 3 arguments: sender's address, recipient's address
|
|
# and message to send - here it is sent as one string.
|
|
try:
|
|
self.smtp_client.sendmail(msg_from, msg_to, msg.as_string())
|
|
logger.info(f"Email was send successfully to {msg_to}")
|
|
except Exception as err:
|
|
logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
os.makedirs('data/', exist_ok=True)
|
|
|
|
mail_server = os.environ.get('MAIL_SERVER', None)
|
|
mail_user = os.environ.get('MAIL_USER', None)
|
|
mail_from = os.environ.get('MAIL_FROM', mail_user)
|
|
mail_pass = os.environ.get('MAIL_PASS', None)
|
|
mail_port = os.environ.get('MAIL_PORT', 587)
|
|
|
|
if not mail_server or not mail_user or not mail_pass:
|
|
logger.error("Environment variables not set!, exiting...")
|
|
exit(1)
|
|
|
|
metric_api_up = 1
|
|
|
|
try:
|
|
while True:
|
|
timer_global = Timer()
|
|
prometheus = PrometheusExporter()
|
|
|
|
now = datetime.datetime.now()
|
|
if os.path.exists("data/last_alarmdepesche.json"):
|
|
last_alarmdepsche = Alarmdepesche()
|
|
last_alarmdepsche.loadFromDisk()
|
|
last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds()
|
|
prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds,
|
|
help_text="Last received alarmdepsche time in seconds")
|
|
|
|
fetchmail = FetchEmail(mail_server, mail_user, mail_pass)
|
|
fetchmail.select_mailbox()
|
|
html_message, attachment = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)")
|
|
if html_message:
|
|
current_alarmdepsche = Alarmdepesche()
|
|
current_alarmdepsche.from_html(html_message)
|
|
current_alarmdepsche.saveToDisk()
|
|
metric_api_up = current_alarmdepsche.metric_api_up
|
|
members = current_alarmdepsche.get_members_emails()
|
|
sendmail = Email(mail_server, mail_port, mail_user, mail_pass)
|
|
for email_addr in members:
|
|
sendmail.sendmail(current_alarmdepsche, mail_from, email_addr, attachment)
|
|
else:
|
|
logger.info("No message found, exiting...")
|
|
|
|
timer_global.end()
|
|
prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took,
|
|
help_text="Program execution time in seconds")
|
|
prometheus.append_gauge_metric("permaalert_duty_api_up", metric_api_up,
|
|
help_text="Boolean if api is up")
|
|
prometheus.append_gauge_metric("permaalert_mailserver_up", fetchmail.metric_success,
|
|
help_text="Boolean if mail server is up")
|
|
prometheus.write()
|
|
|
|
del prometheus
|
|
|
|
logger.debug("Sleeping for 20 seconds")
|
|
time.sleep(20)
|
|
except KeyboardInterrupt as err:
|
|
logger.info(f"Stopping Programm: {err}")
|
|
exit()
|