ungana

Unnamed repository; edit this file 'description' to name the repository.
Info | Log | Files | Refs | README

commit c42363cd0b46814ba05bd33603e8f0fa0a644a82
parent d9755f43205d01115ee9293e0bbda3f3f6ffc3a8
Author: lash <dev@holbrook.no>
Date:   Thu,  4 Sep 2025 13:42:55 +0100

Merge branch 'master' into lash/gui

Diffstat:
Mungana/cmd/args_parser.py | 155+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Aungana/ical/ical_helper.py | 117+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mungana/ical/ical_manager.py | 182++++++++++++++-----------------------------------------------------------------
Aungana/utils.py | 56++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 293 insertions(+), 217 deletions(-)

diff --git a/ungana/cmd/args_parser.py b/ungana/cmd/args_parser.py @@ -6,9 +6,12 @@ import logging import sys from ungana.attachment.attachment_manager import AttachmentManager from ungana.ical.ical_manager import ICalManager +from ungana.ical.ical_helper import ICalHelper from ungana.logging.logging_manager import LoggingManager from dateutil import tz +from ungana.utils import validate_datetime, validate_duration + class ArgsParser: EMAIL_RE = re.compile(r"^[^@]+@[^@]+\.[^@]+$") @@ -26,31 +29,11 @@ class ArgsParser: return Path(file_path).read_text(encoding="utf-8").strip() except Exception as e: raise SystemExit(f"Error: Could not read file '{file_path}': {e}") - - def _validate_datetime(self, dt_str: str) -> str: - try: - dt = datetime.fromisoformat(dt_str) - return dt.isoformat() - except ValueError: - pass - try: - dt = datetime.strptime(dt_str, "%d-%m-%Y %H:%M") - return dt.isoformat() - except ValueError: - pass - raise argparse.ArgumentTypeError( - f"Invalid datetime format: '{dt_str}'. " - "Expected ISO format (YYYY-MM-DDTHH:MM:SS+ZZ:ZZ) or DD-MM-YYYY HH:MM" - ) - - def _validate_duration(self, duration_str: str) -> str: - """Validate duration format (e.g., '2h' or '30m').""" - if not re.match(r'^(\d+h)?(\d+m)?$', duration_str): - raise argparse.ArgumentTypeError( - f"Invalid duration format: '{duration_str}'. Expected format like '2h' or '30m'" - ) - return duration_str + def _ensure_no_multiline_input(self,value: str) -> str: + if "\n" in value or "\r" in value: + raise argparse.ArgumentTypeError("Multiline input is not allowed for this argument.") + return value def _add_command_arguments(self): subparsers = self.parser.add_subparsers(dest='command', required=True) @@ -73,17 +56,17 @@ class ArgsParser: parser.add_argument("-i", "--interactive", action="store_true", help="Run interactive calendar creation") - parser.add_argument("-s", "--summary", required=required, help="Event summary") - parser.add_argument("--start", type=self._validate_datetime, required=required,help="Event start time (ISO format or DD-MM-YYYY HH:MM)") - parser.add_argument("-d", "--description",required=required, help="Event description") - parser.add_argument("-l", "--location",required=required,help="Event location") - parser.add_argument("-o", "--organizer",required=required,help="Event organizer") - - parser.add_argument("--sf", "--summary-file", dest="summary_file", help="File containing event summary") - parser.add_argument("--df", "--description-file", dest="description_file", help="File containing event description") + parser.add_argument("-s", "--summary",type = self._ensure_no_multiline_input, required=required, help="Event summary") + parser.add_argument("--start", type=validate_datetime, required=required,help="Event start time (ISO format or DD-MM-YYYY HH:MM)") + parser.add_argument("-d", "--description",type=self._ensure_no_multiline_input,required=required, help="Event description") + parser.add_argument("-l", "--location",type= self._ensure_no_multiline_input,required=required,help="Event location") + parser.add_argument("-o", "--organizer",type=self._ensure_no_multiline_input,required=required,help="Event organizer") + + parser.add_argument("--summary-file", dest="summary_file", help="File containing event summary") + parser.add_argument("--description-file", dest="description_file", help="File containing event description") parser.add_argument("--tzid", help="Time zone ID") - parser.add_argument("--duration", type=self._validate_duration, help="Event duration") - parser.add_argument("--end", type=self._validate_datetime, help="Event end time") + parser.add_argument("--duration", type=validate_duration, help="Event duration") + parser.add_argument("--end", type=validate_datetime, help="Event end time") def add_create_args(self, parser): @@ -95,29 +78,33 @@ class ArgsParser: ) non_interactive = parser.add_argument_group("non-interactive arguments") - non_interactive.add_argument("-s", "--summary", help="Event summary") - non_interactive.add_argument("--start", type=self._validate_datetime, help="Event start time (ISO format or DD-MM-YYYY HH:MM)") - non_interactive.add_argument("-d", "--description", help="Event description") - non_interactive.add_argument("-l", "--location", help="Event location") - non_interactive.add_argument("-o", "--organizer", help="Event organizer") - - non_interactive.add_argument("--sf", "--summary-file", dest="summary_file", help="File containing event summary") - non_interactive.add_argument("--df", "--description-file", dest="description_file", help="File containing event description") + non_interactive.add_argument("-s", "--summary",type=self._ensure_no_multiline_input, help="Event summary") + non_interactive.add_argument("--start", type=validate_datetime, help="Event start time (ISO format or DD-MM-YYYY HH:MM)") + non_interactive.add_argument("-d", "--description",type=self._ensure_no_multiline_input, help="Event description") + non_interactive.add_argument("-l", "--location",type=self._ensure_no_multiline_input, help="Event location") + non_interactive.add_argument("-o", "--organizer",type=self._ensure_no_multiline_input, help="Event organizer") + + non_interactive.add_argument("--summary-file", dest="summary_file", help="File containing event summary") + non_interactive.add_argument("--description-file", dest="description_file", help="File containing event description") non_interactive.add_argument("--tzid", help="Time zone ID") + non_interactive.add_argument("-p", "--poster", help="Event headline image") + non_interactive.add_argument("--long", type= self._ensure_no_multiline_input,help="Exhaustive description of the event") + non_interactive.add_argument("-c", "--contact",type=self._ensure_no_multiline_input, help="Contact details") + parser.add_argument("ics_filename", nargs="?", help="Output .ics filename (default: event_<date>.ics)") - parser.add_argument("--domain", help="Domain used to generate event UID (default: ungana.local)",default="ungana.local") + parser.add_argument("--domain", type=self._ensure_no_multiline_input,help="Domain used to generate event UID (default: ungana.local)",default="ungana.local") event_end_time_group = non_interactive.add_mutually_exclusive_group(required=False) - event_end_time_group.add_argument("--end", type=self._validate_datetime,help="Event end time (ISO format or DD-MM-YYYY HH:MM). ""Required if no --duration is specified.",) - event_end_time_group.add_argument( "--duration",type=self._validate_duration, help="Event duration (e.g shorthand like '1h30m'). Required if no --end is specified.",) + event_end_time_group.add_argument("--end", type=validate_datetime,help="Event end time (ISO format or DD-MM-YYYY HH:MM). ""Required if no --duration is specified.",) + event_end_time_group.add_argument( "--duration",type=validate_duration, help="Event duration (e.g shorthand like '1h30m'). Required if no --end is specified.",) def add_edit_args(self, parser): self.add_common_args(parser, required=False) parser.add_argument("-p", "--poster", help="Event headline image") - parser.add_argument("-ld", "--long", help="Exhaustive description of the event") - parser.add_argument("-c", "--contact", help="Contact details") + parser.add_argument("--long", type= self._ensure_no_multiline_input,help="Exhaustive description of the event") + parser.add_argument("-c", "--contact",type=self._ensure_no_multiline_input, help="Contact details") @@ -163,7 +150,7 @@ class ArgsParser: sys.exit(1) - def prompt_compulsory_event_fields(self): + def prompt_compulsory_event_fields(self,args): try: summary = input("Enter event summary (title): ").strip() description = input("Enter description: ").strip() @@ -184,11 +171,16 @@ class ArgsParser: organizer = input("Enter organizer (name/email): ").strip() tzid = self._prompt_timezone() + domain = args.domain + if args.domain is None: + domain = "ungana.local" + return { "summary": summary, "description": description, "start": start_dt, "end": end_dt, + "domain": domain, "duration": duration if duration else None, "location": location, "organizer": organizer, @@ -220,7 +212,7 @@ class ArgsParser: ics_filename = args.ics_filename or f"event_{datetime.now().strftime('%Y%m%d_%H%M%S')}.ics" cal = self.ical_manager.load_ical_file(ics_filename) if cal: - event_data = self.prompt_compulsory_event_fields() + event_data = self.prompt_compulsory_event_fields(args) event = self.ical_manager.create_event(event_data) self.ical_manager.save_ical_file(event, ics_filename) return @@ -230,7 +222,7 @@ class ArgsParser: if args.ics_filename: filename = args.ics_filename cal = self.ical_manager.load_ical_file(filename) - exists = self.ical_manager.check_existing_event(cal, event_data) + exists = ICalHelper.check_existing_event(cal, event_data) if exists: details = ( f"Summary='{event_data['summary']}', " @@ -261,20 +253,24 @@ class ArgsParser: self.parser.error("corrupted or missing ical file") if args.all: - return self._edit_multiple_events(cal, ical_file_path) + return self._edit_multiple_events(cal, ical_file_path,args) updates, attachments = self._get_user_updates_from_args(args) if any([updates, attachments]): # Non-interactive mode - self.ical_manager.update_event(cal, updates, attachments, ical_file_path) - logging.info("Calendar updated successfully") + try: + ICalHelper.update_event(cal, updates, attachments, ical_file_path) + logging.info("Calendar updated successfully") + except Exception as e: + self.parser.error(f"Failed to update calendar: {e}") + return else: # Interactive mode self._edit_most_recent_event(cal, args, ical_file_path) def _edit_most_recent_event(self, cal, args, ical_file_path): - event = self.ical_manager.get_first_event(cal) + event = ICalHelper.get_first_event(cal) # only interactive prompts updates = self._get_user_event_updates(event) @@ -286,9 +282,9 @@ class ArgsParser: logging.info("No changes made to calendar file") - def _edit_multiple_events(self, cal, ical_file_path): + def _edit_multiple_events(self, cal, ical_file_path,args): """Edit a specific event from multiple events (requires --all).""" - events = self.ical_manager.get_all_events(cal) + events = ICalHelper.get_all_events(cal) if not events: self.parser.error("no events found for the selected ical file") return @@ -311,16 +307,15 @@ class ArgsParser: except KeyboardInterrupt: self.parser.error("Operation cancelled by user") return - - event = events[choice - 1] - updates = self._get_user_event_updates(event) - - if updates: - self.ical_manager.update_event(cal, updates, None, ical_file_path) - logging.info("Calendar updated successfully") - else: - logging.info("No changes made to calendar file") - + + updates, attachments = self._get_user_updates_from_args(args) + if any([updates, attachments]): + try: + ICalHelper.update_event(cal, updates, attachments, ical_file_path) + logging.info("Calendar updated successfully") + except Exception as e: + self.parser.error(f"Failed to update calendar: {e}") + return def _get_user_event_updates(self, event) -> dict: updates = {} @@ -410,6 +405,7 @@ class ArgsParser: def _validate_and_get_create_event_args(self, args): + attachments = [] if args.summary_file: args.summary = self._read_file_or_exit(args.summary_file) if args.description_file: @@ -454,6 +450,29 @@ class ArgsParser: if args.domain: domain = args.domain + + if args.contact: + value, params = self._process_contact_arg(args.contact) + if not params and self.EMAIL_RE.match(value): + params = {"ALTREP": f"mailto:{value}"} + contact = (value, params) + elif params: + contact = (value, params) + else: + contact = value + else: + contact = None, + + + for ctx_name in ("poster", "long"): + arg_val = getattr(args, ctx_name, None) + if arg_val: + try: + prop, value, params = self.attachment_manager.create_attachment(arg_val, ctx=ctx_name) + attachments.append((prop, value, params)) + except Exception as e: + self.parser.error(f"Unexpected error: {e}") + event_data = { 'summary': args.summary, 'description': args.description, @@ -462,7 +481,9 @@ class ArgsParser: 'start': args.start_dt, 'duration': args.duration, 'tzid': args.tzid, - 'domain': domain + 'contact': contact, + 'domain': domain, + 'attachments': attachments } return event_data diff --git a/ungana/ical/ical_helper.py b/ungana/ical/ical_helper.py @@ -0,0 +1,117 @@ +from datetime import datetime +from typing import Any, Dict, Optional +from zoneinfo import ZoneInfo +from icalendar import Calendar, Event + + +class ICalHelper: + + @staticmethod + def update_event(cal: Calendar, updates: Dict[str, Any], attachments: list = None, filename: str = None) -> Calendar: + if cal is None or not hasattr(cal, "walk"): + raise ValueError("Invalid calendar object: missing structure or corrupted.") + + events = [c for c in cal.walk("VEVENT")] + if not events: + raise ValueError("Calendar does not contain any VEVENT entries.") + + uid = cal.walk("VEVENT")[0].get("UID") + event_found = False + + for component in cal.walk(): + if component.name == "VEVENT" and str(component.get("UID")) == uid: + event_found = True + for key, value in updates.items(): + if key in component: + component.pop(key) + + if isinstance(value, tuple) and len(value) == 2: + v, params = value + component.add(key, v, parameters=params) + else: + component[key] = value + + # Deduplicate attachments + if attachments: + existing_attachments = { + str(a) for a in component.get("ATTACH", []) + } + + for attachment in attachments: + if isinstance(attachment, tuple) and len(attachment) == 3: + prop, value, params = attachment + if value not in existing_attachments: + component.add(prop, value, parameters=params) + existing_attachments.add(value) + else: + value = str(attachment) + if value not in existing_attachments: + component.add("ATTACH", value) + existing_attachments.add(value) + + break + + if not event_found: + raise ValueError(f"No event with UID {uid} found.") + if filename: + ICalHelper.update_ical_file(cal, filename) + + return cal + + @staticmethod + def update_ical_file(cal: Calendar, filename: str) -> None: + ical_bytes = cal.to_ical() + unfolded = ical_bytes.replace(b"\r\n ", b"").replace(b"\n ", b"") + with open(filename, 'wb') as f: + f.write(unfolded) + + @staticmethod + def get_first_event(cal): + """Return the first VEVENT component in the calendar (or None if no events exist).""" + for component in cal.walk(): + if component.name == "VEVENT": + return component + return None + + @staticmethod + def get_all_events(cal): + """Return a list of all VEVENT components in the calendar.""" + return [component for component in cal.walk() if component.name == "VEVENT"] + + @staticmethod + def normalize_ical_field(value, tzid: Optional[str] = None): + if isinstance(value, list): + value = value[0] + if hasattr(value, "dt"): + value = value.dt + + if isinstance(value, datetime): + if value.tzinfo is None and tzid: + return value.replace(tzinfo=ZoneInfo(tzid)) + return value + + if hasattr(value, "to_ical"): + return value.to_ical().decode() + + return str(value) + + @staticmethod + def check_existing_event(cal: Calendar, event_data: Event) -> bool: + tzid = event_data.get("tzid") + + candidate_key = ( + ICalHelper.normalize_ical_field(event_data.get("start"), tzid), + ICalHelper.normalize_ical_field(event_data.get("end"), tzid), + ICalHelper.normalize_ical_field(event_data.get("summary")), + ) + for component in cal.walk("VEVENT"): + existing_key = ( + ICalHelper.normalize_ical_field(component.get("DTSTART"), tzid), + ICalHelper.normalize_ical_field(component.get("DTEND"), tzid), + ICalHelper.normalize_ical_field(component.get("SUMMARY")), + ) + if existing_key == candidate_key: + return True + return False + + diff --git a/ungana/ical/ical_manager.py b/ungana/ical/ical_manager.py @@ -1,11 +1,10 @@ import os -import uuid -from zoneinfo import ZoneInfo -from icalendar import Calendar, Event, vDatetime,prop -from datetime import datetime, timedelta, timezone +from icalendar import Calendar, Event, vDatetime, vText +from datetime import datetime,timezone from typing import Dict, Any from ungana.logging.logging_manager import LoggingManager +from ungana.utils import generate_uid, parse_duration class ICalManager: @@ -42,14 +41,14 @@ class ICalManager: if 'duration' in event_data: if isinstance(event_data['duration'], str): - event.add('duration', self._parse_duration(event_data['duration'])) + event.add('duration', parse_duration(event_data['duration'])) else: event.add('duration', event_data['duration']) if 'uid' in event_data: event.add('uid', event_data['uid']) else: - event.add('uid', self._generate_uid(domain)) + event.add('uid', generate_uid(domain)) ## should be tz based?? dtstamp = datetime.now(timezone.utc) @@ -60,88 +59,34 @@ class ICalManager: event_status = "CONFIRMED" event.add("status", event_status) + + if event_data.get("contact"): + contact = event_data["contact"] + if isinstance(contact, tuple): + value, params = contact + event.add("contact", vText(value), encode=0) + for k, v in params.items(): + event["contact"].params[k] = v + else: + event.add("contact", vText(contact), encode=0) + + if event_data.get("attachments"): + existing_attachments = set() + for attachment in event_data["attachments"]: + if isinstance(attachment, tuple) and len(attachment) == 3: + prop, value, params = attachment + if value not in existing_attachments: + event.add(prop, value, parameters=params) + existing_attachments.add(value) + else: + value = str(attachment) + if value not in existing_attachments: + event.add("ATTACH", value) + existing_attachments.add(value) + return event - def update_event(self,cal: Calendar,updates: Dict[str, Any],attachments: list = None,filename: str = None) -> Calendar: - uid = cal.walk("VEVENT")[0].get("UID") - event_found = False - for component in cal.walk(): - if component.name == "VEVENT" and str(component.get("UID")) == uid: - event_found = True - for key, value in updates.items(): - if key in component: - component.pop(key) - - if isinstance(value, tuple) and len(value) == 2: - v, params = value - component.add(key, v, parameters=params) - else: - component[key] = value - if attachments: - existing = component.get("ATTACH") - if not existing: - existing = [] - elif not isinstance(existing, list): - existing = [existing] - - preserved = [] - for e in existing: - ctx = None - if hasattr(e, "params"): - ctx = e.params.get("CTX") - if ctx not in ("poster", "long"): - preserved.append(e) - - if "ATTACH" in component: - component.pop("ATTACH") - - for e in preserved: - component.add("ATTACH", e) - - for attachment in attachments: - if isinstance(attachment, tuple) and len(attachment) == 3: - prop, value, params = attachment - component.add(prop, value, parameters=params) - else: - component.add("ATTACH", str(attachment)) - break - - if not event_found: - raise ValueError(f"No event with UID {uid} found.") - if filename: - self.update_ical_file(cal, filename) - - return cal - - def _parse_duration(self, duration_str: str): - duration_str = duration_str.replace(",", "").strip() - hours, minutes = 0, 0 - if "h" in duration_str: - parts = duration_str.split("h") - hours = int(parts[0]) if parts[0] else 0 - if len(parts) > 1 and "m" in parts[1]: - minutes = int(parts[1].replace("m", "").strip() or 0) - elif "m" in duration_str: - minutes = int(duration_str.replace("m", "").strip() or 0) - - return timedelta(hours=hours, minutes=minutes) - - def parse_datetime(self,dt_str: str) -> datetime: - formats = ["%Y-%m-%d %H:%M", "%d-%m-%Y %H:%M"] - for fmt in formats: - try: - return datetime.strptime(dt_str, fmt) - except ValueError: - continue - raise ValueError(f"Invalid datetime format: {dt_str}. Expected one of {formats}") - - - def _generate_uid(self,domain: str) -> str: - """Generate a globally unique UID using UUID + domain style.""" - return f"{uuid.uuid4()}@{domain}" - - def load_ical_file(self, filename: str) -> Calendar: """Load an iCalendar file from disk, or return a new Calendar if missing.""" if not os.path.exists(filename): @@ -158,75 +103,11 @@ class ICalManager: return self.calendar except Exception as e: raise ValueError(f"Failed to parse iCalendar file '{filename}': {e}") - - - def get_all_events(self,cal): - """ - Return a list of all VEVENT components in the calendar. - """ - events = [] - for component in cal.walk(): - if component.name == "VEVENT": - events.append(component) - return events - - def get_first_event(self, cal): - """ - Return the first VEVENT component in the calendar (or None if no events exist). - """ - for component in cal.walk(): - if component.name == "VEVENT": - return component - return None - - def check_existing_event(self,cal: Calendar,event_data: Event) -> bool: - tzid = event_data.get("tzid") - - candidate_key = ( - self.normalize_ical_field(event_data.get("start"), tzid), - self.normalize_ical_field(event_data.get("end"), tzid), - self.normalize_ical_field(event_data.get("summary")), - ) - for component in cal.walk("VEVENT"): - existing_key = ( - self.normalize_ical_field(component.get("DTSTART"), tzid), - self.normalize_ical_field(component.get("DTEND"), tzid), - self.normalize_ical_field(component.get("SUMMARY")), - ) - if existing_key == candidate_key: - return True - return False - - - def normalize_ical_field(self, value, tzid: str | None = None): - if isinstance(value, list): - value = value[0] - if hasattr(value, "dt"): - value = value.dt - - if isinstance(value, datetime): - if value.tzinfo is None and tzid: - return value.replace(tzinfo=ZoneInfo(tzid)) - return value - - if hasattr(value, "to_ical"): - return value.to_ical().decode() - - return str(value) - def save_ical_file(self, event: Event, filename: str) -> None: """Save calendar with event to .ics file.""" self.calendar.add_component(event) with open(filename, 'wb') as f: - f.write(self.calendar.to_ical()) - - - def update_ical_file(self, cal: Calendar, filename: str) -> None: - ical_bytes = cal.to_ical() - unfolded = ical_bytes.replace(b"\r\n ", b"").replace(b"\n ", b"") - with open(filename, 'wb') as f: - f.write(unfolded) - + f.write(self.calendar.to_ical()) +\ No newline at end of file diff --git a/ungana/utils.py b/ungana/utils.py @@ -0,0 +1,55 @@ +import argparse +from datetime import datetime, timedelta +import re +import uuid + + +def parse_duration(duration_str: str): + duration_str = duration_str.replace(",", "").strip() + hours, minutes = 0, 0 + if "h" in duration_str: + parts = duration_str.split("h") + hours = int(parts[0]) if parts[0] else 0 + if len(parts) > 1 and "m" in parts[1]: + minutes = int(parts[1].replace("m", "").strip() or 0) + elif "m" in duration_str: + minutes = int(duration_str.replace("m", "").strip() or 0) + + return timedelta(hours=hours, minutes=minutes) + +def parse_datetime(dt_str: str) -> datetime: + formats = ["%Y-%m-%d %H:%M", "%d-%m-%Y %H:%M"] + for fmt in formats: + try: + return datetime.strptime(dt_str, fmt) + except ValueError: + continue + raise ValueError(f"Invalid datetime format: {dt_str}. Expected one of {formats}") + + +def generate_uid(domain: str) -> str: + """Generate a globally unique UID using UUID + domain style.""" + return f"{uuid.uuid4()}@{domain}" + + +def validate_datetime(dt_str: str) -> str: + try: + dt = datetime.fromisoformat(dt_str) + return dt.isoformat() + except ValueError: + pass + try: + dt = datetime.strptime(dt_str, "%d-%m-%Y %H:%M") + return dt.isoformat() + except ValueError: + pass + + raise argparse.ArgumentTypeError(f"Invalid datetime format: '{dt_str}'. ""Expected ISO format (YYYY-MM-DDTHH:MM:SS+ZZ:ZZ) or DD-MM-YYYY HH:MM") + + + +def validate_duration(duration_str: str) -> str: + """Validate duration format (e.g., '2h' or '30m').""" + if not re.match(r'^(\d+h)?(\d+m)?$', duration_str): + raise argparse.ArgumentTypeError(f"Invalid duration format: '{duration_str}'. Expected format like '2h' or '30m'" ) + return duration_str +\ No newline at end of file