From d25c5b9da94395452ff9104500fc8054461222b0 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 29 Jan 2026 11:20:52 -0800 Subject: [PATCH 1/6] Add `.devenv/bin` to Path --- .envrc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.envrc b/.envrc index 943710d0..503963ed 100644 --- a/.envrc +++ b/.envrc @@ -38,6 +38,8 @@ if ! command -v "$DEVENV" >/dev/null; then fi PATH_add "${PWD}/.devenv/all/bin" +PATH_add "${PWD}/.devenv/bin" + case $(uname -s) in Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";; *) PATH_add "${PWD}/.devenv/x86_64-linux/bin";; From 8b7eece6768744e4a2ddc6b58bc507452b2086e8 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 29 Jan 2026 12:45:27 -0800 Subject: [PATCH 2/6] Mention Required `sentry-cli` Version in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dddf9217..9e204f90 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ devservices up launchpad serve ``` -And finally use the `sentry-cli` to upload to your local machine: +And finally use the `sentry-cli` (version 3.10 or greater) to upload to your local machine: ```bash sentry-cli --log-level DEBUG \ From bdecba03084930576ef8aa9b5aff7f7144c91b9b Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Sun, 1 Feb 2026 22:09:11 -0800 Subject: [PATCH 3/6] Use Taskworker Package --- .python-version | 1 + README.md | 2 +- devenv/config.ini | 20 ++++++++++---------- pyproject.toml | 6 +++--- requirements-dev.txt | 2 +- requirements.txt | 1 + src/launchpad/server.py | 23 +++++++++++++++++++++++ src/launchpad/service.py | 31 ++++++++++++++++++++++++++----- src/launchpad/worker/app.py | 32 ++++++++++++++++++++++++++++++++ src/launchpad/worker/store.py | 12 ++++++++++++ src/launchpad/worker/tasks.py | 16 ++++++++++++++++ 11 files changed, 126 insertions(+), 20 deletions(-) create mode 100644 .python-version create mode 100644 src/launchpad/worker/app.py create mode 100644 src/launchpad/worker/store.py create mode 100644 src/launchpad/worker/tasks.py diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..763b6264 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12.12 diff --git a/README.md b/README.md index 9e204f90..205ab9e1 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ devservices up launchpad serve ``` -And finally use the `sentry-cli` (version 3.10 or greater) to upload to your local machine: +And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine: ```bash sentry-cli --log-level DEBUG \ diff --git a/devenv/config.ini b/devenv/config.ini index 9da4e15f..d31b1f27 100644 --- a/devenv/config.ini +++ b/devenv/config.ini @@ -1,19 +1,19 @@ [venv.launchpad] -python = 3.13.1 +python = 3.12.12 path = .venv requirements = requirements-dev.txt editable = . -[python3.13.1] -darwin_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-apple-darwin-install_only.tar.gz -darwin_x86_64_sha256 = 4c4dafe2d59bb58e8d3ad26af637b7ae9c8141bb79738966752976861bdb103d -darwin_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-apple-darwin-install_only.tar.gz -darwin_arm64_sha256 = bbfc96038d0b6922fd783f6eb2c9bf9abb648531d23d236bc1a0c16bdd061944 -linux_x86_64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-x86_64-unknown-linux-gnu-install_only.tar.gz -linux_x86_64_sha256 = bb4696825039a2b5dc7fea2c6aeb085c89fd397016b44165ec73b4224ccc83e2 -linux_arm64 = https://github.com/indygreg/python-build-standalone/releases/download/20250106/cpython-3.13.1+20250106-aarch64-unknown-linux-gnu-install_only.tar.gz -linux_arm64_sha256 = d37aef7bdf5c27f7d006918f7cedb31f4ba07c88f61baac4ffbe0bee6d4b5248 +[python3.12.12] +darwin_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-apple-darwin-install_only.tar.gz +darwin_x86_64_sha256 = 7a453d2773d0ffbc8f8ca45bb20fa305815aff60b8072361451c3674c17ff5ef +darwin_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-apple-darwin-install_only.tar.gz +darwin_arm64_sha256 = 95d7666718239b7b2fc94937453ff6689dc4db0daf42263c21ec1f9f41eefb31 +linux_x86_64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-x86_64-unknown-linux-gnu-install_only.tar.gz +linux_x86_64_sha256 = da2b48a8ba9969d57b75544b818210c5f574e86640776cd885ffc17b50c3569a +linux_arm64 = https://github.com/astral-sh/python-build-standalone/releases/download/20260127/cpython-3.12.12+20260127-aarch64-unknown-linux-gnu-install_only.tar.gz +linux_arm64_sha256 = 738a1f8a187bbf4b6c6e5af0ffb92f61b3e8aded8916e093c4711c00dbafb601 [uv] darwin_arm64 = https://github.com/astral-sh/uv/releases/download/0.7.21/uv-aarch64-apple-darwin.tar.gz diff --git a/pyproject.toml b/pyproject.toml index eb41d806..20380c62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,11 +20,11 @@ classifiers = [ "Operating System :: MacOS", "Operating System :: POSIX :: Linux", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.12", "Topic :: Software Development :: Build Tools", "Topic :: System :: Archiving", ] -requires-python = ">=3.13" +requires-python = ">=3.12.12" [project.scripts] launchpad = "launchpad.cli:main" @@ -38,7 +38,7 @@ where = ["src"] [tool.ruff] line-length = 120 -target-version = "py313" +target-version = "py312" [tool.ruff.lint] # Enable pycodestyle (E), Pyflakes (F), and isort (I) by default diff --git a/requirements-dev.txt b/requirements-dev.txt index 89cf9dd6..7a4eb8cb 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -24,5 +24,5 @@ ty==0.0.1a20 # Testing web endpoints sortedcontainers-stubs>=2.4.0 kafka-python>=2.0.0 -sentry-protos==0.2.0 +sentry-protos>=0.4.11 diff --git a/requirements.txt b/requirements.txt index e398dd58..8416c1b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0 sentry-kafka-schemas==2.1.2 sentry-sdk>=2.36.0 sortedcontainers>=2.4.0 +taskbroker-client @ git+https://github.com/getsentry/taskbroker.git@main#subdirectory=clients/python typing-extensions>=4.15.0 zipfile-zstd==0.0.4 diff --git a/src/launchpad/server.py b/src/launchpad/server.py index 48b76d7d..38e88386 100644 --- a/src/launchpad/server.py +++ b/src/launchpad/server.py @@ -201,6 +201,12 @@ class ServerConfig: log_level: str access_log: bool + # The address of the taskbroker the worker connects to + worker_rpc_host: str + + # The number of child processes the worker should have + worker_concurrency: int + def get_server_config() -> ServerConfig: """Get server configuration from environment.""" @@ -226,6 +232,21 @@ def get_server_config() -> ServerConfig: f"LAUNCHPAD_PORT must be a valid integer, got: {port_str}" ) + worker_rpc_host = os.getenv("LAUNCHPAD_WORKER_RPC_HOST") + if not worker_rpc_host: + raise ValueError("LAUNCHPAD_WORKER_RPC_HOST environment variable required") + + worker_concurrency_str = os.getenv("LAUNCHPAD_WORKER_CONCURRENCY") + if not worker_concurrency_str: + raise ValueError("LAUNCHPAD_WORKER_CONCURRENCY environment variable required") + + try: + worker_concurrency = int(worker_concurrency_str) + except ValueError: + raise ValueError( # noqa: E501 + f"LAUNCHPAD_WORKER_CONCURRENCY must be a valid integer, got: {worker_concurrency_str}" + ) + sentry_region = os.getenv("SENTRY_REGION", "unknown") return ServerConfig( @@ -233,6 +254,8 @@ def get_server_config() -> ServerConfig: sentry_region=sentry_region, host=host, port=port, + worker_rpc_host=worker_rpc_host, + worker_concurrency=worker_concurrency, debug=not is_production, log_level="WARNING" if is_production else "DEBUG", access_log=not is_production, # Disable access logs in prod diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 84a2722b..a6c504bc 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -9,11 +9,13 @@ from dataclasses import dataclass +from taskbroker_client.worker import TaskWorker + from launchpad.sentry_client import SentryClient from launchpad.utils.logging import get_logger from launchpad.utils.statsd import NullStatsd, StatsdInterface, get_statsd -from .kafka import LaunchpadKafkaConsumer, create_kafka_consumer +from .kafka import LaunchpadKafkaConsumer from .sentry_sdk_init import initialize_sentry_sdk from .server import LaunchpadServer, get_server_config @@ -37,6 +39,7 @@ def __init__(self, statsd: StatsdInterface | None = None) -> None: self._service_config: ServiceConfig | None = None self._sentry_client: SentryClient | None = None self._shutdown_requested = False + self.worker: TaskWorker | None = None def setup(self) -> None: initialize_sentry_sdk() @@ -51,11 +54,27 @@ def setup(self) -> None: statsd=self._statsd, ) - self.kafka = create_kafka_consumer() + # Initialize the worker + worker_rpc_host = server_config.worker_rpc_host + worker_concurrency = server_config.worker_concurrency + + self.worker = TaskWorker( + app_module="worker:app", + broker_hosts=[worker_rpc_host], + max_child_task_count=100, + concurrency=worker_concurrency, + child_tasks_queue_maxsize=worker_concurrency * 2, + result_queue_maxsize=worker_concurrency * 2, + rebalance_after=32, + processing_pool_name="examples", # TODO - I don't know what this is honestly + process_type="forkserver", + ) + + # self.kafka = create_kafka_consumer() logger.info("Service components initialized") def start(self) -> None: - if not self.server or not self.kafka: + if not self.server or not self.kafka or not self.worker: raise RuntimeError("Service not properly initialized. Call setup() first.") logger.info("Starting Launchpad service...") @@ -83,14 +102,16 @@ def signal_handler(signum: int, frame) -> None: logger.info("Launchpad service started successfully") + exitcode = 1 try: - # Run Kafka consumer in main thread (blocking) - self.kafka.run() + exitcode = self.worker.start() finally: logger.info("Cleaning up service resources...") self._shutdown_server() logger.info("Service cleanup completed") + raise SystemExit(exitcode) + def is_healthy(self) -> bool: """Get overall service health status.""" is_server_healthy = self.server.is_healthy() diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py new file mode 100644 index 00000000..cc0baffe --- /dev/null +++ b/src/launchpad/worker/app.py @@ -0,0 +1,32 @@ +from arroyo.backends.kafka import KafkaProducer +from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.router import TaskRouter + +from .store import StubAtMostOnce + + +class CustomRouter(TaskRouter): + """Custom router that routes all namespaces to the 'taskworker' topic.""" + + def route_namespace(self, name: str) -> str: + return "taskworker" + + +def producer_factory(topic: str) -> KafkaProducer: + # TODO use env vars for kafka host/port + config = { + "bootstrap.servers": "127.0.0.1:9092", + "compression.type": "lz4", + "message.max.bytes": 50000000, # 50MB + } + return KafkaProducer(config) + + +app = TaskbrokerApp( + name="launchpad", + producer_factory=producer_factory, + router_class=CustomRouter(), + at_most_once_store=StubAtMostOnce(), +) + +app.set_modules(["tasks"]) diff --git a/src/launchpad/worker/store.py b/src/launchpad/worker/store.py new file mode 100644 index 00000000..3e7fc996 --- /dev/null +++ b/src/launchpad/worker/store.py @@ -0,0 +1,12 @@ +from taskbroker_client.types import AtMostOnceStore + + +class StubAtMostOnce(AtMostOnceStore): + def __init__(self) -> None: + self._keys: dict[str, str] = {} + + def add(self, key: str, value: str, timeout: int) -> bool: + if key in self._keys: + return False + self._keys[key] = value + return True diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py new file mode 100644 index 00000000..15275655 --- /dev/null +++ b/src/launchpad/worker/tasks.py @@ -0,0 +1,16 @@ +import logging + +from typing import Any + +from .app import app + +logger = logging.getLogger(__name__) + + +# Create a namespace and register tasks +default = app.taskregistry.create_namespace("default") + + +@default.register(name="process_something") +def process_something(*args: list[Any], **kwargs: dict[str, Any]) -> None: + print("Received task 🎉") From 65ff82a8551cbf59125038cd220f5cee6b2f2836 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 2 Feb 2026 08:59:06 -0800 Subject: [PATCH 4/6] Fix Taskworker Setup Issues (Working Locally) --- .envrc | 4 ++++ src/launchpad/service.py | 16 ++++++---------- src/launchpad/worker/app.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.envrc b/.envrc index 503963ed..10a7d825 100644 --- a/.envrc +++ b/.envrc @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0" export LAUNCHPAD_PORT="2218" export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha" export SENTRY_BASE_URL="http://localhost:8000" + +export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051" +export LAUNCHPAD_WORKER_CONCURRENCY="32" + # STATSD_HOST=... # defaults to 127.0.0.1 # STATSD_PORT=... # defaults to 8125 diff --git a/src/launchpad/service.py b/src/launchpad/service.py index a6c504bc..ff180b46 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -15,7 +15,6 @@ from launchpad.utils.logging import get_logger from launchpad.utils.statsd import NullStatsd, StatsdInterface, get_statsd -from .kafka import LaunchpadKafkaConsumer from .sentry_sdk_init import initialize_sentry_sdk from .server import LaunchpadServer, get_server_config @@ -31,7 +30,6 @@ class LaunchpadService: def __init__(self, statsd: StatsdInterface | None = None) -> None: self.server: LaunchpadServer | None = None - self.kafka: LaunchpadKafkaConsumer | None = None self._server_thread: threading.Thread | None = None self._server_loop: asyncio.AbstractEventLoop | None = None self._statsd = statsd or NullStatsd() @@ -59,22 +57,21 @@ def setup(self) -> None: worker_concurrency = server_config.worker_concurrency self.worker = TaskWorker( - app_module="worker:app", + app_module="launchpad.worker.app:app", broker_hosts=[worker_rpc_host], max_child_task_count=100, concurrency=worker_concurrency, child_tasks_queue_maxsize=worker_concurrency * 2, result_queue_maxsize=worker_concurrency * 2, rebalance_after=32, - processing_pool_name="examples", # TODO - I don't know what this is honestly + processing_pool_name="launchpad", process_type="forkserver", ) - # self.kafka = create_kafka_consumer() logger.info("Service components initialized") def start(self) -> None: - if not self.server or not self.kafka or not self.worker: + if not self.server or not self.worker: raise RuntimeError("Service not properly initialized. Call setup() first.") logger.info("Starting Launchpad service...") @@ -86,8 +83,6 @@ def signal_handler(signum: int, frame) -> None: logger.info(f"Received signal {signum}, initiating shutdown...") self._shutdown_requested = True - if self.kafka: - self.kafka.stop() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) @@ -115,8 +110,9 @@ def signal_handler(signum: int, frame) -> None: def is_healthy(self) -> bool: """Get overall service health status.""" is_server_healthy = self.server.is_healthy() - is_kafka_healthy = self.kafka.is_healthy() - return is_server_healthy and is_kafka_healthy + + # TODO - Report worker health too + return is_server_healthy def _run_http_server_thread(self) -> None: self._server_loop = asyncio.new_event_loop() diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index cc0baffe..4203675d 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -29,4 +29,4 @@ def producer_factory(topic: str) -> KafkaProducer: at_most_once_store=StubAtMostOnce(), ) -app.set_modules(["tasks"]) +app.set_modules(["launchpad.worker.tasks"]) From a4c85a3f6dc7c0a753082f756ad11d50f56d16dc Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 2 Feb 2026 12:00:06 -0800 Subject: [PATCH 5/6] Trigger Message Processing from Task (WIP) --- src/launchpad/artifact_processor.py | 13 ++++++------- src/launchpad/worker/tasks.py | 10 +++++++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/launchpad/artifact_processor.py b/src/launchpad/artifact_processor.py index c45beb65..eead8b8f 100644 --- a/src/launchpad/artifact_processor.py +++ b/src/launchpad/artifact_processor.py @@ -7,7 +7,7 @@ from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterator, cast +from typing import Any, Dict, Iterator, List, cast import sentry_sdk @@ -66,7 +66,10 @@ def __init__( @staticmethod def process_message( - payload: PreprodArtifactEvents, + artifact_id: str, + project_id: str, + organization_id: str, + requested_features: List[PreprodFeature], service_config=None, artifact_processor=None, statsd=None, @@ -84,10 +87,6 @@ def process_message( initialize_sentry_sdk() - organization_id = payload["organization_id"] - project_id = payload["project_id"] - artifact_id = payload["artifact_id"] - if statsd is None: statsd = get_statsd() if artifact_processor is None: @@ -98,7 +97,7 @@ def process_message( artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client) requested_features = [] - for feature in payload.get("requested_features", []): + for feature in requested_features: try: requested_features.append(PreprodFeature(feature)) except ValueError: diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py index 15275655..238e2686 100644 --- a/src/launchpad/worker/tasks.py +++ b/src/launchpad/worker/tasks.py @@ -1,6 +1,9 @@ import logging -from typing import Any +from typing import Any, List + +from launchpad.artifact_processor import ArtifactProcessor +from launchpad.constants import PreprodFeature from .app import app @@ -11,6 +14,7 @@ default = app.taskregistry.create_namespace("default") -@default.register(name="process_something") -def process_something(*args: list[Any], **kwargs: dict[str, Any]) -> None: +@default.register(name="process_artifact") +def process_artifact(artifact_id: str, project_id: str, organization_id: str, requested_features: List[PreprodFeature], **kwargs) -> None: print("Received task 🎉") + ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) From 5958eab5cc3d46be45f868148c429625d058a97e Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 2 Feb 2026 13:57:37 -0800 Subject: [PATCH 6/6] Perform Artifact Processing in Worker Properly (Tested Locally Only) --- src/launchpad/artifact_processor.py | 9 +++------ src/launchpad/constants.py | 1 + src/launchpad/service.py | 6 ++++++ src/launchpad/worker/tasks.py | 15 ++++++++------- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/launchpad/artifact_processor.py b/src/launchpad/artifact_processor.py index eead8b8f..af5dd2bb 100644 --- a/src/launchpad/artifact_processor.py +++ b/src/launchpad/artifact_processor.py @@ -17,9 +17,6 @@ from objectstore_client import ( Usecase, ) -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( - PreprodArtifactEvents, -) from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel @@ -96,10 +93,10 @@ def process_message( objectstore_client = ObjectstoreClient(service_config.objectstore_url) artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client) - requested_features = [] + features = [] for feature in requested_features: try: - requested_features.append(PreprodFeature(feature)) + features.append(PreprodFeature(feature)) except ValueError: logger.exception(f"Unknown feature {feature}") @@ -126,7 +123,7 @@ def process_message( statsd.increment("artifact.processing.started") logger.info(f"Processing artifact {artifact_id} (project: {project_id}, org: {organization_id})") try: - artifact_processor.process_artifact(organization_id, project_id, artifact_id, requested_features) + artifact_processor.process_artifact(organization_id, project_id, artifact_id, features) except Exception: statsd.increment("artifact.processing.failed") duration = time.time() - start_time diff --git a/src/launchpad/constants.py b/src/launchpad/constants.py index 4eb84c0f..b95ec974 100644 --- a/src/launchpad/constants.py +++ b/src/launchpad/constants.py @@ -28,6 +28,7 @@ class ArtifactType(Enum): # This should match exactly with the definition in: # src/sentry/preprod/producer.py class PreprodFeature(Enum): + PRE_PROCESS = "pre_process" SIZE_ANALYSIS = "size_analysis" BUILD_DISTRIBUTION = "build_distribution" diff --git a/src/launchpad/service.py b/src/launchpad/service.py index ff180b46..7a9c5492 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -109,12 +109,18 @@ def signal_handler(signum: int, frame) -> None: def is_healthy(self) -> bool: """Get overall service health status.""" + # PRECONDITION - Assume server exists + assert self.server + is_server_healthy = self.server.is_healthy() # TODO - Report worker health too return is_server_healthy def _run_http_server_thread(self) -> None: + # PRECONDITION - Assume server exists + assert self.server + self._server_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._server_loop) diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py index 238e2686..7a4369ae 100644 --- a/src/launchpad/worker/tasks.py +++ b/src/launchpad/worker/tasks.py @@ -1,20 +1,21 @@ -import logging -from typing import Any, List +from typing import List from launchpad.artifact_processor import ArtifactProcessor from launchpad.constants import PreprodFeature from .app import app -logger = logging.getLogger(__name__) - - # Create a namespace and register tasks default = app.taskregistry.create_namespace("default") @default.register(name="process_artifact") -def process_artifact(artifact_id: str, project_id: str, organization_id: str, requested_features: List[PreprodFeature], **kwargs) -> None: - print("Received task 🎉") +def process_artifact( + artifact_id: str, project_id: str, organization_id: str, requested_features: List[PreprodFeature], **kwargs +) -> None: + print("Processing artifact...") + ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) + + print("Processed artifact 🎉")