Skip to content
Snippets Groups Projects
Commit fb1905a5 authored by Felix Blanke's avatar Felix Blanke
Browse files

Move json dict assembly from view to model

parent 12bce444
No related branches found
No related tags found
2 merge requests!20Check data consistency at import,!17Add view to clear schedule
......@@ -488,6 +488,157 @@ class Event(models.Model):
return slots_updated
def as_json_dict(self) -> dict[str, Any]:
"""Return the json representation of this Event.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
def _test_event_not_covered(availabilities: list[Availability]) -> bool:
"""Test if event is not covered by availabilities."""
return not Availability.is_event_covered(self, availabilities)
def _test_akslot_fixed_in_timeslot(ak_slot: AKSlot, timeslot: Availability) -> bool:
"""Test if an AKSlot is fixed to overlap a timeslot slot."""
if not ak_slot.fixed or ak_slot.start is None:
return False
fixed_avail = Availability(event=self, start=ak_slot.start, end=ak_slot.end)
return fixed_avail.overlaps(timeslot, strict=True)
def _test_add_constraint(slot: Availability, availabilities: list[Availability]) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return (
_test_event_not_covered(availabilities) and slot.is_covered(availabilities)
)
def _generate_time_constraints(
avail_label: str,
avail_dict: dict,
timeslot_avail: Availability,
prefix: str = "availability",
) -> list[str]:
return [
f"{prefix}-{avail_label}-{pk}"
for pk, availabilities in avail_dict.items()
if _test_add_constraint(timeslot_avail, availabilities)
]
timeslots = {
"info": {"duration": float(self.export_slot)},
"blocks": [],
}
rooms = Room.objects.filter(event=self)
slots = AKSlot.objects.filter(event=self)
ak_availabilities = {
ak.pk: Availability.union(ak.availabilities.all())
for ak in AK.objects.filter(event=self).all()
}
room_availabilities = {
room.pk: Availability.union(room.availabilities.all())
for room in rooms
}
person_availabilities = {
person.pk: Availability.union(person.availabilities.all())
for person in AKOwner.objects.filter(event=self)
}
blocks = list(self.discretize_timeslots())
block_names = []
for block_idx, block in enumerate(blocks):
current_block = []
if not block:
continue
block_start = block[0].avail.start.astimezone(self.timezone)
block_end = block[-1].avail.end.astimezone(self.timezone)
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = block_start.strftime("%H:%M") + "" + block_end.strftime("%H:%M")
else:
# different days
time_str = block_start.strftime("%a %H:%M") + "" + block_end.strftime("%a %H:%M")
block_names.append([start_day, time_str])
block_timeconstraints = [f"notblock{idx}" for idx in range(len(blocks)) if idx != block_idx]
for timeslot in block:
time_constraints = []
# if reso_deadline is set and timeslot ends before it,
# add fulfilled time constraint 'resolution'
if self.reso_deadline is None or timeslot.avail.end < self.reso_deadline:
time_constraints.append("resolution")
# add fulfilled time constraints for all AKs that cannot happen during full event
time_constraints.extend(
_generate_time_constraints("ak", ak_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all persons that are not available for full event
time_constraints.extend(
_generate_time_constraints("person", person_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all rooms that are not available for full event
time_constraints.extend(
_generate_time_constraints("room", room_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all AKSlots fixed to happen during timeslot
time_constraints.extend([
f"fixed-akslot-{slot.id}"
for slot in AKSlot.objects.filter(event=self, fixed=True).exclude(start__isnull=True)
if _test_akslot_fixed_in_timeslot(slot, timeslot.avail)
])
time_constraints.extend(timeslot.constraints)
time_constraints.extend(block_timeconstraints)
current_block.append({
"id": timeslot.idx,
"info": {
"start": timeslot.avail.start.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
"end": timeslot.avail.end.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
},
"fulfilled_time_constraints": time_constraints,
})
timeslots["blocks"].append(current_block)
timeslots["info"]["blocknames"] = block_names
info_dict = {
"title": self.name,
"slug": self.slug
}
for attr in ["contact_email", "place"]:
if hasattr(self, attr) and getattr(self, attr):
info_dict[attr] = getattr(self, attr)
return {
"participants": [],
"rooms": [r.as_json_dict() for r in rooms],
"timeslots": timeslots,
"info": info_dict,
"aks": [ak.as_json_dict() for ak in slots],
}
class AKOwner(models.Model):
""" An AKOwner describes the person organizing/holding an AK.
"""
......
import json
from typing import List
from django.contrib import messages
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import ListView, DetailView
from AKModel.availability.models import Availability
from AKModel.metaviews.admin import AdminViewMixin, FilterByEventSlugMixin, EventSlugMixin, IntermediateAdminView, \
IntermediateAdminActionView
from AKModel.models import AKRequirement, AKSlot, Event, AKOrgaMessage, AK, Room, AKOwner
from AKModel.models import AKRequirement, AKSlot, Event, AKOrgaMessage, AK
class AKRequirementOverview(AdminViewMixin, FilterByEventSlugMixin, ListView):
......@@ -50,157 +48,19 @@ class AKJSONExportView(AdminViewMixin, FilterByEventSlugMixin, ListView):
context_object_name = "slots"
title = _("AK JSON Export")
def _test_event_not_covered(self, availabilities: List[Availability]) -> bool:
"""Test if event is not covered by availabilities."""
return not Availability.is_event_covered(self.event, availabilities)
def _test_akslot_fixed_in_timeslot(self, ak_slot: AKSlot, timeslot: Availability) -> bool:
"""Test if an AKSlot is fixed to overlap a timeslot slot."""
if not ak_slot.fixed or ak_slot.start is None:
return False
fixed_avail = Availability(event=self.event, start=ak_slot.start, end=ak_slot.end)
return fixed_avail.overlaps(timeslot, strict=True)
def _test_add_constraint(self, slot: Availability, availabilities: List[Availability]) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return (
self._test_event_not_covered(availabilities) and slot.is_covered(availabilities)
)
def _generate_time_constraints(
self,
avail_label: str,
avail_dict: dict,
timeslot_avail: Availability,
prefix: str = "availability",
) -> list[str]:
return [
f"{prefix}-{avail_label}-{pk}"
for pk, availabilities in avail_dict.items()
if self._test_add_constraint(timeslot_avail, availabilities)
]
def get_queryset(self):
return super().get_queryset().order_by("ak__track")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
timeslots = {
"info": {"duration": float(self.event.export_slot)},
"blocks": [],
}
rooms = Room.objects.filter(event=self.event)
ak_availabilities = {
ak.pk: Availability.union(ak.availabilities.all())
for ak in AK.objects.filter(event=self.event).all()
}
room_availabilities = {
room.pk: Availability.union(room.availabilities.all())
for room in rooms
}
person_availabilities = {
person.pk: Availability.union(person.availabilities.all())
for person in AKOwner.objects.filter(event=self.event)
}
blocks = list(self.event.discretize_timeslots())
block_names = []
for block_idx, block in enumerate(blocks):
current_block = []
if not block:
continue
block_start = block[0].avail.start.astimezone(self.event.timezone)
block_end = block[-1].avail.end.astimezone(self.event.timezone)
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = block_start.strftime("%H:%M") + "" + block_end.strftime("%H:%M")
else:
# different days
time_str = block_start.strftime("%a %H:%M") + "" + block_end.strftime("%a %H:%M")
block_names.append([start_day, time_str])
block_timeconstraints = [f"notblock{idx}" for idx in range(len(blocks)) if idx != block_idx]
for timeslot in block:
time_constraints = []
# if reso_deadline is set and timeslot ends before it,
# add fulfilled time constraint 'resolution'
if self.event.reso_deadline is None or timeslot.avail.end < self.event.reso_deadline:
time_constraints.append("resolution")
# add fulfilled time constraints for all AKs that cannot happen during full event
time_constraints.extend(
self._generate_time_constraints("ak", ak_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all persons that are not available for full event
time_constraints.extend(
self._generate_time_constraints("person", person_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all rooms that are not available for full event
time_constraints.extend(
self._generate_time_constraints("room", room_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all AKSlots fixed to happen during timeslot
time_constraints.extend([
f"fixed-akslot-{slot.id}"
for slot in AKSlot.objects.filter(event=self.event, fixed=True)
.exclude(start__isnull=True)
if self._test_akslot_fixed_in_timeslot(slot, timeslot.avail)
])
time_constraints.extend(timeslot.constraints)
time_constraints.extend(block_timeconstraints)
current_block.append({
"id": timeslot.idx,
"info": {
"start": timeslot.avail.start.astimezone(self.event.timezone).strftime("%Y-%m-%d %H:%M"),
"end": timeslot.avail.end.astimezone(self.event.timezone).strftime("%Y-%m-%d %H:%M"),
},
"fulfilled_time_constraints": time_constraints,
})
timeslots["blocks"].append(current_block)
timeslots["info"]["blocknames"] = block_names
info_dict = {
"title": self.event.name,
"slug": self.event.slug
}
for attr in ["contact_email", "place"]:
if hasattr(self.event, attr) and getattr(self.event, attr):
info_dict[attr] = getattr(self.event, attr)
data = {
"participants": [],
"rooms": [r.as_json_dict() for r in rooms],
"timeslots": timeslots,
"info": info_dict,
"aks": [ak.as_json_dict() for ak in context["slots"]],
}
data = self.event.as_json_dict()
context["json_data_oneline"] = json.dumps(data)
context["json_data"] = json.dumps(data, indent=2)
return context
class AKWikiExportView(AdminViewMixin, DetailView):
"""
View: Export AKs of this event in wiki syntax
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment