diff --git a/.envrc b/.envrc index 943710d0..10a7d825 100644 --- a/.envrc +++ b/.envrc @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0" export LAUNCHPAD_PORT="2218" export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha" export SENTRY_BASE_URL="http://localhost:8000" + +export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051" +export LAUNCHPAD_WORKER_CONCURRENCY="32" + # STATSD_HOST=... # defaults to 127.0.0.1 # STATSD_PORT=... # defaults to 8125 @@ -38,6 +42,8 @@ if ! command -v "$DEVENV" >/dev/null; then fi PATH_add "${PWD}/.devenv/all/bin" +PATH_add "${PWD}/.devenv/bin" + case $(uname -s) in Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";; *) PATH_add "${PWD}/.devenv/x86_64-linux/bin";; diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..763b6264 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12.12 diff --git a/README.md b/README.md index dddf9217..205ab9e1 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ devservices up launchpad serve ``` -And finally use the `sentry-cli` to upload to your local machine: +And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine: ```bash sentry-cli --log-level DEBUG \ diff --git a/devenv/config.ini b/devenv/config.ini index 9da4e15f..d31b1f27 100644 --- a/devenv/config.ini +++ b/devenv/config.ini @@ -1,19 +1,19 @@ [venv.launchpad] -python = 3.13.1 +python = 3.12.12 path = .venv requirements = requirements-dev.txt editable = . -[python3.13.1] -darwin_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-apple-darwin-install_only.tar.gz -darwin_x86_64_sha256 = 4c4dafe2d59bb58e8d3ad26af637b7ae9c8141bb79738966752976861bdb103d -darwin_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-apple-darwin-install_only.tar.gz -darwin_arm64_sha256 = bbfc96038d0b6922fd783f6eb2c9bf9abb648531d23d236bc1a0c16bdd061944 -linux_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-unknown-linux-gnu-install_only.tar.gz -linux_x86_64_sha256 = bb4696825039a2b5dc7fea2c6aeb085c89fd397016b44165ec73b4224ccc83e2 -linux_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-unknown-linux-gnu-install_only.tar.gz -linux_arm64_sha256 = d37aef7bdf5c27f7d006918f7cedb31f4ba07c88f61baac4ffbe0bee6d4b5248 +[python3.12.12] +darwin_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-apple-darwin-install_only.tar.gz +darwin_x86_64_sha256 = 7a453d2773d0ffbc8f8ca45bb20fa305815aff60b8072361451c3674c17ff5ef +darwin_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-apple-darwin-install_only.tar.gz +darwin_arm64_sha256 = 95d7666718239b7b2fc94937453ff6689dc4db0daf42263c21ec1f9f41eefb31 +linux_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-unknown-linux-gnu-install_only.tar.gz +linux_x86_64_sha256 = da2b48a8ba9969d57b75544b818210c5f574e86640776cd885ffc17b50c3569a +linux_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-unknown-linux-gnu-install_only.tar.gz +linux_arm64_sha256 = 738a1f8a187bbf4b6c6e5af0ffb92f61b3e8aded8916e093c4711c00dbafb601 [uv] darwin_arm64 = https://github.com/astral-sh/uv/releases/download/0.7.21/uv-aarch64-apple-darwin.tar.gz diff --git a/pyproject.toml b/pyproject.toml index eb41d806..20380c62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,11 +20,11 @@ classifiers = [ "Operating System :: MacOS", "Operating System :: POSIX :: Linux", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.12", "Topic :: Software Development :: Build Tools", "Topic :: System :: Archiving", ] -requires-python = ">=3.13" +requires-python = ">=3.12.12" [project.scripts] launchpad = "launchpad.cli:main" @@ -38,7 +38,7 @@ where = ["src"] [tool.ruff] line-length = 120 -target-version = "py313" +target-version = "py312" [tool.ruff.lint] # Enable pycodestyle (E), Pyflakes (F), and isort (I) by default diff --git a/requirements-dev.txt b/requirements-dev.txt index 89cf9dd6..7a4eb8cb 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -24,5 +24,5 @@ ty==0.0.1a20 # Testing web endpoints sortedcontainers-stubs>=2.4.0 kafka-python>=2.0.0 -sentry-protos==0.2.0 +sentry-protos>=0.4.11 diff --git a/requirements.txt b/requirements.txt index e398dd58..8416c1b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0 sentry-kafka-schemas==2.1.2 sentry-sdk>=2.36.0 sortedcontainers>=2.4.0 +taskbroker-client @ git+https://github.com/getsentry/taskbroker.git@main#subdirectory=clients/python typing-extensions>=4.15.0 zipfile-zstd==0.0.4 diff --git a/src/launchpad/artifact_processor.py b/src/launchpad/artifact_processor.py index c45beb65..af5dd2bb 100644 --- a/src/launchpad/artifact_processor.py +++ b/src/launchpad/artifact_processor.py @@ -7,7 +7,7 @@ from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterator, cast +from typing import Any, Dict, Iterator, List, cast import sentry_sdk @@ -17,9 +17,6 @@ from objectstore_client import ( Usecase, ) -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( - PreprodArtifactEvents, -) from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel @@ -66,7 +63,10 @@ def __init__( @staticmethod def process_message( - payload: PreprodArtifactEvents, + artifact_id: str, + project_id: str, + organization_id: str, + requested_features: List[PreprodFeature], service_config=None, artifact_processor=None, statsd=None, @@ -84,10 +84,6 @@ def process_message( initialize_sentry_sdk() - organization_id = payload["organization_id"] - project_id = payload["project_id"] - artifact_id = payload["artifact_id"] - if statsd is None: statsd = get_statsd() if artifact_processor is None: @@ -97,10 +93,10 @@ def process_message( objectstore_client = ObjectstoreClient(service_config.objectstore_url) artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client) - requested_features = [] - for feature in payload.get("requested_features", []): + features = [] + for feature in requested_features: try: - requested_features.append(PreprodFeature(feature)) + features.append(PreprodFeature(feature)) except ValueError: logger.exception(f"Unknown feature {feature}") @@ -127,7 +123,7 @@ def process_message( statsd.increment("artifact.processing.started") logger.info(f"Processing artifact {artifact_id} (project: {project_id}, org: {organization_id})") try: - artifact_processor.process_artifact(organization_id, project_id, artifact_id, requested_features) + artifact_processor.process_artifact(organization_id, project_id, artifact_id, features) except Exception: statsd.increment("artifact.processing.failed") duration = time.time() - start_time diff --git a/src/launchpad/constants.py b/src/launchpad/constants.py index 4eb84c0f..b95ec974 100644 --- a/src/launchpad/constants.py +++ b/src/launchpad/constants.py @@ -28,6 +28,7 @@ class ArtifactType(Enum): # This should match exactly with the definition in: # src/sentry/preprod/producer.py class PreprodFeature(Enum): + PRE_PROCESS = "pre_process" SIZE_ANALYSIS = "size_analysis" BUILD_DISTRIBUTION = "build_distribution" diff --git a/src/launchpad/server.py b/src/launchpad/server.py index 48b76d7d..38e88386 100644 --- a/src/launchpad/server.py +++ b/src/launchpad/server.py @@ -201,6 +201,12 @@ class ServerConfig: log_level: str access_log: bool + # The address of the taskbroker the worker connects to + worker_rpc_host: str + + # The number of child processes the worker should have + worker_concurrency: int + def get_server_config() -> ServerConfig: """Get server configuration from environment.""" @@ -226,6 +232,21 @@ def get_server_config() -> ServerConfig: f"LAUNCHPAD_PORT must be a valid integer, got: {port_str}" ) + worker_rpc_host = os.getenv("LAUNCHPAD_WORKER_RPC_HOST") + if not worker_rpc_host: + raise ValueError("LAUNCHPAD_WORKER_RPC_HOST environment variable required") + + worker_concurrency_str = os.getenv("LAUNCHPAD_WORKER_CONCURRENCY") + if not worker_concurrency_str: + raise ValueError("LAUNCHPAD_WORKER_CONCURRENCY environment variable required") + + try: + worker_concurrency = int(worker_concurrency_str) + except ValueError: + raise ValueError( # noqa: E501 + f"LAUNCHPAD_WORKER_CONCURRENCY must be a valid integer, got: {worker_concurrency_str}" + ) + sentry_region = os.getenv("SENTRY_REGION", "unknown") return ServerConfig( @@ -233,6 +254,8 @@ def get_server_config() -> ServerConfig: sentry_region=sentry_region, host=host, port=port, + worker_rpc_host=worker_rpc_host, + worker_concurrency=worker_concurrency, debug=not is_production, log_level="WARNING" if is_production else "DEBUG", access_log=not is_production, # Disable access logs in prod diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 84a2722b..7a9c5492 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -9,11 +9,12 @@ from dataclasses import dataclass +from taskbroker_client.worker import TaskWorker + from launchpad.sentry_client import SentryClient from launchpad.utils.logging import get_logger from launchpad.utils.statsd import NullStatsd, StatsdInterface, get_statsd -from .kafka import LaunchpadKafkaConsumer, create_kafka_consumer from .sentry_sdk_init import initialize_sentry_sdk from .server import LaunchpadServer, get_server_config @@ -29,7 +30,6 @@ class LaunchpadService: def __init__(self, statsd: StatsdInterface | None = None) -> None: self.server: LaunchpadServer | None = None - self.kafka: LaunchpadKafkaConsumer | None = None self._server_thread: threading.Thread | None = None self._server_loop: asyncio.AbstractEventLoop | None = None self._statsd = statsd or NullStatsd() @@ -37,6 +37,7 @@ def __init__(self, statsd: StatsdInterface | None = None) -> None: self._service_config: ServiceConfig | None = None self._sentry_client: SentryClient | None = None self._shutdown_requested = False + self.worker: TaskWorker | None = None def setup(self) -> None: initialize_sentry_sdk() @@ -51,11 +52,26 @@ def setup(self) -> None: statsd=self._statsd, ) - self.kafka = create_kafka_consumer() + # Initialize the worker + worker_rpc_host = server_config.worker_rpc_host + worker_concurrency = server_config.worker_concurrency + + self.worker = TaskWorker( + app_module="launchpad.worker.app:app", + broker_hosts=[worker_rpc_host], + max_child_task_count=100, + concurrency=worker_concurrency, + child_tasks_queue_maxsize=worker_concurrency * 2, + result_queue_maxsize=worker_concurrency * 2, + rebalance_after=32, + processing_pool_name="launchpad", + process_type="forkserver", + ) + logger.info("Service components initialized") def start(self) -> None: - if not self.server or not self.kafka: + if not self.server or not self.worker: raise RuntimeError("Service not properly initialized. Call setup() first.") logger.info("Starting Launchpad service...") @@ -67,8 +83,6 @@ def signal_handler(signum: int, frame) -> None: logger.info(f"Received signal {signum}, initiating shutdown...") self._shutdown_requested = True - if self.kafka: - self.kafka.stop() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) @@ -83,21 +97,30 @@ def signal_handler(signum: int, frame) -> None: logger.info("Launchpad service started successfully") + exitcode = 1 try: - # Run Kafka consumer in main thread (blocking) - self.kafka.run() + exitcode = self.worker.start() finally: logger.info("Cleaning up service resources...") self._shutdown_server() logger.info("Service cleanup completed") + raise SystemExit(exitcode) + def is_healthy(self) -> bool: """Get overall service health status.""" + # PRECONDITION - Assume server exists + assert self.server + is_server_healthy = self.server.is_healthy() - is_kafka_healthy = self.kafka.is_healthy() - return is_server_healthy and is_kafka_healthy + + # TODO - Report worker health too + return is_server_healthy def _run_http_server_thread(self) -> None: + # PRECONDITION - Assume server exists + assert self.server + self._server_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._server_loop) diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py new file mode 100644 index 00000000..4203675d --- /dev/null +++ b/src/launchpad/worker/app.py @@ -0,0 +1,32 @@ +from arroyo.backends.kafka import KafkaProducer +from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.router import TaskRouter + +from .store import StubAtMostOnce + + +class CustomRouter(TaskRouter): + """Custom router that routes all namespaces to the 'taskworker' topic.""" + + def route_namespace(self, name: str) -> str: + return "taskworker" + + +def producer_factory(topic: str) -> KafkaProducer: + # TODO use env vars for kafka host/port + config = { + "bootstrap.servers": "127.0.0.1:9092", + "compression.type": "lz4", + "message.max.bytes": 50000000, # 50MB + } + return KafkaProducer(config) + + +app = TaskbrokerApp( + name="launchpad", + producer_factory=producer_factory, + router_class=CustomRouter(), + at_most_once_store=StubAtMostOnce(), +) + +app.set_modules(["launchpad.worker.tasks"]) diff --git a/src/launchpad/worker/store.py b/src/launchpad/worker/store.py new file mode 100644 index 00000000..3e7fc996 --- /dev/null +++ b/src/launchpad/worker/store.py @@ -0,0 +1,12 @@ +from taskbroker_client.types import AtMostOnceStore + + +class StubAtMostOnce(AtMostOnceStore): + def __init__(self) -> None: + self._keys: dict[str, str] = {} + + def add(self, key: str, value: str, timeout: int) -> bool: + if key in self._keys: + return False + self._keys[key] = value + return True diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py new file mode 100644 index 00000000..7a4369ae --- /dev/null +++ b/src/launchpad/worker/tasks.py @@ -0,0 +1,21 @@ + +from typing import List + +from launchpad.artifact_processor import ArtifactProcessor +from launchpad.constants import PreprodFeature + +from .app import app + +# Create a namespace and register tasks +default = app.taskregistry.create_namespace("default") + + +@default.register(name="process_artifact") +def process_artifact( + artifact_id: str, project_id: str, organization_id: str, requested_features: List[PreprodFeature], **kwargs +) -> None: + print("Processing artifact...") + + ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) + + print("Processed artifact 🎉")