diff --git a/.env b/.env
index 31a4ad25c400fd115b2be6eed43c9a42db021f80..c156dc8f49e1b123a6f082d26b541bfcdeb7f3fb 100644
--- a/.env
+++ b/.env
@@ -1,11 +1,12 @@
 # S3-compatible storage (ex. MinIO)
-S3_ENDPOINT=
-S3_ACCESS_KEY=
-S3_SECRET_KEY=
-S3_BUCKET_NAME=
+S3_ENDPOINT=http://localhost:9000
+S3_ACCESS_KEY=sampleuser
+S3_SECRET_KEY=samplepass
+S3_BUCKET_NAME=analysis
+S3_METADATA_BUCKET_NAME=metadata
 
 # AMQP broker (ex. RabbitMQ)
-BROKER_HOST=
-BROKER_LOGIN=
-BROKER_PASSWORD=""
-BROKER_QUEUE_NAME=
\ No newline at end of file
+BROKER_HOST=localhost
+BROKER_LOGIN=admin
+BROKER_PASSWORD=admin
+BROKER_QUEUE_NAME=task_queue
\ No newline at end of file
diff --git a/README.md b/README.md
index 68bca34700ca7a0bfd9650ebbba2f1665fc07a79..d7a32e79a039333191ee051dc8c021638d41a673 100644
--- a/README.md
+++ b/README.md
@@ -1,93 +1,157 @@
-# Bachelorarbeit
+# Dynamic PyPI Package Analysis
 
+> This project contains scripts that enable dynamic behavior analysis of Python packages and the calculation of their
+> risk scores.
 
+## Table of Contents
 
-## Getting started
+1. [Overview](#overview)
+2. [Modules](#modules)
+    - [Crawler](#crawler)
+    - [Scheduler](#scheduler)
+    - [Worker](#worker)
+    - [Evaluation](#evaluation)
+3. [Environment Variables](#environment-variables)
+4. [Usage Guide](#quick-start-guide)
+    - [Prerequisites](#prerequisites)
+    - [Basic Infrastructure](#basic-infrastructure)
+    - [Crawler](#crawler-1)
+    - [Scheduler](#scheduler-1)
+    - [Workers](#workers)
+    - [Evaluation](#evaluation-1)
 
-To make it easy for you to get started with GitLab, here's a list of recommended next steps.
+---
 
-Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
+## Overview
 
-## Add your files
+**Project** designed to:
 
-- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
-- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
+- **Fetch** metadata of packages from PyPI.
+- **Schedule** these packages for dynamic runtime analysis.
+- **Execute** them in a containerized sandbox for thorough examination.
+- **Store** all analysis results and metadata in an S3-compatible backend.
+- **Evaluate** the outcome of the dynamic analysis for further insights.
 
-```
-cd existing_repo
-git remote add origin https://gitlab.fachschaften.org/anton.shadrin/bachelorarbeit.git
-git branch -M main
-git push -uf origin main
-```
+---
+
+## Modules
+
+### Crawler
+
+- Retrieves a list of all packages using PyPI-API.
+- Obtains detailed metadata (versions, release files, etc.) for each package that hasn't already been collected.
+- Saves the retrieved information to an S3-compatible repository (e.g. MinIO).
+- When “scheduler mode” (with `--scheduler-mode`) is enabled, publishes messages to a broker (e.g. RabbitMQ), allowing
+  further processing (analysis) of packets in asynchronous mode.
 
-## Integrate with your tools
+### Scheduler
 
-- [ ] [Set up project integrations](https://gitlab.fachschaften.org/anton.shadrin/bachelorarbeit/-/settings/integrations)
+Reads metadata from the S3 bucket and submits tasks to the RabbitMQ queue for analysis.
 
-## Collaborate with your team
+### Worker
 
-- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
-- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
-- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
-- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
-- [ ] [Set auto-merge](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
+- Waits for messages from RabbitMQ.
+- Once a message arrives, it spawns an **isolated environment** (via Podman + [gVisor](https://gvisor.dev/) runtime) for
+  the target package.
+- Collects **runtime data** (DNS requests, system calls, Python function calls, installed packages, etc.).
+- Pushes the results (packaged into a JSON summary) back to the configured **S3-compatible storage**.
 
-## Test and Deploy
+### Evaluation
 
-Use the built-in continuous integration in GitLab.
+Processes the results of the dynamic analysis from the S3 storage, providing insights or additional
+analytics.
 
-- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/)
-- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
-- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
-- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
-- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
+---
 
-***
+## Environment Variables
 
-# Editing this README
+All relevant environment variables for the system should be declared in a `.env` file (or provided in another manner for
+the container runtime). A typical `.env` might look like:
 
-When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template.
+```bash
+# S3-compatible storage (ex. MinIO)
+S3_ENDPOINT=http://localhost:9000
+S3_ACCESS_KEY=sampleuser
+S3_SECRET_KEY=samplepass
+S3_BUCKET_NAME=analysis
+S3_METADATA_BUCKET_NAME=metadata
 
-## Suggestions for a good README
+# AMQP broker (ex. RabbitMQ)
+BROKER_HOST=localhost
+BROKER_LOGIN=admin
+BROKER_PASSWORD=admin
+BROKER_QUEUE_NAME=task_queue
+```
+
+---
 
-Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
+## Quick Start Guide
 
-## Name
-Choose a self-explaining name for your project.
+### Prerequisites
 
-## Description
-Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
+1. **Docker** installed (for container builds).
+2. **Python 3.9+** and requirements from `requirements.txt` (`pip install -r requirements.txt`).
+3. **AMQP broker** instance (e.g. RabbitMQ).
+4. **S3-Compatible Storage** (e.g. MinIO).
 
-## Badges
-On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
+Before running each script, populate your `.env` with valid configurations (
+see [Environment Variables](#environment-variables)).
 
-## Visuals
-Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
+### Basic Infrastructure
 
-## Installation
-Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
+You can run a pre-built infrastructure that includes RabbitMQ and MinIO using Docker.
+To do this, navigate to the infra folder and run:
 
-## Usage
-Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
+```bash
+cd infra && docker compose up
+```
 
-## Support
-Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
+### Crawler
 
-## Roadmap
-If you have ideas for releases in the future, it is a good idea to list them in the README.
+Run:
+
+```bash
+cd crawler && python crawler.py [--scheduler-mode]
+```
 
-## Contributing
-State if you are open to contributions and what your requirements are for accepting them.
+- --scheduler-mode (optional), when specified will force the script to publish messages about each processed packet to
+  the
+  broker.
 
-For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
+### Scheduler
 
-You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
+Run:
 
-## Authors and acknowledgment
-Show your appreciation to those who have contributed to the project.
+```bash
+cd scheduler && python scheduler.py
+```
+
+This script will:
+
+- Read collected metadata from the S3-compatible storage.
+- Submit tasks to the AMQP broker for analysis.
+
+### Workers
+
+1. Ensure Docker is running.
+2. Run:
+   ```bash
+   python runner.py [worker_count]
+   ```
+   This script will:
+    - Build the `sandbox` Docker image.
+    - Build the `worker` Docker image.
+    - Launch the worker container(s) to consume tasks from RabbitMQ.
+
+### Evaluation
+
+Run:
+
+```bash
+cd evaluation && python evaluation.py
+```
 
-## License
-For open source projects, say how it is licensed.
+This script will:
 
-## Project status
-If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
+- Read the resulting package analysis data from the S3-compatible storage.
+- Analyze that data and generate statistics in stats-directory.
\ No newline at end of file
diff --git a/crawler.py b/crawler/crawler.py
similarity index 53%
rename from crawler.py
rename to crawler/crawler.py
index 0ab4169f9fbc5930022aa17dec40ab4d2779fe2b..1182bd61c2f8a8fb26e873db2ea068dd31458ca5 100644
--- a/crawler.py
+++ b/crawler/crawler.py
@@ -1,17 +1,31 @@
 import asyncio
 import logging
 import os
+import sys
 import time
 from dataclasses import dataclass, field
+from datetime import datetime
 from logging.handlers import RotatingFileHandler
 from pathlib import Path
 from typing import Dict, List, Optional, Set
 
-import aiofiles
 import httpx
 import orjson
+from dotenv import load_dotenv
 from tenacity import retry, stop_after_attempt, wait_exponential
 
+from message_queue import MessageQueueConfig, AsyncMessageQueue
+from storage import AsyncStorage
+
+
+@dataclass
+class StorageConfig:
+    """Holds S3 connection details."""
+    endpoint: str
+    access_key: str
+    secret_key: str
+    bucket_name: str
+
 
 def setup_logger(
         name: str,
@@ -22,6 +36,7 @@ def setup_logger(
     log_path = Path(log_file)
     log_path.parent.mkdir(parents=True, exist_ok=True)
 
+    """Creates and returns a configured logger with both file and console handlers."""
     logger = logging.getLogger(name)
     logger.setLevel(level)
 
@@ -46,11 +61,12 @@ def setup_logger(
     return logger
 
 
-logger = setup_logger('crawler', 'crawler.log')
+logger = setup_logger('crawler', './logs/crawler.log')
 
 
 @dataclass
 class ReleaseFile:
+    """Represents a single file within a particular release."""
     url: str
     filename: str
     package_type: str
@@ -59,6 +75,7 @@ class ReleaseFile:
     python_version: str
 
     def to_dict(self) -> dict:
+        """Converts ReleaseFile to a dictionary."""
         return {
             "url": self.url,
             "filename": self.filename,
@@ -69,8 +86,37 @@ class ReleaseFile:
         }
 
 
+def get_latest_release_by_upload_time(releases: Dict[str, List[ReleaseFile]]):
+    """
+    Return the release version with the newest upload time and its files.
+    Each value in 'releases' is a list of file dicts, each having 'upload_time'.
+    """
+    best_version = None
+    best_files = []
+    best_time = None
+
+    for version_str, files in releases.items():
+        if not files:
+            continue
+
+        sorted_files = sorted(
+            files,
+            key=lambda f: datetime.fromisoformat(f.upload_time),
+            reverse=True
+        )
+
+        version_latest_dt = datetime.fromisoformat(sorted_files[0].upload_time)
+        if best_time is None or version_latest_dt > best_time:
+            best_time = version_latest_dt
+            best_version = version_str
+            best_files = sorted_files
+
+    return best_version, best_files
+
+
 @dataclass
 class Package:
+    """Represents metadata for a Python package."""
     name: str
     author_name: str
     author_email: str
@@ -78,6 +124,7 @@ class Package:
     releases: Dict[str, List[ReleaseFile]] = field(default_factory=dict)
 
     def to_dict(self) -> dict:
+        """Converts Package to a dictionary, including its releases."""
         return {
             "name": self.name,
             "author_name": self.author_name,
@@ -91,20 +138,28 @@ class Package:
 
 
 class PyPICrawler:
+    """Crawls PyPI to retrieve package metadata and stores it in an S3-compatible storage."""
+
     def __init__(
             self,
-            output_dir: str = "pypi_metadata",
+            storage_config: StorageConfig,
+            message_queue_config: MessageQueueConfig | None,
+            is_scheduler_mode_enabled: bool,
             batch_size: int = 100,
             max_concurrent_requests: int = 20,
             request_delay: float = 0.1
     ):
-        self.output_dir = Path(output_dir)
-        self.output_dir.mkdir(parents=True, exist_ok=True)
+        self.is_scheduler_mode_enabled = is_scheduler_mode_enabled
+
+        """Initializes the crawler with storage config and crawl settings."""
         self.batch_size = batch_size
-        self.processed_packages: Set[str] = set()
         self.max_concurrent_requests = max_concurrent_requests
         self.request_delay = request_delay
+
+        self.processed_packages: Set[str] = set()
+
         self.semaphore = asyncio.Semaphore(max_concurrent_requests)
+
         self.client = httpx.AsyncClient(
             headers={
                 "User-Agent": "PyPICrawler/1.0 (Uni Project)",
@@ -113,53 +168,67 @@ class PyPICrawler:
             timeout=httpx.Timeout(15.0),
             follow_redirects=True
         )
-        self._load_processed_packages()
+
+        if self.is_scheduler_mode_enabled and message_queue_config:
+            self.message_queue = AsyncMessageQueue(message_queue_config)
+
+        self.storage = AsyncStorage(storage_config)
 
     async def __aenter__(self):
+        """Connects to S3 and loads the list of already processed package files."""
+        await self.storage.connect()
+        await self.message_queue.start(10)
+        await self._load_processed_packages()
         return self
 
     async def __aexit__(self, exc_type, exc, tb):
+        """Closes the HTTP client and S3 connection."""
         await self.client.aclose()
+        await self.storage.close()
 
-    def _load_processed_packages(self):
-        self.processed_packages = {f.stem for f in self.output_dir.glob("*.json")}
+    async def _load_processed_packages(self):
+        """Retrieves the list of JSON files in the bucket to identify processed packages."""
+        try:
+            keys = await self.storage.list_objects()
+            for k in keys:
+                if k.endswith(".json"):
+                    pkg_name = Path(k).stem
+                    self.processed_packages.add(pkg_name)
+            logger.info(f"Already processed packages: {len(self.processed_packages)}")
+        except Exception as e:
+            logger.warning(f"Could not load the list of objects from S3: {e}")
 
     @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
     async def _fetch_package_list(self) -> List[str]:
+        """Fetches the complete list of packages from PyPI."""
         try:
             response = await self.client.get("https://pypi.org/simple/", follow_redirects=True)
             response.raise_for_status()
-            return [p["name"] for p in response.json().get("projects", [])]
+            data = response.json()
+            return [p["name"] for p in data.get("projects", [])]
         except Exception as e:
             logger.error(f"Failed to fetch package list: {str(e)}")
             raise
 
     @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
     async def _fetch_package_data(self, package_name: str) -> Optional[dict]:
+        """Fetches metadata for a specific package from PyPI."""
         async with self.semaphore:
             try:
                 main_url = f"https://pypi.org/pypi/{package_name}/json"
                 main_res = await self.client.get(main_url)
 
                 if main_res.status_code != 200:
-                    # logger.warning(f"Error fetching {package_name}: {str(main_res.status_code)}")
                     return None
 
                 await asyncio.sleep(self.request_delay)
-
                 return main_res.json()
             except Exception as e:
                 logger.warning(f"Error fetching {package_name}: {str(e)}")
                 return None
 
-    def _determine_package_type(self, filename: str) -> str:
-        if filename.endswith(".whl"):
-            return "bdist_wheel"
-        elif filename.endswith((".tar.gz", ".zip")):
-            return "sdist"
-        return "unknown"
-
     def _process_package_data(self, main_data: dict) -> Optional[Package]:
+        """Transforms raw JSON data from PyPI into a Package object."""
         try:
             info = main_data.get("info", {})
             releases = main_data.get("releases", {})
@@ -172,14 +241,9 @@ class PyPICrawler:
             )
 
             version_set = set(releases)
-
-            for version, releases in releases.items():
-                for release_info in releases:
+            for version, release_files in releases.items():
+                for release_info in release_files:
                     filename = release_info.get("filename", "")
-
-                    if version is None:
-                        print(f"Version extraction failed for {package.name}: {filename}")
-
                     if version and version in version_set:
                         release_file = ReleaseFile(
                             url=release_info["url"],
@@ -189,37 +253,60 @@ class PyPICrawler:
                             upload_time=release_info.get("upload_time", ""),
                             python_version=release_info.get("requires_python", release_info.get("python_version", ""))
                         )
-
                         if version not in package.releases:
                             package.releases[version] = []
-
                         package.releases[version].append(release_file)
 
-            return package if package.releases else None
+            if not package.releases:
+                return None
+
+            return package
         except Exception as e:
-            logger.error(f"Error processing {main_data.get('info', {}).get('name')}: {str(e)}")
+            logger.error(f"Error processing package data: {str(e)}")
             return None
 
     async def _save_package_data(self, package: Package):
-        file_path = self.output_dir / f"{package.name}.json"
+        """Serializes and uploads package data to S3."""
+        try:
+            data_bytes = orjson.dumps(package.to_dict())
+            key = f"{package.name}.json"
+            await self.storage.put(key, data_bytes, content_type='application/json')
+            self.processed_packages.add(package.name)
+
+            # publish package to the queue if scheduler mode is enabled
+            if self.is_scheduler_mode_enabled and self.message_queue:
+                package_name = package.name
+                latest_version, _ = get_latest_release_by_upload_time(package.releases)
 
-        async with aiofiles.open(file_path, "wb") as f:
-            await f.write(orjson.dumps(package.to_dict()))
+                await self.message_queue.publish(
+                    orjson.dumps({"type": "analyze", "package_name": package_name, "version": latest_version})
+                )
 
-        self.processed_packages.add(package.name)
+        except Exception as e:
+            logger.error(f"Error saving package {package.name} to S3: {str(e)}")
 
     async def process_batch(self, package_names: List[str]):
+        """Processes and stores package metadata for a list of packages."""
         names_to_process = [n for n in package_names if n not in self.processed_packages]
-
         if not names_to_process:
             return
 
-        results = await asyncio.gather(*[self._fetch_package_data(n) for n in names_to_process])
-        packages = [self._process_package_data(result) for result in results if result]
+        results = await asyncio.gather(
+            *[self._fetch_package_data(n) for n in names_to_process],
+            return_exceptions=False
+        )
 
-        await asyncio.gather(*[self._save_package_data(p) for p in packages if p])
+        packages = []
+        for result in results:
+            if result:
+                p = self._process_package_data(result)
+                if p:
+                    packages.append(p)
+
+        await asyncio.gather(*[self._save_package_data(p) for p in packages])
 
     async def crawl_all_packages(self):
+        """Retrieves and processes all packages from PyPI in batches."""
         start_time = time.time()
         logger.info("Starting PyPI package crawling")
 
@@ -228,11 +315,9 @@ class PyPICrawler:
             packages_to_process = []
 
             for package in all_packages:
-                if not os.path.exists(Path(self.output_dir) / (package + ".json")):
+                if package not in self.processed_packages:
                     packages_to_process.append(package)
 
-            all_packages.clear()
-
             total = len(packages_to_process)
             logger.info(f"Total packages to process: {total}")
 
@@ -249,8 +334,37 @@ class PyPICrawler:
 
 
 async def main():
+    args = list(sys.argv)
+    args.pop(0)
+
+    is_scheduler_mode_enabled = True
+
+    if len(args) > 1:
+        is_scheduler_mode_enabled = args.pop(0) == '--scheduler-mode'
+
+    load_dotenv()
+
+    storage_config = StorageConfig(
+        endpoint=os.getenv("S3_ENDPOINT", "not in env defined"),
+        access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"),
+        secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"),
+        bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"),
+    )
+
+    message_queue_config = None
+
+    if is_scheduler_mode_enabled:
+        message_queue_config = MessageQueueConfig(
+            host=os.getenv("BROKER_HOST", "not in env defined"),
+            login=os.getenv("BROKER_LOGIN", "not in env defined"),
+            password=os.getenv("BROKER_PASSWORD", "not in env defined"),
+            queue_name=os.getenv("BROKER_QUEUE_NAME", "not in env defined"),
+        )
+
     async with PyPICrawler(
-            output_dir="pypi_metadata",
+            is_scheduler_mode_enabled=is_scheduler_mode_enabled,
+            storage_config=storage_config,
+            message_queue_config=message_queue_config,
             batch_size=500,
             max_concurrent_requests=100,
             request_delay=0.05
diff --git a/crawler/message_queue.py b/crawler/message_queue.py
new file mode 100644
index 0000000000000000000000000000000000000000..e9b3da91173342b59d251abc434b655d66965428
--- /dev/null
+++ b/crawler/message_queue.py
@@ -0,0 +1,89 @@
+import asyncio
+import json
+from dataclasses import dataclass
+from typing import Callable, Dict
+
+import pika
+from aio_pika import connect_robust, IncomingMessage, DeliveryMode, Message
+
+
+@dataclass
+class MessageQueueConfig:
+    host: str
+    login: str
+    password: str
+    queue_name: str
+
+
+class AsyncMessageQueue:
+    def __init__(self, message_queue_config: MessageQueueConfig, on_connected: Callable | None = None):
+        self.__message_queue_config = message_queue_config
+
+        self.__connection = None
+        self.__channel = None
+        self.__handlers: Dict[str, Callable] = {}
+        self.__on_connected = on_connected
+
+    def register_handler(self, message_type: str, handler_func: Callable):
+        self.__handlers[message_type] = handler_func
+
+    async def __process_message(self, message: IncomingMessage):
+        async with message.process():
+            try:
+                body = message.body.decode()
+                message_data = json.loads(body)
+                message_type = message_data.get('type')
+
+                if message_type in self.__handlers:
+                    del message_data['type']
+                    handler = self.__handlers[message_type]
+
+                    if asyncio.iscoroutinefunction(handler):
+                        await handler(message_data)
+                    else:
+                        handler(message_data)
+
+            except json.JSONDecodeError as e:
+                print(f"JSON decode error: {e}")
+
+            except Exception as e:
+                print(f"Error processing message: {e}")
+
+    async def setup(self, prefetch_count: int):
+        self.__connection = await connect_robust(
+            host=self.__message_queue_config.host,
+            login=self.__message_queue_config.login,
+            password=self.__message_queue_config.password,
+            heartbeat=600
+        )
+
+        self.__channel = await self.__connection.channel()
+
+        await self.__channel.set_qos(prefetch_count)
+        await self.__channel.declare_queue(self.__message_queue_config.queue_name, durable=True)
+
+    async def cleanup(self):
+        if self.__channel and not self.__channel.is_closed:
+            await self.__channel.close()
+
+        if self.__connection and not self.__connection.is_closed:
+            await self.__connection.close()
+
+    async def start(self, prefetch_count: int):
+        try:
+            await self.setup(prefetch_count)
+        except Exception as e:
+            print(f"Connection error: {e}")
+            await self.cleanup()
+
+    async def publish(self, body: bytes):
+        message = Message(
+            body,
+            content_type='application/json',
+            delivery_mode=DeliveryMode.PERSISTENT
+        )
+
+        await self.__channel.default_exchange.publish(
+            message,
+            routing_key=self.__message_queue_config.queue_name,
+        )
diff --git a/crawler/storage.py b/crawler/storage.py
new file mode 100644
index 0000000000000000000000000000000000000000..f5bfb89908cda1f46b43ed2d0d12de50236ba469
--- /dev/null
+++ b/crawler/storage.py
@@ -0,0 +1,85 @@
+from contextlib import AsyncExitStack
+from dataclasses import dataclass
+from io import BytesIO
+from typing import List
+
+import aiobotocore
+from aiobotocore.session import AioSession
+
+
+@dataclass
+class StorageConfig:
+    endpoint: str
+    access_key: str
+    secret_key: str
+    bucket_name: str
+
+
+class AsyncStorage:
+    def __init__(self, storage_config: StorageConfig):
+        self._exit_stack = AsyncExitStack()
+        self.__storage_config = storage_config
+        self.__client = None
+
+    async def connect(self):
+        try:
+            session = AioSession()
+
+            self.__client = await self._exit_stack.enter_async_context(session.create_client(
+                service_name='s3',
+                use_ssl=False,
+                endpoint_url=self.__storage_config.endpoint,
+                aws_access_key_id=self.__storage_config.access_key,
+                aws_secret_access_key=self.__storage_config.secret_key,
+                config=aiobotocore.config.AioConfig(
+                    max_pool_connections=10,
+                    connect_timeout=5000,
+                    read_timeout=5000,
+                    retries={'max_attempts': 10}
+                )
+            ))
+
+        except Exception as e:
+            await self.close()
+            raise
+
+    async def close(self) -> None:
+        if self.__client:
+            await self._exit_stack.__aexit__(None, None, None)
+
+    async def put(self, destination_file_name: str, data: bytes, content_type: str = 'application/json') -> None:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        data_io = BytesIO(data)
+        length = len(data)
+
+        await self.__client.put_object(
+            Bucket=self.__storage_config.bucket_name,
+            Key=destination_file_name,
+            Body=data_io,
+            ContentLength=length,
+            ContentType=content_type
+        )
+
+    async def get(self, key: str) -> any:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.get_object(
+            Bucket=self.__storage_config.bucket_name,
+            Key=key,
+        )
+
+        return response
+
+    async def list_objects(self, prefix: str = "") -> List[str]:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.list_objects_v2(
+            Bucket=self.__storage_config.bucket_name,
+            Prefix=prefix
+        )
+        contents = response.get("Contents", [])
+        return [obj["Key"] for obj in contents]
diff --git a/evaluation/evaluate.py b/evaluation/evaluate.py
index 840789b39804b4e52b99f0bcf506e2b537bb244f..3b3506336466c6846cbd5c63d9fc9690fd021c33 100644
--- a/evaluation/evaluate.py
+++ b/evaluation/evaluate.py
@@ -18,14 +18,30 @@ import mpmath
 import numpy as np
 import orjson
 import seaborn
+from dotenv import load_dotenv
 from upsetplot import UpSet, from_memberships
 
 from function_arguments_parser import parse_python_tuple
+from storage import StorageConfig, Storage
 
-DYNAMIC_ANALYSIS_RESULTS_DIRECTORY = "/Users/xset/PycharmProjects/03-04-2025-14-53-40_files_list"
+load_dotenv()
 
-STATISTICS_DIRECTORY = Path("./stats/")
-STATISTICS_DIRECTORY.mkdir(parents=True, exist_ok=True)
+analysis_storage = Storage(StorageConfig(
+    endpoint=os.getenv("S3_ENDPOINT", "not in env defined"),
+    access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"),
+    secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"),
+    bucket_name=os.getenv("S3_BUCKET_NAME", "not in env defined"),
+))
+
+metadata_storage = Storage(StorageConfig(
+    endpoint=os.getenv("S3_ENDPOINT", "not in env defined"),
+    access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"),
+    secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"),
+    bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"),
+))
+
+STATISTICS_OUTPUT_DIRECTORY = Path("./stats/")
+STATISTICS_OUTPUT_DIRECTORY.mkdir(parents=True, exist_ok=True)
 
 
 ###############################################################################
@@ -66,7 +82,7 @@ def setup_logger(
     return logger
 
 
-logger = setup_logger('evaluate', 'evaluate.log',
+logger = setup_logger('evaluate', './logs/evaluate.log',
                       log_format='%(asctime)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s')
 
 
@@ -593,7 +609,7 @@ def is_suspicious_command(
     return command_with_args_str, False
 
 
-def analyze_single_json(json_path: Path) -> dict:
+def analyze_single_json(json_bytes: bytes, pkg_name: str, pkg_version: str) -> dict:
     """
     Analyze a single dynamic_summary.json file and return a dictionary of results.
     Includes storing all read files, write files, commands, and function calls in lists.
@@ -601,26 +617,19 @@ def analyze_single_json(json_path: Path) -> dict:
     package_result = make_empty_result()
 
     # Identify package
-    root = json_path.parent
-    parts = root.as_posix().split("/")
-    platform = parts[-1]
-    pkg_version = parts[-2]
-    pkg_name = parts[-3]
-
     package_result["name"] = pkg_name
     package_result["version"] = pkg_version
 
     # Read JSON
     try:
-        with open(json_path, "r", errors='replace') as f:
-            data = orjson.loads(f.read())
+        data = orjson.loads(json_bytes.decode(encoding='utf-8', errors='replace'))
     except Exception as e:
-        logger.error(f"Error reading {json_path}: {e}")
-        package_result["warnings"].append(f"Error reading {json_path}: {e}")
+        logger.error(f"Error reading json {pkg_name} {pkg_version}: {e}")
+        package_result["warnings"].append(f"Error reading {pkg_name} {pkg_version}: {e}")
         return package_result
 
     if not isinstance(data, dict):
-        logger.warning(f"Unexpected JSON structure in {json_path}")
+        logger.warning(f"Unexpected JSON structure in {pkg_name} {pkg_version}")
         return package_result
 
     # Extract fields
@@ -756,24 +765,28 @@ def analyze_single_json(json_path: Path) -> dict:
     return package_result
 
 
-def process_json_chunk(paths: List[Path]) -> List[dict]:
+def process_json_chunk(chunks: List[tuple]) -> List[dict]:
     """Process a list of JSON files (a chunk). Return a list of results (one dict per JSON)."""
-    chunk_result = []
-    for p in paths:
+    results = []
+    for key, pkg_name, pkg_version in chunks:
         try:
-            result = analyze_single_json(p)
-            chunk_result.append(result)
+            json_bytes = analysis_storage.get_object_bytes(key)
+
+            if not json_bytes:
+                continue
+
+            result = analyze_single_json(json_bytes, pkg_name, pkg_version)
+            results.append(result)
         except Exception as e:
             traceback.print_exc()
-            logger.error(f"Error processing {p}: {e}")
-    return chunk_result
+            logger.error(f"Error processing {key}: {e}")
+    return results
 
 
 ###############################################################################
 # EVALUATION AND RESULTS AGGREGATION
 ###############################################################################
 def evaluate(
-        directory: Path,
         max_workers: int,
         chunk_size: int
 ) -> None:
@@ -787,32 +800,29 @@ def evaluate(
     current_chunk = []
     total_count = 0
 
-    # Collect unique package-version-locations
-    results_to_process = {}
-    for root, dirs, files in os.walk(directory):
-        if "dynamic_summary.json" not in files:
+    all_keys = analysis_storage.list_objects(prefix="")
+
+    results_to_process = []
+
+    for key in all_keys:
+        if not key.endswith("dynamic_summary.json"):
             continue
 
-        root_path = Path(root)
-        parts = root_path.as_posix().split("/")
-        platform = parts[-1]
-        if platform != "linux":
+        parts = key.split("/")
+
+        if len(parts) < 3:
             continue
-        pkg_version = parts[-2]
+
         pkg_name = parts[-3]
+        pkg_version = parts[-2]
+        platform = parts[-1].replace("dynamic_summary.json", "")
 
-        json_file = root_path / "dynamic_summary.json"
-        results_to_process[(pkg_name, pkg_version)] = json_file
+        results_to_process.append((key, pkg_name, pkg_version))
 
     # Dispatch tasks
     with ProcessPoolExecutor(max_workers=max_workers) as executor:
-        for (pkg_name, pkg_version), json_file in results_to_process.items():
-
-            if not json_file.exists():
-                logger.error(f"File not found: {json_file}")
-                continue
-
-            current_chunk.append(json_file)
+        for entry in results_to_process:
+            current_chunk.append(entry)
             total_count += 1
 
             if len(current_chunk) % chunk_size == 0:
@@ -851,7 +861,7 @@ def evaluate(
     memberships = []
 
     # SINGLE CSV for packages
-    packages_csv = STATISTICS_DIRECTORY / "packages.csv"
+    packages_csv = STATISTICS_OUTPUT_DIRECTORY / "packages.csv"
 
     common_commands = Counter()
     common_read_files = Counter()
@@ -920,16 +930,22 @@ def evaluate(
 
             # Build membership for UpSet
             subset_list = []
+
             if write_count > 0:
                 subset_list.append("Suspicious Write")
+
             if read_count > 0:
                 subset_list.append("Suspicious Read")
+
             if cmd_count > 0:
                 subset_list.append("Suspicious Commands")
+
             if func_count > 0:
                 subset_list.append("Suspicious Functions")
+
             if (hostname_count + socket_count) > 0:
                 subset_list.append("Suspicious Network")
+
             memberships.append(subset_list)
 
             common_read_files.update(pkg_res["suspicious_read_files"])
@@ -966,22 +982,22 @@ def evaluate(
                 functions_json,
             ])
 
-    # Build the UpSet plot from memberships
+    # Build the UpSet plot from memberships if possible
     data = from_memberships(memberships)
-    upset = UpSet(data, subset_size='count', show_counts=True, sort_categories_by=None)
-    plot = upset.plot()
-    plot["intersections"].set_ylabel("Intersection Size")
-    plt.savefig(STATISTICS_DIRECTORY / "upset.png", dpi=300, bbox_inches='tight')
-    plt.close()
+
+    if data.index.nlevels > 1:
+        upset = UpSet(data, subset_size='count', show_counts=True, sort_categories_by=None)
+        plot = upset.plot()
+        plot["intersections"].set_ylabel("Intersection Size")
+        plt.savefig(STATISTICS_OUTPUT_DIRECTORY / "upset.png", dpi=300, bbox_inches='tight')
+        plt.close()
 
     # Plot risk factor histogram
     seaborn.set_context("paper", font_scale=1.2)
     seaborn.set_style("whitegrid")
-
     plt.figure(figsize=(7, 5), dpi=300)
     bins = np.arange(0, 1.01, 0.01)
     seaborn.histplot(risk_factor_list, bins=bins, color="RoyalBlue", kde=False)
-
     plt.xlim(0, 1)
     plt.xticks(np.arange(0, 1.01, 0.1))
     plt.xlabel("Risk Factor", fontsize=12)
@@ -989,7 +1005,7 @@ def evaluate(
     seaborn.color_palette("colorblind")
     seaborn.despine()
     plt.tight_layout()
-    plt.savefig(STATISTICS_DIRECTORY / "rist_factor.png", dpi=300, bbox_inches='tight')
+    plt.savefig(STATISTICS_OUTPUT_DIRECTORY / "rist_factor.png", dpi=300, bbox_inches='tight')
     plt.close()
 
     write_counter("read_files", common_read_files)
@@ -998,7 +1014,7 @@ def evaluate(
     write_counter("functions", common_functions)
     write_counter("commands", common_commands)
 
-    function_stats_csv = STATISTICS_DIRECTORY / "overall_stats.csv"
+    function_stats_csv = STATISTICS_OUTPUT_DIRECTORY / "overall_stats.csv"
 
     with open(function_stats_csv, mode="w", newline="", encoding="utf-8") as stats_csv:
         stats_writer = csv.writer(stats_csv)
@@ -1033,7 +1049,7 @@ def evaluate(
 
 
 def write_counter(name: str, counter: Counter):
-    function_stats_csv = STATISTICS_DIRECTORY / f"{name}_stats.csv"
+    function_stats_csv = STATISTICS_OUTPUT_DIRECTORY / f"{name}_stats.csv"
 
     with open(function_stats_csv, mode="w", newline="", encoding="utf-8") as stats_csv:
         stats_writer = csv.writer(stats_csv)
@@ -1044,12 +1060,10 @@ def write_counter(name: str, counter: Counter):
 
 
 def main():
-    """Main entry point."""
     logger.debug("Starting analysis...")
     start = datetime.now()
 
-    base_dir = Path(DYNAMIC_ANALYSIS_RESULTS_DIRECTORY)
-    evaluate(base_dir, max_workers=6, chunk_size=2000)
+    evaluate(max_workers=6, chunk_size=2000)
 
     logger.info(f"Finished in {datetime.now() - start}")
 
diff --git a/evaluation/requirements.txt b/evaluation/requirements.txt
deleted file mode 100644
index 68d0d9483df28f60a28425b99305576c2d386ab2..0000000000000000000000000000000000000000
--- a/evaluation/requirements.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-upsetplot
-seaborn
-orjson
-numpy
-matplotlib
-mpmath
\ No newline at end of file
diff --git a/evaluation/storage.py b/evaluation/storage.py
new file mode 100644
index 0000000000000000000000000000000000000000..352e036c6d8a5a03b0b4842dc32e8a2a1b49a3cf
--- /dev/null
+++ b/evaluation/storage.py
@@ -0,0 +1,101 @@
+from dataclasses import dataclass
+from typing import List, Optional
+
+import boto3
+from botocore.config import Config
+from botocore.exceptions import BotoCoreError, ClientError
+from dotenv import load_dotenv
+
+load_dotenv()  # Подтянем переменные из .env, если нужно
+
+
+@dataclass
+class StorageConfig:
+    endpoint: str
+    access_key: str
+    secret_key: str
+    bucket_name: str
+
+
+class Storage:
+    def __init__(self, storage_config: StorageConfig):
+        self._cfg = storage_config
+        self._client = boto3.client(
+            service_name="s3",
+            endpoint_url=self._cfg.endpoint,
+            aws_access_key_id=self._cfg.access_key,
+            aws_secret_access_key=self._cfg.secret_key,
+            use_ssl=False,
+            config=Config(
+                max_pool_connections=10,
+                connect_timeout=60,
+                read_timeout=60,
+                retries={'max_attempts': 5}
+            )
+        )
+        self._bucket = self._cfg.bucket_name
+
+    def list_objects(self, prefix: str = "") -> List[str]:
+        """
+        Returns a list of object keys in the bucket that match the specified prefix.
+        """
+        keys = []
+        continuation_token: Optional[str] = None
+
+        while True:
+            try:
+                if continuation_token:
+                    response = self._client.list_objects_v2(
+                        Bucket=self._bucket,
+                        Prefix=prefix,
+                        ContinuationToken=continuation_token
+                    )
+                else:
+                    response = self._client.list_objects_v2(
+                        Bucket=self._bucket,
+                        Prefix=prefix
+                    )
+
+                contents = response.get("Contents", [])
+                for obj in contents:
+                    keys.append(obj["Key"])
+
+                if response.get("IsTruncated"):
+                    continuation_token = response.get("NextContinuationToken")
+                else:
+                    break
+
+            except (BotoCoreError, ClientError) as e:
+                print(f"Error listing objects: {e}")
+                break
+
+        return keys
+
+    def get_object_bytes(self, key: str) -> bytes:
+        """
+        Downloads the specified S3 object and returns its content as bytes.
+        """
+        try:
+            response = self._client.get_object(Bucket=self._bucket, Key=key)
+            return response["Body"].read()
+        except (BotoCoreError, ClientError) as e:
+            print(f"Error getting object {key}: {e}")
+            return b""
+
+    def put_object_bytes(self, key: str, data: bytes, content_type: str = "application/octet-stream") -> None:
+        """
+        Uploads a bytes object to S3 with the specified content type.
+        """
+        try:
+            self._client.put_object(
+                Bucket=self._bucket,
+                Key=key,
+                Body=data,
+                ContentType=content_type,
+                ContentLength=len(data)
+            )
+        except (BotoCoreError, ClientError) as e:
+            print(f"Error putting object {key}: {e}")
+
+    def close(self):
+        self._client.close()
diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml
new file mode 100644
index 0000000000000000000000000000000000000000..603363ab74f154ca2c48c843035093f51180f9fe
--- /dev/null
+++ b/infra/docker-compose.yml
@@ -0,0 +1,22 @@
+version: "3.8"
+
+services:
+  minio:
+    build: ./minio/
+    ports:
+      - "9000:9000"
+      - "9001:9001"
+    environment:
+      MINIO_ROOT_USER: "sampleuser"
+      MINIO_ROOT_PASSWORD: "samplepass"
+    volumes:
+      - ./minio/data:/data
+
+  rabbitmq:
+    image: rabbitmq:3-management
+    container_name: rabbitmq
+    ports:
+      - "5672:5672"
+    environment:
+      RABBITMQ_DEFAULT_USER: "admin"
+      RABBITMQ_DEFAULT_PASS: "admin"
\ No newline at end of file
diff --git a/infra/minio/Dockerfile b/infra/minio/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..02236413dae2e3f09a53a0731be1036888802600
--- /dev/null
+++ b/infra/minio/Dockerfile
@@ -0,0 +1,6 @@
+FROM quay.io/minio/minio
+
+COPY entrypoint.sh /usr/bin/entrypoint.sh
+RUN chmod +x /usr/bin/entrypoint.sh
+
+ENTRYPOINT ["/usr/bin/entrypoint.sh"]
\ No newline at end of file
diff --git a/infra/minio/entrypoint.sh b/infra/minio/entrypoint.sh
new file mode 100644
index 0000000000000000000000000000000000000000..37ab460f14fb9a40bb2ac23ed8ff020ba8666749
--- /dev/null
+++ b/infra/minio/entrypoint.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+
+minio server /data --console-address ":9001" &
+MINIO_PID=$!
+
+# Дождемся, пока MinIO стартует
+sleep 5
+
+# Добавляем alias для mc
+mc alias set main http://127.0.0.1:9000 "$MINIO_ROOT_USER" "$MINIO_ROOT_PASSWORD"
+
+# Проверяем, есть ли уже бакет "metadata"
+if ! mc ls main/metadata >/dev/null 2>&1; then
+  mc mb main/metadata
+fi
+
+# Проверяем, есть ли уже бакет "analysis"
+if ! mc ls main/analysis >/dev/null 2>&1; then
+  mc mb main/analysis
+fi
+
+wait $MINIO_PID
\ No newline at end of file
diff --git a/pypi_malregistry/README.md b/pypi_malregistry/README.md
deleted file mode 100644
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000
diff --git a/requirements.txt b/requirements.txt
index 41a60451092d81fe8bd49656702cc37f6d72deea..19a9995b303b20cea69fed623521b6971329e361 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,18 @@
-mathplot
 numpy
 mpmath
 seaborn
-upsetplot
\ No newline at end of file
+upsetplot
+orjson
+asyncio
+python-dotenv
+aiobotocore
+scapy
+aio-pika
+httpx
+tenacity
+docker
+pika
+packaging
+boto3
+botocore
+matplotlib
\ No newline at end of file
diff --git a/runner.py b/runner.py
index d26923ab300e0995f491e229fff54b23e1126a8c..c5e4aa933e9cce3e128a67262b7ad74d1cfe0b39 100644
--- a/runner.py
+++ b/runner.py
@@ -1,5 +1,6 @@
 import logging
 import os.path
+import sys
 import threading
 from logging.handlers import RotatingFileHandler
 from pathlib import Path
@@ -64,10 +65,11 @@ def create_docker_container(client: DockerClient, image: Image, environment: dic
         privileged=True,
         cgroupns="host",
         environment=environment,
+        network_mode="host",
     )
 
 
-logger = setup_logger('runner', 'runner.log')
+logger = setup_logger('runner', './logs/runner.log')
 
 
 class Worker:
@@ -148,6 +150,22 @@ SANDBOX_DOCKER_IMAGE_TAG = "sandbox:latest"
 def main():
     load_dotenv()
 
+    args = list(sys.argv)
+    args.pop(0)
+
+    worker_count = 1
+
+    try:
+        if len(args) > 0:
+            worker_count = int(args[0])
+
+        if worker_count <= 0:
+            print("Worker count must be > 0")
+            return
+    except:
+        print("Invalid worker count")
+        return
+
     docker_client = docker.from_env(timeout=300)
 
     logger.debug("Building docker images...")
@@ -190,7 +208,9 @@ def main():
     try:
         logger.debug("Starting workers...")
 
-        runner.start_new_worker(worker_environment)
+        for _ in range(worker_count):
+            runner.start_new_worker(worker_environment)
+
         runner.wait()
 
         logger.debug("Workers were finished!")
diff --git a/sandbox/.ssh/id_pub.rsa b/sandbox/.ssh/id_pub.rsa
deleted file mode 100644
index 679aa535f6a97ef99b13ae04725c8a6e75710f56..0000000000000000000000000000000000000000
--- a/sandbox/.ssh/id_pub.rsa
+++ /dev/null
@@ -1 +0,0 @@
-sample text
\ No newline at end of file
diff --git a/sandbox/Dockerfile b/sandbox/Dockerfile
index 91cf6deb19557814fd31c6119e9d41ab6cdecaa9..cff2ab761baffc05b56f58e83a155d8b0b40c231 100644
--- a/sandbox/Dockerfile
+++ b/sandbox/Dockerfile
@@ -5,13 +5,9 @@ WORKDIR /setup
 
 ARG USERNAME="user"
 
-ENV DEBIAN_FRONTEND noninteractive
+ENV DEBIAN_FRONTEND=noninteractive
 ENV DEBCONF_NOWARNINGS="yes"
 
-# install keys for powershell
-RUN curl -fsSL "https://packages.microsoft.com/config/ubuntu/22.04/packages-microsoft-prod.deb" -o /setup/packages-microsoft-prod.deb && \
-	dpkg -i /setup/packages-microsoft-prod.deb
-
 # Extra packages for realistic runtime
 RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-recommends \
 	apt-transport-https \
@@ -39,17 +35,18 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-reco
 	zip \
     unzip
 
-RUN useradd -m user && adduser user sudo && \
+RUN useradd -m "${USERNAME}" && adduser "${USERNAME}" sudo && \
     echo "ALL ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers && \
-    mkdir -p /etc/sudoers.d && echo "user ALL=(ALL:ALL) NOPASSWD: ALL" > /etc/sudoers.d/user
+    mkdir -p /etc/sudoers.d && \
+    echo "${USERNAME} ALL=(ALL:ALL) NOPASSWD: ALL" > "/etc/sudoers.d/${USERNAME}"
 
-RUN mkdir -p /home/user/.local/bin && \
-    chown -R user:user /home/user
+RUN mkdir -p "/home/${USERNAME}/.local/bin" && \
+    chown -R "${USERNAME}:${USERNAME}" "/home/${USERNAME}"
 
 RUN mkdir -m 0700 /root/.ssh && \
-    mkdir -p /home/user/.ssh && \
-    chmod 700 /home/user/.ssh && \
-    chown user:user /home/user/.ssh
+    mkdir -p "/home/${USERNAME}/.ssh" && \
+    chmod 700 "/home/${USERNAME}/.ssh" && \
+    chown "${USERNAME}:${USERNAME}" "/home/${USERNAME}/.ssh"
 
 #
 # Python setup
@@ -76,7 +73,7 @@ COPY dynamic-analysis.py /tools/
 
 RUN chmod 755 /tools/dynamic-analysis.py
 
-WORKDIR /home/user
+WORKDIR "/home/${USERNAME}"
 
 # Set max analysis execution time using sleep
 ENTRYPOINT [ "sleep" ]
diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py
new file mode 100644
index 0000000000000000000000000000000000000000..e76410131d7e1e913a388a3269c4b9ac28e61999
--- /dev/null
+++ b/scheduler/scheduler.py
@@ -0,0 +1,169 @@
+import asyncio
+import os
+import traceback
+from datetime import datetime
+from typing import Dict, List
+
+import orjson
+import pika
+from dotenv import load_dotenv
+from packaging.specifiers import SpecifierSet
+from packaging.version import Version
+
+from storage import AsyncStorage, StorageConfig
+
+
+def is_python_310_supported(python_version: str) -> bool:
+    """Check if 'python_version' spec includes Python 3.10."""
+    try:
+        normalized = python_version.replace(".*", ".0").replace(".x", "0")
+        specifier = SpecifierSet(normalized)
+        return Version("3.10") in specifier
+    except Exception:
+        return False
+
+
+def get_latest_release_by_upload_time(releases: Dict[str, List[dict]]):
+    """
+    Return the release version with the newest upload time and its files.
+    Each value in 'releases' is a list of file dicts, each having 'upload_time'.
+    """
+    best_version = None
+    best_files = []
+    best_time = None
+
+    for version_str, files in releases.items():
+        if not files:
+            continue
+
+        sorted_files = sorted(
+            files,
+            key=lambda f: datetime.fromisoformat(f["upload_time"]),
+            reverse=True
+        )
+
+        version_latest_dt = datetime.fromisoformat(sorted_files[0]["upload_time"])
+        if best_time is None or version_latest_dt > best_time:
+            best_time = version_latest_dt
+            best_version = version_str
+            best_files = sorted_files
+
+    return best_version, best_files
+
+
+async def process_single_key(
+        storage: AsyncStorage,
+        key: str
+) -> dict:
+    """
+    Fetch and analyze package metadata (JSON) from S3 for a single key.
+    Returns a dict with analysis results.
+    """
+    result = {
+        "package_name": key,
+        "version": None,
+    }
+
+    try:
+        storage_object = await storage.get(key=key)
+        data = await storage_object["Body"].read()
+        package_metadata = orjson.loads(data)
+    except Exception as e:
+        traceback.print_exc()
+        return result
+
+    # Update package name if present
+    if "name" in package_metadata:
+        result["package_name"] = package_metadata["name"]
+
+    releases = package_metadata.get("releases", {})
+    release_count = len(releases)
+
+    if release_count == 0:
+        return result
+
+    latest_version, latest_files = get_latest_release_by_upload_time(releases)
+    result["version"] = latest_version
+
+    if latest_files:
+        # Check if at least one file in the latest release supports 3.10
+        supports_310 = any(
+            (f.get("python_version") is None)
+            or is_python_310_supported(f["python_version"])
+            for f in latest_files
+        )
+
+        result["is_python_3_10_supported"] = supports_310
+
+    return result
+
+
+async def main():
+    load_dotenv()
+
+    # Configure S3 async storage
+    storage_config = StorageConfig(
+        endpoint=os.getenv("S3_ENDPOINT", "not in env defined"),
+        access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"),
+        secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"),
+        bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"),
+    )
+
+    storage = AsyncStorage(storage_config)
+    await storage.connect()
+
+    connection_params = pika.ConnectionParameters(
+        host=os.getenv("BROKER_HOST"),
+
+        credentials=pika.PlainCredentials(
+            os.getenv("BROKER_LOGIN"),
+            os.getenv("BROKER_PASSWORD")
+        ),
+
+        heartbeat=600,
+        blocked_connection_timeout=300
+    )
+
+    queue_name = os.getenv("BROKER_QUEUE_NAME")
+
+    connection = pika.BlockingConnection(connection_params)
+    channel = connection.channel()
+
+    # List all keys from S3
+    contents = await storage.list_objects()
+    object_keys = [k for k in contents if k.endswith(".json")]
+
+    print(f"Found {len(object_keys)} JSON objects in S3 bucket.")
+
+    tasks = [
+        process_single_key(storage, key)
+        for key in object_keys
+    ]
+
+    results = await asyncio.gather(*tasks)
+    print(f"Total processed packages: {len(results)}")
+
+    channel.queue_declare(queue=queue_name, durable=True)
+
+    for result in results:
+        package_name = result.get("package_name")
+        version = result.get("version")
+
+        channel.basic_publish(
+            exchange='',
+            routing_key=queue_name,
+            body=orjson.dumps({"type": "analyze", "package_name": package_name, "version": version}),
+            properties=pika.BasicProperties(
+                content_type='application/json',
+                delivery_mode=pika.DeliveryMode.Persistent,
+            ),
+        )
+
+    unsupported_version = sum(1 for result in results if not result["is_python_3_10_supported"])
+    print(f"Packages that do NOT support Python 3.10: {unsupported_version}")
+
+    await storage.close()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())
diff --git a/scheduler/storage.py b/scheduler/storage.py
new file mode 100644
index 0000000000000000000000000000000000000000..f5bfb89908cda1f46b43ed2d0d12de50236ba469
--- /dev/null
+++ b/scheduler/storage.py
@@ -0,0 +1,85 @@
+from contextlib import AsyncExitStack
+from dataclasses import dataclass
+from io import BytesIO
+from typing import List
+
+import aiobotocore
+from aiobotocore.session import AioSession
+
+
+@dataclass
+class StorageConfig:
+    endpoint: str
+    access_key: str
+    secret_key: str
+    bucket_name: str
+
+
+class AsyncStorage:
+    def __init__(self, storage_config: StorageConfig):
+        self._exit_stack = AsyncExitStack()
+        self.__storage_config = storage_config
+        self.__client = None
+
+    async def connect(self):
+        try:
+            session = AioSession()
+
+            self.__client = await self._exit_stack.enter_async_context(session.create_client(
+                service_name='s3',
+                use_ssl=False,
+                endpoint_url=self.__storage_config.endpoint,
+                aws_access_key_id=self.__storage_config.access_key,
+                aws_secret_access_key=self.__storage_config.secret_key,
+                config=aiobotocore.config.AioConfig(
+                    max_pool_connections=10,
+                    connect_timeout=5000,
+                    read_timeout=5000,
+                    retries={'max_attempts': 10}
+                )
+            ))
+
+        except Exception as e:
+            await self.close()
+            raise
+
+    async def close(self) -> None:
+        if self.__client:
+            await self._exit_stack.__aexit__(None, None, None)
+
+    async def put(self, destination_file_name: str, data: bytes, content_type: str = 'application/json') -> None:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        data_io = BytesIO(data)
+        length = len(data)
+
+        await self.__client.put_object(
+            Bucket=self.__storage_config.bucket_name,
+            Key=destination_file_name,
+            Body=data_io,
+            ContentLength=length,
+            ContentType=content_type
+        )
+
+    async def get(self, key: str) -> any:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.get_object(
+            Bucket=self.__storage_config.bucket_name,
+            Key=key,
+        )
+
+        return response
+
+    async def list_objects(self, prefix: str = "") -> List[str]:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.list_objects_v2(
+            Bucket=self.__storage_config.bucket_name,
+            Prefix=prefix
+        )
+        contents = response.get("Contents", [])
+        return [obj["Key"] for obj in contents]
diff --git a/worker/Dockerfile b/worker/Dockerfile
index 3d8cc8aa9621f1351aab6b73966aec8fb388cf7e..5a0a57991fcbf400dd5402d69e645ef5b2f39587 100644
--- a/worker/Dockerfile
+++ b/worker/Dockerfile
@@ -5,7 +5,7 @@ RUN cargo build --release
 
 FROM ubuntu:22.04
 
-ENV DEBIAN_FRONTEND noninteractive
+ENV DEBIAN_FRONTEND=noninteractive
 
 RUN apt-get update && apt-get upgrade -y && \
     apt-get install -y \
diff --git a/worker/__init__.py b/worker/__init__.py
deleted file mode 100644
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000
diff --git a/worker/src/storage.py b/worker/src/storage.py
index 55be8d299e36c43bdfab06557af9efcd8534148b..f5bfb89908cda1f46b43ed2d0d12de50236ba469 100644
--- a/worker/src/storage.py
+++ b/worker/src/storage.py
@@ -1,6 +1,7 @@
 from contextlib import AsyncExitStack
 from dataclasses import dataclass
 from io import BytesIO
+from typing import List
 
 import aiobotocore
 from aiobotocore.session import AioSession
@@ -60,3 +61,25 @@ class AsyncStorage:
             ContentLength=length,
             ContentType=content_type
         )
+
+    async def get(self, key: str) -> any:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.get_object(
+            Bucket=self.__storage_config.bucket_name,
+            Key=key,
+        )
+
+        return response
+
+    async def list_objects(self, prefix: str = "") -> List[str]:
+        if not self.__client:
+            raise RuntimeError("Storage is not initialized.")
+
+        response = await self.__client.list_objects_v2(
+            Bucket=self.__storage_config.bucket_name,
+            Prefix=prefix
+        )
+        contents = response.get("Contents", [])
+        return [obj["Key"] for obj in contents]
diff --git a/worker/src/worker.py b/worker/src/worker.py
index f245653b6f6cfb58766e12dfd69a3068b960347f..d049133026adac248fb7035530dd9beaa7b05669 100644
--- a/worker/src/worker.py
+++ b/worker/src/worker.py
@@ -59,7 +59,7 @@ def setup_logger(
     return logger
 
 
-logger = setup_logger("worker", "worker.log")
+logger = setup_logger("worker", "./logs/worker.log")
 
 
 async def run_async_command(command: str, check: bool = True) -> Tuple[bytes, bytes]:
@@ -362,10 +362,8 @@ class Sandbox:
         try:
             size = os.stat(log_file).st_size
             if size < 1024 * 1024 * 128:  # <128MB
-                logger.debug("Collecting using native Python strace parser")
                 return StraceParser.parse(str(log_file))
 
-            logger.debug("Collecting using external strace parser (file is large)")
             result_json, _ = await run_async_command(f"strace_parser {log_file}")
             return orjson.loads(result_json)
         except Exception as exc:
@@ -832,19 +830,21 @@ async def main() -> None:
     """
     load_dotenv()
     args = list(sys.argv)
-    args.pop(0)  # remove script name
+    args.pop(0)
 
     worker_id = 0
     worker_count = 1
 
     if args:
         worker_id = int(args.pop(0))
+
     if worker_id < 0:
         print("Worker ID must be >= 0")
         return
 
     if args:
         worker_count = int(args.pop(0))
+
     if worker_count <= 0:
         logger.error("Worker count must be > 0")
         return