From 1a514fd8dabbf86d8700d3acf77bd1085fb94188 Mon Sep 17 00:00:00 2001 From: Anton Shadrin <xsetthe@gmail.com> Date: Sun, 16 Mar 2025 13:28:21 +0100 Subject: [PATCH] Add S3 storage integration for crawler and evaluation; Infrastructure example; Documentation; --- .env | 17 +-- README.md | 182 +++++++++++++++++++--------- crawler.py => crawler/crawler.py | 198 ++++++++++++++++++++++++------- crawler/message_queue.py | 89 ++++++++++++++ crawler/storage.py | 85 +++++++++++++ evaluation/evaluate.py | 128 +++++++++++--------- evaluation/requirements.txt | 6 - evaluation/storage.py | 101 ++++++++++++++++ infra/docker-compose.yml | 22 ++++ infra/minio/Dockerfile | 6 + infra/minio/entrypoint.sh | 22 ++++ pypi_malregistry/README.md | 0 requirements.txt | 17 ++- runner.py | 24 +++- sandbox/.ssh/id_pub.rsa | 1 - sandbox/Dockerfile | 23 ++-- scheduler/scheduler.py | 169 ++++++++++++++++++++++++++ scheduler/storage.py | 85 +++++++++++++ worker/Dockerfile | 2 +- worker/__init__.py | 0 worker/src/storage.py | 23 ++++ worker/src/worker.py | 8 +- 22 files changed, 1013 insertions(+), 195 deletions(-) rename crawler.py => crawler/crawler.py (53%) create mode 100644 crawler/message_queue.py create mode 100644 crawler/storage.py delete mode 100644 evaluation/requirements.txt create mode 100644 evaluation/storage.py create mode 100644 infra/docker-compose.yml create mode 100644 infra/minio/Dockerfile create mode 100644 infra/minio/entrypoint.sh delete mode 100644 pypi_malregistry/README.md delete mode 100644 sandbox/.ssh/id_pub.rsa create mode 100644 scheduler/scheduler.py create mode 100644 scheduler/storage.py delete mode 100644 worker/__init__.py diff --git a/.env b/.env index 31a4ad2..c156dc8 100644 --- a/.env +++ b/.env @@ -1,11 +1,12 @@ # S3-compatible storage (ex. MinIO) -S3_ENDPOINT= -S3_ACCESS_KEY= -S3_SECRET_KEY= -S3_BUCKET_NAME= +S3_ENDPOINT=http://localhost:9000 +S3_ACCESS_KEY=sampleuser +S3_SECRET_KEY=samplepass +S3_BUCKET_NAME=analysis +S3_METADATA_BUCKET_NAME=metadata # AMQP broker (ex. RabbitMQ) -BROKER_HOST= -BROKER_LOGIN= -BROKER_PASSWORD="" -BROKER_QUEUE_NAME= \ No newline at end of file +BROKER_HOST=localhost +BROKER_LOGIN=admin +BROKER_PASSWORD=admin +BROKER_QUEUE_NAME=task_queue \ No newline at end of file diff --git a/README.md b/README.md index 68bca34..d7a32e7 100644 --- a/README.md +++ b/README.md @@ -1,93 +1,157 @@ -# Bachelorarbeit +# Dynamic PyPI Package Analysis +> This project contains scripts that enable dynamic behavior analysis of Python packages and the calculation of their +> risk scores. +## Table of Contents -## Getting started +1. [Overview](#overview) +2. [Modules](#modules) + - [Crawler](#crawler) + - [Scheduler](#scheduler) + - [Worker](#worker) + - [Evaluation](#evaluation) +3. [Environment Variables](#environment-variables) +4. [Usage Guide](#quick-start-guide) + - [Prerequisites](#prerequisites) + - [Basic Infrastructure](#basic-infrastructure) + - [Crawler](#crawler-1) + - [Scheduler](#scheduler-1) + - [Workers](#workers) + - [Evaluation](#evaluation-1) -To make it easy for you to get started with GitLab, here's a list of recommended next steps. +--- -Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)! +## Overview -## Add your files +**Project** designed to: -- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files -- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command: +- **Fetch** metadata of packages from PyPI. +- **Schedule** these packages for dynamic runtime analysis. +- **Execute** them in a containerized sandbox for thorough examination. +- **Store** all analysis results and metadata in an S3-compatible backend. +- **Evaluate** the outcome of the dynamic analysis for further insights. -``` -cd existing_repo -git remote add origin https://gitlab.fachschaften.org/anton.shadrin/bachelorarbeit.git -git branch -M main -git push -uf origin main -``` +--- + +## Modules + +### Crawler + +- Retrieves a list of all packages using PyPI-API. +- Obtains detailed metadata (versions, release files, etc.) for each package that hasn't already been collected. +- Saves the retrieved information to an S3-compatible repository (e.g. MinIO). +- When “scheduler mode” (with `--scheduler-mode`) is enabled, publishes messages to a broker (e.g. RabbitMQ), allowing + further processing (analysis) of packets in asynchronous mode. -## Integrate with your tools +### Scheduler -- [ ] [Set up project integrations](https://gitlab.fachschaften.org/anton.shadrin/bachelorarbeit/-/settings/integrations) +Reads metadata from the S3 bucket and submits tasks to the RabbitMQ queue for analysis. -## Collaborate with your team +### Worker -- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/) -- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html) -- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically) -- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/) -- [ ] [Set auto-merge](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html) +- Waits for messages from RabbitMQ. +- Once a message arrives, it spawns an **isolated environment** (via Podman + [gVisor](https://gvisor.dev/) runtime) for + the target package. +- Collects **runtime data** (DNS requests, system calls, Python function calls, installed packages, etc.). +- Pushes the results (packaged into a JSON summary) back to the configured **S3-compatible storage**. -## Test and Deploy +### Evaluation -Use the built-in continuous integration in GitLab. +Processes the results of the dynamic analysis from the S3 storage, providing insights or additional +analytics. -- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/) -- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/) -- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html) -- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/) -- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html) +--- -*** +## Environment Variables -# Editing this README +All relevant environment variables for the system should be declared in a `.env` file (or provided in another manner for +the container runtime). A typical `.env` might look like: -When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template. +```bash +# S3-compatible storage (ex. MinIO) +S3_ENDPOINT=http://localhost:9000 +S3_ACCESS_KEY=sampleuser +S3_SECRET_KEY=samplepass +S3_BUCKET_NAME=analysis +S3_METADATA_BUCKET_NAME=metadata -## Suggestions for a good README +# AMQP broker (ex. RabbitMQ) +BROKER_HOST=localhost +BROKER_LOGIN=admin +BROKER_PASSWORD=admin +BROKER_QUEUE_NAME=task_queue +``` + +--- -Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information. +## Quick Start Guide -## Name -Choose a self-explaining name for your project. +### Prerequisites -## Description -Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors. +1. **Docker** installed (for container builds). +2. **Python 3.9+** and requirements from `requirements.txt` (`pip install -r requirements.txt`). +3. **AMQP broker** instance (e.g. RabbitMQ). +4. **S3-Compatible Storage** (e.g. MinIO). -## Badges -On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge. +Before running each script, populate your `.env` with valid configurations ( +see [Environment Variables](#environment-variables)). -## Visuals -Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method. +### Basic Infrastructure -## Installation -Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection. +You can run a pre-built infrastructure that includes RabbitMQ and MinIO using Docker. +To do this, navigate to the infra folder and run: -## Usage -Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README. +```bash +cd infra && docker compose up +``` -## Support -Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc. +### Crawler -## Roadmap -If you have ideas for releases in the future, it is a good idea to list them in the README. +Run: + +```bash +cd crawler && python crawler.py [--scheduler-mode] +``` -## Contributing -State if you are open to contributions and what your requirements are for accepting them. +- --scheduler-mode (optional), when specified will force the script to publish messages about each processed packet to + the + broker. -For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self. +### Scheduler -You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser. +Run: -## Authors and acknowledgment -Show your appreciation to those who have contributed to the project. +```bash +cd scheduler && python scheduler.py +``` + +This script will: + +- Read collected metadata from the S3-compatible storage. +- Submit tasks to the AMQP broker for analysis. + +### Workers + +1. Ensure Docker is running. +2. Run: + ```bash + python runner.py [worker_count] + ``` + This script will: + - Build the `sandbox` Docker image. + - Build the `worker` Docker image. + - Launch the worker container(s) to consume tasks from RabbitMQ. + +### Evaluation + +Run: + +```bash +cd evaluation && python evaluation.py +``` -## License -For open source projects, say how it is licensed. +This script will: -## Project status -If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers. +- Read the resulting package analysis data from the S3-compatible storage. +- Analyze that data and generate statistics in stats-directory. \ No newline at end of file diff --git a/crawler.py b/crawler/crawler.py similarity index 53% rename from crawler.py rename to crawler/crawler.py index 0ab4169..1182bd6 100644 --- a/crawler.py +++ b/crawler/crawler.py @@ -1,17 +1,31 @@ import asyncio import logging import os +import sys import time from dataclasses import dataclass, field +from datetime import datetime from logging.handlers import RotatingFileHandler from pathlib import Path from typing import Dict, List, Optional, Set -import aiofiles import httpx import orjson +from dotenv import load_dotenv from tenacity import retry, stop_after_attempt, wait_exponential +from message_queue import MessageQueueConfig, AsyncMessageQueue +from storage import AsyncStorage + + +@dataclass +class StorageConfig: + """Holds S3 connection details.""" + endpoint: str + access_key: str + secret_key: str + bucket_name: str + def setup_logger( name: str, @@ -22,6 +36,7 @@ def setup_logger( log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) + """Creates and returns a configured logger with both file and console handlers.""" logger = logging.getLogger(name) logger.setLevel(level) @@ -46,11 +61,12 @@ def setup_logger( return logger -logger = setup_logger('crawler', 'crawler.log') +logger = setup_logger('crawler', './logs/crawler.log') @dataclass class ReleaseFile: + """Represents a single file within a particular release.""" url: str filename: str package_type: str @@ -59,6 +75,7 @@ class ReleaseFile: python_version: str def to_dict(self) -> dict: + """Converts ReleaseFile to a dictionary.""" return { "url": self.url, "filename": self.filename, @@ -69,8 +86,37 @@ class ReleaseFile: } +def get_latest_release_by_upload_time(releases: Dict[str, List[ReleaseFile]]): + """ + Return the release version with the newest upload time and its files. + Each value in 'releases' is a list of file dicts, each having 'upload_time'. + """ + best_version = None + best_files = [] + best_time = None + + for version_str, files in releases.items(): + if not files: + continue + + sorted_files = sorted( + files, + key=lambda f: datetime.fromisoformat(f.upload_time), + reverse=True + ) + + version_latest_dt = datetime.fromisoformat(sorted_files[0].upload_time) + if best_time is None or version_latest_dt > best_time: + best_time = version_latest_dt + best_version = version_str + best_files = sorted_files + + return best_version, best_files + + @dataclass class Package: + """Represents metadata for a Python package.""" name: str author_name: str author_email: str @@ -78,6 +124,7 @@ class Package: releases: Dict[str, List[ReleaseFile]] = field(default_factory=dict) def to_dict(self) -> dict: + """Converts Package to a dictionary, including its releases.""" return { "name": self.name, "author_name": self.author_name, @@ -91,20 +138,28 @@ class Package: class PyPICrawler: + """Crawls PyPI to retrieve package metadata and stores it in an S3-compatible storage.""" + def __init__( self, - output_dir: str = "pypi_metadata", + storage_config: StorageConfig, + message_queue_config: MessageQueueConfig | None, + is_scheduler_mode_enabled: bool, batch_size: int = 100, max_concurrent_requests: int = 20, request_delay: float = 0.1 ): - self.output_dir = Path(output_dir) - self.output_dir.mkdir(parents=True, exist_ok=True) + self.is_scheduler_mode_enabled = is_scheduler_mode_enabled + + """Initializes the crawler with storage config and crawl settings.""" self.batch_size = batch_size - self.processed_packages: Set[str] = set() self.max_concurrent_requests = max_concurrent_requests self.request_delay = request_delay + + self.processed_packages: Set[str] = set() + self.semaphore = asyncio.Semaphore(max_concurrent_requests) + self.client = httpx.AsyncClient( headers={ "User-Agent": "PyPICrawler/1.0 (Uni Project)", @@ -113,53 +168,67 @@ class PyPICrawler: timeout=httpx.Timeout(15.0), follow_redirects=True ) - self._load_processed_packages() + + if self.is_scheduler_mode_enabled and message_queue_config: + self.message_queue = AsyncMessageQueue(message_queue_config) + + self.storage = AsyncStorage(storage_config) async def __aenter__(self): + """Connects to S3 and loads the list of already processed package files.""" + await self.storage.connect() + await self.message_queue.start(10) + await self._load_processed_packages() return self async def __aexit__(self, exc_type, exc, tb): + """Closes the HTTP client and S3 connection.""" await self.client.aclose() + await self.storage.close() - def _load_processed_packages(self): - self.processed_packages = {f.stem for f in self.output_dir.glob("*.json")} + async def _load_processed_packages(self): + """Retrieves the list of JSON files in the bucket to identify processed packages.""" + try: + keys = await self.storage.list_objects() + for k in keys: + if k.endswith(".json"): + pkg_name = Path(k).stem + self.processed_packages.add(pkg_name) + logger.info(f"Already processed packages: {len(self.processed_packages)}") + except Exception as e: + logger.warning(f"Could not load the list of objects from S3: {e}") @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def _fetch_package_list(self) -> List[str]: + """Fetches the complete list of packages from PyPI.""" try: response = await self.client.get("https://pypi.org/simple/", follow_redirects=True) response.raise_for_status() - return [p["name"] for p in response.json().get("projects", [])] + data = response.json() + return [p["name"] for p in data.get("projects", [])] except Exception as e: logger.error(f"Failed to fetch package list: {str(e)}") raise @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def _fetch_package_data(self, package_name: str) -> Optional[dict]: + """Fetches metadata for a specific package from PyPI.""" async with self.semaphore: try: main_url = f"https://pypi.org/pypi/{package_name}/json" main_res = await self.client.get(main_url) if main_res.status_code != 200: - # logger.warning(f"Error fetching {package_name}: {str(main_res.status_code)}") return None await asyncio.sleep(self.request_delay) - return main_res.json() except Exception as e: logger.warning(f"Error fetching {package_name}: {str(e)}") return None - def _determine_package_type(self, filename: str) -> str: - if filename.endswith(".whl"): - return "bdist_wheel" - elif filename.endswith((".tar.gz", ".zip")): - return "sdist" - return "unknown" - def _process_package_data(self, main_data: dict) -> Optional[Package]: + """Transforms raw JSON data from PyPI into a Package object.""" try: info = main_data.get("info", {}) releases = main_data.get("releases", {}) @@ -172,14 +241,9 @@ class PyPICrawler: ) version_set = set(releases) - - for version, releases in releases.items(): - for release_info in releases: + for version, release_files in releases.items(): + for release_info in release_files: filename = release_info.get("filename", "") - - if version is None: - print(f"Version extraction failed for {package.name}: {filename}") - if version and version in version_set: release_file = ReleaseFile( url=release_info["url"], @@ -189,37 +253,60 @@ class PyPICrawler: upload_time=release_info.get("upload_time", ""), python_version=release_info.get("requires_python", release_info.get("python_version", "")) ) - if version not in package.releases: package.releases[version] = [] - package.releases[version].append(release_file) - return package if package.releases else None + if not package.releases: + return None + + return package except Exception as e: - logger.error(f"Error processing {main_data.get('info', {}).get('name')}: {str(e)}") + logger.error(f"Error processing package data: {str(e)}") return None async def _save_package_data(self, package: Package): - file_path = self.output_dir / f"{package.name}.json" + """Serializes and uploads package data to S3.""" + try: + data_bytes = orjson.dumps(package.to_dict()) + key = f"{package.name}.json" + await self.storage.put(key, data_bytes, content_type='application/json') + self.processed_packages.add(package.name) + + # publish package to the queue if scheduler mode is enabled + if self.is_scheduler_mode_enabled and self.message_queue: + package_name = package.name + latest_version, _ = get_latest_release_by_upload_time(package.releases) - async with aiofiles.open(file_path, "wb") as f: - await f.write(orjson.dumps(package.to_dict())) + await self.message_queue.publish( + orjson.dumps({"type": "analyze", "package_name": package_name, "version": latest_version}) + ) - self.processed_packages.add(package.name) + except Exception as e: + logger.error(f"Error saving package {package.name} to S3: {str(e)}") async def process_batch(self, package_names: List[str]): + """Processes and stores package metadata for a list of packages.""" names_to_process = [n for n in package_names if n not in self.processed_packages] - if not names_to_process: return - results = await asyncio.gather(*[self._fetch_package_data(n) for n in names_to_process]) - packages = [self._process_package_data(result) for result in results if result] + results = await asyncio.gather( + *[self._fetch_package_data(n) for n in names_to_process], + return_exceptions=False + ) - await asyncio.gather(*[self._save_package_data(p) for p in packages if p]) + packages = [] + for result in results: + if result: + p = self._process_package_data(result) + if p: + packages.append(p) + + await asyncio.gather(*[self._save_package_data(p) for p in packages]) async def crawl_all_packages(self): + """Retrieves and processes all packages from PyPI in batches.""" start_time = time.time() logger.info("Starting PyPI package crawling") @@ -228,11 +315,9 @@ class PyPICrawler: packages_to_process = [] for package in all_packages: - if not os.path.exists(Path(self.output_dir) / (package + ".json")): + if package not in self.processed_packages: packages_to_process.append(package) - all_packages.clear() - total = len(packages_to_process) logger.info(f"Total packages to process: {total}") @@ -249,8 +334,37 @@ class PyPICrawler: async def main(): + args = list(sys.argv) + args.pop(0) + + is_scheduler_mode_enabled = True + + if len(args) > 1: + is_scheduler_mode_enabled = args.pop(0) == '--scheduler-mode' + + load_dotenv() + + storage_config = StorageConfig( + endpoint=os.getenv("S3_ENDPOINT", "not in env defined"), + access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"), + secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"), + bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"), + ) + + message_queue_config = None + + if is_scheduler_mode_enabled: + message_queue_config = MessageQueueConfig( + host=os.getenv("BROKER_HOST", "not in env defined"), + login=os.getenv("BROKER_LOGIN", "not in env defined"), + password=os.getenv("BROKER_PASSWORD", "not in env defined"), + queue_name=os.getenv("BROKER_QUEUE_NAME", "not in env defined"), + ) + async with PyPICrawler( - output_dir="pypi_metadata", + is_scheduler_mode_enabled=is_scheduler_mode_enabled, + storage_config=storage_config, + message_queue_config=message_queue_config, batch_size=500, max_concurrent_requests=100, request_delay=0.05 diff --git a/crawler/message_queue.py b/crawler/message_queue.py new file mode 100644 index 0000000..e9b3da9 --- /dev/null +++ b/crawler/message_queue.py @@ -0,0 +1,89 @@ +import asyncio +import json +from dataclasses import dataclass +from typing import Callable, Dict + +import pika +from aio_pika import connect_robust, IncomingMessage, DeliveryMode, Message + + +@dataclass +class MessageQueueConfig: + host: str + login: str + password: str + queue_name: str + + +class AsyncMessageQueue: + def __init__(self, message_queue_config: MessageQueueConfig, on_connected: Callable | None = None): + self.__message_queue_config = message_queue_config + + self.__connection = None + self.__channel = None + self.__handlers: Dict[str, Callable] = {} + self.__on_connected = on_connected + + def register_handler(self, message_type: str, handler_func: Callable): + self.__handlers[message_type] = handler_func + + async def __process_message(self, message: IncomingMessage): + async with message.process(): + try: + body = message.body.decode() + message_data = json.loads(body) + message_type = message_data.get('type') + + if message_type in self.__handlers: + del message_data['type'] + handler = self.__handlers[message_type] + + if asyncio.iscoroutinefunction(handler): + await handler(message_data) + else: + handler(message_data) + + except json.JSONDecodeError as e: + print(f"JSON decode error: {e}") + + except Exception as e: + print(f"Error processing message: {e}") + + async def setup(self, prefetch_count: int): + self.__connection = await connect_robust( + host=self.__message_queue_config.host, + login=self.__message_queue_config.login, + password=self.__message_queue_config.password, + heartbeat=600 + ) + + self.__channel = await self.__connection.channel() + + await self.__channel.set_qos(prefetch_count) + await self.__channel.declare_queue(self.__message_queue_config.queue_name, durable=True) + + async def cleanup(self): + if self.__channel and not self.__channel.is_closed: + await self.__channel.close() + + if self.__connection and not self.__connection.is_closed: + await self.__connection.close() + + async def start(self, prefetch_count: int): + try: + await self.setup(prefetch_count) + except Exception as e: + print(f"Connection error: {e}") + await self.cleanup() + + async def publish(self, body: bytes): + message = Message( + body, + content_type='application/json', + delivery_mode=DeliveryMode.PERSISTENT + ) + + await self.__channel.default_exchange.publish( + message, + routing_key=self.__message_queue_config.queue_name, + ) diff --git a/crawler/storage.py b/crawler/storage.py new file mode 100644 index 0000000..f5bfb89 --- /dev/null +++ b/crawler/storage.py @@ -0,0 +1,85 @@ +from contextlib import AsyncExitStack +from dataclasses import dataclass +from io import BytesIO +from typing import List + +import aiobotocore +from aiobotocore.session import AioSession + + +@dataclass +class StorageConfig: + endpoint: str + access_key: str + secret_key: str + bucket_name: str + + +class AsyncStorage: + def __init__(self, storage_config: StorageConfig): + self._exit_stack = AsyncExitStack() + self.__storage_config = storage_config + self.__client = None + + async def connect(self): + try: + session = AioSession() + + self.__client = await self._exit_stack.enter_async_context(session.create_client( + service_name='s3', + use_ssl=False, + endpoint_url=self.__storage_config.endpoint, + aws_access_key_id=self.__storage_config.access_key, + aws_secret_access_key=self.__storage_config.secret_key, + config=aiobotocore.config.AioConfig( + max_pool_connections=10, + connect_timeout=5000, + read_timeout=5000, + retries={'max_attempts': 10} + ) + )) + + except Exception as e: + await self.close() + raise + + async def close(self) -> None: + if self.__client: + await self._exit_stack.__aexit__(None, None, None) + + async def put(self, destination_file_name: str, data: bytes, content_type: str = 'application/json') -> None: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + data_io = BytesIO(data) + length = len(data) + + await self.__client.put_object( + Bucket=self.__storage_config.bucket_name, + Key=destination_file_name, + Body=data_io, + ContentLength=length, + ContentType=content_type + ) + + async def get(self, key: str) -> any: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.get_object( + Bucket=self.__storage_config.bucket_name, + Key=key, + ) + + return response + + async def list_objects(self, prefix: str = "") -> List[str]: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.list_objects_v2( + Bucket=self.__storage_config.bucket_name, + Prefix=prefix + ) + contents = response.get("Contents", []) + return [obj["Key"] for obj in contents] diff --git a/evaluation/evaluate.py b/evaluation/evaluate.py index 840789b..3b35063 100644 --- a/evaluation/evaluate.py +++ b/evaluation/evaluate.py @@ -18,14 +18,30 @@ import mpmath import numpy as np import orjson import seaborn +from dotenv import load_dotenv from upsetplot import UpSet, from_memberships from function_arguments_parser import parse_python_tuple +from storage import StorageConfig, Storage -DYNAMIC_ANALYSIS_RESULTS_DIRECTORY = "/Users/xset/PycharmProjects/03-04-2025-14-53-40_files_list" +load_dotenv() -STATISTICS_DIRECTORY = Path("./stats/") -STATISTICS_DIRECTORY.mkdir(parents=True, exist_ok=True) +analysis_storage = Storage(StorageConfig( + endpoint=os.getenv("S3_ENDPOINT", "not in env defined"), + access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"), + secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"), + bucket_name=os.getenv("S3_BUCKET_NAME", "not in env defined"), +)) + +metadata_storage = Storage(StorageConfig( + endpoint=os.getenv("S3_ENDPOINT", "not in env defined"), + access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"), + secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"), + bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"), +)) + +STATISTICS_OUTPUT_DIRECTORY = Path("./stats/") +STATISTICS_OUTPUT_DIRECTORY.mkdir(parents=True, exist_ok=True) ############################################################################### @@ -66,7 +82,7 @@ def setup_logger( return logger -logger = setup_logger('evaluate', 'evaluate.log', +logger = setup_logger('evaluate', './logs/evaluate.log', log_format='%(asctime)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s') @@ -593,7 +609,7 @@ def is_suspicious_command( return command_with_args_str, False -def analyze_single_json(json_path: Path) -> dict: +def analyze_single_json(json_bytes: bytes, pkg_name: str, pkg_version: str) -> dict: """ Analyze a single dynamic_summary.json file and return a dictionary of results. Includes storing all read files, write files, commands, and function calls in lists. @@ -601,26 +617,19 @@ def analyze_single_json(json_path: Path) -> dict: package_result = make_empty_result() # Identify package - root = json_path.parent - parts = root.as_posix().split("/") - platform = parts[-1] - pkg_version = parts[-2] - pkg_name = parts[-3] - package_result["name"] = pkg_name package_result["version"] = pkg_version # Read JSON try: - with open(json_path, "r", errors='replace') as f: - data = orjson.loads(f.read()) + data = orjson.loads(json_bytes.decode(encoding='utf-8', errors='replace')) except Exception as e: - logger.error(f"Error reading {json_path}: {e}") - package_result["warnings"].append(f"Error reading {json_path}: {e}") + logger.error(f"Error reading json {pkg_name} {pkg_version}: {e}") + package_result["warnings"].append(f"Error reading {pkg_name} {pkg_version}: {e}") return package_result if not isinstance(data, dict): - logger.warning(f"Unexpected JSON structure in {json_path}") + logger.warning(f"Unexpected JSON structure in {pkg_name} {pkg_version}") return package_result # Extract fields @@ -756,24 +765,28 @@ def analyze_single_json(json_path: Path) -> dict: return package_result -def process_json_chunk(paths: List[Path]) -> List[dict]: +def process_json_chunk(chunks: List[tuple]) -> List[dict]: """Process a list of JSON files (a chunk). Return a list of results (one dict per JSON).""" - chunk_result = [] - for p in paths: + results = [] + for key, pkg_name, pkg_version in chunks: try: - result = analyze_single_json(p) - chunk_result.append(result) + json_bytes = analysis_storage.get_object_bytes(key) + + if not json_bytes: + continue + + result = analyze_single_json(json_bytes, pkg_name, pkg_version) + results.append(result) except Exception as e: traceback.print_exc() - logger.error(f"Error processing {p}: {e}") - return chunk_result + logger.error(f"Error processing {key}: {e}") + return results ############################################################################### # EVALUATION AND RESULTS AGGREGATION ############################################################################### def evaluate( - directory: Path, max_workers: int, chunk_size: int ) -> None: @@ -787,32 +800,29 @@ def evaluate( current_chunk = [] total_count = 0 - # Collect unique package-version-locations - results_to_process = {} - for root, dirs, files in os.walk(directory): - if "dynamic_summary.json" not in files: + all_keys = analysis_storage.list_objects(prefix="") + + results_to_process = [] + + for key in all_keys: + if not key.endswith("dynamic_summary.json"): continue - root_path = Path(root) - parts = root_path.as_posix().split("/") - platform = parts[-1] - if platform != "linux": + parts = key.split("/") + + if len(parts) < 3: continue - pkg_version = parts[-2] + pkg_name = parts[-3] + pkg_version = parts[-2] + platform = parts[-1].replace("dynamic_summary.json", "") - json_file = root_path / "dynamic_summary.json" - results_to_process[(pkg_name, pkg_version)] = json_file + results_to_process.append((key, pkg_name, pkg_version)) # Dispatch tasks with ProcessPoolExecutor(max_workers=max_workers) as executor: - for (pkg_name, pkg_version), json_file in results_to_process.items(): - - if not json_file.exists(): - logger.error(f"File not found: {json_file}") - continue - - current_chunk.append(json_file) + for entry in results_to_process: + current_chunk.append(entry) total_count += 1 if len(current_chunk) % chunk_size == 0: @@ -851,7 +861,7 @@ def evaluate( memberships = [] # SINGLE CSV for packages - packages_csv = STATISTICS_DIRECTORY / "packages.csv" + packages_csv = STATISTICS_OUTPUT_DIRECTORY / "packages.csv" common_commands = Counter() common_read_files = Counter() @@ -920,16 +930,22 @@ def evaluate( # Build membership for UpSet subset_list = [] + if write_count > 0: subset_list.append("Suspicious Write") + if read_count > 0: subset_list.append("Suspicious Read") + if cmd_count > 0: subset_list.append("Suspicious Commands") + if func_count > 0: subset_list.append("Suspicious Functions") + if (hostname_count + socket_count) > 0: subset_list.append("Suspicious Network") + memberships.append(subset_list) common_read_files.update(pkg_res["suspicious_read_files"]) @@ -966,22 +982,22 @@ def evaluate( functions_json, ]) - # Build the UpSet plot from memberships + # Build the UpSet plot from memberships if possible data = from_memberships(memberships) - upset = UpSet(data, subset_size='count', show_counts=True, sort_categories_by=None) - plot = upset.plot() - plot["intersections"].set_ylabel("Intersection Size") - plt.savefig(STATISTICS_DIRECTORY / "upset.png", dpi=300, bbox_inches='tight') - plt.close() + + if data.index.nlevels > 1: + upset = UpSet(data, subset_size='count', show_counts=True, sort_categories_by=None) + plot = upset.plot() + plot["intersections"].set_ylabel("Intersection Size") + plt.savefig(STATISTICS_OUTPUT_DIRECTORY / "upset.png", dpi=300, bbox_inches='tight') + plt.close() # Plot risk factor histogram seaborn.set_context("paper", font_scale=1.2) seaborn.set_style("whitegrid") - plt.figure(figsize=(7, 5), dpi=300) bins = np.arange(0, 1.01, 0.01) seaborn.histplot(risk_factor_list, bins=bins, color="RoyalBlue", kde=False) - plt.xlim(0, 1) plt.xticks(np.arange(0, 1.01, 0.1)) plt.xlabel("Risk Factor", fontsize=12) @@ -989,7 +1005,7 @@ def evaluate( seaborn.color_palette("colorblind") seaborn.despine() plt.tight_layout() - plt.savefig(STATISTICS_DIRECTORY / "rist_factor.png", dpi=300, bbox_inches='tight') + plt.savefig(STATISTICS_OUTPUT_DIRECTORY / "rist_factor.png", dpi=300, bbox_inches='tight') plt.close() write_counter("read_files", common_read_files) @@ -998,7 +1014,7 @@ def evaluate( write_counter("functions", common_functions) write_counter("commands", common_commands) - function_stats_csv = STATISTICS_DIRECTORY / "overall_stats.csv" + function_stats_csv = STATISTICS_OUTPUT_DIRECTORY / "overall_stats.csv" with open(function_stats_csv, mode="w", newline="", encoding="utf-8") as stats_csv: stats_writer = csv.writer(stats_csv) @@ -1033,7 +1049,7 @@ def evaluate( def write_counter(name: str, counter: Counter): - function_stats_csv = STATISTICS_DIRECTORY / f"{name}_stats.csv" + function_stats_csv = STATISTICS_OUTPUT_DIRECTORY / f"{name}_stats.csv" with open(function_stats_csv, mode="w", newline="", encoding="utf-8") as stats_csv: stats_writer = csv.writer(stats_csv) @@ -1044,12 +1060,10 @@ def write_counter(name: str, counter: Counter): def main(): - """Main entry point.""" logger.debug("Starting analysis...") start = datetime.now() - base_dir = Path(DYNAMIC_ANALYSIS_RESULTS_DIRECTORY) - evaluate(base_dir, max_workers=6, chunk_size=2000) + evaluate(max_workers=6, chunk_size=2000) logger.info(f"Finished in {datetime.now() - start}") diff --git a/evaluation/requirements.txt b/evaluation/requirements.txt deleted file mode 100644 index 68d0d94..0000000 --- a/evaluation/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -upsetplot -seaborn -orjson -numpy -matplotlib -mpmath \ No newline at end of file diff --git a/evaluation/storage.py b/evaluation/storage.py new file mode 100644 index 0000000..352e036 --- /dev/null +++ b/evaluation/storage.py @@ -0,0 +1,101 @@ +from dataclasses import dataclass +from typing import List, Optional + +import boto3 +from botocore.config import Config +from botocore.exceptions import BotoCoreError, ClientError +from dotenv import load_dotenv + +load_dotenv() # Подтянем переменные из .env, если нужно + + +@dataclass +class StorageConfig: + endpoint: str + access_key: str + secret_key: str + bucket_name: str + + +class Storage: + def __init__(self, storage_config: StorageConfig): + self._cfg = storage_config + self._client = boto3.client( + service_name="s3", + endpoint_url=self._cfg.endpoint, + aws_access_key_id=self._cfg.access_key, + aws_secret_access_key=self._cfg.secret_key, + use_ssl=False, + config=Config( + max_pool_connections=10, + connect_timeout=60, + read_timeout=60, + retries={'max_attempts': 5} + ) + ) + self._bucket = self._cfg.bucket_name + + def list_objects(self, prefix: str = "") -> List[str]: + """ + Returns a list of object keys in the bucket that match the specified prefix. + """ + keys = [] + continuation_token: Optional[str] = None + + while True: + try: + if continuation_token: + response = self._client.list_objects_v2( + Bucket=self._bucket, + Prefix=prefix, + ContinuationToken=continuation_token + ) + else: + response = self._client.list_objects_v2( + Bucket=self._bucket, + Prefix=prefix + ) + + contents = response.get("Contents", []) + for obj in contents: + keys.append(obj["Key"]) + + if response.get("IsTruncated"): + continuation_token = response.get("NextContinuationToken") + else: + break + + except (BotoCoreError, ClientError) as e: + print(f"Error listing objects: {e}") + break + + return keys + + def get_object_bytes(self, key: str) -> bytes: + """ + Downloads the specified S3 object and returns its content as bytes. + """ + try: + response = self._client.get_object(Bucket=self._bucket, Key=key) + return response["Body"].read() + except (BotoCoreError, ClientError) as e: + print(f"Error getting object {key}: {e}") + return b"" + + def put_object_bytes(self, key: str, data: bytes, content_type: str = "application/octet-stream") -> None: + """ + Uploads a bytes object to S3 with the specified content type. + """ + try: + self._client.put_object( + Bucket=self._bucket, + Key=key, + Body=data, + ContentType=content_type, + ContentLength=len(data) + ) + except (BotoCoreError, ClientError) as e: + print(f"Error putting object {key}: {e}") + + def close(self): + self._client.close() diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml new file mode 100644 index 0000000..603363a --- /dev/null +++ b/infra/docker-compose.yml @@ -0,0 +1,22 @@ +version: "3.8" + +services: + minio: + build: ./minio/ + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: "sampleuser" + MINIO_ROOT_PASSWORD: "samplepass" + volumes: + - ./minio/data:/data + + rabbitmq: + image: rabbitmq:3-management + container_name: rabbitmq + ports: + - "5672:5672" + environment: + RABBITMQ_DEFAULT_USER: "admin" + RABBITMQ_DEFAULT_PASS: "admin" \ No newline at end of file diff --git a/infra/minio/Dockerfile b/infra/minio/Dockerfile new file mode 100644 index 0000000..0223641 --- /dev/null +++ b/infra/minio/Dockerfile @@ -0,0 +1,6 @@ +FROM quay.io/minio/minio + +COPY entrypoint.sh /usr/bin/entrypoint.sh +RUN chmod +x /usr/bin/entrypoint.sh + +ENTRYPOINT ["/usr/bin/entrypoint.sh"] \ No newline at end of file diff --git a/infra/minio/entrypoint.sh b/infra/minio/entrypoint.sh new file mode 100644 index 0000000..37ab460 --- /dev/null +++ b/infra/minio/entrypoint.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +minio server /data --console-address ":9001" & +MINIO_PID=$! + +# Дождемся, пока MinIO стартует +sleep 5 + +# Добавляем alias для mc +mc alias set main http://127.0.0.1:9000 "$MINIO_ROOT_USER" "$MINIO_ROOT_PASSWORD" + +# Проверяем, есть ли уже бакет "metadata" +if ! mc ls main/metadata >/dev/null 2>&1; then + mc mb main/metadata +fi + +# Проверяем, есть ли уже бакет "analysis" +if ! mc ls main/analysis >/dev/null 2>&1; then + mc mb main/analysis +fi + +wait $MINIO_PID \ No newline at end of file diff --git a/pypi_malregistry/README.md b/pypi_malregistry/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/requirements.txt b/requirements.txt index 41a6045..19a9995 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,18 @@ -mathplot numpy mpmath seaborn -upsetplot \ No newline at end of file +upsetplot +orjson +asyncio +python-dotenv +aiobotocore +scapy +aio-pika +httpx +tenacity +docker +pika +packaging +boto3 +botocore +matplotlib \ No newline at end of file diff --git a/runner.py b/runner.py index d26923a..c5e4aa9 100644 --- a/runner.py +++ b/runner.py @@ -1,5 +1,6 @@ import logging import os.path +import sys import threading from logging.handlers import RotatingFileHandler from pathlib import Path @@ -64,10 +65,11 @@ def create_docker_container(client: DockerClient, image: Image, environment: dic privileged=True, cgroupns="host", environment=environment, + network_mode="host", ) -logger = setup_logger('runner', 'runner.log') +logger = setup_logger('runner', './logs/runner.log') class Worker: @@ -148,6 +150,22 @@ SANDBOX_DOCKER_IMAGE_TAG = "sandbox:latest" def main(): load_dotenv() + args = list(sys.argv) + args.pop(0) + + worker_count = 1 + + try: + if len(args) > 0: + worker_count = int(args[0]) + + if worker_count <= 0: + print("Worker count must be > 0") + return + except: + print("Invalid worker count") + return + docker_client = docker.from_env(timeout=300) logger.debug("Building docker images...") @@ -190,7 +208,9 @@ def main(): try: logger.debug("Starting workers...") - runner.start_new_worker(worker_environment) + for _ in range(worker_count): + runner.start_new_worker(worker_environment) + runner.wait() logger.debug("Workers were finished!") diff --git a/sandbox/.ssh/id_pub.rsa b/sandbox/.ssh/id_pub.rsa deleted file mode 100644 index 679aa53..0000000 --- a/sandbox/.ssh/id_pub.rsa +++ /dev/null @@ -1 +0,0 @@ -sample text \ No newline at end of file diff --git a/sandbox/Dockerfile b/sandbox/Dockerfile index 91cf6de..cff2ab7 100644 --- a/sandbox/Dockerfile +++ b/sandbox/Dockerfile @@ -5,13 +5,9 @@ WORKDIR /setup ARG USERNAME="user" -ENV DEBIAN_FRONTEND noninteractive +ENV DEBIAN_FRONTEND=noninteractive ENV DEBCONF_NOWARNINGS="yes" -# install keys for powershell -RUN curl -fsSL "https://packages.microsoft.com/config/ubuntu/22.04/packages-microsoft-prod.deb" -o /setup/packages-microsoft-prod.deb && \ - dpkg -i /setup/packages-microsoft-prod.deb - # Extra packages for realistic runtime RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-recommends \ apt-transport-https \ @@ -39,17 +35,18 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-reco zip \ unzip -RUN useradd -m user && adduser user sudo && \ +RUN useradd -m "${USERNAME}" && adduser "${USERNAME}" sudo && \ echo "ALL ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers && \ - mkdir -p /etc/sudoers.d && echo "user ALL=(ALL:ALL) NOPASSWD: ALL" > /etc/sudoers.d/user + mkdir -p /etc/sudoers.d && \ + echo "${USERNAME} ALL=(ALL:ALL) NOPASSWD: ALL" > "/etc/sudoers.d/${USERNAME}" -RUN mkdir -p /home/user/.local/bin && \ - chown -R user:user /home/user +RUN mkdir -p "/home/${USERNAME}/.local/bin" && \ + chown -R "${USERNAME}:${USERNAME}" "/home/${USERNAME}" RUN mkdir -m 0700 /root/.ssh && \ - mkdir -p /home/user/.ssh && \ - chmod 700 /home/user/.ssh && \ - chown user:user /home/user/.ssh + mkdir -p "/home/${USERNAME}/.ssh" && \ + chmod 700 "/home/${USERNAME}/.ssh" && \ + chown "${USERNAME}:${USERNAME}" "/home/${USERNAME}/.ssh" # # Python setup @@ -76,7 +73,7 @@ COPY dynamic-analysis.py /tools/ RUN chmod 755 /tools/dynamic-analysis.py -WORKDIR /home/user +WORKDIR "/home/${USERNAME}" # Set max analysis execution time using sleep ENTRYPOINT [ "sleep" ] diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py new file mode 100644 index 0000000..e764101 --- /dev/null +++ b/scheduler/scheduler.py @@ -0,0 +1,169 @@ +import asyncio +import os +import traceback +from datetime import datetime +from typing import Dict, List + +import orjson +import pika +from dotenv import load_dotenv +from packaging.specifiers import SpecifierSet +from packaging.version import Version + +from storage import AsyncStorage, StorageConfig + + +def is_python_310_supported(python_version: str) -> bool: + """Check if 'python_version' spec includes Python 3.10.""" + try: + normalized = python_version.replace(".*", ".0").replace(".x", "0") + specifier = SpecifierSet(normalized) + return Version("3.10") in specifier + except Exception: + return False + + +def get_latest_release_by_upload_time(releases: Dict[str, List[dict]]): + """ + Return the release version with the newest upload time and its files. + Each value in 'releases' is a list of file dicts, each having 'upload_time'. + """ + best_version = None + best_files = [] + best_time = None + + for version_str, files in releases.items(): + if not files: + continue + + sorted_files = sorted( + files, + key=lambda f: datetime.fromisoformat(f["upload_time"]), + reverse=True + ) + + version_latest_dt = datetime.fromisoformat(sorted_files[0]["upload_time"]) + if best_time is None or version_latest_dt > best_time: + best_time = version_latest_dt + best_version = version_str + best_files = sorted_files + + return best_version, best_files + + +async def process_single_key( + storage: AsyncStorage, + key: str +) -> dict: + """ + Fetch and analyze package metadata (JSON) from S3 for a single key. + Returns a dict with analysis results. + """ + result = { + "package_name": key, + "version": None, + } + + try: + storage_object = await storage.get(key=key) + data = await storage_object["Body"].read() + package_metadata = orjson.loads(data) + except Exception as e: + traceback.print_exc() + return result + + # Update package name if present + if "name" in package_metadata: + result["package_name"] = package_metadata["name"] + + releases = package_metadata.get("releases", {}) + release_count = len(releases) + + if release_count == 0: + return result + + latest_version, latest_files = get_latest_release_by_upload_time(releases) + result["version"] = latest_version + + if latest_files: + # Check if at least one file in the latest release supports 3.10 + supports_310 = any( + (f.get("python_version") is None) + or is_python_310_supported(f["python_version"]) + for f in latest_files + ) + + result["is_python_3_10_supported"] = supports_310 + + return result + + +async def main(): + load_dotenv() + + # Configure S3 async storage + storage_config = StorageConfig( + endpoint=os.getenv("S3_ENDPOINT", "not in env defined"), + access_key=os.getenv("S3_ACCESS_KEY", "not in env defined"), + secret_key=os.getenv("S3_SECRET_KEY", "not in env defined"), + bucket_name=os.getenv("S3_METADATA_BUCKET_NAME", "not in env defined"), + ) + + storage = AsyncStorage(storage_config) + await storage.connect() + + connection_params = pika.ConnectionParameters( + host=os.getenv("BROKER_HOST"), + + credentials=pika.PlainCredentials( + os.getenv("BROKER_LOGIN"), + os.getenv("BROKER_PASSWORD") + ), + + heartbeat=600, + blocked_connection_timeout=300 + ) + + queue_name = os.getenv("BROKER_QUEUE_NAME") + + connection = pika.BlockingConnection(connection_params) + channel = connection.channel() + + # List all keys from S3 + contents = await storage.list_objects() + object_keys = [k for k in contents if k.endswith(".json")] + + print(f"Found {len(object_keys)} JSON objects in S3 bucket.") + + tasks = [ + process_single_key(storage, key) + for key in object_keys + ] + + results = await asyncio.gather(*tasks) + print(f"Total processed packages: {len(results)}") + + channel.queue_declare(queue=queue_name, durable=True) + + for result in results: + package_name = result.get("package_name") + version = result.get("version") + + channel.basic_publish( + exchange='', + routing_key=queue_name, + body=orjson.dumps({"type": "analyze", "package_name": package_name, "version": version}), + properties=pika.BasicProperties( + content_type='application/json', + delivery_mode=pika.DeliveryMode.Persistent, + ), + ) + + unsupported_version = sum(1 for result in results if not result["is_python_3_10_supported"]) + print(f"Packages that do NOT support Python 3.10: {unsupported_version}") + + await storage.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scheduler/storage.py b/scheduler/storage.py new file mode 100644 index 0000000..f5bfb89 --- /dev/null +++ b/scheduler/storage.py @@ -0,0 +1,85 @@ +from contextlib import AsyncExitStack +from dataclasses import dataclass +from io import BytesIO +from typing import List + +import aiobotocore +from aiobotocore.session import AioSession + + +@dataclass +class StorageConfig: + endpoint: str + access_key: str + secret_key: str + bucket_name: str + + +class AsyncStorage: + def __init__(self, storage_config: StorageConfig): + self._exit_stack = AsyncExitStack() + self.__storage_config = storage_config + self.__client = None + + async def connect(self): + try: + session = AioSession() + + self.__client = await self._exit_stack.enter_async_context(session.create_client( + service_name='s3', + use_ssl=False, + endpoint_url=self.__storage_config.endpoint, + aws_access_key_id=self.__storage_config.access_key, + aws_secret_access_key=self.__storage_config.secret_key, + config=aiobotocore.config.AioConfig( + max_pool_connections=10, + connect_timeout=5000, + read_timeout=5000, + retries={'max_attempts': 10} + ) + )) + + except Exception as e: + await self.close() + raise + + async def close(self) -> None: + if self.__client: + await self._exit_stack.__aexit__(None, None, None) + + async def put(self, destination_file_name: str, data: bytes, content_type: str = 'application/json') -> None: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + data_io = BytesIO(data) + length = len(data) + + await self.__client.put_object( + Bucket=self.__storage_config.bucket_name, + Key=destination_file_name, + Body=data_io, + ContentLength=length, + ContentType=content_type + ) + + async def get(self, key: str) -> any: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.get_object( + Bucket=self.__storage_config.bucket_name, + Key=key, + ) + + return response + + async def list_objects(self, prefix: str = "") -> List[str]: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.list_objects_v2( + Bucket=self.__storage_config.bucket_name, + Prefix=prefix + ) + contents = response.get("Contents", []) + return [obj["Key"] for obj in contents] diff --git a/worker/Dockerfile b/worker/Dockerfile index 3d8cc8a..5a0a579 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -5,7 +5,7 @@ RUN cargo build --release FROM ubuntu:22.04 -ENV DEBIAN_FRONTEND noninteractive +ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get upgrade -y && \ apt-get install -y \ diff --git a/worker/__init__.py b/worker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/worker/src/storage.py b/worker/src/storage.py index 55be8d2..f5bfb89 100644 --- a/worker/src/storage.py +++ b/worker/src/storage.py @@ -1,6 +1,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass from io import BytesIO +from typing import List import aiobotocore from aiobotocore.session import AioSession @@ -60,3 +61,25 @@ class AsyncStorage: ContentLength=length, ContentType=content_type ) + + async def get(self, key: str) -> any: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.get_object( + Bucket=self.__storage_config.bucket_name, + Key=key, + ) + + return response + + async def list_objects(self, prefix: str = "") -> List[str]: + if not self.__client: + raise RuntimeError("Storage is not initialized.") + + response = await self.__client.list_objects_v2( + Bucket=self.__storage_config.bucket_name, + Prefix=prefix + ) + contents = response.get("Contents", []) + return [obj["Key"] for obj in contents] diff --git a/worker/src/worker.py b/worker/src/worker.py index f245653..d049133 100644 --- a/worker/src/worker.py +++ b/worker/src/worker.py @@ -59,7 +59,7 @@ def setup_logger( return logger -logger = setup_logger("worker", "worker.log") +logger = setup_logger("worker", "./logs/worker.log") async def run_async_command(command: str, check: bool = True) -> Tuple[bytes, bytes]: @@ -362,10 +362,8 @@ class Sandbox: try: size = os.stat(log_file).st_size if size < 1024 * 1024 * 128: # <128MB - logger.debug("Collecting using native Python strace parser") return StraceParser.parse(str(log_file)) - logger.debug("Collecting using external strace parser (file is large)") result_json, _ = await run_async_command(f"strace_parser {log_file}") return orjson.loads(result_json) except Exception as exc: @@ -832,19 +830,21 @@ async def main() -> None: """ load_dotenv() args = list(sys.argv) - args.pop(0) # remove script name + args.pop(0) worker_id = 0 worker_count = 1 if args: worker_id = int(args.pop(0)) + if worker_id < 0: print("Worker ID must be >= 0") return if args: worker_count = int(args.pop(0)) + if worker_count <= 0: logger.error("Worker count must be > 0") return -- GitLab