diff --git a/AKModel/utils.py b/AKModel/utils.py index 3a0b3f0d57cb646954cd34a9a76c5903ee74cc10..1f9e821915cabb2da2c98caeba2a7ce58f9b8417 100644 --- a/AKModel/utils.py +++ b/AKModel/utils.py @@ -4,24 +4,36 @@ from pathlib import Path from jsonschema import Draft202012Validator from jsonschema.protocols import Validator from referencing import Registry, Resource +import referencing.retrieval from AKPlanning import settings +def _construct_schema_path(uri: str | Path) -> Path: + schema_base_path = Path(settings.BASE_DIR).resolve() + uri_path = (schema_base_path / uri).resolve() + if not uri_path.is_relative_to(schema_base_path / "schemas"): + raise ValueError("Unallowed dictionary traversal") + return uri_path + + +@referencing.retrieval.to_cached_resource() +def retrieve_schema(uri: str) -> str: + # avoid dictionary traversals + uri_path = _construct_schema_path(uri) + with uri_path.open("r") as ff: + return ff.read() + + def construct_schema_validator(schema: str | dict) -> Validator: """Construct a validator for a JSON schema. In particular, all schemas from the 'schemas' directory are loaded into the registry. """ - schema_base_path = Path(settings.BASE_DIR) / "schemas" - resources = [] - for schema_path in schema_base_path.glob("**/*.schema.json"): - with schema_path.open("r") as ff: - res = Resource.from_contents(json.load(ff)) - resources.append((res.id(), res)) - registry = Registry().with_resources(resources) + registry = Registry(retrieve=retrieve_schema) + if isinstance(schema, str): - with (schema_base_path / schema).open("r") as ff: - schema = json.load(ff) + schema_uri = str(Path("schemas") / schema) + schema = registry.get_or_retrieve(schema_uri).value.contents return Draft202012Validator(schema=schema, registry=registry)