Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0"
export LAUNCHPAD_PORT="2218"
export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha"
export SENTRY_BASE_URL="http://localhost:8000"

export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051"
export LAUNCHPAD_WORKER_CONCURRENCY="32"

# STATSD_HOST=... # defaults to 127.0.0.1
# STATSD_PORT=... # defaults to 8125

Expand Down Expand Up @@ -38,6 +42,8 @@ if ! command -v "$DEVENV" >/dev/null; then
fi

PATH_add "${PWD}/.devenv/all/bin"
PATH_add "${PWD}/.devenv/bin"

case $(uname -s) in
Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";;
*) PATH_add "${PWD}/.devenv/x86_64-linux/bin";;
Expand Down
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12.12
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ devservices up
launchpad serve
```

And finally use the `sentry-cli` to upload to your local machine:
And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine:

```bash
sentry-cli --log-level DEBUG \
Expand Down
20 changes: 10 additions & 10 deletions devenv/config.ini
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
[venv.launchpad]
python = 3.13.1
python = 3.12.12
path = .venv
requirements = requirements-dev.txt
editable =
.

[python3.13.1]
darwin_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-apple-darwin-install_only.tar.gz
darwin_x86_64_sha256 = 4c4dafe2d59bb58e8d3ad26af637b7ae9c8141bb79738966752976861bdb103d
darwin_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-apple-darwin-install_only.tar.gz
darwin_arm64_sha256 = bbfc96038d0b6922fd783f6eb2c9bf9abb648531d23d236bc1a0c16bdd061944
linux_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-unknown-linux-gnu-install_only.tar.gz
linux_x86_64_sha256 = bb4696825039a2b5dc7fea2c6aeb085c89fd397016b44165ec73b4224ccc83e2
linux_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-unknown-linux-gnu-install_only.tar.gz
linux_arm64_sha256 = d37aef7bdf5c27f7d006918f7cedb31f4ba07c88f61baac4ffbe0bee6d4b5248
[python3.12.12]
darwin_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-apple-darwin-install_only.tar.gz
darwin_x86_64_sha256 = 7a453d2773d0ffbc8f8ca45bb20fa305815aff60b8072361451c3674c17ff5ef
darwin_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-apple-darwin-install_only.tar.gz
darwin_arm64_sha256 = 95d7666718239b7b2fc94937453ff6689dc4db0daf42263c21ec1f9f41eefb31
linux_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-unknown-linux-gnu-install_only.tar.gz
linux_x86_64_sha256 = da2b48a8ba9969d57b75544b818210c5f574e86640776cd885ffc17b50c3569a
linux_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-unknown-linux-gnu-install_only.tar.gz
linux_arm64_sha256 = 738a1f8a187bbf4b6c6e5af0ffb92f61b3e8aded8916e093c4711c00dbafb601

[uv]
darwin_arm64 = https://github.com/astral-sh/uv/releases/download/0.7.21/uv-aarch64-apple-darwin.tar.gz
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ classifiers = [
"Operating System :: MacOS",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Build Tools",
"Topic :: System :: Archiving",
]
requires-python = ">=3.13"
requires-python = ">=3.12.12"

[project.scripts]
launchpad = "launchpad.cli:main"
Expand All @@ -38,7 +38,7 @@ where = ["src"]

[tool.ruff]
line-length = 120
target-version = "py313"
target-version = "py312"

[tool.ruff.lint]
# Enable pycodestyle (E), Pyflakes (F), and isort (I) by default
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ty==0.0.1a20
# Testing web endpoints
sortedcontainers-stubs>=2.4.0
kafka-python>=2.0.0
sentry-protos==0.2.0
sentry-protos>=0.4.11

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0
sentry-kafka-schemas==2.1.2
sentry-sdk>=2.36.0
sortedcontainers>=2.4.0
taskbroker-client @ git+https://github.com/getsentry/taskbroker.git@main#subdirectory=clients/python
typing-extensions>=4.15.0
zipfile-zstd==0.0.4
22 changes: 9 additions & 13 deletions src/launchpad/artifact_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterator, cast
from typing import Any, Dict, Iterator, List, cast

import sentry_sdk

Expand All @@ -17,9 +17,6 @@
from objectstore_client import (
Usecase,
)
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import (
PreprodArtifactEvents,
)

from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel
from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel
Expand Down Expand Up @@ -66,7 +63,10 @@ def __init__(

@staticmethod
def process_message(
payload: PreprodArtifactEvents,
artifact_id: str,
project_id: str,
organization_id: str,
requested_features: List[PreprodFeature],
service_config=None,
artifact_processor=None,
statsd=None,
Expand All @@ -84,10 +84,6 @@ def process_message(

initialize_sentry_sdk()

organization_id = payload["organization_id"]
project_id = payload["project_id"]
artifact_id = payload["artifact_id"]

if statsd is None:
statsd = get_statsd()
if artifact_processor is None:
Expand All @@ -97,10 +93,10 @@ def process_message(
objectstore_client = ObjectstoreClient(service_config.objectstore_url)
artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client)

requested_features = []
for feature in payload.get("requested_features", []):
features = []
for feature in requested_features:
try:
requested_features.append(PreprodFeature(feature))
features.append(PreprodFeature(feature))
except ValueError:
logger.exception(f"Unknown feature {feature}")

Expand All @@ -127,7 +123,7 @@ def process_message(
statsd.increment("artifact.processing.started")
logger.info(f"Processing artifact {artifact_id} (project: {project_id}, org: {organization_id})")
try:
artifact_processor.process_artifact(organization_id, project_id, artifact_id, requested_features)
artifact_processor.process_artifact(organization_id, project_id, artifact_id, features)
except Exception:
statsd.increment("artifact.processing.failed")
duration = time.time() - start_time
Expand Down
1 change: 1 addition & 0 deletions src/launchpad/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ArtifactType(Enum):
# This should match exactly with the definition in:
# src/sentry/preprod/producer.py
class PreprodFeature(Enum):
PRE_PROCESS = "pre_process"
SIZE_ANALYSIS = "size_analysis"
BUILD_DISTRIBUTION = "build_distribution"

Expand Down
23 changes: 23 additions & 0 deletions src/launchpad/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class ServerConfig:
log_level: str
access_log: bool

# The address of the taskbroker the worker connects to
worker_rpc_host: str

# The number of child processes the worker should have
worker_concurrency: int


def get_server_config() -> ServerConfig:
"""Get server configuration from environment."""
Expand All @@ -226,13 +232,30 @@ def get_server_config() -> ServerConfig:
f"LAUNCHPAD_PORT must be a valid integer, got: {port_str}"
)

worker_rpc_host = os.getenv("LAUNCHPAD_WORKER_RPC_HOST")
if not worker_rpc_host:
raise ValueError("LAUNCHPAD_WORKER_RPC_HOST environment variable required")

worker_concurrency_str = os.getenv("LAUNCHPAD_WORKER_CONCURRENCY")
if not worker_concurrency_str:
raise ValueError("LAUNCHPAD_WORKER_CONCURRENCY environment variable required")

try:
worker_concurrency = int(worker_concurrency_str)
except ValueError:
raise ValueError( # noqa: E501
f"LAUNCHPAD_WORKER_CONCURRENCY must be a valid integer, got: {worker_concurrency_str}"
)

sentry_region = os.getenv("SENTRY_REGION", "unknown")

return ServerConfig(
environment=environment,
sentry_region=sentry_region,
host=host,
port=port,
worker_rpc_host=worker_rpc_host,
worker_concurrency=worker_concurrency,
debug=not is_production,
log_level="WARNING" if is_production else "DEBUG",
access_log=not is_production, # Disable access logs in prod
Expand Down
43 changes: 33 additions & 10 deletions src/launchpad/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

from dataclasses import dataclass

from taskbroker_client.worker import TaskWorker

from launchpad.sentry_client import SentryClient
from launchpad.utils.logging import get_logger
from launchpad.utils.statsd import NullStatsd, StatsdInterface, get_statsd

from .kafka import LaunchpadKafkaConsumer, create_kafka_consumer
from .sentry_sdk_init import initialize_sentry_sdk
from .server import LaunchpadServer, get_server_config

Expand All @@ -29,14 +30,14 @@ class LaunchpadService:

def __init__(self, statsd: StatsdInterface | None = None) -> None:
self.server: LaunchpadServer | None = None
self.kafka: LaunchpadKafkaConsumer | None = None
self._server_thread: threading.Thread | None = None
self._server_loop: asyncio.AbstractEventLoop | None = None
self._statsd = statsd or NullStatsd()
self._healthcheck_file: str | None = None
self._service_config: ServiceConfig | None = None
self._sentry_client: SentryClient | None = None
self._shutdown_requested = False
self.worker: TaskWorker | None = None

def setup(self) -> None:
initialize_sentry_sdk()
Expand All @@ -51,11 +52,26 @@ def setup(self) -> None:
statsd=self._statsd,
)

self.kafka = create_kafka_consumer()
# Initialize the worker
worker_rpc_host = server_config.worker_rpc_host
worker_concurrency = server_config.worker_concurrency

self.worker = TaskWorker(
app_module="launchpad.worker.app:app",
broker_hosts=[worker_rpc_host],
max_child_task_count=100,
concurrency=worker_concurrency,
child_tasks_queue_maxsize=worker_concurrency * 2,
result_queue_maxsize=worker_concurrency * 2,
rebalance_after=32,
processing_pool_name="launchpad",
process_type="forkserver",
)

logger.info("Service components initialized")

def start(self) -> None:
if not self.server or not self.kafka:
if not self.server or not self.worker:
raise RuntimeError("Service not properly initialized. Call setup() first.")

logger.info("Starting Launchpad service...")
Expand All @@ -67,8 +83,6 @@ def signal_handler(signum: int, frame) -> None:

logger.info(f"Received signal {signum}, initiating shutdown...")
self._shutdown_requested = True
if self.kafka:
self.kafka.stop()

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
Expand All @@ -83,21 +97,30 @@ def signal_handler(signum: int, frame) -> None:

logger.info("Launchpad service started successfully")

exitcode = 1
try:
# Run Kafka consumer in main thread (blocking)
self.kafka.run()
exitcode = self.worker.start()
finally:
logger.info("Cleaning up service resources...")
self._shutdown_server()
logger.info("Service cleanup completed")

raise SystemExit(exitcode)

def is_healthy(self) -> bool:
"""Get overall service health status."""
# PRECONDITION - Assume server exists
assert self.server

is_server_healthy = self.server.is_healthy()
is_kafka_healthy = self.kafka.is_healthy()
return is_server_healthy and is_kafka_healthy

# TODO - Report worker health too
return is_server_healthy

def _run_http_server_thread(self) -> None:
# PRECONDITION - Assume server exists
assert self.server

self._server_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._server_loop)

Expand Down
32 changes: 32 additions & 0 deletions src/launchpad/worker/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from arroyo.backends.kafka import KafkaProducer
from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.router import TaskRouter

from .store import StubAtMostOnce


class CustomRouter(TaskRouter):
"""Custom router that routes all namespaces to the 'taskworker' topic."""

def route_namespace(self, name: str) -> str:
return "taskworker"


def producer_factory(topic: str) -> KafkaProducer:
# TODO use env vars for kafka host/port
config = {
"bootstrap.servers": "127.0.0.1:9092",
"compression.type": "lz4",
"message.max.bytes": 50000000, # 50MB
}
return KafkaProducer(config)


app = TaskbrokerApp(
name="launchpad",
producer_factory=producer_factory,
router_class=CustomRouter(),
at_most_once_store=StubAtMostOnce(),
)

app.set_modules(["launchpad.worker.tasks"])
12 changes: 12 additions & 0 deletions src/launchpad/worker/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from taskbroker_client.types import AtMostOnceStore


class StubAtMostOnce(AtMostOnceStore):
def __init__(self) -> None:
self._keys: dict[str, str] = {}

def add(self, key: str, value: str, timeout: int) -> bool:
if key in self._keys:
return False
self._keys[key] = value
return True
21 changes: 21 additions & 0 deletions src/launchpad/worker/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

from typing import List

from launchpad.artifact_processor import ArtifactProcessor
from launchpad.constants import PreprodFeature

from .app import app

# Create a namespace and register tasks
default = app.taskregistry.create_namespace("default")


@default.register(name="process_artifact")
def process_artifact(
artifact_id: str, project_id: str, organization_id: str, requested_features: List[PreprodFeature], **kwargs
) -> None:
print("Processing artifact...")

ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features)

print("Processed artifact 🎉")
Loading