Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/art/dev/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from .openai_server import OpenAIServerConfig, ServerArgs, get_openai_server_config
from .train import TrainConfig, TrainSFTConfig
from .validate import is_dedicated_mode, validate_dedicated_config

__all__ = [
"EngineArgs",
Expand All @@ -21,8 +22,10 @@
"TinkerTrainingClientArgs",
"TrainerArgs",
"get_openai_server_config",
"is_dedicated_mode",
"OpenAIServerConfig",
"ServerArgs",
"TrainSFTConfig",
"TrainConfig",
"validate_dedicated_config",
]
21 changes: 18 additions & 3 deletions src/art/dev/get_model_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .engine import EngineArgs
from .model import InitArgs, InternalModelConfig, PeftArgs, TrainerArgs
from .validate import is_dedicated_mode


def get_model_config(
Expand All @@ -12,13 +13,22 @@ def get_model_config(
if config is None:
config = InternalModelConfig()

enable_sleep_mode = config.get("engine_args", {}).get("enable_sleep_mode", True)
dedicated = is_dedicated_mode(config)

if dedicated:
enable_sleep_mode = False
else:
enable_sleep_mode = config.get("engine_args", {}).get("enable_sleep_mode", True)

init_args = InitArgs(
fast_inference=False,
load_in_4bit=True,
max_seq_length=32768,
model_name=base_model,
)
# fast_inference triggers in-process vLLM via Unsloth; dedicated mode runs vLLM as a subprocess
if not dedicated:
init_args["fast_inference"] = False

engine_args = EngineArgs(
allowed_local_media_path="/tmp",
enable_sleep_mode=enable_sleep_mode,
Expand Down Expand Up @@ -63,10 +73,15 @@ def get_model_config(
weight_decay=0.1,
)
trainer_args.update(config.get("trainer_args", {}))
return InternalModelConfig(
result = InternalModelConfig(
init_args=init_args,
engine_args=engine_args,
peft_args=peft_args,
tinker_args=config.get("tinker_args"),
trainer_args=trainer_args,
)
if "trainer_gpu_ids" in config:
result["trainer_gpu_ids"] = config["trainer_gpu_ids"]
if "inference_gpu_ids" in config:
result["inference_gpu_ids"] = config["inference_gpu_ids"]
return result
7 changes: 7 additions & 0 deletions src/art/dev/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class InternalModelConfig(TypedDict, total=False):
peft: Arguments for creating an Unsloth PEFT model wrapper.
tinker: Arguments for the Tinker training client.
trainer: Arguments for the GRPO trainer.
trainer_gpu_ids: GPU IDs for training (e.g., [0]). When set with
inference_gpu_ids, enables dedicated mode where training and
inference run on separate GPUs.
inference_gpu_ids: GPU IDs for vLLM inference (e.g., [1]). When set
with trainer_gpu_ids, enables dedicated mode.
"""

init_args: "InitArgs"
Expand All @@ -123,6 +128,8 @@ class InternalModelConfig(TypedDict, total=False):
tinker_args: "TinkerArgs | None"
tinker_native_args: "TinkerNativeArgs | None"
trainer_args: "TrainerArgs"
trainer_gpu_ids: list[int]
inference_gpu_ids: list[int]


class TinkerArgs(TypedDict, total=False):
Expand Down
67 changes: 67 additions & 0 deletions src/art/dev/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Validation functions for model configuration."""

from .model import InternalModelConfig


def is_dedicated_mode(config: InternalModelConfig) -> bool:
"""Return True if the config specifies dedicated mode (separate training and inference GPUs)."""
return "trainer_gpu_ids" in config and "inference_gpu_ids" in config


def validate_dedicated_config(config: InternalModelConfig) -> None:
"""Validate dedicated mode GPU configuration.

Raises ValueError if the configuration is invalid.
Does nothing if neither trainer_gpu_ids nor inference_gpu_ids is set (shared mode).
"""
has_trainer = "trainer_gpu_ids" in config
has_inference = "inference_gpu_ids" in config

if has_trainer != has_inference:
raise ValueError(
"trainer_gpu_ids and inference_gpu_ids must both be set or both unset"
)

if not has_trainer:
return

trainer_gpu_ids = config["trainer_gpu_ids"]
inference_gpu_ids = config["inference_gpu_ids"]

if not trainer_gpu_ids:
raise ValueError("trainer_gpu_ids must be non-empty")

if not inference_gpu_ids:
raise ValueError("inference_gpu_ids must be non-empty")

if set(trainer_gpu_ids) & set(inference_gpu_ids):
raise ValueError("trainer_gpu_ids and inference_gpu_ids must not overlap")

if len(inference_gpu_ids) > 1:
raise ValueError(
"Multi-GPU inference not yet supported; inference_gpu_ids must have exactly one GPU"
)

if trainer_gpu_ids[0] != 0:
raise ValueError(
"trainer_gpu_ids must start at GPU 0 (training runs in-process)"
)

expected = list(range(len(trainer_gpu_ids)))
if trainer_gpu_ids != expected:
raise ValueError(
"trainer_gpu_ids must be contiguous starting from 0 (e.g., [0], [0,1])"
)

# Reject settings that are incompatible with dedicated mode
if config.get("init_args", {}).get("fast_inference"):
raise ValueError(
"fast_inference is incompatible with dedicated mode "
"(dedicated mode runs vLLM as a subprocess, not in-process)"
)

if config.get("engine_args", {}).get("enable_sleep_mode"):
raise ValueError(
"enable_sleep_mode is incompatible with dedicated mode "
"(dedicated mode runs vLLM on a separate GPU, sleep/wake is not needed)"
)
52 changes: 47 additions & 5 deletions src/art/local/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import logging
import math
import os
import shutil
Expand All @@ -9,6 +10,8 @@
from typing import AsyncIterator, Iterable, Literal, cast
import warnings

logger = logging.getLogger(__name__)

import aiohttp
import numpy as np
from openai import AsyncOpenAI
Expand Down Expand Up @@ -97,6 +100,9 @@ async def close(self) -> None:

def _close(self) -> None:
for _, service in self._services.items():
close = getattr(service, "close", None)
if close is not None:
close()
close_proxy(service)

async def register(
Expand Down Expand Up @@ -140,18 +146,39 @@ def _model_inference_name(self, model: Model, step: int | None = None) -> str:

# For LocalBackend, vLLM always serves LoRA adapters with @step suffix
# Default to step 0 when not specified (the initial checkpoint created at registration)
actual_step = step if step is not None else self.__get_step(model)
return f"{model.name}@{actual_step}"
if step is not None:
actual_step = step
elif model.name in self._services:
# In dedicated mode the service tracks which adapter vLLM has
# actually loaded. Reading the filesystem would race: the
# checkpoint directory appears before the HTTP reload completes.
svc = self._services[model.name]
loaded_step = getattr(svc, "_latest_step", None)
actual_step = (
loaded_step if loaded_step is not None else self.__get_step(model)
)
else:
actual_step = self.__get_step(model)
name = f"{model.name}@{actual_step}"
logger.debug(
f"[BACKEND] _model_inference_name: step_arg={step} "
f"actual_step={actual_step} -> {name}"
)
return name

async def _get_service(self, model: TrainableModel) -> ModelService:
from ..dev.get_model_config import get_model_config
from ..dev.validate import is_dedicated_mode, validate_dedicated_config

if model.name not in self._services:
config = get_model_config(
base_model=model.base_model,
output_dir=get_model_dir(model=model, art_path=self._path),
config=model._internal_config,
)
validate_dedicated_config(config)
dedicated = is_dedicated_mode(config)

is_tinker = config.get("tinker_args") is not None
if is_tinker:
from ..tinker.service import TinkerService
Expand All @@ -164,13 +191,19 @@ async def _get_service(self, model: TrainableModel) -> ModelService:
# When moving the service to a child process, import unsloth
# early to maximize optimizations
os.environ["IMPORT_UNSLOTH"] = "1"

if dedicated:
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
str(g) for g in config["trainer_gpu_ids"]
)

self._services[model.name] = service_class(
model_name=model.name,
base_model=model.base_model,
config=config,
output_dir=get_model_dir(model=model, art_path=self._path),
)
if not self._in_process:
if not dedicated and not self._in_process:
# Kill all "model-service" processes to free up GPU memory
subprocess.run(["pkill", "-9", "model-service"])
self._services[model.name] = move_to_child_process(
Expand Down Expand Up @@ -585,6 +618,10 @@ async def _train_model(
# Still advance the step by renaming the checkpoint directory
current_step = self.__get_step(model)
next_step = current_step + 1
logger.info(
f"[BACKEND] _train_model SKIP: current_step={current_step} "
f"next_step={next_step} (all rewards equal)"
)
current_checkpoint_dir = get_step_checkpoint_dir(
get_model_dir(model=model, art_path=self._path), current_step
)
Expand All @@ -599,8 +636,9 @@ async def _train_model(
next_checkpoint_dir,
dirs_exist_ok=True,
)
print(
f"Advanced step from {current_step} to {next_step} (no training occurred)"
logger.info(
f"[BACKEND] _train_model SKIP: copied checkpoint "
f"{current_step} -> {next_step}, calling register_lora_for_step..."
)

try:
Expand All @@ -610,6 +648,10 @@ async def _train_model(
await service.register_lora_for_step( # type: ignore[attr-defined]
next_step, next_checkpoint_dir
)
logger.info(
f"[BACKEND] _train_model SKIP: register_lora_for_step "
f"completed for step {next_step}"
)
except ModuleNotFoundError:
pass # Unsloth is not installed

Expand Down
Loading