Merge pull request #3 from keithah/enhance-logging-verify-ics

Enhanced visitor logging and cross-system verification closes #2
main
Keith Herrington 2024-12-11 12:17:50 -08:00 committed by GitHub
commit bf7c0fde39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 94 additions and 17 deletions

View File

@ -17,7 +17,8 @@ class ICSParser:
end = event.get("DTEND").dt
description = event.get("DESCRIPTION", "")
if not description:
self.logger.debug(f"Skipping event with start date {start.date()} due to missing description")
start_date = start if isinstance(start, datetime.date) else start.date()
self.logger.debug(f"Skipping event with start date {start_date} due to missing description")
continue
pin_code = ""
for line in description.split("\n"):
@ -25,8 +26,8 @@ class ICSParser:
pin_code = line.split(": ")[1].strip()
break
reservations.append({
"check_in_date": start.date() if isinstance(start, datetime.datetime) else start,
"check_out_date": end.date() if isinstance(end, datetime.datetime) else end,
"check_in_date": start if isinstance(start, datetime.date) else start.date(),
"check_out_date": end if isinstance(end, datetime.date) else end.date(),
"guests": [{"name": "Airbnb Guest", "phone": pin_code}],
"status": "accepted"
})

98
main.py
View File

@ -1,6 +1,7 @@
import argparse
import logging
import urllib3
import datetime
from config import load_config
from unifi_access import UnifiAccessManager
from hostex_api import HostexManager
@ -11,6 +12,48 @@ from utils import setup_logging
# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def verify_across_systems(hostex_reservations, ics_reservations, unifi_visitors):
discrepancies = []
today = datetime.date.today()
next_month = today + datetime.timedelta(days=30)
# Filter relevant Hostex reservations
relevant_hostex = [
r for r in hostex_reservations
if today <= datetime.datetime.strptime(r["check_in_date"], "%Y-%m-%d").date() <= next_month
and r["status"] == "accepted"
]
# Create lookup dictionaries by date range
hostex_lookup = {
(r["check_in_date"], r["check_out_date"]): r
for r in relevant_hostex
}
ics_lookup = {
(r["check_in_date"].strftime("%Y-%m-%d"), r["check_out_date"].strftime("%Y-%m-%d")): r
for r in ics_reservations
}
# Check Hostex entries against ICS
for dates, hostex_res in hostex_lookup.items():
if dates not in ics_lookup:
discrepancies.append(
f"Hostex reservation for {hostex_res['guests'][0]['name']} "
f"({dates[0]} to {dates[1]}) not found in ICS calendar"
)
else:
# Verify phone number last 4 digits match
hostex_phone = hostex_res['guests'][0].get('phone', '')[-4:]
ics_phone = ics_lookup[dates]['guests'][0].get('phone', '')[-4:]
if hostex_phone and ics_phone and hostex_phone != ics_phone:
discrepancies.append(
f"Phone number mismatch for {hostex_res['guests'][0]['name']}: "
f"Hostex: {hostex_phone}, ICS: {ics_phone}"
)
return discrepancies
def main():
parser = argparse.ArgumentParser(description="UniFi Access Visitor Management")
parser.add_argument('-v', '--verbose', action='store_true', help="Increase output verbosity")
@ -49,27 +92,60 @@ def main():
if config['use_hostex']:
logger.info("Fetching reservations from Hostex")
reservations = hostex_manager.fetch_reservations()
elif config['use_ics']:
logger.info("Parsing ICS file")
reservations = ics_parser.parse_ics()
hostex_reservations = hostex_manager.fetch_reservations()
else:
logger.error("No valid reservation source configured")
return
logger.info(f"Processing {len(reservations)} reservations")
unifi_manager.process_reservations(reservations)
hostex_reservations = []
if config['use_ics']:
logger.info("Parsing ICS file")
ics_reservations = ics_parser.parse_ics()
else:
ics_reservations = []
# Filter and log relevant reservations
today = datetime.date.today()
next_month = today + datetime.timedelta(days=30)
relevant_reservations = [
r for r in hostex_reservations
if today <= datetime.datetime.strptime(r["check_in_date"], "%Y-%m-%d").date() <= next_month
and r["status"] == "accepted"
]
logger.info(f"Found {len(relevant_reservations)} entries in Hostex API within the next 30 days")
for res in relevant_reservations:
guest_name = res["guests"][0]["name"] if res["guests"] else "Guest"
phone_number = res["guests"][0].get("phone", "") if res["guests"] else ""
logger.debug(
f"Hostex Guest: {guest_name}, "
f"Stay: {res['check_in_date']} to {res['check_out_date']}, "
f"Phone: {phone_number}"
)
# Verify consistency across systems
discrepancies = verify_across_systems(
hostex_reservations,
ics_reservations,
unifi_manager.fetch_visitors()
)
# Process reservations
logger.info(f"Processing {len(relevant_reservations)} reservations")
unifi_manager.process_reservations(relevant_reservations)
logger.info("Checking and updating PINs for existing visitors")
unifi_manager.check_and_update_pins()
summary = unifi_manager.generate_summary()
if discrepancies:
summary += "\n\nDiscrepancies Found:\n" + "\n".join(discrepancies)
logger.info(summary)
total_visitors = len(unifi_manager.fetch_visitors())
logger.info(f"Total visitors remaining after cleanup: {total_visitors}")
logger.info(f"Total UniFi Access visitors remaining after cleanup: {total_visitors}")
if config['simplepush_enabled'] and unifi_manager.has_changes():
if config['simplepush_enabled'] and (unifi_manager.has_changes() or discrepancies):
notification_manager.send_notification("UniFi Access Update", summary)
logger.info("Simplepush notification sent")
else:

View File

@ -220,13 +220,13 @@ class UnifiAccessManager:
def generate_summary(self):
summary = "Hostex-UniFi Access Summary:\n"
unchanged_names = ", ".join(self.changes['unchanged'])
summary += f"{len(self.changes['unchanged'])} existing visitors unchanged ({unchanged_names})\n"
summary += f"{len(self.changes['unchanged'])} existing UniFi Access visitors unchanged ({unchanged_names})\n"
if self.changes['deleted']:
deleted_names = ", ".join(self.changes['deleted'])
summary += f"{len(self.changes['deleted'])} visitor(s) deleted ({deleted_names})\n"
summary += f"{len(self.changes['deleted'])} UniFi Access visitor(s) deleted ({deleted_names})\n"
if self.changes['added']:
added_names = ", ".join(self.changes['added'])
summary += f"{len(self.changes['added'])} visitor(s) added ({added_names})\n"
summary += f"{len(self.changes['added'])} UniFi Access visitor(s) added ({added_names})\n"
return summary.strip()
def has_changes(self):