Initial Commit
This commit is contained in:
commit
aefde5013a
374
main.py
Normal file
374
main.py
Normal file
@ -0,0 +1,374 @@
|
||||
import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
if os.environ.get("DEBUG", False):
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s')
|
||||
else:
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s')
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
class Timer():
|
||||
_start: time = None
|
||||
_end: time = None
|
||||
took: float
|
||||
|
||||
def __init__(self):
|
||||
self.start()
|
||||
|
||||
def start(self):
|
||||
self._start = time.time()
|
||||
|
||||
def end(self):
|
||||
self._end = time.time()
|
||||
self.took = self._end - self._start
|
||||
logger.debug(f"Execution took {self.took} seconds")
|
||||
|
||||
class PrometheusExporter():
|
||||
|
||||
metrics = []
|
||||
|
||||
def append_gauge_metric(self, metric_name, metric, help_text=""):
|
||||
if help_text != "":
|
||||
self.metrics.append(f"#HELP {metric_name} {help_text}")
|
||||
self.metrics.append(f"#TYPE {metric_name} gauge")
|
||||
self.metrics.append(f"{metric_name} {metric}")
|
||||
|
||||
def write(self):
|
||||
with open("permalaert.prom", "w") as file:
|
||||
for metric in self.metrics:
|
||||
file.write(f"{metric}\n")
|
||||
self.metrics = []
|
||||
|
||||
|
||||
class FetchEmail():
|
||||
|
||||
connection = None
|
||||
is_close: bool = False
|
||||
error = None
|
||||
|
||||
def __init__(self, mail_server, username, password):
|
||||
logger.debug(f"Connection to imap server {mail_server}")
|
||||
try:
|
||||
self.connection = imaplib.IMAP4_SSL(mail_server)
|
||||
self.connection.login(username, password)
|
||||
except Exception as err:
|
||||
logger.error(f"Couldn't connect to imap server. Got: {err}")
|
||||
|
||||
def __del__(self):
|
||||
pass
|
||||
|
||||
def select_mailbox(self, mailbox="Inbox"):
|
||||
logger.debug(f"Selecting mailbox '{mailbox}'")
|
||||
self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read
|
||||
|
||||
def close_connection(self):
|
||||
"""
|
||||
Close the connection to the IMAP server
|
||||
"""
|
||||
logger.debug(f"Closing IMAP Connection")
|
||||
self.connection.close()
|
||||
|
||||
def get_headers(self, email_msg):
|
||||
logger.debug("Checking email headers")
|
||||
from email.parser import HeaderParser
|
||||
parser = HeaderParser()
|
||||
h = parser.parsestr(email_msg.as_string())
|
||||
|
||||
return h
|
||||
|
||||
def fetch_messages_content(self, searchTerm = "(UNSEEN)"):
|
||||
"""
|
||||
Retrieve unread messages
|
||||
"""
|
||||
logger.debug(f"Using search term '{searchTerm}'")
|
||||
(result, messages) = self.connection.search(None, searchTerm)
|
||||
body = None
|
||||
|
||||
if result == "OK":
|
||||
mail_ids = messages[0]
|
||||
id_list = mail_ids.split()
|
||||
|
||||
if mail_ids == b"":
|
||||
logger.debug(f"No Email found")
|
||||
self.close_connection()
|
||||
return None
|
||||
|
||||
try:
|
||||
#ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])')
|
||||
status, data = self.connection.fetch(id_list[-1], '(RFC822)')
|
||||
email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1])
|
||||
headers = self.get_headers(email_msg)
|
||||
# Make sure that email is send from etat.lu mailservers
|
||||
if "mail.etat.lu" not in headers.get("Received-SPF"):
|
||||
logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}")
|
||||
raise RuntimeError("Email headers are wrong, will not proceed")
|
||||
|
||||
# If message is multi part we only want the text version of the body, this walks the message and gets the body
|
||||
if email_msg.is_multipart():
|
||||
for part in email_msg.walk():
|
||||
if part.get_content_type() == "text/html":
|
||||
body = part.get_payload(decode=True)
|
||||
body = body.decode()
|
||||
|
||||
elif part.get_content_type() == "text/plain":
|
||||
body = part.get_payload(decode=True)
|
||||
body = body.decode()
|
||||
except Exception as err:
|
||||
logger.error(f"Could not retrieve email. Got: {err}", exc_info=True)
|
||||
self.close_connection()
|
||||
return None
|
||||
|
||||
logger.debug(f"Found an email")
|
||||
return str(body)
|
||||
|
||||
def parse_email_address(self, email_address):
|
||||
"""
|
||||
Helper function to parse out the email address from the message
|
||||
return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com')
|
||||
"""
|
||||
return email.utils.parseaddr(email_address)
|
||||
|
||||
class Alarmdepesche():
|
||||
|
||||
id: str
|
||||
timestamp: datetime.datetime
|
||||
engine: str = ""
|
||||
intervention_number: str = None
|
||||
intervention_code: str = ""
|
||||
intervention_message: str = ""
|
||||
intervention_description: str = ""
|
||||
|
||||
city: str = ""
|
||||
info: str = ""
|
||||
geo_point: tuple
|
||||
|
||||
raw: BeautifulSoup
|
||||
|
||||
def get_string(self, td, clean=True):
|
||||
string = td.parent.find_all('td')[-1].string
|
||||
if clean:
|
||||
string = string.replace("\r\n", "").strip()
|
||||
|
||||
return string
|
||||
|
||||
def get_members_emails(self):
|
||||
logger.info("Getting duty from API")
|
||||
members = []
|
||||
url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}"
|
||||
logger.debug(f"Using api url {url}")
|
||||
res = requests.get(url)
|
||||
if not res.status_code == 200:
|
||||
logger.error(f"The Api returned an error, got status code {res.status_code}")
|
||||
return members
|
||||
|
||||
try:
|
||||
json_res = res.json()
|
||||
logger.debug(f"Got JSON result: {json_res}")
|
||||
for member in json_res["vehicles"][0]["contacts"]:
|
||||
members.append(member["mail"].lower())
|
||||
|
||||
except requests.exceptions.JSONDecodeError as e:
|
||||
logger.error(f"Cannot get data from perma api. Got {e}")
|
||||
except IndexError as e:
|
||||
logger.error(f"The API changed, got an Index Error: {e}")
|
||||
|
||||
return members
|
||||
|
||||
def from_html(self, alarmdepesche_html):
|
||||
logger.info("Stating parsing alarmdepesche")
|
||||
self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser')
|
||||
tds = self.raw.find_all('td')
|
||||
for td in tds:
|
||||
key = str(str(td.string).strip())
|
||||
|
||||
if key == "Druckdatum:":
|
||||
self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S")
|
||||
if key == "Einsatzstichwort:":
|
||||
self.intervention_code = self.get_string(td)
|
||||
if key == "Sachverhalt:":
|
||||
self.intervention_message = self.get_string(td)
|
||||
if key == "Alarmiertes Einsatzmittel:":
|
||||
engine = self.get_string(td)
|
||||
if "," in engine:
|
||||
logger.warning(f"Unusual engine format, got {engine} only using last entry")
|
||||
engine = engine.split(",")[-1].strip()
|
||||
self.engine = engine
|
||||
if key == "Einsatznummer:":
|
||||
self.intervention_number = self.get_string(td)
|
||||
if key == "Stadt:":
|
||||
self.city = self.get_string(td)
|
||||
if key == "Info:":
|
||||
self.info = self.get_string(td)
|
||||
if key == "UTM - Koordinaten:":
|
||||
utm_string = self.get_string(td).split(" ")
|
||||
self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]),
|
||||
utm_string[0][2])
|
||||
|
||||
logger.info("Alarmepesche parsed successfully")
|
||||
logger.debug(self)
|
||||
|
||||
|
||||
def __init__(self):
|
||||
logger.debug(f"Object {self} was created")
|
||||
|
||||
def toText(self):
|
||||
logger.debug(f"Returing {self} as Text")
|
||||
text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}"
|
||||
return text
|
||||
|
||||
def toHtml(self):
|
||||
logger.debug(f"Returning {self} as HTML")
|
||||
return self.raw
|
||||
|
||||
|
||||
def toDict(self):
|
||||
logger.debug(f"Creating dictionary for {self}")
|
||||
return {
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"engine": self.engine,
|
||||
"intervention_number": self.intervention_number,
|
||||
"intervention_code": self.intervention_code,
|
||||
"intervention_message": self.intervention_message,
|
||||
"intervention_description": self.intervention_description,
|
||||
"intervention_city": self.city,
|
||||
"intervention_info": self.info,
|
||||
"intervention_coordinates": self.geo_point
|
||||
|
||||
}
|
||||
|
||||
def fromDict(self, json_data):
|
||||
logger.debug("Creating new object")
|
||||
self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"])
|
||||
self.engine = json_data["engine"]
|
||||
self.intervention_number = json_data["intervention_number"]
|
||||
self.intervention_code = json_data["intervention_code"]
|
||||
self.intervention_message = json_data["intervention_message"]
|
||||
self.intervention_description = json_data["intervention_description"]
|
||||
self.city = json_data["intervention_city"]
|
||||
self.info = json_data["intervention_info"]
|
||||
self.geo_point = json_data["intervention_coordinates"]
|
||||
|
||||
|
||||
def loadFromDisk(self, file_path="last_alarmdepesche.json"):
|
||||
logger.debug(f"Loading last Alarmdepesche from disk")
|
||||
with open(file_path, "r") as file:
|
||||
self.fromDict(json.load(file))
|
||||
|
||||
def saveToDisk(self, file_path="last_alarmdepesche.json"):
|
||||
logger.debug(f"Writing {self} to disk")
|
||||
with open(file_path, "w") as file:
|
||||
json.dump(self.toDict(), file, indent=3)
|
||||
|
||||
def __repr__(self):
|
||||
if self.intervention_number:
|
||||
return f"<Alarmdepesche {self.intervention_number} (engine='{self.engine}', code='{self.intervention_code}', time='{self.timestamp.isoformat()}')>"
|
||||
return "<Alarmdepesche>"
|
||||
|
||||
class Email():
|
||||
|
||||
smtp_client : smtplib.SMTP
|
||||
|
||||
def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password):
|
||||
logger.debug(f"Connecting to SMTP Server {smtp_server}")
|
||||
try:
|
||||
self.smtp_client = smtplib.SMTP(smtp_server, smtp_port)
|
||||
self.smtp_client.ehlo()
|
||||
if os.environ.get("MAIL_SMTP_STARTTLS", True):
|
||||
self.smtp_client.starttls()
|
||||
self.smtp_client.login(smtp_user, smtp_password)
|
||||
logger.debug("Connection to SMTP Sever was successfull")
|
||||
except BaseException as err:
|
||||
logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True)
|
||||
|
||||
def __del__(self):
|
||||
logger.debug("Closing SMTP Connection")
|
||||
self.smtp_client.close()
|
||||
|
||||
|
||||
def sendmail(self, alarmdepesche, msg_from, msg_to):
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
logger.debug(f"Preparing email for {msg_to}")
|
||||
|
||||
# Create message container - the correct MIME type is multipart/alternative.
|
||||
msg = MIMEMultipart('alternative')
|
||||
msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}"
|
||||
msg['From'] = f"PermaAlert <{msg_from}>"
|
||||
msg['To'] = msg_to
|
||||
msg['Date'] = alarmdepesche.timestamp.isoformat()
|
||||
msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER'))
|
||||
|
||||
# Record the MIME types of both parts - text/plain and text/html.
|
||||
part1 = MIMEText(alarmdepesche.toText(), 'plain')
|
||||
part2 = MIMEText(alarmdepesche.toHtml(), 'html')
|
||||
|
||||
# Attach parts into message container.
|
||||
# According to RFC 2046, the last part of a multipart message, in this case
|
||||
# the HTML message, is best and preferred.
|
||||
msg.attach(part1)
|
||||
msg.attach(part2)
|
||||
|
||||
logger.info(f"Sending email to {msg_to}")
|
||||
|
||||
# sendmail function takes 3 arguments: sender's address, recipient's address
|
||||
# and message to send - here it is sent as one string.
|
||||
try:
|
||||
self.smtp_client.sendmail(msg_from, msg_to, msg.as_string())
|
||||
logger.info(f"Email was send successfully to {msg_to}")
|
||||
except Exception as err:
|
||||
logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
mail_server = os.environ.get('MAIL_SERVER', None)
|
||||
mail_user = os.environ.get('MAIL_USER', None)
|
||||
mail_from = os.environ.get('MAIL_FROM', mail_user)
|
||||
mail_pass = os.environ.get('MAIL_PASS', None)
|
||||
|
||||
if not mail_server or not mail_user or not mail_pass:
|
||||
logger.error("Environment variables not set!, exiting...")
|
||||
exit(1)
|
||||
|
||||
try:
|
||||
while True:
|
||||
timer_global = Timer()
|
||||
prometheus = PrometheusExporter()
|
||||
|
||||
now = datetime.datetime.now()
|
||||
if os.path.exists("last_alarmdepesche.json"):
|
||||
last_alarmdepsche = Alarmdepesche()
|
||||
last_alarmdepsche.loadFromDisk()
|
||||
last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds()
|
||||
prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds,
|
||||
help_text="Last received alarmdepsche time in seconds")
|
||||
|
||||
fetchmail = FetchEmail(mail_server, mail_user, mail_pass)
|
||||
fetchmail.select_mailbox()
|
||||
html_message = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)")
|
||||
if html_message:
|
||||
current_alarmdepsche = Alarmdepesche()
|
||||
current_alarmdepsche.from_html(html_message)
|
||||
current_alarmdepsche.saveToDisk()
|
||||
members = current_alarmdepsche.get_members_emails()
|
||||
sendmail = Email(mail_server, 587, mail_user, mail_pass)
|
||||
for email_addr in members:
|
||||
sendmail.sendmail(current_alarmdepsche, mail_from, email_addr)
|
||||
else:
|
||||
logger.info("Message could not be loaded, exiting...")
|
||||
|
||||
timer_global.end()
|
||||
prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took,
|
||||
help_text="Program execution time in seconds")
|
||||
prometheus.write()
|
||||
|
||||
del prometheus
|
||||
|
||||
logger.debug("Sleeping for 20 seconds")
|
||||
time.sleep(20)
|
||||
except KeyboardInterrupt as err:
|
||||
logger.info(f"Stopping Programm: {err}")
|
||||
exit()
|
Loading…
x
Reference in New Issue
Block a user