ungana

Unnamed repository; edit this file 'description' to name the repository.
Info | Log | Files | Refs | README

commit 930ce2abbf74140451ff3adcc2d941843d70bdf2
parent f0f8c87a84cb7140f43729d881b69ec4d742bb0d
Author: Carlosokumu <carlosokumu254@gmail.com>
Date:   Tue, 11 Nov 2025 12:46:18 +0300

feat: implement merge function

Diffstat:
Mungana/ical/ical_helper.py | 83+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 81 insertions(+), 2 deletions(-)

diff --git a/ungana/ical/ical_helper.py b/ungana/ical/ical_helper.py @@ -1,7 +1,12 @@ -from datetime import datetime, timedelta +import datetime +import logging +import mimetypes +import os +from pathlib import Path from typing import Any, Dict, Optional from zoneinfo import ZoneInfo from icalendar import Calendar, Event, vUri +import icalendar class ICalHelper: @@ -185,7 +190,7 @@ class ICalHelper: if val is None: return None dur = val.dt - if isinstance(dur, timedelta): + if isinstance(dur, datetime.timedelta): hours, remainder = divmod(dur.total_seconds(), 3600) minutes, _ = divmod(remainder, 60) return f"{int(hours)}h {int(minutes)}m" @@ -195,6 +200,80 @@ class ICalHelper: @staticmethod def get_ical_classes(): return ["PUBLIC", "PRIVATE", "CONFIDENTIAL"] + + + @staticmethod + def merge_calendars(events_dir: str = "./events", output_file: str = "merged_events.ics") -> tuple[bytes, str]: + now = datetime.datetime.now(datetime.UTC) + events = {} + + events_dir = Path(events_dir) + if not events_dir.exists() or not events_dir.is_dir(): + raise FileNotFoundError(f"Input directory not found: {events_dir}") + + for (r, _, files) in os.walk(events_dir): + for f in files: + fp = os.path.join(r, f) + mime_type, _ = mimetypes.guess_type(fp) + if mime_type != 'text/calendar': + continue + + logging.info(f"processing: {fp}") + try: + cal = icalendar.Calendar.from_ical(Path(fp).read_bytes()) + except Exception as e: + logging.warning(f"skipping: {fp}") + continue + + for ev in cal.walk("vevent"): + dtstart = ev.get("DTSTART") + if not dtstart: + continue + + if isinstance(dtstart, list): + d = None + for item in dtstart: + if hasattr(item, "dt"): + d = item.dt + break + if d is None: + continue + else: + d = getattr(dtstart, "dt", None) + if d is None: + continue + + if not isinstance(d, datetime.datetime): + continue + + zone = getattr(d.tzinfo, "tzname", lambda _: None)(d) + if zone == "UTC" and d < now: + continue + + uid = str(ev.get("UID", d.isoformat())) + events[uid] = ev + + + merged_cal = icalendar.Calendar() + merged_cal.add('prodid', '-//Ungana//mxm.dk//') + merged_cal.add('version', '2.0') + + + logging.debug(f"processed events: {len(events)}") + for k in dict(sorted(events.items())): + merged_cal.add_component(events[k]) + + output_path = Path(output_file) + with open(output_path, "wb") as f: + f.write(merged_cal.to_ical()) + + logging.info(f"Merged calendar saved to: {output_path.resolve()}") + return merged_cal.to_ical(),str(output_path.resolve()) + + + + +