From 7140368303f5bb94bc3278f663fb7a731c4887ba Mon Sep 17 00:00:00 2001 From: Felix Blanke <s6feblan@uni-bonn.de> Date: Thu, 6 Feb 2025 12:43:45 +0000 Subject: [PATCH] Add tests on json export --- .gitlab-ci.yml | 4 +- AKDashboard/tests.py | 2 +- AKModel/availability/models.py | 13 +- AKModel/fixtures/model.json | 191 +++++- AKModel/tests/__init__.py | 0 AKModel/tests/test_json_export.py | 777 ++++++++++++++++++++++ AKModel/{tests.py => tests/test_views.py} | 203 ++++-- AKModel/views/ak.py | 11 +- AKPlan/tests.py | 2 +- AKScheduling/api.py | 4 +- AKScheduling/models.py | 4 +- AKScheduling/tests.py | 2 +- AKSubmission/tests.py | 2 +- 13 files changed, 1136 insertions(+), 79 deletions(-) create mode 100644 AKModel/tests/__init__.py create mode 100644 AKModel/tests/test_json_export.py rename AKModel/{tests.py => tests/test_views.py} (60%) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 55ef5901..5855c87c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -38,7 +38,7 @@ test: script: - source venv/bin/activate - echo "GRANT ALL on *.* to '${MYSQL_USER}';"| mysql -u root --password="${MYSQL_ROOT_PASSWORD}" -h mysql - - pip install pytest-cov unittest-xml-reporting + - pip install pytest-cov unittest-xml-reporting beautifulsoup4 - coverage run --source='.' manage.py test --settings AKPlanning.settings_ci after_script: - source venv/bin/activate @@ -56,6 +56,8 @@ lint: extends: .before_script_template stage: test script: + - source venv/bin/activate + - pip install beautifulsoup4 - pylint --load-plugins pylint_django --django-settings-module=AKPlanning.settings_ci --rcfile pylintrc --exit-zero --output-format=text AK* | tee /tmp/pylint.txt - sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' /tmp/pylint.txt > public/badges/$CI_JOB_NAME.score - pylint --load-plugins pylint_django --django-settings-module=AKPlanning.settings_ci --rcfile pylintrc --exit-zero --output-format=pylint_gitlab.GitlabCodeClimateReporter AK* > codeclimate.json diff --git a/AKDashboard/tests.py b/AKDashboard/tests.py index f96af9a1..de25ffbc 100644 --- a/AKDashboard/tests.py +++ b/AKDashboard/tests.py @@ -6,7 +6,7 @@ from django.utils.timezone import now from AKDashboard.models import DashboardButton from AKModel.models import Event, AK, AKCategory -from AKModel.tests import BasicViewTests +from AKModel.tests.test_views import BasicViewTests class DashboardTests(TestCase): diff --git a/AKModel/availability/models.py b/AKModel/availability/models.py index 27a6c228..e2a64b22 100644 --- a/AKModel/availability/models.py +++ b/AKModel/availability/models.py @@ -280,6 +280,16 @@ class Availability(models.Model): return Availability(start=timeframe_start, end=timeframe_end, event=event, person=person, room=room, ak=ak, ak_category=ak_category) + def is_covered(self, availabilities: List['Availability']): + """Check if list of availibilities cover this object. + + :param availabilities: availabilities to check. + :return: whether the availabilities cover full event. + :rtype: bool + """ + avail_union = Availability.union(availabilities) + return any(avail.contains(self) for avail in avail_union) + @classmethod def is_event_covered(cls, event: Event, availabilities: List['Availability']) -> bool: """Check if list of availibilities cover whole event. @@ -292,8 +302,7 @@ class Availability(models.Model): # NOTE: Cannot use `Availability.with_event_length` as its end is the # event end + 1 day full_event = Availability(event=event, start=event.start, end=event.end) - avail_union = Availability.union(availabilities) - return any(avail.contains(full_event) for avail in avail_union) + return full_event.is_covered(availabilities) class Meta: verbose_name = _('Availability') diff --git a/AKModel/fixtures/model.json b/AKModel/fixtures/model.json index d848041d..42cf6c81 100644 --- a/AKModel/fixtures/model.json +++ b/AKModel/fixtures/model.json @@ -93,7 +93,7 @@ "model": "AKModel.akcategory", "pk": 1, "fields": { - "name": "Spa▀", + "name": "Spaß", "color": "275246", "description": "", "present_by_default": true, @@ -115,7 +115,7 @@ "model": "AKModel.akcategory", "pk": 3, "fields": { - "name": "Spa▀/Kultur", + "name": "Spaß/Kultur", "color": "333333", "description": "", "present_by_default": true, @@ -436,6 +436,62 @@ ] } }, +{ + "model": "AKModel.ak", + "pk": 4, + "fields": { + "name": "Test AK fixed slots", + "short_name": "testfixed", + "description": "", + "link": "", + "protocol_link": "", + "category": 4, + "track": null, + "reso": false, + "present": true, + "notes": "", + "interest": -1, + "interest_counter": 0, + "include_in_export": false, + "event": 2, + "owners": [ + 1 + ], + "requirements": [ + 3 + ], + "conflicts": [], + "prerequisites": [] + } +}, +{ + "model": "AKModel.ak", + "pk": 5, + "fields": { + "name": "Test AK Ernst", + "short_name": "testernst", + "description": "", + "link": "", + "protocol_link": "", + "category": 2, + "track": null, + "reso": false, + "present": true, + "notes": "", + "interest": -1, + "interest_counter": 0, + "include_in_export": false, + "event": 1, + "owners": [ + 3 + ], + "requirements": [ + 2 + ], + "conflicts": [], + "prerequisites": [] + } +}, { "model": "AKModel.room", "pk": 1, @@ -460,6 +516,19 @@ "properties": [] } }, +{ + "model": "AKModel.room", + "pk": 3, + "fields": { + "name": "BBB Session 1", + "location": "", + "capacity": -1, + "event": 1, + "properties": [ + 2 + ] + } +}, { "model": "AKModel.akslot", "pk": 1, @@ -525,6 +594,58 @@ "updated": "2022-12-02T12:23:11.856Z" } }, +{ + "model": "AKModel.akslot", + "pk": 6, + "fields": { + "ak": 4, + "room": null, + "start": "2020-11-08T18:30:00Z", + "duration": "2.00", + "fixed": true, + "event": 2, + "updated": "2022-12-02T12:23:11.856Z" + } +}, +{ + "model": "AKModel.akslot", + "pk": 7, + "fields": { + "ak": 4, + "room": 2, + "start": null, + "duration": "2.00", + "fixed": true, + "event": 2, + "updated": "2022-12-02T12:23:11.856Z" + } +}, +{ + "model": "AKModel.akslot", + "pk": 8, + "fields": { + "ak": 4, + "room": 2, + "start": "2020-11-07T16:00:00Z", + "duration": "2.00", + "fixed": true, + "event": 2, + "updated": "2022-12-02T12:23:11.856Z" + } +}, +{ + "model": "AKModel.akslot", + "pk": 9, + "fields": { + "ak": 5, + "room": null, + "start": null, + "duration": "2.00", + "fixed": false, + "event": 1, + "updated": "2022-12-02T12:23:11.856Z" + } +}, { "model": "AKModel.constraintviolation", "pk": 1, @@ -668,5 +789,71 @@ "start": "2020-11-07T18:30:00Z", "end": "2020-11-07T21:30:00Z" } +}, +{ + "model": "AKModel.availability", + "pk": 7, + "fields": { + "event": 1, + "person": null, + "room": null, + "ak": 5, + "ak_category": null, + "start": "2020-10-01T17:41:22Z", + "end": "2020-10-04T17:41:30Z" + } +}, +{ + "model": "AKModel.availability", + "pk": 8, + "fields": { + "event": 1, + "person": null, + "room": 3, + "ak": null, + "ak_category": null, + "start": "2020-10-01T17:41:22Z", + "end": "2020-10-04T17:41:30Z" + } +}, +{ + "model": "AKModel.defaultslot", + "pk": 1, + "fields": { + "event": 2, + "start": "2020-11-07T08:00:00Z", + "end": "2020-11-07T12:00:00Z", + "primary_categories": [5] + } +}, +{ + "model": "AKModel.defaultslot", + "pk": 2, + "fields": { + "event": 2, + "start": "2020-11-07T14:00:00Z", + "end": "2020-11-07T17:00:00Z", + "primary_categories": [4] + } +}, +{ + "model": "AKModel.defaultslot", + "pk": 3, + "fields": { + "event": 2, + "start": "2020-11-08T08:00:00Z", + "end": "2020-11-08T19:00:00Z", + "primary_categories": [4, 5] + } +}, +{ + "model": "AKModel.defaultslot", + "pk": 4, + "fields": { + "event": 2, + "start": "2020-11-09T17:00:00Z", + "end": "2020-11-10T01:00:00Z", + "primary_categories": [4, 5, 3] + } } ] diff --git a/AKModel/tests/__init__.py b/AKModel/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AKModel/tests/test_json_export.py b/AKModel/tests/test_json_export.py new file mode 100644 index 00000000..d53cd4c5 --- /dev/null +++ b/AKModel/tests/test_json_export.py @@ -0,0 +1,777 @@ +import json +import math + +from collections import defaultdict +from collections.abc import Iterable +from datetime import datetime, timedelta +from itertools import chain + +from bs4 import BeautifulSoup +from django.contrib.auth import get_user_model +from django.test import TestCase +from django.urls import reverse + +from AKModel.availability.models import Availability +from AKModel.models import ( + Event, + AKOwner, + AKCategory, + AK, + Room, + AKSlot, + DefaultSlot, +) + + +class JSONExportTest(TestCase): + """Test if JSON export is correct. + + It tests if the output conforms to the KoMa specification: + https://github.com/Die-KoMa/ak-plan-optimierung/wiki/Input-&-output-format + """ + + fixtures = ["model.json"] + + @classmethod + def setUpTestData(cls): + """Shared set up by initializing admin user.""" + cls.admin_user = get_user_model().objects.create( + username="Test Admin User", + email="testadmin@example.com", + password="adminpw", + is_staff=True, + is_superuser=True, + is_active=True, + ) + + def setUp(self): + self.client.force_login(self.admin_user) + self.export_dict = {} + self.export_objects = { + "aks": {}, + "rooms": {}, + "participants": {}, + } + + self.ak_slots: Iterable[AKSlot] = [] + self.rooms: Iterable[Room] = [] + self.slots_in_an_hour: float = 1.0 + self.event: Event | None = None + + def set_up_event(self, event: Event) -> None: + """Set up by retrieving json export and initializing data.""" + + export_url = reverse("admin:ak_json_export", kwargs={"event_slug": event.slug}) + response = self.client.get(export_url) + + self.assertEqual(response.status_code, 200, "Export not working at all") + + soup = BeautifulSoup(response.content, features="lxml") + self.export_dict = json.loads(soup.find("pre").string) + + self.export_objects["aks"] = {ak["id"]: ak for ak in self.export_dict["aks"]} + self.export_objects["rooms"] = { + room["id"]: room for room in self.export_dict["rooms"] + } + self.export_objects["participants"] = { + participant["id"]: participant + for participant in self.export_dict["participants"] + } + + self.ak_slots = AKSlot.objects.filter(event__slug=event.slug).all() + self.rooms = Room.objects.filter(event__slug=event.slug).all() + self.slots_in_an_hour = 1 / self.export_dict["timeslots"]["info"]["duration"] + self.event = event + + def test_all_aks_exported(self): + """Test if exported AKs match AKSlots of Event.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + self.assertEqual( + {str(slot.pk) for slot in self.ak_slots}, + self.export_objects["aks"].keys(), + "Exported AKs does not match the AKSlots of the event", + ) + + def _check_uniqueness(self, lst, name: str, key: str | None = "id"): + if key is not None: + lst = [entry[key] for entry in lst] + self.assertEqual(len(lst), len(set(lst)), f"{name} IDs not unique!") + + def _check_type(self, attr, cls, name: str, item: str) -> None: + self.assertTrue(isinstance(attr, cls), f"{item} {name} not a {cls}") + + def _check_lst( + self, lst: list[str], name: str, item: str, contained_type=str + ) -> None: + self.assertTrue(isinstance(lst, list), f"{item} {name} not a list") + self.assertTrue( + all(isinstance(c, contained_type) for c in lst), + f"{item} has non-{contained_type} {name}", + ) + if contained_type in {str, int}: + self._check_uniqueness(lst, name, key=None) + + def test_ak_conformity_to_spec(self): + """Test if AK JSON structure and types conform to standard.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + self._check_uniqueness(self.export_dict["aks"], "AK") + for ak in self.export_dict["aks"]: + item = f"AK {ak['id']}" + self.assertEqual( + ak.keys(), + { + "id", + "duration", + "properties", + "room_constraints", + "time_constraints", + "info", + }, + f"{item} keys not as expected", + ) + self.assertEqual( + ak["info"].keys(), + { + "name", + "head", + "description", + "reso", + "duration_in_hours", + "django_ak_id", + }, + f"{item} info keys not as expected", + ) + self.assertEqual( + ak["properties"].keys(), + {"conflicts", "dependencies"}, + f"{item} properties keys not as expected", + ) + + self._check_type(ak["id"], str, "id", item=item) + self._check_type(ak["duration"], int, "duration", item=item) + self._check_type(ak["info"]["name"], str, "info/name", item=item) + self._check_type(ak["info"]["head"], str, "info/head", item=item) + self._check_type( + ak["info"]["description"], str, "info/description", item=item + ) + self._check_type(ak["info"]["reso"], bool, "info/reso", item=item) + self._check_type( + ak["info"]["duration_in_hours"], + float, + "info/duration_in_hours", + item=item, + ) + self._check_type( + ak["info"]["django_ak_id"], + str, + "info/django_ak_id", + item=item, + ) + + self._check_lst( + ak["properties"]["conflicts"], "conflicts", item=item + ) + self._check_lst( + ak["properties"]["dependencies"], "dependencies", item=item + ) + self._check_lst( + ak["time_constraints"], "time_constraints", item=item + ) + self._check_lst( + ak["room_constraints"], "room_constraints", item=item + ) + + def test_room_conformity_to_spec(self): + """Test if Room JSON structure and types conform to standard.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + self._check_uniqueness(self.export_dict["rooms"], "Room") + for room in self.export_dict["rooms"]: + item = f"Room {room['id']}" + self.assertEqual( + room.keys(), + { + "id", + "info", + "capacity", + "fulfilled_room_constraints", + "time_constraints", + }, + f"{item} keys not as expected", + ) + self.assertEqual( + room["info"].keys(), + {"name"}, + f"{item} info keys not as expected", + ) + + self._check_type(room["id"], str, "id", item=item) + self._check_type(room["capacity"], int, "capacity", item=item) + self._check_type(room["info"]["name"], str, "info/name", item=item) + + self.assertTrue( + room["capacity"] > 0 or room["capacity"] == -1, + "invalid room capacity", + ) + + self._check_lst( + room["time_constraints"], "time_constraints", item=item + ) + self._check_lst( + room["fulfilled_room_constraints"], + "fulfilled_room_constraints", + item=item, + ) + + def test_timeslots_conformity_to_spec(self): + """Test if Timeslots JSON structure and types conform to standard.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + self._check_uniqueness( + chain.from_iterable(self.export_dict["timeslots"]["blocks"]), + "Timeslots", + ) + item = "timeslots" + self.assertEqual( + self.export_dict["timeslots"].keys(), + {"info", "blocks"}, + "timeslot keys not as expected", + ) + self.assertEqual( + self.export_dict["timeslots"]["info"].keys(), + {"duration"}, + "timeslot info keys not as expected", + ) + self._check_type( + self.export_dict["timeslots"]["info"]["duration"], + float, + "info/duration", + item=item, + ) + self._check_lst( + self.export_dict["timeslots"]["blocks"], + "blocks", + item=item, + contained_type=list, + ) + + prev_id = None + for timeslot in chain.from_iterable( + self.export_dict["timeslots"]["blocks"] + ): + item = f"timeslot {timeslot['id']}" + self.assertEqual( + timeslot.keys(), + {"id", "info", "fulfilled_time_constraints"}, + f"{item} keys not as expected", + ) + self.assertEqual( + timeslot["info"].keys(), + {"start", "end"}, + f"{item} info keys not as expected", + ) + self._check_type(timeslot["id"], str, "id", item=item) + self._check_type( + timeslot["info"]["start"], str, "info/start", item=item + ) + self._check_lst( + timeslot["fulfilled_time_constraints"], + "fulfilled_time_constraints", + item=item, + ) + + if prev_id is not None: + self.assertLess( + prev_id, + int(timeslot["id"]), + "timeslot ids must be increasing", + ) + prev_id = int(timeslot["id"]) + + def test_general_conformity_to_spec(self): + """Test if rest of JSON structure and types conform to standard.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + self.assertEqual( + self.export_dict["participants"], + [], + "Empty participant list expected", + ) + + info_keys = {"title": "name", "slug": "slug"} + for attr in ["contact_email", "place"]: + if hasattr(self.event, attr) and getattr(self.event, attr): + info_keys[attr] = attr + self.assertEqual( + self.export_dict["info"].keys(), + info_keys.keys(), + "info keys not as expected", + ) + for attr, attr_field in info_keys.items(): + self.assertEqual( + getattr(self.event, attr_field), self.export_dict["info"][attr] + ) + + self._check_uniqueness(self.export_dict["participants"], "Participants") + + def test_ak_durations(self): + """Test if all AK durations are correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + + self.assertLessEqual( + float(slot.duration) * self.slots_in_an_hour - 1e-4, + ak["duration"], + "Slot duration is too short", + ) + + self.assertEqual( + math.ceil(float(slot.duration) * self.slots_in_an_hour - 1e-4), + ak["duration"], + "Slot duration is wrong", + ) + + self.assertEqual( + float(slot.duration), + ak["info"]["duration_in_hours"], + "Slot duration_in_hours is wrong", + ) + + def test_ak_conflicts(self): + """Test if all AK conflicts are correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + conflict_slots = self.ak_slots.filter( + ak__in=slot.ak.conflicts.all() + ).values_list("pk", flat=True) + conflict_pks = {str(conflict_pk) for conflict_pk in conflict_slots} + + other_ak_slots = ( + self.ak_slots.filter(ak=slot.ak) + .exclude(pk=slot.pk) + .values_list("pk", flat=True) + ) + conflict_pks.update( + str(other_slot_pk) for other_slot_pk in other_ak_slots + ) + + self.assertEqual( + conflict_pks, + set(ak["properties"]["conflicts"]), + f"Conflicts for slot {slot.pk} not as expected", + ) + + def test_ak_depenedencies(self): + """Test if all AK dependencies are correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + dependency_slots = self.ak_slots.filter( + ak__in=slot.ak.prerequisites.all() + ).values_list("pk", flat=True) + + self.assertEqual( + {str(dep_pk) for dep_pk in dependency_slots}, + set(ak["properties"]["dependencies"]), + f"Dependencies for slot {slot.pk} not as expected", + ) + + def test_ak_reso(self): + """Test if resolution intent of AKs is correctly exported.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + self.assertEqual(slot.ak.reso, ak["info"]["reso"]) + self.assertEqual( + slot.ak.reso, "resolution" in ak["time_constraints"] + ) + + def test_ak_info(self): + """Test if contents of AK info dict is correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + self.assertEqual(ak["info"]["name"], slot.ak.name) + self.assertEqual( + ak["info"]["head"], ", ".join(map(str, slot.ak.owners.all())) + ) + self.assertEqual(ak["info"]["description"], slot.ak.description) + self.assertEqual(ak["info"]["django_ak_id"], str(slot.ak.pk)) + + def test_ak_room_constraints(self): + """Test if AK room constraints are exported as expected.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + ak = self.export_objects["aks"][str(slot.pk)] + requirements = list( + slot.ak.requirements.values_list("name", flat=True) + ) + + # proxy rooms + if not any(constr.startswith("proxy") for constr in requirements): + requirements.append("no-proxy") + + # fixed slot + if slot.fixed and slot.room is not None: + requirements.append(f"fixed-room-{slot.room.pk}") + + self.assertEqual( + set(ak["room_constraints"]), + set(requirements), + f"Room constraints for slot {slot.pk} not as expected", + ) + + def test_ak_time_constraints(self): + """Test if AK time constraints are exported as expected.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for slot in self.ak_slots: + time_constraints = set() + + # add time constraints for AK category + if slot.ak.category: + category_constraints = AKCategory.create_category_constraints( + [slot.ak.category] + ) + time_constraints |= category_constraints + + if slot.fixed and slot.start is not None: + # fixed slot + time_constraints.add(f"fixed-akslot-{slot.pk}") + elif not Availability.is_event_covered( + slot.event, slot.ak.availabilities.all() + ): + # restricted AK availability + time_constraints.add(f"availability-ak-{slot.ak.pk}") + + for owner in slot.ak.owners.all(): + # restricted owner availability + if not Availability.is_event_covered( + slot.event, owner.availabilities.all() + ): + time_constraints.add(f"availability-person-{owner.pk}") + + ak = self.export_objects["aks"][str(slot.pk)] + self.assertEqual( + set(ak["time_constraints"]), + time_constraints, + f"Time constraints for slot {slot.pk} not as expected", + ) + + def test_all_rooms_exported(self): + """Test if exported Rooms match the rooms of Event.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + self.assertEqual( + {str(room.pk) for room in self.rooms}, + self.export_objects["rooms"].keys(), + "Exported Rooms do not match the Rooms of the event", + ) + + def test_room_capacity(self): + """Test if room capacity is exported correctly.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for room in self.rooms: + export_room = self.export_objects["rooms"][str(room.pk)] + self.assertEqual(room.capacity, export_room["capacity"]) + + def test_room_info(self): + """Test if contents of Room info dict is correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for room in self.rooms: + export_room = self.export_objects["rooms"][str(room.pk)] + self.assertEqual(room.name, export_room["info"]["name"]) + + def test_room_timeconstraints(self): + """Test if Room time constraints are exported as expected.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for room in self.rooms: + time_constraints = set() + + # test if time availability of room is restricted + if not Availability.is_event_covered( + room.event, room.availabilities.all() + ): + time_constraints.add(f"availability-room-{room.pk}") + + export_room = self.export_objects["rooms"][str(room.pk)] + self.assertEqual( + time_constraints, set(export_room["time_constraints"]) + ) + + def test_room_fulfilledroomconstraints(self): + """Test if room constraints fulfilled by Room are correct.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + for room in self.rooms: + # room properties + fulfilled_room_constraints = set( + room.properties.values_list("name", flat=True) + ) + + # proxy rooms + if not any( + constr.startswith("proxy") + for constr in fulfilled_room_constraints + ): + fulfilled_room_constraints.add("no-proxy") + + fulfilled_room_constraints.add(f"fixed-room-{room.pk}") + + export_room = self.export_objects["rooms"][str(room.pk)] + self.assertEqual( + fulfilled_room_constraints, + set(export_room["fulfilled_room_constraints"]), + ) + + def _get_timeslot_start_end(self, timeslot): + start = datetime.strptime(timeslot["info"]["start"], "%Y-%m-%d %H:%M").replace( + tzinfo=self.event.timezone + ) + end = datetime.strptime(timeslot["info"]["end"], "%Y-%m-%d %H:%M").replace( + tzinfo=self.event.timezone + ) + return start, end + + def _get_cat_availability_in_export(self): + export_slot_cat_avails = defaultdict(list) + for timeslot in chain.from_iterable(self.export_dict["timeslots"]["blocks"]): + for constr in timeslot["fulfilled_time_constraints"]: + if constr.startswith("availability-cat-"): + cat_name = constr[len("availability-cat-") :] + start, end = self._get_timeslot_start_end(timeslot) + export_slot_cat_avails[cat_name].append( + Availability(event=self.event, start=start, end=end) + ) + return { + cat_name: Availability.union(avail_lst) + for cat_name, avail_lst in export_slot_cat_avails.items() + } + + def _get_cat_availability(self): + if DefaultSlot.objects.filter(event=self.event).exists(): + # Event has default slots -> use them for category availability + default_slots_avails = defaultdict(list) + for def_slot in DefaultSlot.objects.filter(event=self.event).all(): + avail = Availability( + event=self.event, + start=def_slot.start.astimezone(self.event.timezone), + end=def_slot.end.astimezone(self.event.timezone), + ) + for cat in def_slot.primary_categories.all(): + default_slots_avails[cat.name].append(avail) + + return { + cat_name: Availability.union(avail_lst) + for cat_name, avail_lst in default_slots_avails.items() + } + + # Event has no default slots -> all categories available through whole event + start = self.event.start.astimezone(self.event.timezone) + end = self.event.end.astimezone(self.event.timezone) + delta = (end - start).total_seconds() + + # tweak event end + # 1. shorten event to match discrete slot grid + slot_seconds = 3600 / self.slots_in_an_hour + remainder_seconds = delta % slot_seconds + remainder_seconds += 1 # add a second to compensate rounding errs + end -= timedelta(seconds=remainder_seconds) + + # set seconds and microseconds to 0 as they are not exported to the json + start -= timedelta(seconds=start.second, microseconds=start.microsecond) + end -= timedelta(seconds=end.second, microseconds=end.microsecond) + event_avail = Availability(event=self.event, start=start, end=end) + + category_names = AKCategory.objects.filter(event=self.event).values_list( + "name", flat=True + ) + return {cat_name: [event_avail] for cat_name in category_names} + + def test_timeslots_consecutive(self): + """Test if consecutive timeslots in JSON are in fact consecutive.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + prev_end = None + for timeslot in chain.from_iterable( + self.export_dict["timeslots"]["blocks"] + ): + start, end = self._get_timeslot_start_end(timeslot) + self.assertLess(start, end) + + delta = end - start + self.assertAlmostEqual( + delta.total_seconds() / (3600), 1 / self.slots_in_an_hour + ) + + if prev_end is not None: + self.assertLessEqual(prev_end, start) + prev_end = end + + def test_block_cover_categories(self): + """Test if blocks covers all default slot resp. whole event per category.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + category_names = AKCategory.objects.filter(event=event).values_list( + "name", flat=True + ) + + export_cat_avails = self._get_cat_availability_in_export() + cat_avails = self._get_cat_availability() + + for cat_name in category_names: + for avail in cat_avails[cat_name]: + # check that all category availabilities are covered + self.assertTrue( + avail.is_covered(export_cat_avails[cat_name]), + f"AKCategory {cat_name}: avail ({avail.start} – {avail.end}) " + f"not covered by {[f'({a.start} – {a.end})' for a in export_cat_avails[cat_name]]}", + ) + + def _is_restricted_and_contained_slot( + self, slot: Availability, availabilities: list[Availability] + ) -> bool: + """Test if object is not available for whole event and may happen during slot.""" + return slot.is_covered(availabilities) and not Availability.is_event_covered( + self.event, availabilities + ) + + def _is_ak_fixed_in_slot( + self, + ak_slot: AKSlot, + timeslot_avail: Availability, + ) -> bool: + if not ak_slot.fixed or ak_slot.start is None: + return False + ak_slot_avail = Availability( + event=self.event, + start=ak_slot.start.astimezone(self.event.timezone), + end=ak_slot.end.astimezone(self.event.timezone), + ) + return timeslot_avail.overlaps(ak_slot_avail, strict=True) + + def test_timeslot_fulfilledconstraints(self): + """Test if fulfilled time constraints by timeslot are as expected.""" + for event in Event.objects.all(): + with self.subTest(event=event): + self.set_up_event(event=event) + + cat_avails = self._get_cat_availability() + for timeslot in chain.from_iterable( + self.export_dict["timeslots"]["blocks"] + ): + start, end = self._get_timeslot_start_end(timeslot) + timeslot_avail = Availability( + event=self.event, start=start, end=end + ) + + fulfilled_time_constraints = set() + + # reso deadline + if self.event.reso_deadline is not None: + # timeslot ends before deadline + if end < self.event.reso_deadline.astimezone( + self.event.timezone + ): + fulfilled_time_constraints.add("resolution") + + # add category constraints + fulfilled_time_constraints |= ( + AKCategory.create_category_constraints( + [ + cat + for cat in AKCategory.objects.filter( + event=self.event + ).all() + if timeslot_avail.is_covered(cat_avails[cat.name]) + ] + ) + ) + + # add owner constraints + fulfilled_time_constraints |= { + f"availability-person-{owner.id}" + for owner in AKOwner.objects.filter(event=self.event).all() + if self._is_restricted_and_contained_slot( + timeslot_avail, + Availability.union(owner.availabilities.all()), + ) + } + + # add room constraints + fulfilled_time_constraints |= { + f"availability-room-{room.id}" + for room in self.rooms + if self._is_restricted_and_contained_slot( + timeslot_avail, + Availability.union(room.availabilities.all()), + ) + } + + # add ak constraints + fulfilled_time_constraints |= { + f"availability-ak-{ak.id}" + for ak in AK.objects.filter(event=event) + if self._is_restricted_and_contained_slot( + timeslot_avail, Availability.union(ak.availabilities.all()) + ) + } + fulfilled_time_constraints |= { + f"fixed-akslot-{slot.id}" + for slot in self.ak_slots + if self._is_ak_fixed_in_slot(slot, timeslot_avail) + } + + self.assertEqual( + fulfilled_time_constraints, + set(timeslot["fulfilled_time_constraints"]), + ) diff --git a/AKModel/tests.py b/AKModel/tests/test_views.py similarity index 60% rename from AKModel/tests.py rename to AKModel/tests/test_views.py index 6730315d..63984745 100644 --- a/AKModel/tests.py +++ b/AKModel/tests/test_views.py @@ -7,8 +7,19 @@ from django.contrib.messages.storage.base import Message from django.test import TestCase from django.urls import reverse_lazy, reverse -from AKModel.models import Event, AKOwner, AKCategory, AKTrack, AKRequirement, AK, Room, AKSlot, AKOrgaMessage, \ - ConstraintViolation, DefaultSlot +from AKModel.models import ( + Event, + AKOwner, + AKCategory, + AKTrack, + AKRequirement, + AK, + Room, + AKSlot, + AKOrgaMessage, + ConstraintViolation, + DefaultSlot, +) class BasicViewTests: @@ -29,9 +40,10 @@ class BasicViewTests: since the test framework does not understand the concept of abstract test definitions and would handle this class as real test case otherwise, distorting the test results. """ + # pylint: disable=no-member VIEWS = [] - APP_NAME = '' + APP_NAME = "" VIEWS_STAFF_ONLY = [] EDIT_TESTCASES = [] @@ -41,16 +53,26 @@ class BasicViewTests: """ user_model = get_user_model() self.staff_user = user_model.objects.create( - username='Test Staff User', email='teststaff@example.com', password='staffpw', - is_staff=True, is_active=True + username="Test Staff User", + email="teststaff@example.com", + password="staffpw", + is_staff=True, + is_active=True, ) self.admin_user = user_model.objects.create( - username='Test Admin User', email='testadmin@example.com', password='adminpw', - is_staff=True, is_superuser=True, is_active=True + username="Test Admin User", + email="testadmin@example.com", + password="adminpw", + is_staff=True, + is_superuser=True, + is_active=True, ) self.deactivated_user = user_model.objects.create( - username='Test Deactivated User', email='testdeactivated@example.com', password='deactivatedpw', - is_staff=True, is_active=False + username="Test Deactivated User", + email="testdeactivated@example.com", + password="deactivatedpw", + is_staff=True, + is_active=False, ) def _name_and_url(self, view_name): @@ -62,7 +84,9 @@ class BasicViewTests: :return: full view name with prefix if applicable, url of the view :rtype: str, str """ - view_name_with_prefix = f"{self.APP_NAME}:{view_name[0]}" if self.APP_NAME != "" else view_name[0] + view_name_with_prefix = ( + f"{self.APP_NAME}:{view_name[0]}" if self.APP_NAME != "" else view_name[0] + ) url = reverse(view_name_with_prefix, kwargs=view_name[1]) return view_name_with_prefix, url @@ -74,7 +98,7 @@ class BasicViewTests: :param expected_message: message that should be shown :param msg_prefix: prefix for the error message when test fails """ - messages:List[Message] = list(get_messages(response.wsgi_request)) + messages: List[Message] = list(get_messages(response.wsgi_request)) msg_count = "No message shown to user" msg_content = "Wrong message, expected '{expected_message}'" @@ -95,10 +119,16 @@ class BasicViewTests: view_name_with_prefix, url = self._name_and_url(view_name) try: response = self.client.get(url) - self.assertEqual(response.status_code, 200, msg=f"{view_name_with_prefix} ({url}) broken") - except Exception: # pylint: disable=broad-exception-caught - self.fail(f"An error occurred during rendering of {view_name_with_prefix} ({url}):" - f"\n\n{traceback.format_exc()}") + self.assertEqual( + response.status_code, + 200, + msg=f"{view_name_with_prefix} ({url}) broken", + ) + except Exception: # pylint: disable=broad-exception-caught + self.fail( + f"An error occurred during rendering of {view_name_with_prefix} ({url}):" + f"\n\n{traceback.format_exc()}" + ) def test_access_control_staff_only(self): """ @@ -107,11 +137,16 @@ class BasicViewTests: # Not logged in? Views should not be visible self.client.logout() for view_name_info in self.VIEWS_STAFF_ONLY: - expected_response_code = 302 if len(view_name_info) == 2 else view_name_info[2] + expected_response_code = ( + 302 if len(view_name_info) == 2 else view_name_info[2] + ) view_name_with_prefix, url = self._name_and_url(view_name_info) response = self.client.get(url) - self.assertEqual(response.status_code, expected_response_code, - msg=f"{view_name_with_prefix} ({url}) accessible by non-staff") + self.assertEqual( + response.status_code, + expected_response_code, + msg=f"{view_name_with_prefix} ({url}) accessible by non-staff", + ) # Logged in? Views should be visible self.client.force_login(self.staff_user) @@ -119,20 +154,30 @@ class BasicViewTests: view_name_with_prefix, url = self._name_and_url(view_name_info) try: response = self.client.get(url) - self.assertEqual(response.status_code, 200, - msg=f"{view_name_with_prefix} ({url}) should be accessible for staff (but isn't)") + self.assertEqual( + response.status_code, + 200, + msg=f"{view_name_with_prefix} ({url}) should be accessible for staff (but isn't)", + ) except Exception: # pylint: disable=broad-exception-caught - self.fail(f"An error occurred during rendering of {view_name_with_prefix} ({url}):" - f"\n\n{traceback.format_exc()}") + self.fail( + f"An error occurred during rendering of {view_name_with_prefix} ({url}):" + f"\n\n{traceback.format_exc()}" + ) # Disabled user? Views should not be visible self.client.force_login(self.deactivated_user) for view_name_info in self.VIEWS_STAFF_ONLY: - expected_response_code = 302 if len(view_name_info) == 2 else view_name_info[2] + expected_response_code = ( + 302 if len(view_name_info) == 2 else view_name_info[2] + ) view_name_with_prefix, url = self._name_and_url(view_name_info) response = self.client.get(url) - self.assertEqual(response.status_code, expected_response_code, - msg=f"{view_name_with_prefix} ({url}) still accessible for deactivated user") + self.assertEqual( + response.status_code, + expected_response_code, + msg=f"{view_name_with_prefix} ({url}) still accessible for deactivated user", + ) def _to_sendable_value(self, val): """ @@ -182,16 +227,26 @@ class BasicViewTests: self.client.logout() response = self.client.get(url) - self.assertEqual(response.status_code, 200, msg=f"{name}: Could not load edit form via GET ({url})") + self.assertEqual( + response.status_code, + 200, + msg=f"{name}: Could not load edit form via GET ({url})", + ) form = response.context[form_name] - data = {k:self._to_sendable_value(v) for k,v in form.initial.items()} + data = {k: self._to_sendable_value(v) for k, v in form.initial.items()} response = self.client.post(url, data=data) if expected_code == 200: - self.assertEqual(response.status_code, 200, msg=f"{name}: Did not return 200 ({url}") + self.assertEqual( + response.status_code, 200, msg=f"{name}: Did not return 200 ({url}" + ) elif expected_code == 302: - self.assertRedirects(response, target_url, msg_prefix=f"{name}: Did not redirect ({url} -> {target_url}") + self.assertRedirects( + response, + target_url, + msg_prefix=f"{name}: Did not redirect ({url} -> {target_url}", + ) if expected_message != "": self._assert_message(response, expected_message, msg_prefix=f"{name}") @@ -200,32 +255,44 @@ class ModelViewTests(BasicViewTests, TestCase): """ Basic view test cases for views from AKModel plus some custom tests """ - fixtures = ['model.json'] + + fixtures = ["model.json"] ADMIN_MODELS = [ - (Event, 'event'), (AKOwner, 'akowner'), (AKCategory, 'akcategory'), (AKTrack, 'aktrack'), - (AKRequirement, 'akrequirement'), (AK, 'ak'), (Room, 'room'), (AKSlot, 'akslot'), - (AKOrgaMessage, 'akorgamessage'), (ConstraintViolation, 'constraintviolation'), - (DefaultSlot, 'defaultslot') + (Event, "event"), + (AKOwner, "akowner"), + (AKCategory, "akcategory"), + (AKTrack, "aktrack"), + (AKRequirement, "akrequirement"), + (AK, "ak"), + (Room, "room"), + (AKSlot, "akslot"), + (AKOrgaMessage, "akorgamessage"), + (ConstraintViolation, "constraintviolation"), + (DefaultSlot, "defaultslot"), ] VIEWS_STAFF_ONLY = [ - ('admin:index', {}), - ('admin:event_status', {'event_slug': 'kif42'}), - ('admin:event_requirement_overview', {'event_slug': 'kif42'}), - ('admin:ak_csv_export', {'event_slug': 'kif42'}), - ('admin:ak_json_export', {'event_slug': 'kif42'}), - ('admin:ak_wiki_export', {'slug': 'kif42'}), - ('admin:ak_schedule_json_import', {'event_slug': 'kif42'}), - ('admin:ak_delete_orga_messages', {'event_slug': 'kif42'}), - ('admin:ak_slide_export', {'event_slug': 'kif42'}), - ('admin:default-slots-editor', {'event_slug': 'kif42'}), - ('admin:room-import', {'event_slug': 'kif42'}), - ('admin:new_event_wizard_start', {}), + ("admin:index", {}), + ("admin:event_status", {"event_slug": "kif42"}), + ("admin:event_requirement_overview", {"event_slug": "kif42"}), + ("admin:ak_csv_export", {"event_slug": "kif42"}), + ("admin:ak_json_export", {"event_slug": "kif42"}), + ("admin:ak_wiki_export", {"slug": "kif42"}), + ("admin:ak_schedule_json_import", {"event_slug": "kif42"}), + ("admin:ak_delete_orga_messages", {"event_slug": "kif42"}), + ("admin:ak_slide_export", {"event_slug": "kif42"}), + ("admin:default-slots-editor", {"event_slug": "kif42"}), + ("admin:room-import", {"event_slug": "kif42"}), + ("admin:new_event_wizard_start", {}), ] EDIT_TESTCASES = [ - {'view': 'admin:default-slots-editor', 'kwargs': {'event_slug': 'kif42'}, "admin": True}, + { + "view": "admin:default-slots-editor", + "kwargs": {"event_slug": "kif42"}, + "admin": True, + }, ] def test_admin(self): @@ -236,24 +303,32 @@ class ModelViewTests(BasicViewTests, TestCase): for model in self.ADMIN_MODELS: # Special treatment for a subset of views (where we exchanged default functionality, e.g., create views) if model[1] == "event": - _, url = self._name_and_url(('admin:new_event_wizard_start', {})) + _, url = self._name_and_url(("admin:new_event_wizard_start", {})) elif model[1] == "room": - _, url = self._name_and_url(('admin:room-new', {})) + _, url = self._name_and_url(("admin:room-new", {})) # Otherwise, just call the creation form view else: - _, url = self._name_and_url((f'admin:AKModel_{model[1]}_add', {})) + _, url = self._name_and_url((f"admin:AKModel_{model[1]}_add", {})) response = self.client.get(url) - self.assertEqual(response.status_code, 200, msg=f"Add form for model {model[1]} ({url}) broken") + self.assertEqual( + response.status_code, + 200, + msg=f"Add form for model {model[1]} ({url}) broken", + ) for model in self.ADMIN_MODELS: # Test the update view using the first existing instance of each model m = model[0].objects.first() if m is not None: _, url = self._name_and_url( - (f'admin:AKModel_{model[1]}_change', {'object_id': m.pk}) + (f"admin:AKModel_{model[1]}_change", {"object_id": m.pk}) ) response = self.client.get(url) - self.assertEqual(response.status_code, 200, msg=f"Edit form for model {model[1]} ({url}) broken") + self.assertEqual( + response.status_code, + 200, + msg=f"Edit form for model {model[1]} ({url}) broken", + ) def test_wiki_export(self): """ @@ -262,17 +337,27 @@ class ModelViewTests(BasicViewTests, TestCase): """ self.client.force_login(self.admin_user) - export_url = reverse_lazy("admin:ak_wiki_export", kwargs={'slug': 'kif42'}) + export_url = reverse_lazy("admin:ak_wiki_export", kwargs={"slug": "kif42"}) response = self.client.get(export_url) self.assertEqual(response.status_code, 200, "Export not working at all") export_count = 0 for _, aks in response.context["categories_with_aks"]: for ak in aks: - self.assertEqual(ak.include_in_export, True, - f"AK with export flag set to False (pk={ak.pk}) included in export") - self.assertNotEqual(ak.pk, 1, "AK known to be excluded from export (PK 1) included in export") + self.assertEqual( + ak.include_in_export, + True, + f"AK with export flag set to False (pk={ak.pk}) included in export", + ) + self.assertNotEqual( + ak.pk, + 1, + "AK known to be excluded from export (PK 1) included in export", + ) export_count += 1 - self.assertEqual(export_count, AK.objects.filter(event_id=2, include_in_export=True).count(), - "Wiki export contained the wrong number of AKs") + self.assertEqual( + export_count, + AK.objects.filter(event_id=2, include_in_export=True).count(), + "Wiki export contained the wrong number of AKs", + ) diff --git a/AKModel/views/ak.py b/AKModel/views/ak.py index 5e642dd5..bca1c5b3 100644 --- a/AKModel/views/ak.py +++ b/AKModel/views/ak.py @@ -50,11 +50,6 @@ class AKJSONExportView(AdminViewMixin, FilterByEventSlugMixin, ListView): context_object_name = "slots" title = _("AK JSON Export") - - def _test_slot_contained(self, slot: Availability, availabilities: List[Availability]) -> bool: - """Test if slot is contained in any member of availabilities.""" - return any(availability.contains(slot) for availability in availabilities) - def _test_event_not_covered(self, availabilities: List[Availability]) -> bool: """Test if event is not covered by availabilities.""" return not Availability.is_event_covered(self.event, availabilities) @@ -70,8 +65,7 @@ class AKJSONExportView(AdminViewMixin, FilterByEventSlugMixin, ListView): def _test_add_constraint(self, slot: Availability, availabilities: List[Availability]) -> bool: """Test if object is not available for whole event and may happen during slot.""" return ( - self._test_event_not_covered(availabilities) - and self._test_slot_contained(slot, availabilities) + self._test_event_not_covered(availabilities) and slot.is_covered(availabilities) ) def _generate_time_constraints( @@ -161,7 +155,8 @@ class AKJSONExportView(AdminViewMixin, FilterByEventSlugMixin, ListView): current_block.append({ "id": str(timeslot.idx), "info": { - "start": timeslot.avail.simplified, + "start": timeslot.avail.start.astimezone(self.event.timezone).strftime("%Y-%m-%d %H:%M"), + "end": timeslot.avail.end.astimezone(self.event.timezone).strftime("%Y-%m-%d %H:%M"), }, "fulfilled_time_constraints": time_constraints, }) diff --git a/AKPlan/tests.py b/AKPlan/tests.py index 69365c2b..3f00061a 100644 --- a/AKPlan/tests.py +++ b/AKPlan/tests.py @@ -1,6 +1,6 @@ from django.test import TestCase -from AKModel.tests import BasicViewTests +from AKModel.tests.test_views import BasicViewTests class PlanViewTests(BasicViewTests, TestCase): diff --git a/AKScheduling/api.py b/AKScheduling/api.py index e78fda78..cfd476ec 100644 --- a/AKScheduling/api.py +++ b/AKScheduling/api.py @@ -55,7 +55,9 @@ class EventsView(LoginRequiredMixin, EventSlugMixin, ListView): model = AKSlot def get_queryset(self): - return super().get_queryset().select_related('ak').filter(event=self.event, room__isnull=False) + return super().get_queryset().select_related('ak').filter( + event=self.event, room__isnull=False, start__isnull=False + ) def render_to_response(self, context, **response_kwargs): return JsonResponse( diff --git a/AKScheduling/models.py b/AKScheduling/models.py index aa6be3d0..8f34c151 100644 --- a/AKScheduling/models.py +++ b/AKScheduling/models.py @@ -365,8 +365,8 @@ def akslot_changed_handler(sender, instance: AKSlot, **kwargs): new_violations = [] # For all slots in this room... - if instance.room: - for other_slot in instance.room.akslot_set.all(): + if instance.room and instance.start: + for other_slot in instance.room.akslot_set.filter(start__isnull=False): if other_slot != instance: # ... find overlapping slots... if instance.overlaps(other_slot): diff --git a/AKScheduling/tests.py b/AKScheduling/tests.py index 0996eedd..44f25719 100644 --- a/AKScheduling/tests.py +++ b/AKScheduling/tests.py @@ -4,7 +4,7 @@ from datetime import timedelta from django.test import TestCase from django.utils import timezone -from AKModel.tests import BasicViewTests +from AKModel.tests.test_views import BasicViewTests from AKModel.models import AKSlot, Event, Room class ModelViewTests(BasicViewTests, TestCase): diff --git a/AKSubmission/tests.py b/AKSubmission/tests.py index 018289aa..423a9776 100644 --- a/AKSubmission/tests.py +++ b/AKSubmission/tests.py @@ -5,7 +5,7 @@ from django.urls import reverse_lazy from django.utils.datetime_safe import datetime from AKModel.models import AK, AKSlot, Event -from AKModel.tests import BasicViewTests +from AKModel.tests.test_views import BasicViewTests class ModelViewTests(BasicViewTests, TestCase): -- GitLab