Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • konstantin/akplanning
  • matedealer/akplanning
  • kif/akplanning
  • mirco/akplanning
  • lordofthevoid/akplanning
  • voidptr/akplanning
  • xayomer/akplanning-fork
  • mollux/akplanning
  • neumantm/akplanning
  • mmarx/akplanning
  • nerf/akplanning
  • felix_bonn/akplanning
  • sebastian.uschmann/akplanning
13 results
Select Git revision
Show changes
Showing
with 1870 additions and 106 deletions
# Generated by Django 5.1.6 on 2025-03-29 22:05
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("AKModel", "0063_field_validators"),
]
operations = [
migrations.AddField(
model_name="event",
name="export_slot",
field=models.DecimalField(
decimal_places=2,
default=1,
help_text="Slot duration in hours that is used in the timeslot discretization, when this event is exported for the solver.",
max_digits=4,
verbose_name="Export Slot Length",
),
),
]
import decimal
import itertools
from datetime import timedelta
import json
import math
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Generator, Iterable
from django.db import models
from django.apps import apps
from django.core.validators import RegexValidator
from django.db import models, transaction
from django.db.models import Count
from django.urls import reverse_lazy
from django.utils import timezone
from django.utils.datetime_safe import datetime
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from simple_history.models import HistoricalRecords
from timezone_field import TimeZoneField
# Custom validators to be used for some of the fields
# Prevent inclusion of the quotation marks ' " ´ `
# This may be necessary to prevent javascript issues
no_quotation_marks_validator = RegexValidator(regex=r"['\"´`]+", inverse_match=True,
message=_('May not contain quotation marks'))
# Enforce that the field contains of at least one letter or digit (and not just special characters
# This prevents issues when autogenerating slugs from that field
slugable_validator = RegexValidator(regex=r"[\w\s]+", message=_('Must contain at least one letter or digit'))
@dataclass
class OptimizerTimeslot:
"""Class describing a discrete timeslot. Used to interface with an optimizer."""
avail: "Availability"
"""The availability object corresponding to this timeslot."""
idx: int
"""The unique index of this optimizer timeslot."""
constraints: set[str]
"""The set of time constraints fulfilled by this object."""
def merge(self, other: "OptimizerTimeslot") -> "OptimizerTimeslot":
"""Merge with other OptimizerTimeslot.
Creates a new OptimizerTimeslot object.
Its availability is constructed by merging the availabilities of self and other,
its constraints by taking the union of both constraint sets.
As an index, the index of self is used.
"""
avail = self.avail.merge_with(other.avail)
constraints = self.constraints.union(other.constraints)
return OptimizerTimeslot(
avail=avail, idx=self.idx, constraints=constraints
)
def __repr__(self) -> str:
return f"({self.avail.simplified}, {self.idx}, {self.constraints})"
TimeslotBlock = list[OptimizerTimeslot]
def merge_blocks(
blocks: Iterable[TimeslotBlock]
) -> Iterable[TimeslotBlock]:
"""Merge iterable of blocks together.
The timeslots of all blocks are grouped into maximal blocks.
Timeslots with the same start and end are identified with each other
and merged (cf `OptimizerTimeslot.merge`).
Throws a ValueError if any timeslots are overlapping but do not
share the same start and end, i.e. partial overlap is not allowed.
:param blocks: iterable of blocks to merge.
:return: iterable of merged blocks.
:rtype: iterable over lists of OptimizerTimeslot objects
"""
if not blocks:
return []
# flatten timeslot iterables to single chain
timeslot_chain = itertools.chain.from_iterable(blocks)
# sort timeslots according to start
timeslots = sorted(
timeslot_chain,
key=lambda slot: slot.avail.start
)
if not timeslots:
return []
all_blocks = []
current_block = [timeslots[0]]
timeslots = timeslots[1:]
for slot in timeslots:
if current_block and slot.avail.overlaps(current_block[-1].avail, strict=True):
if (
slot.avail.start == current_block[-1].avail.start
and slot.avail.end == current_block[-1].avail.end
):
# the same timeslot -> merge
current_block[-1] = current_block[-1].merge(slot)
else:
# partial overlap of interiors -> not supported
raise ValueError(
"Partially overlapping timeslots are not supported!"
f" ({current_block[-1].avail.simplified}, {slot.avail.simplified})"
)
elif not current_block or slot.avail.overlaps(current_block[-1].avail, strict=False):
# only endpoints in intersection -> same block
current_block.append(slot)
else:
# no overlap at all -> new block
all_blocks.append(current_block)
current_block = [slot]
if current_block:
all_blocks.append(current_block)
return all_blocks
class Event(models.Model):
"""
......@@ -32,7 +141,10 @@ class Event(models.Model):
help_text=_('When should AKs with intention to submit a resolution be done?'))
interest_start = models.DateTimeField(verbose_name=_('Interest Window Start'), blank=True, null=True,
help_text=_('Opening time for expression of interest.'))
help_text=
_('Opening time for expression of interest. When left blank, no interest '
'indication will be possible.'))
interest_end = models.DateTimeField(verbose_name=_('Interest Window End'), blank=True, null=True,
help_text=_('Closing time for expression of interest.'))
......@@ -43,16 +155,22 @@ class Event(models.Model):
plan_hidden = models.BooleanField(verbose_name=_('Plan Hidden'), help_text=_('Hides plan for non-staff users'),
default=True)
plan_published_at = models.DateTimeField(verbose_name=_('Plan published at'), blank=True, null=True,
help_text=_('Timestamp at which the plan was published'))
help_text=_('Timestamp at which the plan was published'))
base_url = models.URLField(verbose_name=_("Base URL"), help_text=_("Prefix for wiki link construction"), blank=True)
wiki_export_template_name = models.CharField(verbose_name=_("Wiki Export Template Name"), blank=True, max_length=50)
default_slot = models.DecimalField(max_digits=4, decimal_places=2, default=2, verbose_name=_('Default Slot Length'),
help_text=_('Default length in hours that is assumed for AKs in this event.'))
export_slot = models.DecimalField(max_digits=4, decimal_places=2, default=1, verbose_name=_('Export Slot Length'),
help_text=_(
'Slot duration in hours that is used in the timeslot discretization, '
'when this event is exported for the solver.'
))
contact_email = models.EmailField(verbose_name=_("Contact email address"), blank=True,
help_text=_("An email address that is displayed on every page "
"and can be used for all kinds of questions"))
help_text=_("An email address that is displayed on every page "
"and can be used for all kinds of questions"))
class Meta:
verbose_name = _('Event')
......@@ -83,7 +201,7 @@ class Event(models.Model):
event = Event.objects.filter(active=True).order_by('start').first()
# No active event? Return the next event taking place
if event is None:
event = Event.objects.order_by('start').filter(start__gt=datetime.now()).first()
event = Event.objects.order_by('start').filter(start__gt=datetime.now().astimezone()).first()
return event
def get_categories_with_aks(self, wishes_seperately=False,
......@@ -160,11 +278,394 @@ class Event(models.Model):
.filter(availabilities__count=0, owners__count__gt=0)
)
def _generate_slots_from_block(
self,
start: datetime,
end: datetime,
slot_duration: timedelta,
*,
slot_index: int = 0,
constraints: set[str] | None = None,
) -> Generator[TimeslotBlock, None, int]:
"""Discretize a time range into timeslots.
Uses a uniform discretization into discrete slots of length `slot_duration`,
starting at `start`. No incomplete timeslots are generated, i.e.
if (`end` - `start`) is not a whole number multiple of `slot_duration`
then the last incomplete timeslot is dropped.
:param start: Start of the time range.
:param end: Start of the time range.
:param slot_duration: Duration of a single timeslot in the discretization.
:param slot_index: index of the first timeslot. Defaults to 0.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of OptimizerTimeslot
:return: The first slot index after the yielded blocks, i.e.
`slot_index` + total # generated timeslots
:rtype: int
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
current_slot_start = start
previous_slot_start: datetime | None = None
if constraints is None:
constraints = set()
current_block = []
room_availabilities = list({
availability
for room in Room.objects.filter(event=self)
for availability in room.availabilities.all()
})
while current_slot_start + slot_duration <= end:
slot = Availability(
event=self,
start=current_slot_start,
end=current_slot_start + slot_duration,
)
if any((availability.contains(slot) for availability in room_availabilities)):
# no gap in a block
if (
previous_slot_start is not None
and previous_slot_start + slot_duration < current_slot_start
):
yield current_block
current_block = []
current_block.append(
OptimizerTimeslot(avail=slot, idx=slot_index, constraints=constraints)
)
previous_slot_start = current_slot_start
slot_index += 1
current_slot_start += slot_duration
if current_block:
yield current_block
return slot_index
def uniform_time_slots(self, *, slots_in_an_hour: float) -> Iterable[TimeslotBlock]:
"""Uniformly discretize the entire event into blocks of timeslots.
Discretizes entire event uniformly. May not necessarily result in a single block
as slots with no room availability are dropped.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of OptimizerTimeslot
"""
all_category_constraints = AKCategory.create_category_optimizer_constraints(
AKCategory.objects.filter(event=self).all()
)
yield from self._generate_slots_from_block(
start=self.start,
end=self.end,
slot_duration=timedelta(hours=1.0 / slots_in_an_hour),
constraints=all_category_constraints,
)
def default_time_slots(self, *, slots_in_an_hour: float) -> Iterable[TimeslotBlock]:
"""Discretize all default slots into blocks of timeslots.
In the discretization each default slot corresponds to one block.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of TimeslotBlock
"""
slot_duration = timedelta(hours=1.0 / slots_in_an_hour)
slot_index = 0
for block_slot in DefaultSlot.objects.filter(event=self).order_by("start", "end"):
category_constraints = AKCategory.create_category_optimizer_constraints(
block_slot.primary_categories.all()
)
slot_index = yield from self._generate_slots_from_block(
start=block_slot.start,
end=block_slot.end,
slot_duration=slot_duration,
slot_index=slot_index,
constraints=category_constraints,
)
def discretize_timeslots(self, *, slots_in_an_hour: float | None = None) -> Iterable[TimeslotBlock]:
""""Choose discretization scheme.
Uses default_time_slots if the event has any DefaultSlot, otherwise uniform_time_slots.
:param slots_in_an_hour: The percentage of an hour covered by a single slot.
Determines the discretization granularity.
:yield: Block of optimizer timeslots as the discretization result.
:ytype: list of TimeslotBlock
"""
if slots_in_an_hour is None:
slots_in_an_hour = 1.0 / float(self.export_slot)
if DefaultSlot.objects.filter(event=self).exists():
# discretize default slots if they exists
yield from merge_blocks(self.default_time_slots(slots_in_an_hour=slots_in_an_hour))
else:
yield from self.uniform_time_slots(slots_in_an_hour=slots_in_an_hour)
@transaction.atomic
def schedule_from_json(
self, schedule: str | dict[str, Any], *, check_for_data_inconsistency: bool = True
) -> int:
"""Load AK schedule from a json string.
:param schedule: A string that can be decoded to json, describing
the AK schedule. The json data is assumed to be constructed
following the output specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
"""
if isinstance(schedule, str):
schedule = json.loads(schedule)
if "input" not in schedule or "scheduled_aks" not in schedule:
raise ValueError(_("Cannot parse malformed JSON input."))
if check_for_data_inconsistency:
export_dict = self.as_json_dict()
if schedule["input"] != export_dict:
raise ValueError(_("Data has changed since the export. Reexport and run the solver again."))
slots_in_an_hour = schedule["input"]["timeslots"]["info"]["duration"]
timeslot_dict = {
timeslot.idx: timeslot
for block in self.discretize_timeslots(slots_in_an_hour=slots_in_an_hour)
for timeslot in block
}
slots_updated = 0
for scheduled_slot in schedule["scheduled_aks"]:
scheduled_slot["timeslot_ids"] = list(map(int, scheduled_slot["timeslot_ids"]))
slot = AKSlot.objects.get(id=int(scheduled_slot["ak_id"]))
if not scheduled_slot["timeslot_ids"]:
raise ValueError(
_("AK {ak_name} is not assigned any timeslot by the solver").format(ak_name=slot.ak.name)
)
start_timeslot = timeslot_dict[min(scheduled_slot["timeslot_ids"])].avail
end_timeslot = timeslot_dict[max(scheduled_slot["timeslot_ids"])].avail
solver_duration = (end_timeslot.end - start_timeslot.start).total_seconds() / 3600.0
if solver_duration + 2e-4 < slot.duration:
raise ValueError(
_(
"Duration of AK {ak_name} assigned by solver ({solver_duration} hours) "
"is less than the duration required by the slot ({slot_duration} hours)"
).format(
ak_name=slot.ak.name,
solver_duration=solver_duration,
slot_duration=slot.duration,
)
)
if slot.fixed:
solver_room = Room.objects.get(id=int(scheduled_slot["room_id"]))
if slot.room != solver_room:
raise ValueError(
_(
"Fixed AK {ak_name} assigned by solver to room {solver_room} "
"is fixed to room {slot_room}"
).format(
ak_name=slot.ak.name,
solver_room=solver_room.name,
slot_room=slot.room.name,
)
)
if slot.start != start_timeslot.start:
raise ValueError(
_(
"Fixed AK {ak_name} assigned by solver to start at {solver_start} "
"is fixed to start at {slot_start}"
).format(
ak_name=slot.ak.name,
solver_start=start_timeslot.start,
slot_start=slot.start,
)
)
else:
slot.room = Room.objects.get(id=int(scheduled_slot["room_id"]))
slot.start = start_timeslot.start
slot.save()
slots_updated += 1
return slots_updated
def as_json_dict(self) -> dict[str, Any]:
"""Return the json representation of this Event.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
def _check_event_not_covered(availabilities: list[Availability]) -> bool:
"""Test if event is not covered by availabilities."""
return not Availability.is_event_covered(self, availabilities)
def _check_akslot_fixed_in_timeslot(ak_slot: AKSlot, timeslot: Availability) -> bool:
"""Test if an AKSlot is fixed to overlap a timeslot slot."""
if not ak_slot.fixed or ak_slot.start is None:
return False
fixed_avail = Availability(event=self, start=ak_slot.start, end=ak_slot.end)
return fixed_avail.overlaps(timeslot, strict=True)
def _check_add_constraint(slot: Availability, availabilities: list[Availability]) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return (
_check_event_not_covered(availabilities) and slot.is_covered(availabilities)
)
def _generate_time_constraints(
avail_label: str,
avail_dict: dict,
timeslot_avail: Availability,
prefix: str = "availability",
) -> list[str]:
return [
f"{prefix}-{avail_label}-{pk}"
for pk, availabilities in avail_dict.items()
if _check_add_constraint(timeslot_avail, availabilities)
]
timeslots = {
"info": {"duration": float(self.export_slot)},
"blocks": [],
}
rooms = Room.objects.filter(event=self).order_by()
slots = AKSlot.objects.filter(event=self).order_by()
ak_availabilities = {
ak.pk: Availability.union(ak.availabilities.all())
for ak in AK.objects.filter(event=self).all()
}
room_availabilities = {
room.pk: Availability.union(room.availabilities.all())
for room in rooms
}
person_availabilities = {
person.pk: Availability.union(person.availabilities.all())
for person in AKOwner.objects.filter(event=self)
}
blocks = list(self.discretize_timeslots())
block_names = []
for block_idx, block in enumerate(blocks):
current_block = []
if not block:
continue
block_start = block[0].avail.start.astimezone(self.timezone)
block_end = block[-1].avail.end.astimezone(self.timezone)
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = block_start.strftime("%H:%M") + " - " + block_end.strftime("%H:%M")
else:
# different days
time_str = block_start.strftime("%a %H:%M") + " - " + block_end.strftime("%a %H:%M")
block_names.append([start_day, time_str])
block_timeconstraints = [f"notblock{idx}" for idx in range(len(blocks)) if idx != block_idx]
for timeslot in block:
time_constraints = []
# if reso_deadline is set and timeslot ends before it,
# add fulfilled time constraint 'resolution'
if self.reso_deadline is None or timeslot.avail.end < self.reso_deadline:
time_constraints.append("resolution")
# add fulfilled time constraints for all AKs that cannot happen during full event
time_constraints.extend(
_generate_time_constraints("ak", ak_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all persons that are not available for full event
time_constraints.extend(
_generate_time_constraints("person", person_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all rooms that are not available for full event
time_constraints.extend(
_generate_time_constraints("room", room_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all AKSlots fixed to happen during timeslot
time_constraints.extend([
f"fixed-akslot-{slot.id}"
for slot in AKSlot.objects.filter(event=self, fixed=True).exclude(start__isnull=True)
if _check_akslot_fixed_in_timeslot(slot, timeslot.avail)
])
time_constraints.extend(timeslot.constraints)
time_constraints.extend(block_timeconstraints)
current_block.append({
"id": timeslot.idx,
"info": {
"start": timeslot.avail.start.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
"end": timeslot.avail.end.astimezone(self.timezone).strftime("%Y-%m-%d %H:%M"),
},
"fulfilled_time_constraints": sorted(time_constraints),
})
timeslots["blocks"].append(current_block)
timeslots["info"]["blocknames"] = block_names
info_dict = {
"title": self.name,
"slug": self.slug
}
for attr in ["contact_email", "place"]:
if hasattr(self, attr) and getattr(self, attr):
info_dict[attr] = getattr(self, attr)
return {
"participants": [],
"rooms": [r.as_json_dict() for r in rooms],
"timeslots": timeslots,
"info": info_dict,
"aks": [ak.as_json_dict() for ak in slots],
}
class AKOwner(models.Model):
""" An AKOwner describes the person organizing/holding an AK.
"""
name = models.CharField(max_length=64, verbose_name=_('Nickname'), help_text=_('Name to identify an AK owner by'))
name = models.CharField(max_length=64, verbose_name=_('Nickname'),
validators=[no_quotation_marks_validator, slugable_validator],
help_text=_('Name to identify an AK owner by'))
slug = models.SlugField(max_length=64, blank=True, verbose_name=_('Slug'), help_text=_('Slug for URL generation'))
institution = models.CharField(max_length=128, blank=True, verbose_name=_('Institution'), help_text=_('Uni etc.'))
link = models.URLField(blank=True, verbose_name=_('Web Link'), help_text=_('Link to Homepage'))
......@@ -243,8 +744,8 @@ class AKCategory(models.Model):
description = models.TextField(blank=True, verbose_name=_("Description"),
help_text=_("Short description of this AK Category"))
present_by_default = models.BooleanField(blank=True, default=True, verbose_name=_("Present by default"),
help_text=_("Present AKs of this category by default "
"if AK owner did not specify whether this AK should be presented?"))
help_text=_("Present AKs of this category by default if AK owner did not "
"specify whether this AK should be presented?"))
event = models.ForeignKey(to=Event, on_delete=models.CASCADE, verbose_name=_('Event'),
help_text=_('Associated event'))
......@@ -258,6 +759,20 @@ class AKCategory(models.Model):
def __str__(self):
return self.name
@staticmethod
def create_category_optimizer_constraints(categories: Iterable["AKCategory"]) -> set[str]:
"""Create a set of constraint strings from an AKCategory iterable.
:param categories: The iterable of categories to derive the constraint strings from.
:return: A set of category constraint strings, i.e. strings of the form
'availability-cat-<cat.name>'.
:rtype: set of strings.
"""
return {
f"availability-cat-{cat.name}"
for cat in categories
}
class AKTrack(models.Model):
""" An AKTrack describes a set of semantically related AKs.
......@@ -304,23 +819,46 @@ class AKRequirement(models.Model):
return self.name
class AKType(models.Model):
""" An AKType allows to associate one or multiple types with an AK, e.g., to better describe the format of that AK
or to which group of people it is addressed. Types are specified per event and are an optional feature.
"""
name = models.CharField(max_length=128, verbose_name=_('Name'), help_text=_('Name describing the type'))
event = models.ForeignKey(to=Event, on_delete=models.CASCADE, verbose_name=_('Event'),
help_text=_('Associated event'))
class Meta:
verbose_name = _('AK Type')
verbose_name_plural = _('AK Types')
ordering = ['name']
unique_together = ['event', 'name']
def __str__(self):
return self.name
class AK(models.Model):
""" An AK is a slot-based activity to be scheduled during an event.
"""
name = models.CharField(max_length=256, verbose_name=_('Name'), help_text=_('Name of the AK'))
name = models.CharField(max_length=256, verbose_name=_('Name'), help_text=_('Name of the AK'),
validators=[no_quotation_marks_validator, slugable_validator])
short_name = models.CharField(max_length=64, blank=True, verbose_name=_('Short Name'),
validators=[no_quotation_marks_validator],
help_text=_('Name displayed in the schedule'))
description = models.TextField(blank=True, verbose_name=_('Description'), help_text=_('Description of the AK'))
owners = models.ManyToManyField(to=AKOwner, blank=True, verbose_name=_('Owners'),
help_text=_('Those organizing the AK'))
# TODO generate automatically
# Will be automatically generated in save method if not set
link = models.URLField(blank=True, verbose_name=_('Web Link'), help_text=_('Link to wiki page'))
protocol_link = models.URLField(blank=True, verbose_name=_('Protocol Link'), help_text=_('Link to protocol'))
category = models.ForeignKey(to=AKCategory, on_delete=models.PROTECT, verbose_name=_('Category'),
help_text=_('Category of the AK'))
types = models.ManyToManyField(to=AKType, blank=True, verbose_name=_('Types'),
help_text=_("This AK is"))
track = models.ForeignKey(to=AKTrack, blank=True, on_delete=models.SET_NULL, null=True, verbose_name=_('Track'),
help_text=_('Track the AK belongs to'))
......@@ -338,8 +876,8 @@ class AK(models.Model):
help_text=_('AKs that should precede this AK in the schedule'))
notes = models.TextField(blank=True, verbose_name=_('Organizational Notes'), help_text=_(
'Notes to organizers. These are public. For private notes, please use the button for private messages '
'on the detail page of this AK (after creation/editing).'))
'Notes to organizers. These are public. For private notes, please use the button for private messages '
'on the detail page of this AK (after creation/editing).'))
interest = models.IntegerField(default=-1, verbose_name=_('Interest'), help_text=_('Expected number of people'))
interest_counter = models.IntegerField(default=0, verbose_name=_('Interest Counter'),
......@@ -351,7 +889,7 @@ class AK(models.Model):
include_in_export = models.BooleanField(default=True, verbose_name=_('Export?'),
help_text=_("Include AK in wiki export?"))
history = HistoricalRecords(excluded_fields=['interest_counter', 'include_in_export'])
history = HistoricalRecords(excluded_fields=['interest', 'interest_counter', 'include_in_export'])
class Meta:
verbose_name = _('AK')
......@@ -367,7 +905,7 @@ class AK(models.Model):
@property
def details(self):
"""
Generate a detailled string representation, e.g., for usage in scheduling
Generate a detailed string representation, e.g., for usage in scheduling
:return: string representation of that AK with all details
:rtype: str
"""
......@@ -376,15 +914,35 @@ class AK(models.Model):
from AKModel.availability.models import Availability
availabilities = ', \n'.join(f'{a.simplified}' for a in Availability.objects.select_related('event')
.filter(ak=self))
return f"""{self.name}{" (R)" if self.reso else ""}:
detail_string = f"""{self.name}{" (R)" if self.reso else ""}:
{self.owners_list}
{_('Interest')}: {self.interest}
{_("Requirements")}: {", ".join(str(r) for r in self.requirements.all())}
{_("Conflicts")}: {", ".join(str(c) for c in self.conflicts.all())}
{_("Prerequisites")}: {", ".join(str(p) for p in self.prerequisites.all())}
{_("Availabilities")}: \n{availabilities}"""
{_('Interest')}: {self.interest}"""
if self.requirements.count() > 0:
detail_string += f"\n{_('Requirements')}: {', '.join(str(r) for r in self.requirements.all())}"
if self.types.count() > 0:
detail_string += f"\n{_('Types')}: {', '.join(str(r) for r in self.types.all())}"
# Find conflicts
# (both directions, those specified for this AK and those were this AK was specified as conflict)
# Deduplicate and order list alphabetically
conflicts = set()
if self.conflicts.count() > 0:
for c in self.conflicts.all():
conflicts.add(str(c))
if self.conflict.count() > 0:
for c in self.conflict.all():
conflicts.add(str(c))
if len(conflicts) > 0:
conflicts = list(conflicts)
conflicts.sort()
detail_string += f"\n{_('Conflicts')}: {', '.join(conflicts)}"
if self.prerequisites.count() > 0:
detail_string += f"\n{_('Prerequisites')}: {', '.join(str(p) for p in self.prerequisites.all())}"
detail_string += f"\n{_('Availabilities')}: \n{availabilities}"
return detail_string
@property
def owners_list(self):
......@@ -392,7 +950,7 @@ class AK(models.Model):
Get a list of stringified representations of all owners
:return: list of owners
:rtype: List[str]
:rtype: list[str]
"""
return ", ".join(str(owner) for owner in self.owners.all())
......@@ -402,7 +960,7 @@ class AK(models.Model):
Get a list of stringified representations of all durations of associated slots
:return: list of durations
:rtype: List[str]
:rtype: list[str]
"""
return ", ".join(str(slot.duration_simplified) for slot in self.akslot_set.select_related('event').all())
......@@ -460,6 +1018,18 @@ class AK(models.Model):
return reverse_lazy('submit:ak_detail', kwargs={'event_slug': self.event.slug, 'pk': self.id})
return self.edit_url
def save(self, *args, force_insert=False, force_update=False, using=None, update_fields=None):
# Auto-Generate Link if not set yet
if self.link == "":
link = self.event.base_url + self.name.replace(" ", "_")
# Truncate links longer than 200 characters (default length of URL fields in django)
self.link = link[:200]
# Tell Django that we have updated the link field
if update_fields is not None:
update_fields = {"link"}.union(update_fields)
super().save(*args,
force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
class Room(models.Model):
""" A room describes where an AK can be held.
......@@ -496,6 +1066,44 @@ class Room(models.Model):
def __str__(self):
return self.title
def as_json_dict(self) -> dict[str, Any]:
"""Return a json representation of this room object.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
# check if room is available for the whole event
# -> no time constraint needs to be introduced
if Availability.is_event_covered(self.event, self.availabilities.all()):
time_constraints = []
else:
time_constraints = [f"availability-room-{self.pk}"]
data = {
"id": self.pk,
"info": {
"name": self.name,
},
"capacity": self.capacity,
"fulfilled_room_constraints": [constraint.name
for constraint in self.properties.all()],
"time_constraints": time_constraints
}
data["fulfilled_room_constraints"].append(f"fixed-room-{self.pk}")
if not any(constr.startswith("proxy") for constr in data["fulfilled_room_constraints"]):
data["fulfilled_room_constraints"].append("no-proxy")
data["fulfilled_room_constraints"].sort()
return data
class AKSlot(models.Model):
""" An AK Mapping matches an AK to a room during a certain time.
......@@ -575,7 +1183,7 @@ class AKSlot(models.Model):
def overlaps(self, other: "AKSlot"):
"""
Check wether two slots overlap
Check whether two slots overlap
:param other: second slot to compare with
:return: true if they overlap, false if not:
......@@ -583,6 +1191,93 @@ class AKSlot(models.Model):
"""
return self.start < other.end <= self.end or self.start <= other.start < self.end
def save(self, *args, force_insert=False, force_update=False, using=None, update_fields=None):
# Make sure duration is not longer than the event
if update_fields is None or 'duration' in update_fields:
event_duration = self.event.end - self.event.start
event_duration_hours = event_duration.days * 24 + event_duration.seconds // 3600
self.duration = min(self.duration, event_duration_hours)
super().save(*args,
force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
def as_json_dict(self) -> dict[str, Any]:
"""Return a json representation of the AK object of this slot.
:return: The json dict representation is constructed
following the input specification of the KoMa conference optimizer, cf.
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
:rtype: dict[str, Any]
"""
# local import to prevent cyclic import
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
# check if ak resp. owner is available for the whole event
# -> no time constraint needs to be introduced
if self.fixed and self.start is not None:
ak_time_constraints = [f"fixed-akslot-{self.id}"]
elif not Availability.is_event_covered(self.event, self.ak.availabilities.all()):
ak_time_constraints = [f"availability-ak-{self.ak.pk}"]
else:
ak_time_constraints = []
def _owner_time_constraints(owner: AKOwner):
owner_avails = owner.availabilities.all()
if not owner_avails or Availability.is_event_covered(self.event, owner_avails):
return []
return [f"availability-person-{owner.pk}"]
conflict_slots = AKSlot.objects.filter(ak__in=self.ak.conflicts.all())
dependency_slots = AKSlot.objects.filter(ak__in=self.ak.prerequisites.all())
other_ak_slots = AKSlot.objects.filter(ak=self.ak).exclude(pk=self.pk)
ceil_offet_eps = decimal.Decimal(1e-4)
data = {
"id": self.pk,
"duration": math.ceil(self.duration / self.event.export_slot - ceil_offet_eps),
"properties": {
"conflicts":
sorted(
[conflict.pk for conflict in conflict_slots.all()]
+ [second_slot.pk for second_slot in other_ak_slots.all()]
),
"dependencies": sorted([dep.pk for dep in dependency_slots.all()]),
},
"room_constraints": [constraint.name
for constraint in self.ak.requirements.all()],
"time_constraints": ["resolution"] if self.ak.reso else [],
"info": {
"name": self.ak.name,
"head": ", ".join([str(owner)
for owner in self.ak.owners.all()]),
"description": self.ak.description,
"reso": self.ak.reso,
"duration_in_hours": float(self.duration),
"django_ak_id": self.ak.pk,
"types": list(self.ak.types.values_list("name", flat=True).order_by()),
},
}
data["time_constraints"].extend(ak_time_constraints)
for owner in self.ak.owners.all():
data["time_constraints"].extend(_owner_time_constraints(owner))
if self.ak.category:
category_constraints = AKCategory.create_category_optimizer_constraints([self.ak.category])
data["time_constraints"].extend(category_constraints)
if self.fixed and self.room is not None:
data["room_constraints"].append(f"fixed-room-{self.room.pk}")
if not any(constr.startswith("proxy") for constr in data["room_constraints"]):
data["room_constraints"].append("no-proxy")
data["room_constraints"].sort()
data["time_constraints"].sort()
return data
class AKOrgaMessage(models.Model):
"""
......@@ -595,6 +1290,8 @@ class AKOrgaMessage(models.Model):
timestamp = models.DateTimeField(auto_now_add=True)
event = models.ForeignKey(to=Event, on_delete=models.CASCADE, verbose_name=_('Event'),
help_text=_('Associated event'))
resolved = models.BooleanField(verbose_name=_('Resolved'), default=False,
help_text=_('This message has been resolved (no further action needed)'))
class Meta:
verbose_name = _('AK Orga Message')
......@@ -614,6 +1311,7 @@ class ConstraintViolation(models.Model):
Depending on the type, different fields (references to other models) will be filled. Each violation should always
be related to an event and at least on other instance of a causing entity
"""
class Meta:
verbose_name = _('Constraint Violation')
verbose_name_plural = _('Constraint Violations')
......@@ -630,7 +1328,7 @@ class ConstraintViolation(models.Model):
AK_CONFLICT_COLLISION = 'acc', _('AK Slot is scheduled at the same time as an AK listed as a conflict')
AK_BEFORE_PREREQUISITE = 'abp', _('AK Slot is scheduled before an AK listed as a prerequisite')
AK_AFTER_RESODEADLINE = 'aar', _(
'AK Slot for AK with intention to submit a resolution is scheduled after resolution deadline')
'AK Slot for AK with intention to submit a resolution is scheduled after resolution deadline')
AK_CATEGORY_MISMATCH = 'acm', _('AK Slot in a category is outside that categories availabilities')
AK_SLOT_COLLISION = 'asc', _('Two AK Slots for the same AK scheduled at the same time')
ROOM_CAPACITY_EXCEEDED = 'rce', _('Room does not have enough space for interest in scheduled AK Slot')
......@@ -831,6 +1529,7 @@ class DefaultSlot(models.Model):
Model representing a default slot,
i.e., a prefered slot to use for typical AKs in the schedule to guarantee enough breaks etc.
"""
class Meta:
verbose_name = _('Default Slot')
verbose_name_plural = _('Default Slots')
......@@ -843,7 +1542,8 @@ class DefaultSlot(models.Model):
help_text=_('Associated event'))
primary_categories = models.ManyToManyField(to=AKCategory, verbose_name=_('Primary categories'), blank=True,
help_text=_('Categories that should be assigned to this slot primarily'))
help_text=_(
'Categories that should be assigned to this slot primarily'))
@property
def start_simplified(self) -> str:
......
......@@ -4,8 +4,8 @@
{% block content %}
<pre>
title;duration;who;requirements;prerequisites;conflicts;availabilities;category;track;reso;notes;
{% for slot in slots %}{{ slot.ak.short_name }};{{ slot.duration }};{{ slot.ak.owners.all|join:", " }};{{ slot.ak.requirements.all|join:", " }};{{ slot.ak.prerequisites.all|join:", " }};{{ slot.ak.conflicts.all|join:", " }};{% for a in slot.ak.availabilities.all %}{{ a.start | timezone:event.timezone | date:"l H:i" }} - {{ a.end | timezone:event.timezone | date:"l H:i" }}, {% endfor %};{{ slot.ak.category }};{{ slot.ak.track }};{{ slot.ak.reso }};{{ slot.ak.notes }};
title;duration;who;requirements;prerequisites;conflicts;availabilities;category;types;track;reso;notes;
{% for slot in slots %}{{ slot.ak.short_name }};{{ slot.duration }};{{ slot.ak.owners.all|join:", " }};{{ slot.ak.requirements.all|join:", " }};{{ slot.ak.prerequisites.all|join:", " }};{{ slot.ak.conflicts.all|join:", " }};{% for a in slot.ak.availabilities.all %}{{ a.start | timezone:event.timezone | date:"l H:i" }} - {{ a.end | timezone:event.timezone | date:"l H:i" }}, {% endfor %};{{ slot.ak.category }};{{ slot.ak.types.all|join:", " }};{{ slot.ak.track }};{{ slot.ak.reso }};{{ slot.ak.notes }};
{% endfor %}
</pre>
{% endblock %}
{% extends "admin/base_site.html" %}
{% load tz %}
{% block content %}
<div class="mb-3">
<label for="export_single_line" class="form-label">Exported JSON:</label>
<input readonly type="text" name="export_single_line" class="form-control form-control-sm" value="{{ json_data_oneline }}" style="font-family:var(--font-family-monospace);">
</div>
<div class="mb-3">
<label class="form-label">Exported JSON (indented for better readability):</label>
<pre class="prettyprint border rounded p-2">{{ json_data }}</pre>
</div>
<script>
$(document).ready(function() {
// JSON highlighting.
prettyPrint();
}
)
</script>
{% endblock %}
{% extends "admin/base_site.html" %}
{% load tags_AKModel %}
{% load i18n %}
{% load tz %}
{% load fontawesome_6 %}
{% block title %}{% trans "AKs by Owner" %}: {{owner}}{% endblock %}
{% block content %}
{% timezone event.timezone %}
<h2>[{{event}}] <a href="{% url 'admin:AKModel_akowner_change' owner.pk %}">{{owner}}</a> - {% trans "AKs" %}</h2>
<div class="row mt-4">
<table class="table table-striped">
{% for ak in owner.ak_set.all %}
<tr>
<td>{{ ak }}</td>
{% if "AKSubmission"|check_app_installed %}
<td class="text-end">
<a href="{{ ak.detail_url }}" data-bs-toggle="tooltip"
title="{% trans 'Details' %}"
class="btn btn-primary">{% fa6_icon 'info' 'fas' %}</a>
{% if event.active %}
<a href="{{ ak.edit_url }}" data-bs-toggle="tooltip"
title="{% trans 'Edit' %}"
class="btn btn-success">{% fa6_icon 'pencil-alt' 'fas' %}</a>
{% endif %}
{% endif %}
</td>
</tr>
{% empty %}
<tr><td>{% trans "This user does not have any AKs currently" %}</td></tr>
{% endfor %}
</table>
</div>
{% endtimezone %}
{% endblock %}
......@@ -8,6 +8,11 @@
{% block title %}{% trans "New event wizard" %}: {{ wizard_step_text }}{% endblock %}
{% block extrahead %}
{{ block.super }}
{{ form.media }}
{% endblock %}
{% block content %}
{% include "admin/AKModel/event_wizard/wizard_steps.html" %}
......@@ -17,8 +22,6 @@
<h5 class="mb-3">{% trans "Successfully imported.<br><br>Do you want to activate your event now?" %}</h5>
{{ form.media }}
<form method="post">{% csrf_token %}
{% bootstrap_form form %}
......
......@@ -8,6 +8,11 @@
{% block title %}{% trans "New event wizard" %}: {{ wizard_step_text }}{% endblock %}
{% block extrahead %}
{{ block.super }}
{{ form.media }}
{% endblock %}
{% block content %}
{% include "admin/AKModel/event_wizard/wizard_steps.html" %}
......@@ -29,8 +34,6 @@
<h5 class="mb-3">{% trans "Your event was created and can now be further configured." %}</h5>
{{ form.media }}
<form method="post">{% csrf_token %}
{% bootstrap_form form %}
......
......@@ -8,11 +8,14 @@
{% block title %}{% trans "New event wizard" %}: {{ wizard_step_text }}{% endblock %}
{% block extrahead %}
{{ block.super }}
{{ form.media }}
{% endblock %}
{% block content %}
{% include "admin/AKModel/event_wizard/wizard_steps.html" %}
{{ form.media }}
<form method="post">{% csrf_token %}
{% bootstrap_form form %}
......
......@@ -8,11 +8,14 @@
{% block title %}{% trans "New event wizard" %}: {{ wizard_step_text }}{% endblock %}
{% block extrahead %}
{{ block.super }}
{{ form.media }}
{% endblock %}
{% block content %}
{% include "admin/AKModel/event_wizard/wizard_steps.html" %}
{{ form.media }}
{% timezone timezone %}
<form method="post">{% csrf_token %}
......
......@@ -7,6 +7,11 @@
{% block title %}{% trans "New event wizard" %}: {{ wizard_step_text }}{% endblock %}
{% block extrahead %}
{{ block.super }}
{{ form.media }}
{% endblock %}
{% block content %}
{% include "admin/AKModel/event_wizard/wizard_steps.html" %}
......
{% extends "admin/base_site.html" %}
{% load tags_AKModel %}
{% load i18n %}
{% load django_bootstrap5 %}
{% load fontawesome_6 %}
{% block title %}{{event}}: {{ title }}{% endblock %}
{% block content %}
{% block action_preview %}
<p>
{{ preview|linebreaksbr }}
</p>
{% endblock %}
<form enctype="multipart/form-data" method="post">{% csrf_token %}
{% bootstrap_form form %}
<div class="float-end">
<button type="submit" class="save btn btn-success" value="Submit">
{% fa6_icon "check" 'fas' %} {% trans "Confirm" %}
</button>
</div>
<a href="javascript:history.back()" class="btn btn-info">
{% fa6_icon "times" 'fas' %} {% trans "Cancel" %}
</a>
</form>
{% endblock %}
{% load tz %}
{% load fontawesome_6 %}
{% timezone event.timezone %}
<table class="table table-striped">
......@@ -7,7 +8,10 @@
<span class="text-secondary float-end">
{{ message.timestamp|date:"Y-m-d H:i:s" }}
</span>
<h5><a href="{{ message.ak.detail_url }}">{{ message.ak }}</a></h5>
<h5><a href="{{ message.ak.detail_url }}">
{% if message.resolved %}{% fa6_icon "check-circle" %} {% endif %}
{{ message.ak }}
</a></h5>
<p>{{ message.text }}</p>
</td></tr>
{% endfor %}
......
......@@ -3,8 +3,11 @@ from django.apps import apps
from django.conf import settings
from django.utils.html import format_html, mark_safe, conditional_escape
from django.templatetags.static import static
from django.template.defaultfilters import date
from fontawesome_6.app_settings import get_css
from AKModel.models import Event
register = template.Library()
......@@ -55,8 +58,7 @@ def wiki_owners_export(owners, event):
but external links when owner specified a non-wikilink. This is applied to the full list of owners
:param owners: list of owners
:param event: event this owner belongs to and that is currently exported
(specifying this directly prevents unnecesary database lookups)
:param event: event this owner belongs to and that is currently exported (specifying this directly prevents unnecessary database lookups) #pylint: disable=line-too-long
:return: linkified owners list in wiki syntax
:rtype: str
"""
......@@ -72,6 +74,21 @@ def wiki_owners_export(owners, event):
return ", ".join(to_link(owner) for owner in owners.all())
@register.filter
def event_month_year(event:Event):
"""
Print rough event date (month and year)
:param event: event to print the date for
:return: string containing rough date information for event
"""
if event.start.month == event.end.month:
return f"{date(event.start, 'F')} {event.start.year}"
event_start_string = date(event.start, 'F')
if event.start.year != event.end.year:
event_start_string = f"{event_start_string} {event.start.year}"
return f"{event_start_string} - {date(event.end, 'F')} {event.end.year}"
# get list of relevant css fontawesome css files for this instance
css = get_css()
......
import json
import math
from collections import defaultdict
from collections.abc import Iterable
from datetime import datetime, timedelta
from itertools import chain
from bs4 import BeautifulSoup
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.urls import reverse
from jsonschema.exceptions import best_match
from AKModel.availability.models import Availability
from AKModel.models import AK, AKCategory, AKOwner, AKSlot, DefaultSlot, Event, Room
from AKModel.utils import construct_schema_validator
class JSONExportTest(TestCase):
"""Test if JSON export is correct.
It tests if the output conforms to the KoMa specification:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
"""
fixtures = ["model.json"]
@classmethod
def setUpTestData(cls):
"""Shared set up by initializing admin user."""
cls.admin_user = get_user_model().objects.create(
username="Test Admin User",
email="testadmin@example.com",
password="adminpw",
is_staff=True,
is_superuser=True,
is_active=True,
)
cls.json_export_validator = construct_schema_validator(
"solver-input-export.schema.json"
)
def setUp(self):
self.client.force_login(self.admin_user)
self.export_dict = {}
self.export_objects = {
"aks": {},
"rooms": {},
"participants": {},
}
self.ak_slots: Iterable[AKSlot] = []
self.rooms: Iterable[Room] = []
self.slots_in_an_hour: float = 1.0
self.event: Event | None = None
def set_up_event(self, event: Event) -> None:
"""Set up by retrieving json export and initializing data."""
export_url = reverse("admin:ak_json_export", kwargs={"event_slug": event.slug})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200, "Export not working at all")
soup = BeautifulSoup(response.content, features="lxml")
self.export_dict = json.loads(soup.find("pre").string)
self.export_objects["aks"] = {ak["id"]: ak for ak in self.export_dict["aks"]}
self.export_objects["rooms"] = {
room["id"]: room for room in self.export_dict["rooms"]
}
self.export_objects["participants"] = {
participant["id"]: participant
for participant in self.export_dict["participants"]
}
self.ak_slots = (
AKSlot.objects.filter(event__slug=event.slug)
.select_related("ak")
.prefetch_related("ak__conflicts")
.prefetch_related("ak__prerequisites")
.all()
)
self.rooms = Room.objects.filter(event__slug=event.slug).all()
self.slots_in_an_hour = 1 / self.export_dict["timeslots"]["info"]["duration"]
self.event = event
def test_all_aks_exported(self):
"""Test if exported AKs match AKSlots of Event."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertEqual(
{slot.pk for slot in self.ak_slots},
self.export_objects["aks"].keys(),
"Exported AKs does not match the AKSlots of the event",
)
def _check_uniqueness(self, lst, name: str, key: str | None = "id"):
if key is not None:
lst = [entry[key] for entry in lst]
self.assertEqual(len(lst), len(set(lst)), f"{name} IDs not unique!")
def _check_type(self, attr, cls, name: str, item: str) -> None:
self.assertTrue(isinstance(attr, cls), f"{item} {name} not a {cls}")
def _check_lst(
self, lst: list[str], name: str, item: str, contained_type=str
) -> None:
self.assertTrue(isinstance(lst, list), f"{item} {name} not a list")
self.assertTrue(
all(isinstance(c, contained_type) for c in lst),
f"{item} has non-{contained_type} {name}",
)
if contained_type in {str, int}:
self._check_uniqueness(lst, name, key=None)
def test_conformity_to_schema(self):
"""Test if JSON structure and types conform to schema."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
error = best_match(
self.json_export_validator.iter_errors(self.export_dict)
)
msg = "" if not error else f"{error.message} at {error.json_path}"
self.assertFalse(error, msg)
def test_id_uniqueness(self):
"""Test if objects are only exported once."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self._check_uniqueness(self.export_dict["aks"], "AKs")
self._check_uniqueness(self.export_dict["rooms"], "Rooms")
self._check_uniqueness(self.export_dict["participants"], "Participants")
self._check_uniqueness(
chain.from_iterable(self.export_dict["timeslots"]["blocks"]),
"Timeslots",
)
def test_timeslot_ids_consecutive(self):
"""Test if Timeslots ids are chronologically consecutive."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
prev_id = None
for timeslot in chain.from_iterable(
self.export_dict["timeslots"]["blocks"]
):
if prev_id is not None:
self.assertLess(
prev_id,
timeslot["id"],
"timeslot ids must be increasing",
)
prev_id = timeslot["id"]
def test_general_conformity_to_spec(self):
"""Test if rest of JSON structure and types conform to standard."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertEqual(
self.export_dict["participants"],
[],
"Empty participant list expected",
)
info_keys = {"title": "name", "slug": "slug"}
for attr in ["contact_email", "place"]:
if hasattr(self.event, attr) and getattr(self.event, attr):
info_keys[attr] = attr
self.assertEqual(
self.export_dict["info"].keys(),
info_keys.keys(),
"info keys not as expected",
)
for attr, attr_field in info_keys.items():
self.assertEqual(
getattr(self.event, attr_field), self.export_dict["info"][attr]
)
def test_ak_durations(self):
"""Test if all AK durations are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertLessEqual(
float(slot.duration) * self.slots_in_an_hour - 1e-4,
ak["duration"],
"Slot duration is too short",
)
self.assertEqual(
math.ceil(float(slot.duration) * self.slots_in_an_hour - 1e-4),
ak["duration"],
"Slot duration is wrong",
)
self.assertEqual(
float(slot.duration),
ak["info"]["duration_in_hours"],
"Slot duration_in_hours is wrong",
)
def test_ak_conflicts(self):
"""Test if all AK conflicts are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
conflict_slots = set(
self.ak_slots.filter(
ak__in=slot.ak.conflicts.all()
).values_list("pk", flat=True)
)
other_ak_slots = (
self.ak_slots.filter(ak=slot.ak)
.exclude(pk=slot.pk)
.values_list("pk", flat=True)
)
conflict_slots.update(other_ak_slots)
self.assertEqual(
conflict_slots,
set(ak["properties"]["conflicts"]),
f"Conflicts for slot {slot.pk} not as expected",
)
def test_ak_depenedencies(self):
"""Test if all AK dependencies are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
dependency_slots = self.ak_slots.filter(
ak__in=slot.ak.prerequisites.all()
).values_list("pk", flat=True)
self.assertEqual(
set(dependency_slots),
set(ak["properties"]["dependencies"]),
f"Dependencies for slot {slot.pk} not as expected",
)
def test_ak_reso(self):
"""Test if resolution intent of AKs is correctly exported."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(slot.ak.reso, ak["info"]["reso"])
self.assertEqual(
slot.ak.reso, "resolution" in ak["time_constraints"]
)
def test_ak_info(self):
"""Test if contents of AK info dict is correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(ak["info"]["name"], slot.ak.name)
self.assertEqual(
ak["info"]["head"], ", ".join(map(str, slot.ak.owners.all()))
)
self.assertEqual(ak["info"]["description"], slot.ak.description)
self.assertEqual(ak["info"]["django_ak_id"], slot.ak.pk)
self.assertEqual(
ak["info"]["types"],
list(slot.ak.types.values_list("name", flat=True).order_by()),
)
def test_ak_room_constraints(self):
"""Test if AK room constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
requirements = list(
slot.ak.requirements.values_list("name", flat=True)
)
# proxy rooms
if not any(constr.startswith("proxy") for constr in requirements):
requirements.append("no-proxy")
# fixed slot
if slot.fixed and slot.room is not None:
requirements.append(f"fixed-room-{slot.room.pk}")
self.assertEqual(
set(ak["room_constraints"]),
set(requirements),
f"Room constraints for slot {slot.pk} not as expected",
)
def test_ak_time_constraints(self):
"""Test if AK time constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
time_constraints = set()
# add time constraints for AK category
if slot.ak.category:
category_constraints = AKCategory.create_category_optimizer_constraints(
[slot.ak.category]
)
time_constraints |= category_constraints
if slot.fixed and slot.start is not None:
# fixed slot
time_constraints.add(f"fixed-akslot-{slot.pk}")
elif not Availability.is_event_covered(
slot.event, slot.ak.availabilities.all()
):
# restricted AK availability
time_constraints.add(f"availability-ak-{slot.ak.pk}")
for owner in slot.ak.owners.all():
# restricted owner availability
if not owner.availabilities.all():
# no availability for owner -> assume full event is covered
continue
if not Availability.is_event_covered(
slot.event, owner.availabilities.all()
):
time_constraints.add(f"availability-person-{owner.pk}")
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(
set(ak["time_constraints"]),
time_constraints,
f"Time constraints for slot {slot.pk} not as expected",
)
def test_all_rooms_exported(self):
"""Test if exported Rooms match the rooms of Event."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertEqual(
{room.pk for room in self.rooms},
self.export_objects["rooms"].keys(),
"Exported Rooms do not match the Rooms of the event",
)
def test_room_capacity(self):
"""Test if room capacity is exported correctly."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(room.capacity, export_room["capacity"])
def test_room_info(self):
"""Test if contents of Room info dict is correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(room.name, export_room["info"]["name"])
def test_room_timeconstraints(self):
"""Test if Room time constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
time_constraints = set()
# test if time availability of room is restricted
if not Availability.is_event_covered(
event, room.availabilities.all()
):
time_constraints.add(f"availability-room-{room.pk}")
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(
time_constraints, set(export_room["time_constraints"])
)
def test_room_fulfilledroomconstraints(self):
"""Test if room constraints fulfilled by Room are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
# room properties
fulfilled_room_constraints = set(
room.properties.values_list("name", flat=True)
)
# proxy rooms
if not any(
constr.startswith("proxy")
for constr in fulfilled_room_constraints
):
fulfilled_room_constraints.add("no-proxy")
fulfilled_room_constraints.add(f"fixed-room-{room.pk}")
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(
fulfilled_room_constraints,
set(export_room["fulfilled_room_constraints"]),
)
def _get_timeslot_start_end(self, timeslot):
start = datetime.strptime(timeslot["info"]["start"], "%Y-%m-%d %H:%M").replace(
tzinfo=self.event.timezone
)
end = datetime.strptime(timeslot["info"]["end"], "%Y-%m-%d %H:%M").replace(
tzinfo=self.event.timezone
)
return start, end
def _get_cat_availability_in_export(self):
export_slot_cat_avails = defaultdict(list)
for timeslot in chain.from_iterable(self.export_dict["timeslots"]["blocks"]):
for constr in timeslot["fulfilled_time_constraints"]:
if constr.startswith("availability-cat-"):
cat_name = constr[len("availability-cat-") :]
start, end = self._get_timeslot_start_end(timeslot)
export_slot_cat_avails[cat_name].append(
Availability(event=self.event, start=start, end=end)
)
return {
cat_name: Availability.union(avail_lst)
for cat_name, avail_lst in export_slot_cat_avails.items()
}
def _get_cat_availability(self):
if DefaultSlot.objects.filter(event=self.event).exists():
# Event has default slots -> use them for category availability
default_slots_avails = defaultdict(list)
for def_slot in DefaultSlot.objects.filter(event=self.event).all():
avail = Availability(
event=self.event,
start=def_slot.start.astimezone(self.event.timezone),
end=def_slot.end.astimezone(self.event.timezone),
)
for cat in def_slot.primary_categories.all():
default_slots_avails[cat.name].append(avail)
return {
cat_name: Availability.union(avail_lst)
for cat_name, avail_lst in default_slots_avails.items()
}
# Event has no default slots -> all categories available through whole event
start = self.event.start.astimezone(self.event.timezone)
end = self.event.end.astimezone(self.event.timezone)
delta = (end - start).total_seconds()
# tweak event end
# 1. shorten event to match discrete slot grid
slot_seconds = 3600 / self.slots_in_an_hour
remainder_seconds = delta % slot_seconds
remainder_seconds += 1 # add a second to compensate rounding errs
end -= timedelta(seconds=remainder_seconds)
# set seconds and microseconds to 0 as they are not exported to the json
start -= timedelta(seconds=start.second, microseconds=start.microsecond)
end -= timedelta(seconds=end.second, microseconds=end.microsecond)
event_avail = Availability(event=self.event, start=start, end=end)
category_names = AKCategory.objects.filter(event=self.event).values_list(
"name", flat=True
)
return {cat_name: [event_avail] for cat_name in category_names}
def test_timeslots_consecutive(self):
"""Test if consecutive timeslots in JSON are in fact consecutive."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
prev_end = None
for timeslot in chain.from_iterable(
self.export_dict["timeslots"]["blocks"]
):
start, end = self._get_timeslot_start_end(timeslot)
self.assertLess(start, end)
delta = end - start
self.assertAlmostEqual(
delta.total_seconds() / (3600), 1 / self.slots_in_an_hour
)
if prev_end is not None:
self.assertLessEqual(prev_end, start)
prev_end = end
def test_block_cover_categories(self):
"""Test if blocks covers all default slot resp. whole event per category."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
category_names = AKCategory.objects.filter(event=event).values_list(
"name", flat=True
)
export_cat_avails = self._get_cat_availability_in_export()
cat_avails = self._get_cat_availability()
for cat_name in category_names:
for avail in cat_avails[cat_name]:
# check that all category availabilities are covered
self.assertTrue(
avail.is_covered(export_cat_avails[cat_name]),
f"AKCategory {cat_name}: avail ({avail.start} - {avail.end}) "
f"not covered by {[f'({a.start} - {a.end})' for a in export_cat_avails[cat_name]]}",
)
def _is_restricted_and_contained_slot(
self, slot: Availability, availabilities: list[Availability]
) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return slot.is_covered(availabilities) and not Availability.is_event_covered(
self.event, availabilities
)
def _is_ak_fixed_in_slot(
self,
ak_slot: AKSlot,
timeslot_avail: Availability,
) -> bool:
if not ak_slot.fixed or ak_slot.start is None:
return False
ak_slot_avail = Availability(
event=self.event,
start=ak_slot.start.astimezone(self.event.timezone),
end=ak_slot.end.astimezone(self.event.timezone),
)
return timeslot_avail.overlaps(ak_slot_avail, strict=True)
def test_timeslot_fulfilledconstraints(self):
"""Test if fulfilled time constraints by timeslot are as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
cat_avails = self._get_cat_availability()
num_blocks = len(self.export_dict["timeslots"]["blocks"])
for block_idx, block in enumerate(
self.export_dict["timeslots"]["blocks"]
):
for timeslot in block:
start, end = self._get_timeslot_start_end(timeslot)
timeslot_avail = Availability(
event=self.event, start=start, end=end
)
fulfilled_time_constraints = set()
# reso deadline
if self.event.reso_deadline is not None:
# timeslot ends before deadline
if end < self.event.reso_deadline.astimezone(
self.event.timezone
):
fulfilled_time_constraints.add("resolution")
# add category constraints
fulfilled_time_constraints |= (
AKCategory.create_category_optimizer_constraints(
[
cat
for cat in AKCategory.objects.filter(
event=self.event
).all()
if timeslot_avail.is_covered(cat_avails[cat.name])
]
)
)
# add owner constraints
fulfilled_time_constraints |= {
f"availability-person-{owner.id}"
for owner in AKOwner.objects.filter(event=self.event).all()
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(owner.availabilities.all()),
)
}
# add room constraints
fulfilled_time_constraints |= {
f"availability-room-{room.id}"
for room in self.rooms
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(room.availabilities.all()),
)
}
# add ak constraints
fulfilled_time_constraints |= {
f"availability-ak-{ak.id}"
for ak in AK.objects.filter(event=event)
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(ak.availabilities.all()),
)
}
fulfilled_time_constraints |= {
f"fixed-akslot-{slot.id}"
for slot in self.ak_slots
if self._is_ak_fixed_in_slot(slot, timeslot_avail)
}
fulfilled_time_constraints |= {
f"notblock{idx}"
for idx in range(num_blocks)
if idx != block_idx
}
self.assertEqual(
fulfilled_time_constraints,
set(timeslot["fulfilled_time_constraints"]),
)
def test_timeslots_info(self):
"""Test timeslots info dict"""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertAlmostEqual(
self.export_dict["timeslots"]["info"]["duration"],
float(self.event.export_slot),
)
block_names = []
for block in self.export_dict["timeslots"]["blocks"]:
if not block:
continue
block_start, _ = self._get_timeslot_start_end(block[0])
_, block_end = self._get_timeslot_start_end(block[-1])
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = (
block_start.strftime("%H:%M")
+ " - "
+ block_end.strftime("%H:%M")
)
else:
# different days
time_str = (
block_start.strftime("%a %H:%M")
+ " - "
+ block_end.strftime("%a %H:%M")
)
block_names.append([start_day, time_str])
self.assertEqual(
block_names, self.export_dict["timeslots"]["info"]["blocknames"]
)
......@@ -7,8 +7,19 @@ from django.contrib.messages.storage.base import Message
from django.test import TestCase
from django.urls import reverse_lazy, reverse
from AKModel.models import Event, AKOwner, AKCategory, AKTrack, AKRequirement, AK, Room, AKSlot, AKOrgaMessage, \
ConstraintViolation, DefaultSlot
from AKModel.models import (
Event,
AKOwner,
AKCategory,
AKTrack,
AKRequirement,
AK,
Room,
AKSlot,
AKOrgaMessage,
ConstraintViolation,
DefaultSlot,
)
class BasicViewTests:
......@@ -29,9 +40,10 @@ class BasicViewTests:
since the test framework does not understand the concept of abstract test definitions and would handle this class
as real test case otherwise, distorting the test results.
"""
# pylint: disable=no-member
VIEWS = []
APP_NAME = ''
APP_NAME = ""
VIEWS_STAFF_ONLY = []
EDIT_TESTCASES = []
......@@ -41,16 +53,26 @@ class BasicViewTests:
"""
user_model = get_user_model()
self.staff_user = user_model.objects.create(
username='Test Staff User', email='teststaff@example.com', password='staffpw',
is_staff=True, is_active=True
username="Test Staff User",
email="teststaff@example.com",
password="staffpw",
is_staff=True,
is_active=True,
)
self.admin_user = user_model.objects.create(
username='Test Admin User', email='testadmin@example.com', password='adminpw',
is_staff=True, is_superuser=True, is_active=True
username="Test Admin User",
email="testadmin@example.com",
password="adminpw",
is_staff=True,
is_superuser=True,
is_active=True,
)
self.deactivated_user = user_model.objects.create(
username='Test Deactivated User', email='testdeactivated@example.com', password='deactivatedpw',
is_staff=True, is_active=False
username="Test Deactivated User",
email="testdeactivated@example.com",
password="deactivatedpw",
is_staff=True,
is_active=False,
)
def _name_and_url(self, view_name):
......@@ -62,7 +84,9 @@ class BasicViewTests:
:return: full view name with prefix if applicable, url of the view
:rtype: str, str
"""
view_name_with_prefix = f"{self.APP_NAME}:{view_name[0]}" if self.APP_NAME != "" else view_name[0]
view_name_with_prefix = (
f"{self.APP_NAME}:{view_name[0]}" if self.APP_NAME != "" else view_name[0]
)
url = reverse(view_name_with_prefix, kwargs=view_name[1])
return view_name_with_prefix, url
......@@ -74,7 +98,7 @@ class BasicViewTests:
:param expected_message: message that should be shown
:param msg_prefix: prefix for the error message when test fails
"""
messages:List[Message] = list(get_messages(response.wsgi_request))
messages: List[Message] = list(get_messages(response.wsgi_request))
msg_count = "No message shown to user"
msg_content = "Wrong message, expected '{expected_message}'"
......@@ -95,10 +119,16 @@ class BasicViewTests:
view_name_with_prefix, url = self._name_and_url(view_name)
try:
response = self.client.get(url)
self.assertEqual(response.status_code, 200, msg=f"{view_name_with_prefix} ({url}) broken")
except Exception: # pylint: disable=broad-exception-caught
self.fail(f"An error occurred during rendering of {view_name_with_prefix} ({url}):"
f"\n\n{traceback.format_exc()}")
self.assertEqual(
response.status_code,
200,
msg=f"{view_name_with_prefix} ({url}) broken",
)
except Exception: # pylint: disable=broad-exception-caught
self.fail(
f"An error occurred during rendering of {view_name_with_prefix} ({url}):"
f"\n\n{traceback.format_exc()}"
)
def test_access_control_staff_only(self):
"""
......@@ -107,11 +137,16 @@ class BasicViewTests:
# Not logged in? Views should not be visible
self.client.logout()
for view_name_info in self.VIEWS_STAFF_ONLY:
expected_response_code = 302 if len(view_name_info) == 2 else view_name_info[2]
expected_response_code = (
302 if len(view_name_info) == 2 else view_name_info[2]
)
view_name_with_prefix, url = self._name_and_url(view_name_info)
response = self.client.get(url)
self.assertEqual(response.status_code, expected_response_code,
msg=f"{view_name_with_prefix} ({url}) accessible by non-staff")
self.assertEqual(
response.status_code,
expected_response_code,
msg=f"{view_name_with_prefix} ({url}) accessible by non-staff",
)
# Logged in? Views should be visible
self.client.force_login(self.staff_user)
......@@ -119,20 +154,30 @@ class BasicViewTests:
view_name_with_prefix, url = self._name_and_url(view_name_info)
try:
response = self.client.get(url)
self.assertEqual(response.status_code, 200,
msg=f"{view_name_with_prefix} ({url}) should be accessible for staff (but isn't)")
self.assertEqual(
response.status_code,
200,
msg=f"{view_name_with_prefix} ({url}) should be accessible for staff (but isn't)",
)
except Exception: # pylint: disable=broad-exception-caught
self.fail(f"An error occurred during rendering of {view_name_with_prefix} ({url}):"
f"\n\n{traceback.format_exc()}")
self.fail(
f"An error occurred during rendering of {view_name_with_prefix} ({url}):"
f"\n\n{traceback.format_exc()}"
)
# Disabled user? Views should not be visible
self.client.force_login(self.deactivated_user)
for view_name_info in self.VIEWS_STAFF_ONLY:
expected_response_code = 302 if len(view_name_info) == 2 else view_name_info[2]
expected_response_code = (
302 if len(view_name_info) == 2 else view_name_info[2]
)
view_name_with_prefix, url = self._name_and_url(view_name_info)
response = self.client.get(url)
self.assertEqual(response.status_code, expected_response_code,
msg=f"{view_name_with_prefix} ({url}) still accessible for deactivated user")
self.assertEqual(
response.status_code,
expected_response_code,
msg=f"{view_name_with_prefix} ({url}) still accessible for deactivated user",
)
def _to_sendable_value(self, val):
"""
......@@ -182,16 +227,26 @@ class BasicViewTests:
self.client.logout()
response = self.client.get(url)
self.assertEqual(response.status_code, 200, msg=f"{name}: Could not load edit form via GET ({url})")
self.assertEqual(
response.status_code,
200,
msg=f"{name}: Could not load edit form via GET ({url})",
)
form = response.context[form_name]
data = {k:self._to_sendable_value(v) for k,v in form.initial.items()}
data = {k: self._to_sendable_value(v) for k, v in form.initial.items()}
response = self.client.post(url, data=data)
if expected_code == 200:
self.assertEqual(response.status_code, 200, msg=f"{name}: Did not return 200 ({url}")
self.assertEqual(
response.status_code, 200, msg=f"{name}: Did not return 200 ({url}"
)
elif expected_code == 302:
self.assertRedirects(response, target_url, msg_prefix=f"{name}: Did not redirect ({url} -> {target_url}")
self.assertRedirects(
response,
target_url,
msg_prefix=f"{name}: Did not redirect ({url} -> {target_url}",
)
if expected_message != "":
self._assert_message(response, expected_message, msg_prefix=f"{name}")
......@@ -200,30 +255,44 @@ class ModelViewTests(BasicViewTests, TestCase):
"""
Basic view test cases for views from AKModel plus some custom tests
"""
fixtures = ['model.json']
fixtures = ["model.json"]
ADMIN_MODELS = [
(Event, 'event'), (AKOwner, 'akowner'), (AKCategory, 'akcategory'), (AKTrack, 'aktrack'),
(AKRequirement, 'akrequirement'), (AK, 'ak'), (Room, 'room'), (AKSlot, 'akslot'),
(AKOrgaMessage, 'akorgamessage'), (ConstraintViolation, 'constraintviolation'),
(DefaultSlot, 'defaultslot')
(Event, "event"),
(AKOwner, "akowner"),
(AKCategory, "akcategory"),
(AKTrack, "aktrack"),
(AKRequirement, "akrequirement"),
(AK, "ak"),
(Room, "room"),
(AKSlot, "akslot"),
(AKOrgaMessage, "akorgamessage"),
(ConstraintViolation, "constraintviolation"),
(DefaultSlot, "defaultslot"),
]
VIEWS_STAFF_ONLY = [
('admin:index', {}),
('admin:event_status', {'event_slug': 'kif42'}),
('admin:event_requirement_overview', {'event_slug': 'kif42'}),
('admin:ak_csv_export', {'event_slug': 'kif42'}),
('admin:ak_wiki_export', {'slug': 'kif42'}),
('admin:ak_delete_orga_messages', {'event_slug': 'kif42'}),
('admin:ak_slide_export', {'event_slug': 'kif42'}),
('admin:default-slots-editor', {'event_slug': 'kif42'}),
('admin:room-import', {'event_slug': 'kif42'}),
('admin:new_event_wizard_start', {}),
("admin:index", {}),
("admin:event_status", {"event_slug": "kif42"}),
("admin:event_requirement_overview", {"event_slug": "kif42"}),
("admin:ak_csv_export", {"event_slug": "kif42"}),
("admin:ak_json_export", {"event_slug": "kif42"}),
("admin:ak_wiki_export", {"slug": "kif42"}),
("admin:ak_schedule_json_import", {"event_slug": "kif42"}),
("admin:ak_delete_orga_messages", {"event_slug": "kif42"}),
("admin:ak_slide_export", {"event_slug": "kif42"}),
("admin:default-slots-editor", {"event_slug": "kif42"}),
("admin:room-import", {"event_slug": "kif42"}),
("admin:new_event_wizard_start", {}),
]
EDIT_TESTCASES = [
{'view': 'admin:default-slots-editor', 'kwargs': {'event_slug': 'kif42'}, "admin": True},
{
"view": "admin:default-slots-editor",
"kwargs": {"event_slug": "kif42"},
"admin": True,
},
]
def test_admin(self):
......@@ -234,24 +303,32 @@ class ModelViewTests(BasicViewTests, TestCase):
for model in self.ADMIN_MODELS:
# Special treatment for a subset of views (where we exchanged default functionality, e.g., create views)
if model[1] == "event":
_, url = self._name_and_url(('admin:new_event_wizard_start', {}))
_, url = self._name_and_url(("admin:new_event_wizard_start", {}))
elif model[1] == "room":
_, url = self._name_and_url(('admin:room-new', {}))
_, url = self._name_and_url(("admin:room-new", {}))
# Otherwise, just call the creation form view
else:
_, url = self._name_and_url((f'admin:AKModel_{model[1]}_add', {}))
_, url = self._name_and_url((f"admin:AKModel_{model[1]}_add", {}))
response = self.client.get(url)
self.assertEqual(response.status_code, 200, msg=f"Add form for model {model[1]} ({url}) broken")
self.assertEqual(
response.status_code,
200,
msg=f"Add form for model {model[1]} ({url}) broken",
)
for model in self.ADMIN_MODELS:
# Test the update view using the first existing instance of each model
m = model[0].objects.first()
if m is not None:
_, url = self._name_and_url(
(f'admin:AKModel_{model[1]}_change', {'object_id': m.pk})
(f"admin:AKModel_{model[1]}_change", {"object_id": m.pk})
)
response = self.client.get(url)
self.assertEqual(response.status_code, 200, msg=f"Edit form for model {model[1]} ({url}) broken")
self.assertEqual(
response.status_code,
200,
msg=f"Edit form for model {model[1]} ({url}) broken",
)
def test_wiki_export(self):
"""
......@@ -260,17 +337,27 @@ class ModelViewTests(BasicViewTests, TestCase):
"""
self.client.force_login(self.admin_user)
export_url = reverse_lazy("admin:ak_wiki_export", kwargs={'slug': 'kif42'})
export_url = reverse_lazy("admin:ak_wiki_export", kwargs={"slug": "kif42"})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200, "Export not working at all")
export_count = 0
for _, aks in response.context["categories_with_aks"]:
for ak in aks:
self.assertEqual(ak.include_in_export, True,
f"AK with export flag set to False (pk={ak.pk}) included in export")
self.assertNotEqual(ak.pk, 1, "AK known to be excluded from export (PK 1) included in export")
self.assertEqual(
ak.include_in_export,
True,
f"AK with export flag set to False (pk={ak.pk}) included in export",
)
self.assertNotEqual(
ak.pk,
1,
"AK known to be excluded from export (PK 1) included in export",
)
export_count += 1
self.assertEqual(export_count, AK.objects.filter(event_id=2, include_in_export=True).count(),
"Wiki export contained the wrong number of AKs")
self.assertEqual(
export_count,
AK.objects.filter(event_id=2, include_in_export=True).count(),
"Wiki export contained the wrong number of AKs",
)
......@@ -4,8 +4,10 @@ from django.urls import include, path
from rest_framework.routers import DefaultRouter
import AKModel.views.api
from AKModel.views.manage import ExportSlidesView, PlanPublishView, PlanUnpublishView, DefaultSlotEditorView
from AKModel.views.ak import AKRequirementOverview, AKCSVExportView, AKWikiExportView, AKMessageDeleteView
from AKModel.views.manage import ExportSlidesView, PlanPublishView, PlanUnpublishView, DefaultSlotEditorView, \
AKsByUserView, AKScheduleJSONImportView
from AKModel.views.ak import AKRequirementOverview, AKCSVExportView, AKJSONExportView, AKWikiExportView, \
AKMessageDeleteView
from AKModel.views.event_wizard import NewEventWizardStartView, NewEventWizardPrepareImportView, \
NewEventWizardImportView, NewEventWizardActivateView, NewEventWizardFinishView, NewEventWizardSettingsView
from AKModel.views.room import RoomBatchCreationView
......@@ -91,8 +93,14 @@ def get_admin_urls_event(admin_site):
path('<slug:event_slug>/status/', admin_site.admin_view(EventStatusView.as_view()), name="event_status"),
path('<slug:event_slug>/requirements/', admin_site.admin_view(AKRequirementOverview.as_view()),
name="event_requirement_overview"),
path('<slug:event_slug>/aks/owner/<pk>/', admin_site.admin_view(AKsByUserView.as_view()),
name="aks_by_owner"),
path('<slug:event_slug>/ak-csv-export/', admin_site.admin_view(AKCSVExportView.as_view()),
name="ak_csv_export"),
path('<slug:event_slug>/ak-json-export/', admin_site.admin_view(AKJSONExportView.as_view()),
name="ak_json_export"),
path('<slug:event_slug>/ak-schedule-json-import/', admin_site.admin_view(AKScheduleJSONImportView.as_view()),
name="ak_schedule_json_import"),
path('<slug:slug>/ak-wiki-export/', admin_site.admin_view(AKWikiExportView.as_view()),
name="ak_wiki_export"),
path('<slug:event_slug>/delete-orga-messages/', admin_site.admin_view(AKMessageDeleteView.as_view()),
......
from pathlib import Path
import referencing.retrieval
from jsonschema import Draft202012Validator
from jsonschema.protocols import Validator
from referencing import Registry
from AKPlanning import settings
def _construct_schema_path(uri: str | Path) -> Path:
"""Construct a schema URI.
This function also checks for unallowed directory traversals
out of the 'schema' subfolder.
"""
schema_base_path = Path(settings.BASE_DIR).resolve()
uri_path = (schema_base_path / uri).resolve()
if not uri_path.is_relative_to(schema_base_path / "schemas"):
raise ValueError("Unallowed dictionary traversal")
return uri_path
@referencing.retrieval.to_cached_resource()
def retrieve_schema_from_disk(uri: str) -> str:
"""Retrieve schemas from disk by URI."""
uri_path = _construct_schema_path(uri)
with uri_path.open("r") as ff:
return ff.read()
def construct_schema_validator(schema: str | dict) -> Validator:
"""Construct a validator for a JSON schema.
In particular, all schemas from the 'schemas' directory
are loaded into the registry.
"""
registry = Registry(retrieve=retrieve_schema_from_disk)
if isinstance(schema, str):
schema_uri = str(Path("schemas") / schema)
schema = registry.get_or_retrieve(schema_uri).value.contents
return Draft202012Validator(schema=schema, registry=registry)
import json
from django.contrib import messages
from django.shortcuts import redirect
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import ListView, DetailView
......@@ -37,6 +40,44 @@ class AKCSVExportView(AdminViewMixin, FilterByEventSlugMixin, ListView):
return super().get_queryset().order_by("ak__track")
class AKJSONExportView(AdminViewMixin, DetailView):
"""
View: Export all AK slots of this event in JSON format ordered by tracks
"""
template_name = "admin/AKModel/ak_json_export.html"
model = Event
context_object_name = "event"
title = _("AK JSON Export")
slug_url_kwarg = "event_slug"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
try:
data = context["event"].as_json_dict()
context["json_data_oneline"] = json.dumps(data, ensure_ascii=False)
context["json_data"] = json.dumps(data, indent=2, ensure_ascii=False)
context["is_valid"] = True
except ValueError as ex:
messages.add_message(
self.request,
messages.ERROR,
_("Exporting AKs for the solver failed! Reason: ") + str(ex),
)
return context
def get(self, request, *args, **kwargs):
# as this code is adapted from BaseDetailView::get
# pylint: disable=attribute-defined-outside-init
self.object = self.get_object()
context = self.get_context_data(object=self.object)
# if serialization failed in `get_context_data` we redirect to
# the status page and show a message instead
if not context.get("is_valid", False):
return redirect("admin:event_status", context["event"].slug)
return self.render_to_response(context)
class AKWikiExportView(AdminViewMixin, DetailView):
"""
View: Export AKs of this event in wiki syntax
......
......@@ -4,17 +4,19 @@ import os
import tempfile
from itertools import zip_longest
from django.contrib import messages
from django.db.models.functions import Now
from django.shortcuts import redirect
from django.utils.dateparse import parse_datetime
from django.utils.translation import gettext_lazy as _
from django.views.generic import TemplateView
from django.views.generic import TemplateView, DetailView
from django_tex.core import render_template_with_context, run_tex_in_directory
from django_tex.response import PDFResponse
from AKModel.forms import SlideExportForm, DefaultSlotEditorForm
from AKModel.metaviews.admin import EventSlugMixin, IntermediateAdminView, IntermediateAdminActionView
from AKModel.models import ConstraintViolation, Event, DefaultSlot
from AKModel.forms import SlideExportForm, DefaultSlotEditorForm, JSONScheduleImportForm
from AKModel.metaviews.admin import EventSlugMixin, IntermediateAdminView, IntermediateAdminActionView, AdminViewMixin
from AKModel.models import ConstraintViolation, Event, DefaultSlot, AKOwner
class UserView(TemplateView):
......@@ -58,7 +60,7 @@ class ExportSlidesView(EventSlugMixin, IntermediateAdminView):
Create a list of tuples cosisting of an AK and a list of upcoming AKs (list length depending on setting)
"""
next_aks_list = zip_longest(*[ak_list[i + 1:] for i in range(NEXT_AK_LIST_LENGTH)], fillvalue=None)
return [(ak, next_aks) for ak, next_aks in zip_longest(ak_list, next_aks_list, fillvalue=[])]
return list(zip_longest(ak_list, next_aks_list, fillvalue=[]))
# Get all relevant AKs (wishes separately, and either all AKs or only those who should directly or indirectly
# be presented when restriction setting was chosen)
......@@ -236,3 +238,38 @@ class DefaultSlotEditorView(EventSlugMixin, IntermediateAdminView):
.format(u=str(updated_count), c=str(created_count), d=str(deleted_count))
)
return super().form_valid(form)
class AKsByUserView(AdminViewMixin, EventSlugMixin, DetailView):
"""
View: Show all AKs of a given user
"""
model = AKOwner
context_object_name = 'owner'
template_name = "admin/AKModel/aks_by_user.html"
class AKScheduleJSONImportView(EventSlugMixin, IntermediateAdminView):
"""
View: Import an AK schedule from a json file that can be pasted into this view.
"""
template_name = "admin/AKModel/import_json.html"
form_class = JSONScheduleImportForm
title = _("AK Schedule JSON Import")
def form_valid(self, form):
try:
number_of_slots_changed = self.event.schedule_from_json(form.cleaned_data["data"])
messages.add_message(
self.request,
messages.SUCCESS,
_("Successfully imported {n} slot(s)").format(n=number_of_slots_changed)
)
except ValueError as ex:
messages.add_message(
self.request,
messages.ERROR,
_("Importing an AK schedule failed! Reason: ") + str(ex),
)
return redirect("admin:event_status", self.event.slug)