Compare commits

...

No commits in common. "master" and "main" have entirely different histories.
master ... main

5 changed files with 11 additions and 439 deletions

View File

@ -1,14 +0,0 @@
FROM python:3
ENV TZ=Europe/Luxembourg
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN mkdir /app
COPY requirements.txt /app
COPY main.py /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "main.py"]

9
LICENSE Normal file
View File

@ -0,0 +1,9 @@
MIT License
Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,4 +1,3 @@
# Perma Alert
System to forward Alarmdepesche from CGDIS
# perma-alert
System to forward Alarmdepesche from CGDIS

419
main.py
View File

@ -1,419 +0,0 @@
import datetime, imaplib, email, time, utm, requests, json, os, smtplib, logging, sys
from urllib.parse import unquote
from bs4 import BeautifulSoup
if os.environ.get("DEBUG", False):
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(levelname)s]: %(message)s')
else:
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s [%(levelname)s]: %(message)s')
logger = logging.getLogger()
class Timer():
_start: time = None
_end: time = None
took: float
def __init__(self):
self.start()
def start(self):
self._start = time.time()
def end(self):
self._end = time.time()
self.took = self._end - self._start
logger.debug(f"Execution took {self.took} seconds")
class PrometheusExporter():
metrics = []
def __init__(self):
self.metrics = []
def append_gauge_metric(self, metric_name, metric, help_text=""):
if help_text != "":
self.metrics.append(f"#HELP {metric_name} {help_text}")
self.metrics.append(f"#TYPE {metric_name} gauge")
self.metrics.append(f"{metric_name} {metric}")
def write(self):
with open("data/permalaert.prom", "w") as file:
for metric in self.metrics:
file.write(f"{metric}\n")
self.metrics = []
class FetchEmail():
metric_success = 0
connection = None
is_close: bool = False
error = None
def __init__(self, mail_server, username, password):
logger.debug(f"Connection to imap server {mail_server}")
try:
self.connection = imaplib.IMAP4_SSL(mail_server)
self.connection.login(username, password)
self.metric_success = 1
except Exception as err:
logger.error(f"Couldn't connect to imap server. Got: {err}")
self.metric_success = 0
def __del__(self):
pass
def select_mailbox(self, mailbox="Inbox"):
logger.debug(f"Selecting mailbox '{mailbox}'")
self.connection.select(mailbox=mailbox, readonly=False) # so we can mark mails as read
def close_connection(self):
"""
Close the connection to the IMAP server
"""
logger.debug(f"Closing IMAP Connection")
self.connection.close()
def get_headers(self, email_msg):
logger.debug("Checking email headers")
from email.parser import HeaderParser
parser = HeaderParser()
h = parser.parsestr(email_msg.as_string())
return h
def fetch_messages_content(self, searchTerm = "(UNSEEN)"):
"""
Retrieve unread messages
"""
logger.debug(f"Using search term '{searchTerm}'")
(result, messages) = self.connection.search(None, searchTerm)
body = None
attachment = []
if result == "OK":
mail_ids = messages[0]
id_list = mail_ids.split()
if mail_ids == b"":
logger.debug(f"No Email found")
self.close_connection()
return None, attachment
try:
#ret, data = self.connection.fetch(id_list[-1], '(UID BODY[TEXT])')
status, data = self.connection.fetch(id_list[-1], '(RFC822)')
email_msg = email.message_from_bytes(data[0][1]) # email.message_from_string(data[0][1])
headers = self.get_headers(email_msg)
# Make sure that email is send from etat.lu mailservers
if "mail.etat.lu" not in headers.get("Received-SPF"):
logger.warning(f"Strange Email Header: {headers.get('Received-SPF')}")
raise RuntimeError("Email headers are wrong, will not proceed")
# If message is multi part we only want the text version of the body, this walks the message and gets the body
if email_msg.is_multipart():
for part in email_msg.walk():
if part.get_content_type() == "text/html":
logger.debug("Found HTML File in Email")
body = part.get_payload(decode=True)
body = body.decode()
elif part.get_content_type() == "text/plain":
logger.debug("Found Text in Email")
body = part.get_payload(decode=True)
body = body.decode()
if part.get_content_type() == "application/pdf":
logger.debug("Found PDF File in Email")
attachment.append({
"filename": part.get_filename(),
"type": "application/pdf",
"content": part.get_payload(decode=True)
})
except Exception as err:
logger.error(f"Could not retrieve email. Got: {err}", exc_info=True)
self.close_connection()
return None, attachment
logger.debug(f"Found an email")
return str(body), attachment
def parse_email_address(self, email_address):
"""
Helper function to parse out the email address from the message
return: tuple (name, address). Eg. ('John Doe', 'jdoe@example.com')
"""
return email.utils.parseaddr(email_address)
class Alarmdepesche():
id: str
timestamp: datetime.datetime
engine: str = ""
intervention_number: str = None
intervention_code: str = ""
intervention_message: str = ""
intervention_description: str = ""
city: str = ""
info: str = ""
geo_point: tuple
raw: BeautifulSoup
metric_api_up = 0
def get_string(self, td, clean=True):
string = td.parent.find_all('td')[-1].string
if clean:
string = string.replace("\r\n", "").strip()
return string
def get_members_emails(self):
self.metric_api_up = 0
logger.info("Getting duty from API")
members = []
url = os.environ.get("DUTY_API_URL") + f"?vehicle={self.engine}"
logger.debug(f"Using api url {url}")
res = requests.get(url)
if not res.status_code == 200:
logger.error(f"The Api returned an error, got status code {res.status_code}")
return members
try:
json_res = res.json()
logger.debug(f"Got JSON result: {json_res}")
for member in json_res["vehicles"][0]["contacts"]:
members.append(member["mail"].lower())
self.metric_api_up = 1
except requests.exceptions.JSONDecodeError as e:
logger.error(f"Cannot get data from perma api. Got {e}")
except IndexError as e:
logger.error(f"The API changed, got an Index Error: {e}")
return members
def from_html(self, alarmdepesche_html):
logger.info("Stating parsing alarmdepesche")
self.raw = BeautifulSoup(alarmdepesche_html, 'html.parser')
tds = self.raw.find_all('td')
for td in tds:
key = str(str(td.string).strip())
if "Druckdatum:" in key:
self.timestamp = datetime.datetime.strptime(self.get_string(td), "%d.%m.%Y %H:%M:%S")
if "Einsatzstichwort:" in key:
self.intervention_code = self.get_string(td)
if "Sachverhalt:" in key:
self.intervention_message = self.get_string(td)
if "Alarmiertes Einsatzmittel:" in key:
engine = self.get_string(td)
if "," in engine:
logger.warning(f"Unusual engine format, got {engine} only using last entry")
engine = engine.split(",")[-1].strip()
self.engine = engine
if "Einsatznummer:" in key:
self.intervention_number = self.get_string(td)
if "Stadt:" in key:
self.city = self.get_string(td)
if "Info:" in key:
self.info = self.get_string(td)
if "UTM - Koordinaten:" in key:
utm_string = self.get_string(td).split(" ")
self.geo_point = utm.to_latlon(int(utm_string[1]), int(utm_string[2]), int(utm_string[0][:2]),
utm_string[0][2])
logger.info("Alarmepesche parsed successfully")
logger.debug(self)
def __init__(self):
logger.debug(f"Object {self} was created")
def toText(self):
logger.debug(f"Returing {self} as Text")
text = f"{self.intervention_code} / {self.engine} / {self.city} / {self.intervention_number} / {self.intervention_message} / {self.info}"
return text
def toHtml(self):
logger.debug(f"Returning {self} as HTML")
return self.raw
def toDict(self):
logger.debug(f"Creating dictionary for {self}")
return {
"timestamp": self.timestamp.isoformat(),
"engine": self.engine,
"intervention_number": self.intervention_number,
"intervention_code": self.intervention_code,
"intervention_message": self.intervention_message,
"intervention_description": self.intervention_description,
"intervention_city": self.city,
"intervention_info": self.info,
"intervention_coordinates": self.geo_point
}
def fromDict(self, json_data):
logger.debug("Creating new object")
self.timestamp = datetime.datetime.fromisoformat(json_data["timestamp"])
self.engine = json_data["engine"]
self.intervention_number = json_data["intervention_number"]
self.intervention_code = json_data["intervention_code"]
self.intervention_message = json_data["intervention_message"]
self.intervention_description = json_data["intervention_description"]
self.city = json_data["intervention_city"]
self.info = json_data["intervention_info"]
self.geo_point = json_data["intervention_coordinates"]
def loadFromDisk(self, file_path="data/last_alarmdepesche.json"):
logger.debug(f"Loading last Alarmdepesche from disk")
with open(file_path, "r") as file:
self.fromDict(json.load(file))
def saveToDisk(self, file_path="data/last_alarmdepesche.json"):
logger.debug(f"Writing {self} to disk")
with open(file_path, "w") as file:
json.dump(self.toDict(), file, indent=3)
def __repr__(self):
if self.intervention_number:
return f"<Alarmdepesche {self.intervention_number} (engine='{self.engine}', code='{self.intervention_code}', time='{self.timestamp.isoformat()}')>"
return "<Alarmdepesche>"
class Email():
smtp_client : smtplib.SMTP
def __init__(self, smtp_server, smtp_port, smtp_user, smtp_password):
if os.environ.get("SMTP_SERVER", None):
smtp_server = os.environ.get("SMTP_SERVER")
if os.environ.get("SMTP_PORT", None):
smtp_port = os.environ.get("SMTP_PORT")
logger.debug(f"Connecting to SMTP Server {smtp_server}")
try:
self.smtp_client = smtplib.SMTP(smtp_server, smtp_port)
self.smtp_client.ehlo()
if os.environ.get("MAIL_SMTP_STARTTLS", True):
self.smtp_client.starttls()
if os.environ.get("SMTP_AUTH", True):
self.smtp_client.login(smtp_user, smtp_password)
logger.debug("Connection to SMTP Sever was successfull")
except BaseException as err:
logger.error(f"Connection to SMTP Sevrer failed. Got {err}", exc_info=True)
def __del__(self):
logger.debug("Closing SMTP Connection")
self.smtp_client.close()
def sendmail(self, alarmdepesche, msg_from, msg_to, attachment=None):
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
logger.debug(f"Preparing email for {msg_to}")
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = f"ALARMDEPESCHE {alarmdepesche.intervention_code} >> {alarmdepesche.engine}"
msg['From'] = f"PermaAlert <{msg_from}>"
msg['To'] = msg_to
msg['Date'] = alarmdepesche.timestamp.isoformat()
msg["Message-id"] = email.utils.make_msgid(None, os.environ.get('MAIL_SERVER'))
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(alarmdepesche.toText(), 'plain')
part2 = MIMEText(alarmdepesche.toHtml(), 'html')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
if attachment:
part3 = MIMEApplication(attachment[0]["content"], Name=unquote(attachment[0]["filename"]), _subtype="pdf")
part3['Content-Disposition'] = f'attachment; filename="{unquote(attachment[0]["filename"])}"'
msg.attach(part3)
logger.info(f"Sending email to {msg_to}")
# sendmail function takes 3 arguments: sender's address, recipient's address
# and message to send - here it is sent as one string.
try:
self.smtp_client.sendmail(msg_from, msg_to, msg.as_string())
logger.info(f"Email was send successfully to {msg_to}")
except Exception as err:
logger.error(f"Cannot not email to {msg_to}: {err}", exc_info=True)
if __name__ == '__main__':
os.makedirs('data/', exist_ok=True)
mail_server = os.environ.get('MAIL_SERVER', None)
mail_user = os.environ.get('MAIL_USER', None)
mail_from = os.environ.get('MAIL_FROM', mail_user)
mail_pass = os.environ.get('MAIL_PASS', None)
mail_port = os.environ.get('MAIL_PORT', 587)
if not mail_server or not mail_user or not mail_pass:
logger.error("Environment variables not set!, exiting...")
exit(1)
metric_api_up = 1
try:
while True:
timer_global = Timer()
prometheus = PrometheusExporter()
now = datetime.datetime.now()
if os.path.exists("data/last_alarmdepesche.json"):
last_alarmdepsche = Alarmdepesche()
last_alarmdepsche.loadFromDisk()
last_alarmdepsche_in_seconds = (now - last_alarmdepsche.timestamp).total_seconds()
prometheus.append_gauge_metric("permaalert_last_received_alarmdepsche", last_alarmdepsche_in_seconds,
help_text="Last received alarmdepsche time in seconds")
fetchmail = FetchEmail(mail_server, mail_user, mail_pass)
fetchmail.select_mailbox()
html_message, attachment = fetchmail.fetch_messages_content("FROM els.112@cgdis.lu (UNSEEN)")
if html_message:
current_alarmdepsche = Alarmdepesche()
current_alarmdepsche.from_html(html_message)
current_alarmdepsche.saveToDisk()
metric_api_up = current_alarmdepsche.metric_api_up
members = current_alarmdepsche.get_members_emails()
sendmail = Email(mail_server, mail_port, mail_user, mail_pass)
for email_addr in members:
sendmail.sendmail(current_alarmdepsche, mail_from, email_addr, attachment)
else:
logger.info("No message found, exiting...")
timer_global.end()
prometheus.append_gauge_metric("permaalert_execution_time", timer_global.took,
help_text="Program execution time in seconds")
prometheus.append_gauge_metric("permaalert_duty_api_up", metric_api_up,
help_text="Boolean if api is up")
prometheus.append_gauge_metric("permaalert_mailserver_up", fetchmail.metric_success,
help_text="Boolean if mail server is up")
prometheus.write()
del prometheus
logger.debug("Sleeping for 20 seconds")
time.sleep(20)
except KeyboardInterrupt as err:
logger.info(f"Stopping Programm: {err}")
exit()

View File

@ -1,3 +0,0 @@
beautifulsoup4==4.11.1
requests==2.28.1
utm==0.7.0