Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • feature-type-filters
  • komasolver
  • main
  • renovate/django-5.x
  • renovate/django_csp-4.x
  • renovate/jsonschema-4.x
  • renovate/uwsgi-2.x
7 results

Target

Select target project
  • konstantin/akplanning
  • matedealer/akplanning
  • kif/akplanning
  • mirco/akplanning
  • lordofthevoid/akplanning
  • voidptr/akplanning
  • xayomer/akplanning-fork
  • mollux/akplanning
  • neumantm/akplanning
  • mmarx/akplanning
  • nerf/akplanning
  • felix_bonn/akplanning
  • sebastian.uschmann/akplanning
13 results
Select Git revision
  • ak-import
  • feature/clear-schedule-button
  • feature/json-export-via-rest-framework
  • feature/json-schedule-import-tests
  • feature/preference-polling-form
  • fix/add-room-import-only-once
  • main
  • renovate/django-5.x
  • renovate/django-debug-toolbar-4.x
  • renovate/django-simple-history-3.x
  • renovate/mysqlclient-2.x
11 results
Show changes
Showing
with 2194 additions and 7 deletions
{% load i18n %}
<div class="text-center">
<a href="{% url 'admin:constraint-violations' slug=event.slug %}">
<h1>{{ constraint_violations_count }}</h1>
{% blocktrans count constraint_violations_count=constraint_violations_count %}
<h3>Constraint Violation</h3>
{% plural %}
<h3>Constraint Violations</h3>
{% endblocktrans %}
</a>
</div>
......@@ -17,7 +17,7 @@
<li>
{% with group.grouper as ak %}
{% if "AKSubmission"|check_app_installed %}
<a href="{% url 'submit:ak_detail' event_slug=ak.event.slug pk=ak.pk %}">{{ ak }}</a>
<a href="{{ ak.detail_url }}">{{ ak }}</a>
{% else %}
{{ ak }}
{% endif %}
......
# Create your tests here.
import json
from datetime import timedelta
from django.test import TestCase
from django.utils import timezone
from AKModel.tests.test_views import BasicViewTests
from AKModel.models import AKSlot, Event, Room
class ModelViewTests(BasicViewTests, TestCase):
"""
Tests for AKScheduling
"""
fixtures = ['model.json']
VIEWS_STAFF_ONLY = [
# Views
('admin:schedule', {'event_slug': 'kif42'}),
('admin:slots_unscheduled', {'event_slug': 'kif42'}),
('admin:constraint-violations', {'slug': 'kif42'}),
('admin:special-attention', {'slug': 'kif42'}),
('admin:cleanup-wish-slots', {'event_slug': 'kif42'}),
('admin:autocreate-availabilities', {'event_slug': 'kif42'}),
('admin:tracks_manage', {'event_slug': 'kif42'}),
('admin:enter-interest', {'event_slug': 'kif42', 'pk': 1}),
# API (Read)
('model:scheduling-resources-list', {'event_slug': 'kif42'}, 403),
('model:scheduling-constraint-violations-list', {'event_slug': 'kif42'}, 403),
('model:scheduling-events', {'event_slug': 'kif42'}),
('model:scheduling-room-availabilities', {'event_slug': 'kif42'}),
('model:scheduling-default-slots', {'event_slug': 'kif42'}),
]
def test_scheduling_of_slot_update(self):
"""
Test rescheduling a slot to a different time or room
"""
self.client.force_login(self.admin_user)
event = Event.get_by_slug('kif42')
# Get the first already scheduled slot belonging to this event
slot = event.akslot_set.filter(start__isnull=False).first()
pk = slot.pk
room_id = slot.room_id
events_api_url = f"/kif42/api/scheduling-event/{pk}/"
# Create updated time
offset = timedelta(hours=1)
new_start_time = slot.start + offset
new_end_time = slot.end + offset
new_start_time_string = timezone.localtime(new_start_time, event.timezone).strftime("%Y-%m-%d %H:%M:%S")
new_end_time_string = timezone.localtime(new_end_time, event.timezone).strftime("%Y-%m-%d %H:%M:%S")
# Try API call
response = self.client.put(
events_api_url,
json.dumps({
'start': new_start_time_string,
'end': new_end_time_string,
'roomId': room_id,
}),
content_type = 'application/json'
)
self.assertEqual(response.status_code, 200, "PUT to API endpoint did not work")
# Make sure API call did update the slot as expected
slot = AKSlot.objects.get(pk=pk)
self.assertEqual(new_start_time, slot.start, "Update did not work")
# Test updating room
new_room = Room.objects.exclude(pk=room_id).first()
# Try second API call
response = self.client.put(
events_api_url,
json.dumps({
'start': new_start_time_string,
'end': new_end_time_string,
'roomId': new_room.pk,
}),
content_type = 'application/json'
)
self.assertEqual(response.status_code, 200, "Second PUT to API endpoint did not work")
# Make sure API call did update the slot as expected
slot = AKSlot.objects.get(pk=pk)
self.assertEqual(new_room.pk, slot.room.pk, "Update did not work")
from django.urls import path
from AKScheduling.views import SchedulingAdminView, UnscheduledSlotsAdminView, TrackAdminView, \
ConstraintViolationsAdminView, SpecialAttentionAKsAdminView, InterestEnteringAdminView, WishSlotCleanupView, \
AvailabilityAutocreateView
def get_admin_urls_scheduling(admin_site):
return [
path('<slug:event_slug>/schedule/', admin_site.admin_view(SchedulingAdminView.as_view()),
name="schedule"),
path('<slug:event_slug>/unscheduled/', admin_site.admin_view(UnscheduledSlotsAdminView.as_view()),
name="slots_unscheduled"),
path('<slug:slug>/constraint-violations/', admin_site.admin_view(ConstraintViolationsAdminView.as_view()),
name="constraint-violations"),
path('<slug:slug>/special-attention/', admin_site.admin_view(SpecialAttentionAKsAdminView.as_view()),
name="special-attention"),
path('<slug:event_slug>/cleanup-wish-slots/', admin_site.admin_view(WishSlotCleanupView.as_view()),
name="cleanup-wish-slots"),
path('<slug:event_slug>/autocreate-availabilities/', admin_site.admin_view(AvailabilityAutocreateView.as_view()),
name="autocreate-availabilities"),
path('<slug:event_slug>/tracks/', admin_site.admin_view(TrackAdminView.as_view()),
name="tracks_manage"),
path('<slug:event_slug>/enter-interest/<int:pk>', admin_site.admin_view(InterestEnteringAdminView.as_view()),
name="enter-interest"),
]
from django.views.generic import ListView
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.db.models import Count
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import ListView, DetailView, UpdateView
from AKModel.models import AKSlot, AKTrack
from AKModel.views import AdminViewMixin, FilterByEventSlugMixin
from AKModel.metaviews import status_manager
from AKModel.metaviews.status import TemplateStatusWidget
from AKModel.models import AKSlot, AKTrack, Event, AK, AKCategory
from AKModel.metaviews.admin import EventSlugMixin, FilterByEventSlugMixin, AdminViewMixin, IntermediateAdminView
from AKScheduling.forms import AKInterestForm, AKAddSlotForm
class UnscheduledSlotsAdminView(AdminViewMixin, FilterByEventSlugMixin, ListView):
"""
Admin view: Get a list of all unscheduled slots
"""
template_name = "admin/AKScheduling/unscheduled.html"
model = AKSlot
context_object_name = "akslots"
......@@ -20,12 +30,20 @@ class UnscheduledSlotsAdminView(AdminViewMixin, FilterByEventSlugMixin, ListView
class SchedulingAdminView(AdminViewMixin, FilterByEventSlugMixin, ListView):
"""
Admin view: Scheduler
View and adapt the schedule of an event. This view heavily uses JavaScript to display a calendar view plus
a list of unscheduled slots and to allow dragging slots in and into the calendar
"""
template_name = "admin/AKScheduling/scheduling.html"
model = AKSlot
context_object_name = "slots_unscheduled"
def get_queryset(self):
return super().get_queryset().filter(start__isnull=True)
return super().get_queryset().filter(start__isnull=True).select_related('event', 'ak', 'ak__track',
'ak__category').prefetch_related('ak__types', 'ak__owners', 'ak__conflicts', 'ak__prerequisites',
'ak__requirements', 'ak__conflict').order_by('ak__track', 'ak')
def get_context_data(self, *, object_list=None, **kwargs):
context = super().get_context_data(object_list=object_list, **kwargs)
......@@ -35,15 +53,257 @@ class SchedulingAdminView(AdminViewMixin, FilterByEventSlugMixin, ListView):
context["start"] = self.event.start
context["end"] = self.event.end
context["akSlotAddForm"] = AKAddSlotForm(self.event)
return context
class TrackAdminView(AdminViewMixin, FilterByEventSlugMixin, ListView):
"""
Admin view: Distribute AKs to tracks
Again using JavaScript, the user can here see a list of all AKs split-up by tracks and can move them to other or
even new tracks using drag and drop. The state is then automatically synchronized via API calls in the background
"""
template_name = "admin/AKScheduling/manage_tracks.html"
model = AKTrack
context_object_name = "tracks"
def get_context_data(self, *, object_list=None, **kwargs):
context = super().get_context_data(object_list=object_list, **kwargs)
context["aks_without_track"] = self.event.ak_set.filter(track=None)
context["aks_without_track"] = self.event.ak_set.select_related('category').filter(track=None)
return context
class ConstraintViolationsAdminView(AdminViewMixin, DetailView):
"""
Admin view: Inspect and adjust all constraint violations of the event
This view populates a table of constraint violations via background API call (JavaScript), offers the option to
see details or edit each of them and provides an auto-reload feature.
"""
template_name = "admin/AKScheduling/constraint_violations.html"
model = Event
context_object_name = "event"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["title"] = f"{_('Constraint violations for')} {context['event']}"
return context
class SpecialAttentionAKsAdminView(AdminViewMixin, DetailView):
"""
Admin view: List all AKs that require special attention via scheduling, e.g., because of free-form comments,
since there are slots even though it is a wish, or no slots even though it is an AK etc.
"""
template_name = "admin/AKScheduling/special_attention.html"
model = Event
context_object_name = "event"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["title"] = f"{_('AKs requiring special attention for')} {context['event']}"
# Load all "special" AKs from the database using annotations to reduce the amount of necessary queries
aks = (AK.objects.filter(event=context["event"]).annotate(Count('owners', distinct=True))
.annotate(Count('akslot', distinct=True)).annotate(Count('availabilities', distinct=True)))
aks_with_comment = []
ak_wishes_with_slots = []
aks_without_availabilities = []
aks_without_slots = []
# Loop over all AKs of this event and identify all relevant factors that make the AK "special" and add them to
# the respective lists if the AK fullfills an condition
for ak in aks:
if ak.notes != "":
aks_with_comment.append(ak)
if ak.owners__count == 0:
if ak.akslot__count > 0:
ak_wishes_with_slots.append(ak)
else:
if ak.akslot__count == 0:
aks_without_slots.append(ak)
if ak.availabilities__count == 0:
aks_without_availabilities.append(ak)
context["aks_with_comment"] = aks_with_comment
context["ak_wishes_with_slots"] = ak_wishes_with_slots
context["aks_without_slots"] = aks_without_slots
context["aks_without_availabilities"] = aks_without_availabilities
return context
class InterestEnteringAdminView(SuccessMessageMixin, AdminViewMixin, EventSlugMixin, UpdateView):
"""
Admin view: Form view to quickly store information about the interest in an AK
(e.g., during presentation of the AK list)
The view offers a field to update interest and manually set a comment for the current AK, but also features links
to the AKs before and probably coming up next, as well as links to other AKs sorted by category, for quick
and hazzle-free navigation during the AK presentation
"""
template_name = "admin/AKScheduling/interest.html"
model = AK
context_object_name = "ak"
form_class = AKInterestForm
success_message = _("Interest updated")
def get_success_url(self):
return self.request.path
def form_valid(self, form):
# Don't create a history entry for this change
form.instance.skip_history_when_saving = True
r = super().form_valid(form)
del form.instance.skip_history_when_saving
return r
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["title"] = f"{_('Enter interest')}"
# Sort AKs into different lists (by their category)
ak_wishes = []
categories_with_aks = []
context["previous_ak"] = None
context["next_ak"] = None
last_ak = None
next_is_next = False
# Building the right navigation is a bit tricky since wishes have to be treated as an own category here
# Hence, depending on the AK we are currently at (displaying the form for) we need to either:
# Find other AK wishes (regardless of the category)...
if context['ak'].wish:
other_aks = [ak for ak in context['event'].ak_set.prefetch_related('owners').all() if ak.wish]
# or other AKs of this category
else:
other_aks = [ak for ak in context['ak'].category.ak_set.prefetch_related('owners').all() if not ak.wish]
# Use that list of other AKs belonging to this category to identify the previous and next AK (if any)
for other_ak in other_aks:
if next_is_next:
context['next_ak'] = other_ak
next_is_next = False
elif other_ak.pk == context['ak'].pk :
context['previous_ak'] = last_ak
next_is_next = True
last_ak = other_ak
# Gather information for link lists for all categories (and wishes)
for category in context['event'].akcategory_set.prefetch_related('ak_set').all():
aks_for_category = []
for ak in category.ak_set.prefetch_related('owners').all():
if ak.wish:
ak_wishes.append(ak)
else:
aks_for_category.append(ak)
categories_with_aks.append((category, aks_for_category))
# Make sure wishes have the right order (since the list was filled category by category before, this requires
# explicitly reordering them by their primary key)
ak_wishes.sort(key=lambda x: x.pk)
categories_with_aks.append(
(AKCategory(name=_("Wishes"), pk=0, description="-"), ak_wishes))
context["categories_with_aks"] = categories_with_aks
return context
class WishSlotCleanupView(EventSlugMixin, IntermediateAdminView):
"""
Admin action view: Allow to delete all unscheduled slots for wishes
The view will render a preview of all slots that are affected by this. It is not possible to manually choose
which slots should be deleted (either all or none) and the functionality will therefore delete slots that were
created in the time between rendering of the preview and running the action ofter confirmation as well.
Due to the automated slot cleanup functionality for wishes in the AKSubmission app, this functionality should be
rarely needed/used
"""
title = _('Cleanup: Delete unscheduled slots for wishes')
def get_success_url(self):
return reverse_lazy('admin:special-attention', kwargs={'slug': self.event.slug})
def get_preview(self):
slots = self.event.get_unscheduled_wish_slots()
return _("The following {count} unscheduled slots of wishes will be deleted:\n\n {slots}").format(
count=len(slots),
slots=", ".join(str(s.ak) for s in slots)
)
def form_valid(self, form):
self.event.get_unscheduled_wish_slots().delete()
messages.add_message(self.request, messages.SUCCESS, _("Unscheduled slots for wishes successfully deleted"))
return super().form_valid(form)
class AvailabilityAutocreateView(EventSlugMixin, IntermediateAdminView):
"""
Admin action view: Allow to automatically create default availabilities (event start to end) for all AKs without
any manually specified availability information
The view will render a preview of all AKs that are affected by this. It is not possible to manually choose
which AKs should be affected (either all or none) and the functionality will therefore create availability entries
for AKs that were created in the time between rendering of the preview and running the action ofter confirmation
as well.
"""
title = _('Create default availabilities for AKs')
def get_success_url(self):
return reverse_lazy('admin:special-attention', kwargs={'slug': self.event.slug})
def get_preview(self):
aks = self.event.get_aks_without_availabilities()
return _("The following {count} AKs don't have any availability information. "
"Create default availability for them:\n\n {aks}").format(
count=len(aks),
aks=", ".join(str(ak) for ak in aks)
)
def form_valid(self, form):
# Local import to prevent cyclic imports
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
success_count = 0
for ak in self.event.get_aks_without_availabilities():
try:
availability = Availability.with_event_length(event=self.event, ak=ak)
availability.save()
success_count += 1
except: # pylint: disable=bare-except
messages.add_message(
self.request, messages.WARNING,
_("Could not create default availabilities for AK: {ak}").format(ak=ak)
)
messages.add_message(
self.request, messages.SUCCESS,
_("Created default availabilities for {count} AKs").format(count=success_count)
)
return super().form_valid(form)
@status_manager.register(name="scheduling_constraint_violations")
class CVWidget(TemplateStatusWidget):
"""
Status page widget: Constraint violations
"""
required_context_type = "event"
title = _("Constraint Violations")
template_name = "admin/AKScheduling/status/cvs.html"
def render_status(self, context: {}) -> str:
return "success" if context["constraint_violations_count"] == 0 else "warning"
def get_context_data(self, context) -> dict:
context = super().get_context_data(context)
context["constraint_violations_count"] = (context["event"].constraintviolation_set
.filter(manually_resolved=False).count())
return context
from rest_framework import permissions, viewsets
from rest_framework.response import Response
from AKModel.models import Event
from AKSolverInterface.serializers import ExportEventSerializer
class ExportEventForSolverViewSet(viewsets.GenericViewSet):
"""
API View: Current event, formatted to be consumed by a solver.
Read-only
Follows the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
permission_classes = (permissions.DjangoModelPermissions,)
serializer_class = ExportEventSerializer
lookup_url_kwarg = "event_slug"
lookup_field = "slug"
# only allow exporting of active events
queryset = Event.objects.filter(active=True)
# rename view name to avoid 'List' suffix
def get_view_name(self):
return "JSON-Export for scheduling solver"
# somewhat hacky solution: TODO: FIXME
def list(self, request, *args, **kwargs):
"""Construct HTTP response showing serialized event export."""
# Below is the code of mixins.RetrieveModelMixin::retrieve
# which would usually be used to serve this detail API page.
# However, we do not specify the event to serve at the end of the URL
# but instead prepend the URL with it, i.e.
# `<slug>/api/<export_endpoint>` instead of the usual `api/<export_endpoint>/<slug>`.
# To make this work with the DefaultRouter, we serve this page as a list instead
# of a detail page. Hence, we use the method `list` instead of `retrieve`.
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
from django.apps import AppConfig
class AksolverinterfaceConfig(AppConfig):
"""
App configuration for the solver interface (default)
"""
name = "AKSolverInterface"
import json
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from jsonschema.exceptions import best_match
from AKModel.forms import AdminIntermediateForm
from AKSolverInterface.utils import construct_schema_validator
class JSONScheduleImportForm(AdminIntermediateForm):
"""Form to import an AK schedule from a json file."""
json_data = forms.CharField(
required=False,
widget=forms.Textarea,
label=_("JSON data"),
help_text=_("JSON data from the scheduling solver"),
)
json_file = forms.FileField(
required=False,
label=_("File with JSON data"),
help_text=_("File with JSON data from the scheduling solver"),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.json_schema_validator = construct_schema_validator(
schema="solver-output.schema.json"
)
def _check_json_data(self, data: str):
"""Validate `data` against our JSON schema.
:param data: The JSON string to validate using `self.json_schema_validator`.
:type data: str
:raises ValidationError: if the validation fails, with a description of the cause.
:return: The parsed JSON dict, if validation is successful.
"""
try:
schedule = json.loads(data)
except json.JSONDecodeError as ex:
raise ValidationError(_("Cannot decode as JSON"), "invalid") from ex
error = best_match(self.json_schema_validator.iter_errors(schedule))
if error:
raise ValidationError(
_("Invalid JSON format: %(msg)s at %(error_path)s"),
"invalid",
params={"msg": error.message, "error_path": error.json_path},
) from error
return schedule
def clean(self):
"""Extract and validate entered JSON data.
We allow entering of the schedule from two sources:
1. from an uploaded file
2. from a text field.
This function checks that data is entered from exactly one source.
If so, the entered JSON string is validated against our schema.
Any errors are reported at the corresponding form field.
"""
cleaned_data = super().clean()
if cleaned_data.get("json_file") and cleaned_data.get("json_data"):
err = ValidationError(
_("Please enter data as a file OR via text, not both."), "invalid"
)
self.add_error("json_data", err)
self.add_error("json_file", err)
elif not (cleaned_data.get("json_file") or cleaned_data.get("json_data")):
err = ValidationError(
_("No data entered. Please enter data as a file or via text."),
"invalid",
)
self.add_error("json_data", err)
self.add_error("json_file", err)
else:
source_field = "json_data"
data = cleaned_data.get(source_field)
if not data:
source_field = "json_file"
with cleaned_data.get(source_field).open() as ff:
data = ff.read()
try:
cleaned_data["data"] = self._check_json_data(data)
except ValidationError as ex:
self.add_error(source_field, ex)
return cleaned_data
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-03-25 12:03+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"Language: \n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
#: AKSolverInterface/forms.py:18
msgid "JSON data"
msgstr "JSON-Daten"
#: AKSolverInterface/forms.py:19
msgid "JSON data from the scheduling solver"
msgstr "JSON-Daten, die der scheduling-solver produziert hat"
#: AKSolverInterface/forms.py:24
msgid "File with JSON data"
msgstr "Datei mit JSON-Daten"
#: AKSolverInterface/forms.py:25
msgid "File with JSON data from the scheduling solver"
msgstr "Datei mit JSON-Daten, die der scheduling-solver produziert hat"
#: AKSolverInterface/forms.py:38
msgid "Cannot decode as JSON"
msgstr "Dekodierung als JSON fehlgeschlagen"
#: AKSolverInterface/forms.py:43
#, python-format
msgid "Invalid JSON format: %(msg)s at %(error_path)s"
msgstr "Ungültige JSON-Eingabe: %(msg)s bei %(error_path)s"
#: AKSolverInterface/forms.py:54
msgid "Please enter data as a file OR via text, not both."
msgstr "Gib die Daten bitte als Datei oder als Text ein, nicht beides."
#: AKSolverInterface/forms.py:60
msgid "No data entered. Please enter data as a file or via text."
msgstr ""
"Keine Daten eingegeben. Gib die Daten bitte als Datei oder als Text ein."
#: AKSolverInterface/templates/admin/AKSolverInterface/import_json.html:23
msgid "Confirm"
msgstr "Bestätigen"
#: AKSolverInterface/templates/admin/AKSolverInterface/import_json.html:27
msgid "Cancel"
msgstr "Abbrechen"
#: AKSolverInterface/views.py:26
msgid "AK JSON Export"
msgstr "AK-JSON-Export"
#: AKSolverInterface/views.py:40
msgid "Exporting AKs for the solver failed! Reason: "
msgstr "Daten für den Solver exportieren fehlgeschlagen! Grund: "
#: AKSolverInterface/views.py:48
msgid "AK Schedule JSON Import"
msgstr "AK-Plan JSON-Import"
#: AKSolverInterface/views.py:58
#, python-brace-format
msgid "Successfully imported {n} slot(s)"
msgstr "Erfolgreich {n} Slot(s) importiert"
#: AKSolverInterface/views.py:66
msgid "Importing an AK schedule failed! Reason: "
msgstr "AK-Plan importieren fehlgeschlagen! Grund: "
#, python-format
#~ msgid "Invalid JSON format: field '%(field)s' is missing"
#~ msgstr "Ungültige JSON-Eingabe: das Feld '%(field)s' fehlt"
from rest_framework import serializers
from AKModel.models import (
AK,
AKPreference,
AKSlot,
Event,
EventParticipant,
Room,
)
class StringListField(serializers.ListField):
"""List field containing strings."""
child = serializers.CharField()
class IntListField(serializers.ListField):
"""List field containing integers."""
child = serializers.IntegerField()
class ExportRoomInfoSerializer(serializers.ModelSerializer):
"""Serializer of Room objects for the 'info' field.
Used in `ExportRoomSerializer` to serialize Room objects
for the export to a solver. Part of the implementation of the
format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
class Meta:
model = Room
fields = ["name"]
read_only_fields = ["name"]
class ExportRoomSerializer(serializers.ModelSerializer):
"""Export serializer for Room objects.
Used to serialize Room objects for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
time_constraints = StringListField(source="get_time_constraints", read_only=True)
fulfilled_room_constraints = StringListField(
source="get_fulfilled_room_constraints", read_only=True
)
info = ExportRoomInfoSerializer(source="*")
class Meta:
model = Room
fields = [
"id",
"capacity",
"time_constraints",
"fulfilled_room_constraints",
"info",
]
read_only_fields = [
"id",
"capacity",
"time_constraints",
"fulfilled_room_constraints",
"info",
]
class ExportAKSlotInfoSerializer(serializers.ModelSerializer):
"""Serializer of AKSlot objects for the 'info' field.
Used in `ExportAKSlotSerializer` to serialize AKSlot objects
for the export to a solver. Part of the implementation of the
format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
name = serializers.CharField(source="ak.name")
head = serializers.SerializerMethodField()
reso = serializers.BooleanField(source="ak.reso")
description = serializers.CharField(source="ak.description")
duration_in_hours = serializers.FloatField(source="duration")
django_ak_id = serializers.IntegerField(source="ak.pk")
types = StringListField(source="type_names")
def get_head(self, slot: AKSlot) -> str:
"""Get string representation for 'head' field."""
return ", ".join([str(owner) for owner in slot.ak.owners.all()])
class Meta:
model = AKSlot
fields = [
"name",
"head",
"description",
"reso",
"duration_in_hours",
"django_ak_id",
"types",
]
read_only_fields = [
"name",
"head",
"description",
"reso",
"duration_in_hours",
"django_ak_id",
"types",
]
class ExportAKSlotPropertiesSerializer(serializers.ModelSerializer):
"""Serializer of AKSlot objects for the 'properties' field.
Used in `ExportAKSlotSerializer` to serialize AKSlot objects
for the export to a solver. Part of the implementation of the
format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
conflicts = IntListField(source="conflict_pks")
dependencies = IntListField(source="depencency_pks")
class Meta:
model = AKSlot
fields = ["conflicts", "dependencies"]
read_only_fields = ["conflicts", "dependencies"]
class ExportAKSlotSerializer(serializers.ModelSerializer):
"""Export serializer for AKSlot objects.
Used to serialize AKSlot objects for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
duration = serializers.IntegerField(source="export_duration")
room_constraints = StringListField(source="get_room_constraints")
time_constraints = StringListField(source="get_time_constraints")
info = ExportAKSlotInfoSerializer(source="*")
properties = ExportAKSlotPropertiesSerializer(source="*")
class Meta:
model = AKSlot
fields = [
"id",
"duration",
"properties",
"room_constraints",
"time_constraints",
"info",
]
read_only_fields = [
"id",
"duration",
"properties",
"room_constraints",
"time_constraints",
"info",
]
class ExportAKPreferenceSerializer(serializers.ModelSerializer):
"""Export serializer for AKPreference objects.
Used to serialize AKPreference objects for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
ak_id = serializers.IntegerField(source="slot.pk")
class Meta:
model = AKPreference
fields = ["ak_id", "required", "preference_score"]
read_only_fields = ["ak_id", "required", "preference_score"]
class ExportParticipantInfoSerializer(serializers.ModelSerializer):
"""Serializer of EventParticipant objects for the 'info' field.
Used in `ExportParticipantSerializer` to serialize EventParticipant objects
for the export to a solver. Part of the implementation of the
format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
name = serializers.CharField(source="__str__")
class Meta:
model = EventParticipant
fields = ["name"]
read_only_fields = ["name"]
class ExportParticipantSerializer(serializers.ModelSerializer):
"""Export serializer for EventParticipant objects.
Used to serialize EventParticipant objects for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
room_constraints = StringListField(source="get_room_constraints")
time_constraints = StringListField(source="get_time_constraints")
preferences = ExportAKPreferenceSerializer(source="export_preferences", many=True)
info = ExportParticipantInfoSerializer(source="*")
class Meta:
model = EventParticipant
fields = ["id", "info", "room_constraints", "time_constraints", "preferences"]
read_only_fields = ["id", "info", "room_constraints", "time_constraints", "preferences"]
class ExportParticipantAndDummiesSerializer(serializers.BaseSerializer):
"""Export serializer for EventParticipant objects that includes 'dummy' participants.
This serializer is a work-around to make the solver compatible with the AKOwner model.
Internally, `ExportParticipantSerializer` is used to serialize all EventParticipants of
the event to serialize. To avoid scheduling conflicts, a 'dummy' participant is then added
to the list for each AKOwner of the event. These dummy participants only have 'required'
preference for all AKs of the owner, so the target of the optimization is not impacted.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
def create(self, validated_data):
raise ValueError("`ExportParticipantAndDummiesSerializer` is read-only.")
def to_internal_value(self, data):
raise ValueError("`ExportParticipantAndDummiesSerializer` is read-only.")
def update(self, instance, validated_data):
raise ValueError("`ExportParticipantAndDummiesSerializer` is read-only.")
def to_representation(self, instance: Event):
event = instance
real_participants = ExportParticipantSerializer(event.participants, many=True).data
dummies = []
if EventParticipant.objects.exists():
next_participant_pk = EventParticipant.objects.latest("pk").pk + 1
else:
next_participant_pk = 1
# add one dummy participant per owner
# this ensures that the hard constraints from each owner are considered
for new_pk, owner in enumerate(event.owners, next_participant_pk):
owned_slots = event.slots.filter(ak__owners=owner).order_by().all()
if not owned_slots:
continue
new_participant_data = {
"id": new_pk,
"info": {"name": f"{owner} [AKOwner]"},
"room_constraints": [],
"time_constraints": [],
"preferences": [
{"ak_id": slot.pk, "required": True, "preference_score": -1}
for slot in owned_slots
]
}
dummies.append(new_participant_data)
return real_participants + dummies
class ExportEventInfoSerializer(serializers.ModelSerializer):
"""Serializer of an Event object for the 'info' field.
Used in `ExportEventSerializer` to serialize an Event object
for the export to a solver. Part of the implementation of the
format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
title = serializers.CharField(source="name")
contact_email = serializers.EmailField(required=False)
place = serializers.CharField(required=False)
class Meta:
model = Event
fields = ["title", "slug", "contact_email", "place"]
class ExportTimeslotBlockSerializer(serializers.BaseSerializer):
"""Read-only serializer for timeslots.
Used to serialize timeslots for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
def create(self, validated_data):
raise ValueError("`ExportTimeslotBlockSerializer` is read-only.")
def to_internal_value(self, data):
raise ValueError("`ExportTimeslotBlockSerializer` is read-only.")
def update(self, instance, validated_data):
raise ValueError("`ExportTimeslotBlockSerializer` is read-only.")
def to_representation(self, instance: Event):
"""Construct serialized representation of the timeslots of an event."""
# pylint: disable=import-outside-toplevel
from AKModel.availability.models import Availability
event = instance
blocks = list(event.discretize_timeslots())
def _check_event_not_covered(availabilities: list[Availability]) -> bool:
"""Test if event is not covered by availabilities."""
return not Availability.is_event_covered(event, availabilities)
def _check_akslot_fixed_in_timeslot(
ak_slot: AKSlot, timeslot: Availability
) -> bool:
"""Test if an AKSlot is fixed to overlap a timeslot slot."""
if not ak_slot.fixed or ak_slot.start is None:
return False
fixed_avail = Availability(
event=event, start=ak_slot.start, end=ak_slot.end
)
return fixed_avail.overlaps(timeslot, strict=True)
def _check_add_constraint(
slot: Availability, availabilities: list[Availability]
) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return _check_event_not_covered(availabilities) and slot.is_covered(
availabilities
)
def _generate_time_constraints(
avail_label: str,
avail_dict: dict,
timeslot_avail: Availability,
prefix: str = "availability",
) -> list[str]:
return [
f"{prefix}-{avail_label}-{pk}"
for pk, availabilities in avail_dict.items()
if _check_add_constraint(timeslot_avail, availabilities)
]
timeslots = {
"info": {"duration": float(event.export_slot)},
"blocks": [],
}
ak_availabilities = {
ak.pk: Availability.union(ak.availabilities.all())
for ak in AK.objects.filter(event=event).all()
}
room_availabilities = {
room.pk: Availability.union(room.availabilities.all()) for room in event.rooms
}
person_availabilities = {
person.pk: Availability.union(person.availabilities.all())
for person in event.owners
}
participant_availabilities = {
participant.pk: Availability.union(participant.availabilities.all())
for participant in event.participants
}
block_names = []
for block_idx, block in enumerate(blocks):
current_block = []
if not block:
continue
block_start = block[0].avail.start.astimezone(event.timezone)
block_end = block[-1].avail.end.astimezone(event.timezone)
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = (
block_start.strftime("%H:%M") + " - " + block_end.strftime("%H:%M")
)
else:
# different days
time_str = (
block_start.strftime("%a %H:%M")
+ " - "
+ block_end.strftime("%a %H:%M")
)
block_names.append([start_day, time_str])
block_timeconstraints = [
f"notblock{idx}" for idx in range(len(blocks)) if idx != block_idx
]
for timeslot in block:
time_constraints = []
# if reso_deadline is set and timeslot ends before it,
# add fulfilled time constraint 'resolution'
if (
event.reso_deadline is None
or timeslot.avail.end < event.reso_deadline
):
time_constraints.append("resolution")
# add fulfilled time constraints for all AKs that cannot happen during full event
time_constraints.extend(
_generate_time_constraints("ak", ak_availabilities, timeslot.avail)
)
# add fulfilled time constraints for all persons that are not available for full event
time_constraints.extend(
_generate_time_constraints(
"person", person_availabilities, timeslot.avail
)
)
# add fulfilled time constraints for all rooms that are not available for full event
time_constraints.extend(
_generate_time_constraints(
"room", room_availabilities, timeslot.avail
)
)
# add fulfilled time constraints for all participants that are not available for full event
time_constraints.extend(
_generate_time_constraints(
"participant", participant_availabilities, timeslot.avail
)
)
# add fulfilled time constraints for all AKSlots fixed to happen during timeslot
time_constraints.extend(
[
f"fixed-akslot-{slot.id}"
for slot in AKSlot.objects.filter(
event=event, fixed=True
).exclude(start__isnull=True)
if _check_akslot_fixed_in_timeslot(slot, timeslot.avail)
]
)
time_constraints.extend(timeslot.constraints)
time_constraints.extend(block_timeconstraints)
time_constraints.sort()
current_block.append(
{
"id": timeslot.idx,
"info": {
"start": timeslot.avail.start.astimezone(
event.timezone
).strftime("%Y-%m-%d %H:%M"),
"end": timeslot.avail.end.astimezone(
event.timezone
).strftime("%Y-%m-%d %H:%M"),
},
"fulfilled_time_constraints": time_constraints,
}
)
timeslots["blocks"].append(current_block)
timeslots["info"]["blocknames"] = block_names
return timeslots
class ExportEventSerializer(serializers.ModelSerializer):
"""Export serializer for an Event object.
Used to serialize an Event for the export to a solver.
Part of the implementation of the format of the KoMa solver:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format#input--output-format
"""
info = ExportEventInfoSerializer(source="*")
rooms = ExportRoomSerializer(many=True)
aks = ExportAKSlotSerializer(source="slots", many=True)
participants = ExportParticipantAndDummiesSerializer(source="*")
timeslots = ExportTimeslotBlockSerializer(source="*")
class Meta:
model = Event
fields = ["participants", "rooms", "timeslots", "info", "aks"]
read_only_fields = ["participants", "rooms", "timeslots", "info", "aks"]
{% extends "admin/base_site.html" %}
{% load tz %}
{% block content %}
<div class="mb-3">
<label for="export_single_line" class="form-label">Exported JSON:</label>
<input readonly type="text" name="export_single_line" class="form-control form-control-sm" value="{{ json_data_oneline }}" style="font-family:var(--font-family-monospace);">
</div>
<div class="mb-3">
<label class="form-label">Exported JSON (indented for better readability):</label>
<pre class="prettyprint border rounded p-2">{{ json_data }}</pre>
</div>
<script>
$(document).ready(function() {
// JSON highlighting.
prettyPrint();
}
)
</script>
{% endblock %}
{% extends "admin/base_site.html" %}
{% load tags_AKModel %}
{% load i18n %}
{% load django_bootstrap5 %}
{% load fontawesome_6 %}
{% block title %}{{event}}: {{ title }}{% endblock %}
{% block content %}
{% block action_preview %}
<p>
{{ preview|linebreaksbr }}
</p>
{% endblock %}
<form enctype="multipart/form-data" method="post">{% csrf_token %}
{% bootstrap_form form %}
<div class="float-end">
<button type="submit" class="save btn btn-success" value="Submit">
{% fa6_icon "check" 'fas' %} {% trans "Confirm" %}
</button>
</div>
<a href="javascript:history.back()" class="btn btn-info">
{% fa6_icon "times" 'fas' %} {% trans "Cancel" %}
</a>
</form>
{% endblock %}
import json
import math
from collections import defaultdict
from collections.abc import Iterable
from datetime import datetime, timedelta
from itertools import chain
from bs4 import BeautifulSoup
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.urls import reverse
from jsonschema.exceptions import best_match
from AKModel.availability.models import Availability
from AKModel.models import (
AK,
AKCategory,
AKOwner,
AKPreference,
AKSlot,
DefaultSlot,
Event,
EventParticipant,
Room,
)
from AKSolverInterface.utils import construct_schema_validator
class JSONExportTest(TestCase):
"""Test if JSON export is correct.
It tests if the output conforms to the KoMa specification:
https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format
"""
fixtures = ["model.json"]
@classmethod
def setUpTestData(cls):
"""Shared set up by initializing admin user."""
cls.admin_user = get_user_model().objects.create(
username="Test Admin User",
email="testadmin@example.com",
password="adminpw",
is_staff=True,
is_superuser=True,
is_active=True,
)
cls.json_export_validator = construct_schema_validator(
"solver-input-export.schema.json"
)
def setUp(self):
self.client.force_login(self.admin_user)
self.export_dict = {}
self.export_objects = {
"aks": {},
"rooms": {},
"participants": {},
}
self.ak_slots: Iterable[AKSlot] = []
self.rooms: Iterable[Room] = []
self.participants: Iterable[EventParticipant] = []
self.owners: Iterable[AKOwner] = []
self.slots_in_an_hour: float = 1.0
self.max_participant_pk = 0
self.event: Event | None = None
def set_up_event(self, event: Event) -> None:
"""Set up by retrieving json export and initializing data."""
export_url = reverse("admin:ak_json_export", kwargs={"event_slug": event.slug})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200, "Export not working at all")
soup = BeautifulSoup(response.content, features="lxml")
self.export_dict = json.loads(soup.find("pre").string)
self.export_objects["aks"] = {ak["id"]: ak for ak in self.export_dict["aks"]}
self.export_objects["rooms"] = {
room["id"]: room for room in self.export_dict["rooms"]
}
self.export_objects["participants"] = {
participant["id"]: participant
for participant in self.export_dict["participants"]
}
self.ak_slots = (
AKSlot.objects.filter(event__slug=event.slug)
.select_related("ak")
.prefetch_related("ak__conflicts")
.prefetch_related("ak__prerequisites")
.all()
)
self.rooms = Room.objects.filter(event__slug=event.slug).all()
self.participants = EventParticipant.objects.filter(
event__slug=event.slug
).all()
self.owners = AKOwner.objects.filter(event__slug=event.slug).all()
self.max_participant_pk = (
self.participants.latest("pk").pk if self.participants else 0
)
self.slots_in_an_hour = 1 / self.export_dict["timeslots"]["info"]["duration"]
self.event = event
def test_all_aks_exported(self):
"""Test if exported AKs match AKSlots of Event."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertEqual(
{slot.pk for slot in self.ak_slots},
self.export_objects["aks"].keys(),
"Exported AKs does not match the AKSlots of the event",
)
def _check_uniqueness(self, lst, name: str, key: str | None = "id"):
if key is not None:
lst = [entry[key] for entry in lst]
self.assertEqual(len(lst), len(set(lst)), f"{name} IDs not unique!")
def _check_type(self, attr, cls, name: str, item: str) -> None:
self.assertTrue(isinstance(attr, cls), f"{item} {name} not a {cls}")
def _check_lst(
self, lst: list[str], name: str, item: str, contained_type=str
) -> None:
self.assertTrue(isinstance(lst, list), f"{item} {name} not a list")
self.assertTrue(
all(isinstance(c, contained_type) for c in lst),
f"{item} has non-{contained_type} {name}",
)
if contained_type in {str, int}:
self._check_uniqueness(lst, name, key=None)
def test_conformity_to_schema(self):
"""Test if JSON structure and types conform to schema."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
error = best_match(
self.json_export_validator.iter_errors(self.export_dict)
)
msg = "" if not error else f"{error.message} at {error.json_path}"
self.assertFalse(error, msg)
def test_id_uniqueness(self):
"""Test if objects are only exported once."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self._check_uniqueness(self.export_dict["aks"], "AKs")
self._check_uniqueness(self.export_dict["rooms"], "Rooms")
self._check_uniqueness(self.export_dict["participants"], "Participants")
self._check_uniqueness(
chain.from_iterable(self.export_dict["timeslots"]["blocks"]),
"Timeslots",
)
def test_timeslot_ids_consecutive(self):
"""Test if Timeslots ids are chronologically consecutive."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
prev_id = None
for timeslot in chain.from_iterable(
self.export_dict["timeslots"]["blocks"]
):
if prev_id is not None:
self.assertLess(
prev_id,
timeslot["id"],
"timeslot ids must be increasing",
)
prev_id = timeslot["id"]
def test_general_conformity_to_spec(self):
"""Test if rest of JSON structure and types conform to standard."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
info_keys = {"title": "name", "slug": "slug"}
for attr in ["contact_email", "place"]:
if hasattr(self.event, attr) and getattr(self.event, attr):
info_keys[attr] = attr
self.assertEqual(
self.export_dict["info"].keys(),
info_keys.keys(),
"info keys not as expected",
)
for attr, attr_field in info_keys.items():
self.assertEqual(
getattr(self.event, attr_field), self.export_dict["info"][attr]
)
def test_ak_durations(self):
"""Test if all AK durations are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertLessEqual(
float(slot.duration) * self.slots_in_an_hour - 1e-4,
ak["duration"],
"Slot duration is too short",
)
self.assertEqual(
math.ceil(float(slot.duration) * self.slots_in_an_hour - 1e-4),
ak["duration"],
"Slot duration is wrong",
)
self.assertEqual(
float(slot.duration),
ak["info"]["duration_in_hours"],
"Slot duration_in_hours is wrong",
)
def test_ak_conflicts(self):
"""Test if all AK conflicts are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
conflict_slots = set(
self.ak_slots.filter(
ak__in=slot.ak.conflicts.all()
).values_list("pk", flat=True)
)
other_ak_slots = (
self.ak_slots.filter(ak=slot.ak)
.exclude(pk=slot.pk)
.values_list("pk", flat=True)
)
conflict_slots.update(other_ak_slots)
self.assertEqual(
conflict_slots,
set(ak["properties"]["conflicts"]),
f"Conflicts for slot {slot.pk} not as expected",
)
def test_ak_depenedencies(self):
"""Test if all AK dependencies are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
dependency_slots = self.ak_slots.filter(
ak__in=slot.ak.prerequisites.all()
).values_list("pk", flat=True)
self.assertEqual(
set(dependency_slots),
set(ak["properties"]["dependencies"]),
f"Dependencies for slot {slot.pk} not as expected",
)
def test_ak_reso(self):
"""Test if resolution intent of AKs is correctly exported."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(slot.ak.reso, ak["info"]["reso"])
self.assertEqual(
slot.ak.reso, "resolution" in ak["time_constraints"]
)
def test_ak_info(self):
"""Test if contents of AK info dict is correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(ak["info"]["name"], slot.ak.name)
self.assertEqual(
ak["info"]["head"], ", ".join(map(str, slot.ak.owners.all()))
)
self.assertEqual(ak["info"]["description"], slot.ak.description)
self.assertEqual(ak["info"]["django_ak_id"], slot.ak.pk)
self.assertEqual(
ak["info"]["types"],
list(slot.ak.types.values_list("name", flat=True).order_by()),
)
def test_ak_room_constraints(self):
"""Test if AK room constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
ak = self.export_objects["aks"][slot.pk]
requirements = list(
slot.ak.requirements.values_list("name", flat=True)
)
# proxy rooms
if not any(constr.startswith("proxy") for constr in requirements):
requirements.append("no-proxy")
# fixed slot
if slot.fixed and slot.room is not None:
requirements.append(f"fixed-room-{slot.room.pk}")
self.assertEqual(
set(ak["room_constraints"]),
set(requirements),
f"Room constraints for slot {slot.pk} not as expected",
)
def test_ak_time_constraints(self):
"""Test if AK time constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for slot in self.ak_slots:
time_constraints = set()
# add time constraints for AK category
if slot.ak.category:
category_constraints = AKCategory.create_category_optimizer_constraints(
[slot.ak.category]
)
time_constraints |= category_constraints
if slot.fixed and slot.start is not None:
# fixed slot
time_constraints.add(f"fixed-akslot-{slot.pk}")
elif not Availability.is_event_covered(
slot.event, slot.ak.availabilities.all()
):
# restricted AK availability
time_constraints.add(f"availability-ak-{slot.ak.pk}")
for owner in slot.ak.owners.all():
# restricted owner availability
if not owner.availabilities.all():
# no availability for owner -> assume full event is covered
continue
if not Availability.is_event_covered(
slot.event, owner.availabilities.all()
):
time_constraints.add(f"availability-person-{owner.pk}")
ak = self.export_objects["aks"][slot.pk]
self.assertEqual(
set(ak["time_constraints"]),
time_constraints,
f"Time constraints for slot {slot.pk} not as expected",
)
def test_all_rooms_exported(self):
"""Test if exported Rooms match the rooms of Event."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertEqual(
{room.pk for room in self.rooms},
self.export_objects["rooms"].keys(),
"Exported Rooms do not match the Rooms of the event",
)
def test_room_capacity(self):
"""Test if room capacity is exported correctly."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(room.capacity, export_room["capacity"])
def test_room_info(self):
"""Test if contents of Room info dict is correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(room.name, export_room["info"]["name"])
def test_room_timeconstraints(self):
"""Test if Room time constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
time_constraints = set()
# test if time availability of room is restricted
if not Availability.is_event_covered(
event, room.availabilities.all()
):
time_constraints.add(f"availability-room-{room.pk}")
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(
time_constraints, set(export_room["time_constraints"])
)
def test_room_fulfilledroomconstraints(self):
"""Test if room constraints fulfilled by Room are correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for room in self.rooms:
# room properties
fulfilled_room_constraints = set(
room.properties.values_list("name", flat=True)
)
# proxy rooms
if not any(
constr.startswith("proxy")
for constr in fulfilled_room_constraints
):
fulfilled_room_constraints.add("no-proxy")
fulfilled_room_constraints.add(f"fixed-room-{room.pk}")
export_room = self.export_objects["rooms"][room.pk]
self.assertEqual(
fulfilled_room_constraints,
set(export_room["fulfilled_room_constraints"]),
)
def _get_timeslot_start_end(self, timeslot):
start = datetime.strptime(timeslot["info"]["start"], "%Y-%m-%d %H:%M").replace(
tzinfo=self.event.timezone
)
end = datetime.strptime(timeslot["info"]["end"], "%Y-%m-%d %H:%M").replace(
tzinfo=self.event.timezone
)
return start, end
def _get_cat_availability_in_export(self):
export_slot_cat_avails = defaultdict(list)
for timeslot in chain.from_iterable(self.export_dict["timeslots"]["blocks"]):
for constr in timeslot["fulfilled_time_constraints"]:
if constr.startswith("availability-cat-"):
cat_name = constr[len("availability-cat-") :]
start, end = self._get_timeslot_start_end(timeslot)
export_slot_cat_avails[cat_name].append(
Availability(event=self.event, start=start, end=end)
)
return {
cat_name: Availability.union(avail_lst)
for cat_name, avail_lst in export_slot_cat_avails.items()
}
def _get_cat_availability(self):
if DefaultSlot.objects.filter(event=self.event).exists():
# Event has default slots -> use them for category availability
default_slots_avails = defaultdict(list)
for def_slot in DefaultSlot.objects.filter(event=self.event).all():
avail = Availability(
event=self.event,
start=def_slot.start.astimezone(self.event.timezone),
end=def_slot.end.astimezone(self.event.timezone),
)
for cat in def_slot.primary_categories.all():
default_slots_avails[cat.name].append(avail)
return {
cat_name: Availability.union(avail_lst)
for cat_name, avail_lst in default_slots_avails.items()
}
# Event has no default slots -> all categories available through whole event
start = self.event.start.astimezone(self.event.timezone)
end = self.event.end.astimezone(self.event.timezone)
delta = (end - start).total_seconds()
# tweak event end
# 1. shorten event to match discrete slot grid
slot_seconds = 3600 / self.slots_in_an_hour
remainder_seconds = delta % slot_seconds
remainder_seconds += 1 # add a second to compensate rounding errs
end -= timedelta(seconds=remainder_seconds)
# set seconds and microseconds to 0 as they are not exported to the json
start -= timedelta(seconds=start.second, microseconds=start.microsecond)
end -= timedelta(seconds=end.second, microseconds=end.microsecond)
event_avail = Availability(event=self.event, start=start, end=end)
category_names = AKCategory.objects.filter(event=self.event).values_list(
"name", flat=True
)
return {cat_name: [event_avail] for cat_name in category_names}
def test_timeslots_consecutive(self):
"""Test if consecutive timeslots in JSON are in fact consecutive."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
prev_end = None
for timeslot in chain.from_iterable(
self.export_dict["timeslots"]["blocks"]
):
start, end = self._get_timeslot_start_end(timeslot)
self.assertLess(start, end)
delta = end - start
self.assertAlmostEqual(
delta.total_seconds() / (3600), 1 / self.slots_in_an_hour
)
if prev_end is not None:
self.assertLessEqual(prev_end, start)
prev_end = end
def test_block_cover_categories(self):
"""Test if blocks covers all default slot resp. whole event per category."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
category_names = AKCategory.objects.filter(event=event).values_list(
"name", flat=True
)
export_cat_avails = self._get_cat_availability_in_export()
cat_avails = self._get_cat_availability()
for cat_name in category_names:
for avail in cat_avails[cat_name]:
# check that all category availabilities are covered
self.assertTrue(
avail.is_covered(export_cat_avails[cat_name]),
f"AKCategory {cat_name}: avail ({avail.start} - {avail.end}) "
f"not covered by {[f'({a.start} - {a.end})' for a in export_cat_avails[cat_name]]}",
)
def _is_restricted_and_contained_slot(
self, slot: Availability, availabilities: list[Availability]
) -> bool:
"""Test if object is not available for whole event and may happen during slot."""
return slot.is_covered(availabilities) and not Availability.is_event_covered(
self.event, availabilities
)
def _is_ak_fixed_in_slot(
self,
ak_slot: AKSlot,
timeslot_avail: Availability,
) -> bool:
if not ak_slot.fixed or ak_slot.start is None:
return False
ak_slot_avail = Availability(
event=self.event,
start=ak_slot.start.astimezone(self.event.timezone),
end=ak_slot.end.astimezone(self.event.timezone),
)
return timeslot_avail.overlaps(ak_slot_avail, strict=True)
def test_timeslot_fulfilledconstraints(self):
"""Test if fulfilled time constraints by timeslot are as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
cat_avails = self._get_cat_availability()
num_blocks = len(self.export_dict["timeslots"]["blocks"])
for block_idx, block in enumerate(
self.export_dict["timeslots"]["blocks"]
):
for timeslot in block:
start, end = self._get_timeslot_start_end(timeslot)
timeslot_avail = Availability(
event=self.event, start=start, end=end
)
fulfilled_time_constraints = set()
# reso deadline
if self.event.reso_deadline is not None:
# timeslot ends before deadline
if end < self.event.reso_deadline.astimezone(
self.event.timezone
):
fulfilled_time_constraints.add("resolution")
# add category constraints
fulfilled_time_constraints |= (
AKCategory.create_category_optimizer_constraints(
[
cat
for cat in AKCategory.objects.filter(
event=self.event
).all()
if timeslot_avail.is_covered(cat_avails[cat.name])
]
)
)
# add owner constraints
fulfilled_time_constraints |= {
f"availability-person-{owner.id}"
for owner in self.owners
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(owner.availabilities.all()),
)
}
# add room constraints
fulfilled_time_constraints |= {
f"availability-room-{room.id}"
for room in self.rooms
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(room.availabilities.all()),
)
}
# add participant constraints
fulfilled_time_constraints |= {
f"availability-participant-{participant.id}"
for participant in self.participants
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(participant.availabilities.all()),
)
}
# add ak constraints
fulfilled_time_constraints |= {
f"availability-ak-{ak.id}"
for ak in AK.objects.filter(event=event)
if self._is_restricted_and_contained_slot(
timeslot_avail,
Availability.union(ak.availabilities.all()),
)
}
fulfilled_time_constraints |= {
f"fixed-akslot-{slot.id}"
for slot in self.ak_slots
if self._is_ak_fixed_in_slot(slot, timeslot_avail)
}
fulfilled_time_constraints |= {
f"notblock{idx}"
for idx in range(num_blocks)
if idx != block_idx
}
self.assertEqual(
fulfilled_time_constraints,
set(timeslot["fulfilled_time_constraints"]),
)
def test_timeslots_info(self):
"""Test timeslots info dict"""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
self.assertAlmostEqual(
self.export_dict["timeslots"]["info"]["duration"],
float(self.event.export_slot),
)
block_names = []
for block in self.export_dict["timeslots"]["blocks"]:
if not block:
continue
block_start, _ = self._get_timeslot_start_end(block[0])
_, block_end = self._get_timeslot_start_end(block[-1])
start_day = block_start.strftime("%A, %d. %b")
if block_start.date() == block_end.date():
# same day
time_str = (
block_start.strftime("%H:%M")
+ " - "
+ block_end.strftime("%H:%M")
)
else:
# different days
time_str = (
block_start.strftime("%a %H:%M")
+ " - "
+ block_end.strftime("%a %H:%M")
)
block_names.append([start_day, time_str])
self.assertEqual(
block_names, self.export_dict["timeslots"]["info"]["blocknames"]
)
def _owner_has_ak(self, owner: AKOwner) -> bool:
owned_aks = self.ak_slots.filter(ak__owners=owner).all()
return bool(owned_aks)
def test_all_participants_exported(self):
"""Test if exported Rooms match the rooms of Event."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
participant_ids = set(self.participants.values_list("pk", flat=True))
for idx, owner in enumerate(self.owners, self.max_participant_pk + 1):
if self._owner_has_ak(owner):
participant_ids.add(idx)
self.assertEqual(
participant_ids,
self.export_objects["participants"].keys(),
"Exported Participants does not match the Participants of the event",
)
def test_participant_info(self):
"""Test if contents of participants info dict is correct."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for participant in self.participants:
export_participant = self.export_objects["participants"][
participant.pk
]
self.assertEqual(
str(participant), export_participant["info"]["name"]
)
for idx, owner in enumerate(self.owners, self.max_participant_pk + 1):
if not self._owner_has_ak(owner):
continue
export_participant = self.export_objects["participants"][idx]
self.assertEqual(
str(owner) + " [AKOwner]", export_participant["info"]["name"]
)
def test_participant_timeconstraints(self):
"""Test if participant time constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for participant in self.participants:
export_participant = self.export_objects["participants"][
participant.pk
]
time_constraints = set()
participant_avails = participant.availabilities.all()
if participant_avails and not Availability.is_event_covered(
self.event, participant_avails
):
# participant has restricted availability
if AKPreference.objects.filter(
event=self.event,
participant=participant,
preference=AKPreference.PreferenceLevel.REQUIRED,
):
# partipant is actually required for AKs
time_constraints.add(
f"availability-participant-{participant.pk}"
)
self.assertEqual(
set(export_participant["time_constraints"]), time_constraints
)
# dummy participants have no time constraints
for idx, owner in enumerate(self.owners, self.max_participant_pk + 1):
if not self._owner_has_ak(owner):
continue
export_participant = self.export_objects["participants"][idx]
self.assertEqual(export_participant["time_constraints"], [])
def test_participant_roomconstraints(self):
"""Test if participant room constraints are exported as expected."""
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for participant in self.participants:
export_participant = self.export_objects["participants"][
participant.pk
]
room_constraints = [
constr.name for constr in participant.requirements.all()
]
self.assertCountEqual(
export_participant["room_constraints"], room_constraints
)
for idx, owner in enumerate(self.owners, self.max_participant_pk + 1):
if not self._owner_has_ak(owner):
continue
export_participant = self.export_objects["participants"][idx]
self.assertEqual(export_participant["room_constraints"], [])
def test_preferences(self):
"""Test if preferences are exported as expected."""
def _preference_json(pref: AKPreference):
return {
"ak_id": pref.slot.pk,
"required": pref.preference == AKPreference.PreferenceLevel.REQUIRED,
"preference_score": (
pref.preference
if pref.preference != AKPreference.PreferenceLevel.REQUIRED
else -1
),
}
for event in Event.objects.all():
with self.subTest(event=event):
self.set_up_event(event=event)
for participant in self.participants:
export_participant = self.export_objects["participants"][
participant.pk
]
preferences = [
_preference_json(pref)
for pref in AKPreference.objects.filter(
participant=participant, preference__gt=0
).select_related("slot")
]
self.assertCountEqual(
export_participant["preferences"], preferences
)
for idx, owner in enumerate(self.owners, self.max_participant_pk + 1):
owned_slots = self.ak_slots.filter(ak__owners=owner).all()
if not owned_slots:
continue
preferences = [
{
"ak_id": slot.pk,
"required": True,
"preference_score": -1,
}
for slot in owned_slots
]
export_participant = self.export_objects["participants"][idx]
self.assertCountEqual(
export_participant["preferences"], preferences
)
from django.test import TestCase
from AKModel.tests.test_views import BasicViewTests
class ModelViewTests(BasicViewTests, TestCase):
"""
Tests for AKSolverInterface
"""
fixtures = ["model.json"]
VIEWS_STAFF_ONLY = [
("admin:ak_json_export", {"event_slug": "kif42"}),
("admin:ak_schedule_json_import", {"event_slug": "kif42"}),
]
from django.urls import path
from .views import AKJSONExportView, AKScheduleJSONImportView
def get_admin_urls_solver_interface(admin_site):
return [
path(
"<slug:event_slug>/ak-json-export/",
admin_site.admin_view(AKJSONExportView.as_view()),
name="ak_json_export",
),
path(
"<slug:event_slug>/ak-schedule-json-import/",
admin_site.admin_view(AKScheduleJSONImportView.as_view()),
name="ak_schedule_json_import",
),
]
from pathlib import Path
import referencing.retrieval
from jsonschema import Draft202012Validator
from jsonschema.protocols import Validator
from referencing import Registry
from AKPlanning import settings
def _construct_schema_path(uri: str | Path) -> Path:
"""Construct a schema URI.
This function also checks for unallowed directory traversals
out of the 'schema' subfolder.
"""
schema_base_path = Path(settings.BASE_DIR).resolve()
uri_path = (schema_base_path / uri).resolve()
if not uri_path.is_relative_to(schema_base_path / "schemas"):
raise ValueError("Unallowed dictionary traversal")
return uri_path
@referencing.retrieval.to_cached_resource()
def retrieve_schema_from_disk(uri: str) -> str:
"""Retrieve schemas from disk by URI."""
uri_path = _construct_schema_path(uri)
with uri_path.open("r") as ff:
return ff.read()
def construct_schema_validator(schema: str | dict) -> Validator:
"""Construct a validator for a JSON schema.
In particular, all schemas from the 'schemas' directory
are loaded into the registry.
"""
registry = Registry(retrieve=retrieve_schema_from_disk)
if isinstance(schema, str):
schema_uri = str(Path("schemas") / schema)
schema = registry.get_or_retrieve(schema_uri).value.contents
return Draft202012Validator(schema=schema, registry=registry)
import json
from django.contrib import messages
from django.shortcuts import redirect
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView
from AKModel.metaviews.admin import (
AdminViewMixin,
EventSlugMixin,
IntermediateAdminView,
)
from AKModel.models import Event
from AKSolverInterface.forms import JSONScheduleImportForm
from AKSolverInterface.serializers import ExportEventSerializer
class AKJSONExportView(AdminViewMixin, DetailView):
"""
View: Export all AK slots of this event in JSON format ordered by tracks
"""
template_name = "admin/AKSolverInterface/ak_json_export.html"
model = Event
context_object_name = "event"
title = _("AK JSON Export")
slug_url_kwarg = "event_slug"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
try:
serialized_event = ExportEventSerializer(context["event"])
context["json_data_oneline"] = json.dumps(serialized_event.data, ensure_ascii=False)
context["json_data"] = json.dumps(serialized_event.data, indent=2, ensure_ascii=False)
context["is_valid"] = True
except ValueError as ex:
messages.add_message(
self.request,
messages.ERROR,
_("Exporting AKs for the solver failed! Reason: ") + str(ex),
)
return context
def get(self, request, *args, **kwargs):
# as this code is adapted from BaseDetailView::get
# pylint: disable=attribute-defined-outside-init
self.object = self.get_object()
context = self.get_context_data(object=self.object)
# if serialization failed in `get_context_data` we redirect to
# the status page and show a message instead
if not context.get("is_valid", False):
return redirect("admin:event_status", context["event"].slug)
return self.render_to_response(context)
class AKScheduleJSONImportView(EventSlugMixin, IntermediateAdminView):
"""
View: Import an AK schedule from a json file that can be pasted into this view.
"""
template_name = "admin/AKSolverInterface/import_json.html"
form_class = JSONScheduleImportForm
title = _("AK Schedule JSON Import")
def form_valid(self, form):
try:
number_of_slots_changed = self.event.schedule_from_json(
form.cleaned_data["data"]
)
messages.add_message(
self.request,
messages.SUCCESS,
_("Successfully imported {n} slot(s)").format(
n=number_of_slots_changed
),
)
except ValueError as ex:
messages.add_message(
self.request,
messages.ERROR,
_("Importing an AK schedule failed! Reason: ") + str(ex),
)
return redirect("admin:event_status", self.event.slug)