commit 99aafa73a0107eae7f22a69b4132651894457315
parent 33422b8d62e35eb239fdfdc4ae17a94351547ba1
Author: Carlosokumu <carlosokumu254@gmail.com>
Date: Mon, 1 Sep 2025 12:55:07 +0300
move methods that require explicit cal argument to IcalHelper class
Diffstat:
1 file changed, 6 insertions(+), 142 deletions(-)
diff --git a/ungana/ical/ical_manager.py b/ungana/ical/ical_manager.py
@@ -1,11 +1,10 @@
import os
-import uuid
-from zoneinfo import ZoneInfo
from icalendar import Calendar, Event, vDatetime
-from datetime import datetime, timedelta, timezone
+from datetime import datetime,timezone
from typing import Dict, Any
from ungana.logging.logging_manager import LoggingManager
+from ungana.utils import generate_uid, parse_duration
class ICalManager:
@@ -42,14 +41,14 @@ class ICalManager:
if 'duration' in event_data:
if isinstance(event_data['duration'], str):
- event.add('duration', self._parse_duration(event_data['duration']))
+ event.add('duration', parse_duration(event_data['duration']))
else:
event.add('duration', event_data['duration'])
if 'uid' in event_data:
event.add('uid', event_data['uid'])
else:
- event.add('uid', self._generate_uid(domain))
+ event.add('uid', generate_uid(domain))
## should be tz based??
dtstamp = datetime.now(timezone.utc)
@@ -63,78 +62,6 @@ class ICalManager:
return event
- def update_event(self,cal: Calendar,updates: Dict[str, Any],attachments: list = None,filename: str = None) -> Calendar:
- uid = cal.walk("VEVENT")[0].get("UID")
- event_found = False
-
- for component in cal.walk():
- if component.name == "VEVENT" and str(component.get("UID")) == uid:
- event_found = True
- for key, value in updates.items():
- if key in component:
- component.pop(key)
-
- if isinstance(value, tuple) and len(value) == 2:
- v, params = value
- component.add(key, v, parameters=params)
- else:
- component[key] = value
-
- # Deduplicate attachments
- if attachments:
- existing_attachments = {
- str(a) for a in component.get("ATTACH", [])
- }
-
- for attachment in attachments:
- if isinstance(attachment, tuple) and len(attachment) == 3:
- prop, value, params = attachment
- if value not in existing_attachments:
- component.add(prop, value, parameters=params)
- existing_attachments.add(value)
- else:
- value = str(attachment)
- if value not in existing_attachments:
- component.add("ATTACH", value)
- existing_attachments.add(value)
-
- break
-
- if not event_found:
- raise ValueError(f"No event with UID {uid} found.")
- if filename:
- self.update_ical_file(cal, filename)
-
- return cal
-
- def _parse_duration(self, duration_str: str):
- duration_str = duration_str.replace(",", "").strip()
- hours, minutes = 0, 0
- if "h" in duration_str:
- parts = duration_str.split("h")
- hours = int(parts[0]) if parts[0] else 0
- if len(parts) > 1 and "m" in parts[1]:
- minutes = int(parts[1].replace("m", "").strip() or 0)
- elif "m" in duration_str:
- minutes = int(duration_str.replace("m", "").strip() or 0)
-
- return timedelta(hours=hours, minutes=minutes)
-
- def parse_datetime(self,dt_str: str) -> datetime:
- formats = ["%Y-%m-%d %H:%M", "%d-%m-%Y %H:%M"]
- for fmt in formats:
- try:
- return datetime.strptime(dt_str, fmt)
- except ValueError:
- continue
- raise ValueError(f"Invalid datetime format: {dt_str}. Expected one of {formats}")
-
-
- def _generate_uid(self,domain: str) -> str:
- """Generate a globally unique UID using UUID + domain style."""
- return f"{uuid.uuid4()}@{domain}"
-
-
def load_ical_file(self, filename: str) -> Calendar:
"""Load an iCalendar file from disk, or return a new Calendar if missing."""
if not os.path.exists(filename):
@@ -151,75 +78,11 @@ class ICalManager:
return self.calendar
except Exception as e:
raise ValueError(f"Failed to parse iCalendar file '{filename}': {e}")
-
- def get_all_events(self,cal):
- """
- Return a list of all VEVENT components in the calendar.
- """
- events = []
- for component in cal.walk():
- if component.name == "VEVENT":
- events.append(component)
- return events
-
- def get_first_event(self, cal):
- """
- Return the first VEVENT component in the calendar (or None if no events exist).
- """
- for component in cal.walk():
- if component.name == "VEVENT":
- return component
- return None
-
-
- def check_existing_event(self,cal: Calendar,event_data: Event) -> bool:
- tzid = event_data.get("tzid")
-
- candidate_key = (
- self.normalize_ical_field(event_data.get("start"), tzid),
- self.normalize_ical_field(event_data.get("end"), tzid),
- self.normalize_ical_field(event_data.get("summary")),
- )
- for component in cal.walk("VEVENT"):
- existing_key = (
- self.normalize_ical_field(component.get("DTSTART"), tzid),
- self.normalize_ical_field(component.get("DTEND"), tzid),
- self.normalize_ical_field(component.get("SUMMARY")),
- )
- if existing_key == candidate_key:
- return True
- return False
-
-
- def normalize_ical_field(self, value, tzid: str | None = None):
- if isinstance(value, list):
- value = value[0]
- if hasattr(value, "dt"):
- value = value.dt
-
- if isinstance(value, datetime):
- if value.tzinfo is None and tzid:
- return value.replace(tzinfo=ZoneInfo(tzid))
- return value
-
- if hasattr(value, "to_ical"):
- return value.to_ical().decode()
-
- return str(value)
-
def save_ical_file(self, event: Event, filename: str) -> None:
"""Save calendar with event to .ics file."""
self.calendar.add_component(event)
with open(filename, 'wb') as f:
- f.write(self.calendar.to_ical())
-
-
- def update_ical_file(self, cal: Calendar, filename: str) -> None:
- ical_bytes = cal.to_ical()
- unfolded = ical_bytes.replace(b"\r\n ", b"").replace(b"\n ", b"")
- with open(filename, 'wb') as f:
- f.write(unfolded)
-
+ f.write(self.calendar.to_ical())
+\ No newline at end of file