Skip to content
Snippets Groups Projects

Merge fork for interoperability of KoMa solver

Closed Felix Blanke requested to merge felix_bonn/akplanning:main into main
1 file
+ 2
2
Compare changes
  • Side-by-side
  • Inline
+ 622
6
import decimal
import itertools
import json
import math
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Generator, Iterable
from django.core.validators import RegexValidator
from django.apps import apps
from django.db import models
from django.core.validators import RegexValidator
from django.db import models, transaction
from django.db.models import Count
from django.urls import reverse_lazy
from django.utils import timezone
@@ -12,7 +17,6 @@ from django.utils.translation import gettext_lazy as _
from simple_history.models import HistoricalRecords
from timezone_field import TimeZoneField
# Custom validators to be used for some of the fields
# Prevent inclusion of the quotation marks ' " ´ `
# This may be necessary to prevent javascript issues
@@ -23,6 +27,102 @@ no_quotation_marks_validator = RegexValidator(regex=r"['\"´`]+", inverse_match=
slugable_validator = RegexValidator(regex=r"[\w\s]+", message=_('Must contain at least one letter or digit'))
@dataclass
class OptimizerTimeslot:
"""Class describing a discrete timeslot. Used to interface with an optimizer."""
avail: "Availability"
"""The availability object corresponding to this timeslot."""
idx: int
"""The unique index of this optimizer timeslot."""
constraints: set[str]
"""The set of time constraints fulfilled by this object."""
def merge(self, other: "OptimizerTimeslot") -> "OptimizerTimeslot":
"""Merge with other OptimizerTimeslot.
Creates a new OptimizerTimeslot object.
Its availability is constructed by merging the availabilities of self and other,
its constraints by taking the union of both constraint sets.
As an index, the index of self is used.
"""
avail = self.avail.merge_with(other.avail)
constraints = self.constraints.union(other.constraints)
return OptimizerTimeslot(
avail=avail, idx=self.idx, constraints=constraints
)
def __repr__(self) -> str:
return f"({self.avail.simplified}, {self.idx}, {self.constraints})"
TimeslotBlock = list[OptimizerTimeslot]
def merge_blocks(
blocks: Iterable[TimeslotBlock]
) -> Iterable[TimeslotBlock]:
"""Merge iterable of blocks together.
The timeslots of all blocks are grouped into maximal blocks.
Timeslots with the same start and end are identified with each other
and merged (cf `OptimizerTimeslot.merge`).
Throws a ValueError if any timeslots are overlapping but do not
share the same start and end, i.e. partial overlap is not allowed.
:param blocks: iterable of blocks to merge.
:return: iterable of merged blocks.
:rtype: iterable over lists of OptimizerTimeslot objects
"""
if not blocks:
return []
# flatten timeslot iterables to single chain
timeslot_chain = itertools.chain.from_iterable(blocks)
# sort timeslots according to start
timeslots = sorted(
timeslot_chain,
key=lambda slot: slot.avail.start
)
if not timeslots:
return []
all_blocks = []
current_block = [timeslots[0]]
timeslots = timeslots[1:]
for slot in timeslots:
if current_block and slot.avail.overlaps(current_block[-1].avail, strict=True):
if (
slot.avail.start == current_block[-1].avail.start
and slot.avail.end == current_block[-1].avail.end
):
# the same timeslot -> merge
current_block[-1] = current_block[-1].merge(slot)
else:
# partial overlap of interiors -> not supported
# TODO: Show comprehensive message in production
raise ValueError(
"Partially overlapping timeslots are not supported!"
f" ({current_block[-1].avail.simplified}, {slot.avail.simplified})"
)
elif not current_block or slot.avail.overlaps(current_block[-1].avail, strict=False):
# only endpoints in intersection -> same block
current_block.append(slot)
else:
# no overlap at all -> new block
all_blocks.append(current_block)
current_block = [slot]
if current_block:
all_blocks.append(current_block)
return all_blocks
class Event(models.Model):
"""
An event supplies the frame for all Aks.
@@ -62,6 +162,12 @@ class Event(models.Model):
wiki_export_template_name = models.CharField(verbose_name=_("Wiki Export Template Name"), blank=True, max_length=50)
default_slot = models.DecimalField(max_digits=4, decimal_places=2, default=2, verbose_name=_('Default Slot Length'),
help_text=_('Default length in hours that is assumed for AKs in this event.'))
export_slot = models.DecimalField(max_digits=4, decimal_places=2, default=1, verbose_name=_('Export Slot Length'),
help_text=_(
'Slot duration in hours that is used in the timeslot discretization, '
'when this event is exported for the solver.'
))
contact_email = models.EmailField(verbose_name=_("Contact email address"), blank=True,
help_text=_("An email address that is displayed on every page "
@@ -173,6 +279,386 @@ class Event(models.Model):
.filter(availabilities__count=0, owners__count__gt=0)
)
def _generate_slots_from_block(
self,
start: datetime,
end: datetime,
slot_duration: timedelta,
*,
slot_index: int = 0,
constraints: set[str] | None = None,
) -> Generator[TimeslotBlock, None, int]:
"""Discretize a time range into timeslots.
Uses a uniform discretization into discrete slots of length `slot_duration`,
starting at `start`. No incomplete timeslots are generated, i.e.
if (`end` - `start`) is not a whole number multiple of `slot_duration`
then the last incomplete timeslot is dropped.
:param start: Start of the time range.
:param end: Start of the time range.
:param slot_duration: Duration of a single timeslot in the discretization.
:param slot_index: index of the first timeslot. Defaults to 0.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of OptimizerTimeslot
:return: The first slot index after the yielded blocks, i.e.
`slot_index` + total # generated timeslots
:rtype: int
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
current_slot_start = start
previous_slot_start: datetime | None = None
if constraints is None:
constraints = set()
current_block = []
room_availabilities = list({
availability
for room in Room.objects.filter(event=self)
for availability in room.availabilities.all()
})
while current_slot_start + slot_duration <= end:
slot = Availability(
event=self,
start=current_slot_start,
end=current_slot_start + slot_duration,
)
if any((availability.contains(slot) for availability in room_availabilities)):
# no gap in a block
if (
previous_slot_start is not None
and previous_slot_start + slot_duration < current_slot_start
):
yield current_block
current_block = []
current_block.append(
OptimizerTimeslot(avail=slot, idx=slot_index, constraints=constraints)
)
previous_slot_start = current_slot_start
slot_index += 1
current_slot_start += slot_duration
if current_block:
yield current_block
return slot_index
def uniform_time_slots(self, *, slots_in_an_hour: float) -> Iterable[TimeslotBlock]:
"""Uniformly discretize the entire event into blocks of timeslots.
Discretizes entire event uniformly. May not necessarily result in a single block
as slots with no room availability are dropped.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of OptimizerTimeslot
"""
all_category_constraints = AKCategory.create_category_constraints(
AKCategory.objects.filter(event=self).all()
)
yield from self._generate_slots_from_block(
start=self.start,
end=self.end,
slot_duration=timedelta(hours=1.0 / slots_in_an_hour),
constraints=all_category_constraints,
)
def default_time_slots(self, *, slots_in_an_hour: float) -> Iterable[TimeslotBlock]:
"""Discretize all default slots into blocks of timeslots.
In the discretization each default slot corresponds to one block.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of TimeslotBlock
"""
slot_duration = timedelta(hours=1.0 / slots_in_an_hour)
slot_index = 0
for block_slot in DefaultSlot.objects.filter(event=self).order_by("start", "end"):
category_constraints = AKCategory.create_category_constraints(
block_slot.primary_categories.all()
)
slot_index = yield from self._generate_slots_from_block(
start=block_slot.start,
end=block_slot.end,
slot_duration=slot_duration,
slot_index=slot_index,
constraints=category_constraints,
)
def discretize_timeslots(self, *, slots_in_an_hour: float | None = None) -> Iterable[TimeslotBlock]:
""""Choose discretization scheme.
Uses default_time_slots if the event has any DefaultSlot, otherwise uniform_time_slots.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of TimeslotBlock
"""
if slots_in_an_hour is None:
slots_in_an_hour = 1.0 / float(self.export_slot)
if DefaultSlot.objects.filter(event=self).exists():
# discretize default slots if they exists
yield from merge_blocks(self.default_time_slots(slots_in_an_hour=slots_in_an_hour))
else:
yield from self.uniform_time_slots(slots_in_an_hour=slots_in_an_hour)
@transaction.atomic
def schedule_from_json(
self, schedule: str | dict[str, Any], *, check_for_data_inconsistency: bool = True
) -> int:
"""Load AK schedule from a json string.
:param schedule: A string that can be decoded to json, describing
the AK schedule. The json data is assumed to be constructed
following the output specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
"""
if isinstance(schedule, str):
schedule = json.loads(schedule)
export_dict = self.as_json_dict()
if "input" not in schedule or "scheduled_aks" not in schedule:
raise ValueError(_("Cannot parse malformed JSON input."))
if check_for_data_inconsistency and schedule["input"] != export_dict:
raise ValueError(_("Data has changed since the export. Reexport and run the solver again."))
slots_in_an_hour = schedule["input"]["timeslots"]["info"]["duration"]
timeslot_dict = {
timeslot.idx: timeslot
for block in self.discretize_timeslots(slots_in_an_hour=slots_in_an_hour)
for timeslot in block
}
slots_updated = 0
for scheduled_slot in schedule["scheduled_aks"]:
scheduled_slot["timeslot_ids"] = list(map(int, scheduled_slot["timeslot_ids"]))
slot = AKSlot.objects.get(id=int(scheduled_slot["ak_id"]))
if not scheduled_slot["timeslot_ids"]:
raise ValueError(
_("AK {ak_name} is not assigned any timeslot by the solver").format(ak_name=slot.ak.name)
)
start_timeslot = timeslot_dict[min(scheduled_slot["timeslot_ids"])].avail
end_timeslot = timeslot_dict[max(scheduled_slot["timeslot_ids"])].avail
solver_duration = (end_timeslot.end - start_timeslot.start).total_seconds() / 3600.0
if solver_duration + 2e-4 < slot.duration:
raise ValueError(
_(
"Duration of AK {ak_name} assigned by solver ({solver_duration} hours) "
"is less than the duration required by the slot ({slot_duration} hours)"
).format(
ak_name=slot.ak.name,
solver_duration=solver_duration,
slot_duration=slot.duration,
)
)
if slot.fixed:
solver_room = Room.objects.get(id=int(scheduled_slot["room_id"]))
if slot.room != solver_room:
raise ValueError(
_(
"Fixed AK {ak_name} assigned by solver to room {solver_room} "
"is fixed to room {slot_room}"
).format(
ak_name=slot.ak.name,
solver_room=solver_room.name,
slot_room=slot.room.name,
)
)
if slot.start != start_timeslot.start:
raise ValueError(
_(
"Fixed AK {ak_name} assigned by solver to start at {solver_start} "
"is fixed to start at {slot_start}"
).format(
ak_name=slot.ak.name,
solver_start=start_timeslot.start,
slot_start=slot.start,
)
)
else:
slot.room = Room.objects.get(id=int(scheduled_slot["room_id"]))
slot.start = start_timeslot.start
slot.save()
slots_updated += 1
return slots_updated
def as_json_dict(self) -> dict[str, Any]:
"""Return the json representation of this Event.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
def _test_event_not_covered(availabilities: list[Availability]) -> bool:
"""Test if event is not covered by availabilities."""
return not Availability.is_event_covered(self, availabilities)
def _test_akslot_fixed_in_timeslot(ak_slot: AKSlot, timeslot: Availability) -> bool:
"""Test if an AKSlot is fixed to overlap a timeslot slot."""
if not ak_slot.fixed or ak_slot.start is None:
return False
fixed_avail = Availability(event=self, start=ak_slot.start, end=ak_slot.end)
return fixed_avail.overlaps(timeslot, strict=True)
def _test_add_constraint(slot: Availability, availabilities: list[Availability]) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return (
_test_event_not_covered(availabilities) and slot.is_covered(availabilities)
)
def _generate_time_constraints(
avail_label: str,
avail_dict: dict,
timeslot_avail: Availability,
prefix: str = "availability",
) -> list[str]:
return [
f"{prefix}-{avail_label}-{pk}"
for pk, availabilities in avail_dict.items()
if _test_add_constraint(timeslot_avail, availabilities)
]
timeslots = {
"info": {"duration": float(self.export_slot)},
"blocks": [],
}
rooms = Room.objects.filter(event=self).order_by()
slots = AKSlot.objects.filter(event=self).order_by()
ak_availabilities = {
ak.pk: Availability.union(ak.availabilities.all())
for ak in AK.objects.filter(event=self).all()
}
room_availabilities = {
room.pk: Availability.union(room.availabilities.all())
for room in rooms
}
person_availabilities = {
person.pk: Availability.union(person.availabilities.all())
for person in AKOwner.objects.filter(event=self)
}
blocks = list(self.discretize_timeslots())
block_names = []
for block_idx, block in enumerate(blocks):
current_block = []
if not block:
continue
block_start = block[0].avail.start.astimezone(self.timezone)
block_end = block[-1].avail.end.astimezone(self.timezone)
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = block_start.strftime("%H:%M") + "" + block_end.strftime("%H:%M")
else:
# different days
time_str = block_start.strftime("%a %H:%M") + "" + block_end.strftime("%a %H:%M")
block_names.append([start_day, time_str])
block_timeconstraints = [f"notblock{idx}" for idx in range(len(blocks)) if idx != block_idx]
for timeslot in block:
time_constraints = []
# if reso_deadline is set and timeslot ends before it,
# add fulfilled time constraint 'resolution'
if self.reso_deadline is None or timeslot.avail.end < self.reso_deadline:
time_constraints.append("resolution")
# add fulfilled time constraints for all AKs that cannot happen during full event
time_constraints.extend(
_generate_time_constraints("ak", ak_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all persons that are not available for full event
time_constraints.extend(
_generate_time_constraints("person", person_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all rooms that are not available for full event
time_constraints.extend(
_generate_time_constraints("room", room_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all AKSlots fixed to happen during timeslot
time_constraints.extend([
f"fixed-akslot-{slot.id}"
for slot in AKSlot.objects.filter(event=self, fixed=True).exclude(start__isnull=True)
if _test_akslot_fixed_in_timeslot(slot, timeslot.avail)
])
time_constraints.extend(timeslot.constraints)
time_constraints.extend(block_timeconstraints)
current_block.append({
"id": timeslot.idx,
"info": {
"start": timeslot.avail.start.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
"end": timeslot.avail.end.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
},
"fulfilled_time_constraints": sorted(time_constraints),
})
timeslots["blocks"].append(current_block)
timeslots["info"]["blocknames"] = block_names
info_dict = {
"title": self.name,
"slug": self.slug
}
for attr in ["contact_email", "place"]:
if hasattr(self, attr) and getattr(self, attr):
info_dict[attr] = getattr(self, attr)
return {
"participants": [],
"rooms": [r.as_json_dict() for r in rooms],
"timeslots": timeslots,
"info": info_dict,
"aks": [ak.as_json_dict() for ak in slots],
}
class AKOwner(models.Model):
""" An AKOwner describes the person organizing/holding an AK.
@@ -273,6 +759,20 @@ class AKCategory(models.Model):
def __str__(self):
return self.name
@staticmethod
def create_category_constraints(categories: Iterable["AKCategory"]) -> set[str]:
"""Create a set of constraint strings from an AKCategory iterable.
:param categories: The iterable of categories to derive the constraint strings from.
:return: A set of category constraint strings, i.e. strings of the form
'availability-cat-<cat.name>'.
:rtype: set of strings.
"""
return {
f"availability-cat-{cat.name}"
for cat in categories
}
class AKTrack(models.Model):
""" An AKTrack describes a set of semantically related AKs.
@@ -415,7 +915,7 @@ class AK(models.Model):
availabilities = ', \n'.join(f'{a.simplified}' for a in Availability.objects.select_related('event')
.filter(ak=self))
detail_string = f"""{self.name}{" (R)" if self.reso else ""}:
{self.owners_list}
{_('Interest')}: {self.interest}"""
@@ -450,7 +950,7 @@ class AK(models.Model):
Get a list of stringified representations of all owners
:return: list of owners
:rtype: List[str]
:rtype: list[str]
"""
return ", ".join(str(owner) for owner in self.owners.all())
@@ -460,7 +960,7 @@ class AK(models.Model):
Get a list of stringified representations of all durations of associated slots
:return: list of durations
:rtype: List[str]
:rtype: list[str]
"""
return ", ".join(str(slot.duration_simplified) for slot in self.akslot_set.select_related('event').all())
@@ -566,6 +1066,44 @@ class Room(models.Model):
def __str__(self):
return self.title
def as_json_dict(self) -> dict[str, Any]:
"""Return a json representation of this room object.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
# check if room is available for the whole event
# -> no time constraint needs to be introduced
if Availability.is_event_covered(self.event, self.availabilities.all()):
time_constraints = []
else:
time_constraints = [f"availability-room-{self.pk}"]
data = {
"id": self.pk,
"info": {
"name": self.name,
},
"capacity": self.capacity,
"fulfilled_room_constraints": [constraint.name
for constraint in self.properties.all()],
"time_constraints": time_constraints
}
data["fulfilled_room_constraints"].append(f"fixed-room-{self.pk}")
if not any(constr.startswith("proxy") for constr in data["fulfilled_room_constraints"]):
data["fulfilled_room_constraints"].append("no-proxy")
data["fulfilled_room_constraints"].sort()
return data
class AKSlot(models.Model):
""" An AK Mapping matches an AK to a room during a certain time.
@@ -662,6 +1200,84 @@ class AKSlot(models.Model):
super().save(*args,
force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
def as_json_dict(self) -> dict[str, Any]:
"""Return a json representation of the AK object of this slot.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
# check if ak resp. owner is available for the whole event
# -> no time constraint needs to be introduced
if self.fixed and self.start is not None:
ak_time_constraints = [f"fixed-akslot-{self.id}"]
elif not Availability.is_event_covered(self.event, self.ak.availabilities.all()):
ak_time_constraints = [f"availability-ak-{self.ak.pk}"]
else:
ak_time_constraints = []
def _owner_time_constraints(owner: AKOwner):
owner_avails = owner.availabilities.all()
if not owner_avails or Availability.is_event_covered(self.event, owner_avails):
return []
return [f"availability-person-{owner.pk}"]
conflict_slots = AKSlot.objects.filter(ak__in=self.ak.conflicts.all())
dependency_slots = AKSlot.objects.filter(ak__in=self.ak.prerequisites.all())
other_ak_slots = AKSlot.objects.filter(ak=self.ak).exclude(pk=self.pk)
ceil_offet_eps = decimal.Decimal(1e-4)
data = {
"id": self.pk,
"duration": math.ceil(self.duration / self.event.export_slot - ceil_offet_eps),
"properties": {
"conflicts":
sorted(
[conflict.pk for conflict in conflict_slots.all()]
+ [second_slot.pk for second_slot in other_ak_slots.all()]
),
"dependencies": sorted([dep.pk for dep in dependency_slots.all()]),
},
"room_constraints": [constraint.name
for constraint in self.ak.requirements.all()],
"time_constraints": ["resolution"] if self.ak.reso else [],
"info": {
"name": self.ak.name,
"head": ", ".join([str(owner)
for owner in self.ak.owners.all()]),
"description": self.ak.description,
"reso": self.ak.reso,
"duration_in_hours": float(self.duration),
"django_ak_id": self.ak.pk,
"types": list(self.ak.types.values_list("name", flat=True).order_by()),
},
}
data["time_constraints"].extend(ak_time_constraints)
for owner in self.ak.owners.all():
data["time_constraints"].extend(_owner_time_constraints(owner))
if self.ak.category:
category_constraints = AKCategory.create_category_constraints([self.ak.category])
data["time_constraints"].extend(category_constraints)
if self.fixed and self.room is not None:
data["room_constraints"].append(f"fixed-room-{self.room.pk}")
if not any(constr.startswith("proxy") for constr in data["room_constraints"]):
data["room_constraints"].append("no-proxy")
data["room_constraints"].sort()
data["time_constraints"].sort()
return data
class AKOrgaMessage(models.Model):
"""
Loading